ooni-probe-cli/internal/engine/experiment/dash/dash.go
Simone Basso d57c78bc71
chore: merge probe-engine into probe-cli (#201)
This is how I did it:

1. `git clone https://github.com/ooni/probe-engine internal/engine`

2. ```
(cd internal/engine && git describe --tags)
v0.23.0
```

3. `nvim go.mod` (merging `go.mod` with `internal/engine/go.mod`

4. `rm -rf internal/.git internal/engine/go.{mod,sum}`

5. `git add internal/engine`

6. `find . -type f -name \*.go -exec sed -i 's@/ooni/probe-engine@/ooni/probe-cli/v3/internal/engine@g' {} \;`

7. `go build ./...` (passes)

8. `go test -race ./...` (temporary failure on RiseupVPN)

9. `go mod tidy`

10. this commit message

Once this piece of work is done, we can build a new version of `ooniprobe` that
is using `internal/engine` directly. We need to do more work to ensure all the
other functionality in `probe-engine` (e.g. making mobile packages) are still WAI.

Part of https://github.com/ooni/probe/issues/1335
2021-02-02 12:05:47 +01:00

308 lines
8.7 KiB
Go

// Package dash implements the DASH network experiment.
//
// Spec: https://github.com/ooni/spec/blob/master/nettests/ts-021-dash.md
package dash
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"runtime"
"time"
"github.com/montanaflynn/stats"
"github.com/ooni/probe-cli/v3/internal/engine/internal/humanizex"
"github.com/ooni/probe-cli/v3/internal/engine/model"
"github.com/ooni/probe-cli/v3/internal/engine/netx"
"github.com/ooni/probe-cli/v3/internal/engine/netx/errorx"
"github.com/ooni/probe-cli/v3/internal/engine/netx/trace"
)
const (
defaultTimeout = 120 * time.Second
magicVersion = "0.008000000"
testName = "dash"
testVersion = "0.12.0"
totalStep = 15.0
)
var (
errServerBusy = errors.New("dash: server busy; try again later")
errHTTPRequestFailed = errors.New("dash: request failed")
)
// Config contains the experiment config.
type Config struct{}
// Simple contains the experiment total summary
type Simple struct {
ConnectLatency float64 `json:"connect_latency"`
MedianBitrate int64 `json:"median_bitrate"`
MinPlayoutDelay float64 `json:"min_playout_delay"`
}
// ServerInfo contains information on the selected server
//
// This is currently an extension to the DASH specification
// until the data format of the new mlab locate is clear.
type ServerInfo struct {
Hostname string `json:"hostname"`
Site string `json:"site,omitempty"`
}
// TestKeys contains the test keys
type TestKeys struct {
Server ServerInfo `json:"server"`
Simple Simple `json:"simple"`
Failure *string `json:"failure"`
ReceiverData []clientResults `json:"receiver_data"`
}
type runner struct {
callbacks model.ExperimentCallbacks
httpClient *http.Client
saver *trace.Saver
sess model.ExperimentSession
tk *TestKeys
}
func (r runner) HTTPClient() *http.Client {
return r.httpClient
}
func (r runner) JSONMarshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
func (r runner) Logger() model.Logger {
return r.sess.Logger()
}
func (r runner) NewHTTPRequest(meth, url string, body io.Reader) (*http.Request, error) {
return http.NewRequest(meth, url, body)
}
func (r runner) ReadAll(reader io.Reader) ([]byte, error) {
return ioutil.ReadAll(reader)
}
func (r runner) Scheme() string {
return "https"
}
func (r runner) UserAgent() string {
return r.sess.UserAgent()
}
func (r runner) loop(ctx context.Context, numIterations int64) error {
locateResult, err := locate(ctx, r)
if err != nil {
return err
}
r.tk.Server = ServerInfo{
Hostname: locateResult.FQDN,
Site: locateResult.Site,
}
fqdn := locateResult.FQDN
r.callbacks.OnProgress(0.0, fmt.Sprintf("streaming: server: %s", fqdn))
negotiateResp, err := negotiate(ctx, fqdn, r)
if err != nil {
return err
}
if err := r.measure(ctx, fqdn, negotiateResp, numIterations); err != nil {
return err
}
// TODO(bassosimone): it seems we're not saving the server data?
err = collect(ctx, fqdn, negotiateResp.Authorization, r.tk.ReceiverData, r)
if err != nil {
return err
}
return r.tk.analyze()
}
func (r runner) measure(
ctx context.Context, fqdn string, negotiateResp negotiateResponse,
numIterations int64) error {
// Note: according to a comment in MK sources 3000 kbit/s was the
// minimum speed recommended by Netflix for SD quality in 2017.
//
// See: <https://help.netflix.com/en/node/306>.
const initialBitrate = 3000
current := clientResults{
ElapsedTarget: 2,
Platform: runtime.GOOS,
Rate: initialBitrate,
RealAddress: negotiateResp.RealAddress,
Version: magicVersion,
}
var (
begin = time.Now()
connectTime float64
total int64
)
for current.Iteration < numIterations {
result, err := download(ctx, downloadConfig{
authorization: negotiateResp.Authorization,
begin: begin,
currentRate: current.Rate,
deps: r,
elapsedTarget: current.ElapsedTarget,
fqdn: fqdn,
})
if err != nil {
// Implementation note: ndt7 controls the connection much
// more than us and it can tell whether an error occurs when
// connecting or later. We cannot say that very precisely
// because, in principle, we may reconnect. So we always
// return error here. This comment is being introduced so
// that we don't do https://github.com/ooni/probe-cli/v3/internal/engine/pull/526
// again, because that isn't accurate.
return err
}
current.Elapsed = result.elapsed
current.Received = result.received
current.RequestTicks = result.requestTicks
current.Timestamp = result.timestamp
current.ServerURL = result.serverURL
// Read the events so far and possibly update our measurement
// of the latest connect time. We should have one sample in most
// cases, because the connection should be persistent.
for _, ev := range r.saver.Read() {
if ev.Name == errorx.ConnectOperation {
connectTime = ev.Duration.Seconds()
}
}
current.ConnectTime = connectTime
r.tk.ReceiverData = append(r.tk.ReceiverData, current)
total += current.Received
avgspeed := 8 * float64(total) / time.Now().Sub(begin).Seconds()
percentage := float64(current.Iteration) / float64(numIterations)
message := fmt.Sprintf("streaming: speed: %s", humanizex.SI(avgspeed, "bit/s"))
r.callbacks.OnProgress(percentage, message)
current.Iteration++
speed := float64(current.Received) / float64(current.Elapsed)
speed *= 8.0 // to bits per second
speed /= 1000.0 // to kbit/s
current.Rate = int64(speed)
}
return nil
}
func (tk *TestKeys) analyze() error {
var (
rates []float64
frameReadyTime float64
playTime float64
)
for _, results := range tk.ReceiverData {
rates = append(rates, float64(results.Rate))
// Same in all samples if we're using a single connection
tk.Simple.ConnectLatency = results.ConnectTime
// Rationale: first segment plays when it arrives. Subsequent segments
// would play in ElapsedTarget seconds. However, will play when they
// arrive. Stall is the time we need to wait for a frame to arrive with
// the video stopped and the spinning icon.
frameReadyTime += results.Elapsed
if playTime == 0.0 {
playTime += frameReadyTime
} else {
playTime += float64(results.ElapsedTarget)
}
stall := frameReadyTime - playTime
if stall > tk.Simple.MinPlayoutDelay {
tk.Simple.MinPlayoutDelay = stall
}
}
median, err := stats.Median(rates)
tk.Simple.MedianBitrate = int64(median)
return err
}
func (r runner) do(ctx context.Context) error {
defer r.callbacks.OnProgress(1, "streaming: done")
const numIterations = 15
err := r.loop(ctx, numIterations)
if err != nil {
s := err.Error()
r.tk.Failure = &s
// fallthrough
}
return err
}
// Measurer performs the measurement.
type Measurer struct {
config Config
}
// ExperimentName implements model.ExperimentMeasurer.ExperimentName.
func (m Measurer) ExperimentName() string {
return testName
}
// ExperimentVersion implements model.ExperimentMeasurer.ExperimentVersion.
func (m Measurer) ExperimentVersion() string {
return testVersion
}
// Run implements model.ExperimentMeasurer.Run.
func (m Measurer) Run(
ctx context.Context, sess model.ExperimentSession,
measurement *model.Measurement, callbacks model.ExperimentCallbacks,
) error {
tk := new(TestKeys)
measurement.TestKeys = tk
saver := &trace.Saver{}
httpClient := &http.Client{
Transport: netx.NewHTTPTransport(netx.Config{
ContextByteCounting: true,
DialSaver: saver,
Logger: sess.Logger(),
}),
}
defer httpClient.CloseIdleConnections()
r := runner{
callbacks: callbacks,
httpClient: httpClient,
saver: saver,
sess: sess,
tk: tk,
}
ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
return r.do(ctx)
}
// NewExperimentMeasurer creates a new ExperimentMeasurer.
func NewExperimentMeasurer(config Config) model.ExperimentMeasurer {
return Measurer{config: config}
}
// SummaryKeys contains summary keys for this experiment.
//
// Note that this structure is part of the ABI contract with probe-cli
// therefore we should be careful when changing it.
type SummaryKeys struct {
Latency float64 `json:"connect_latency"`
Bitrate float64 `json:"median_bitrate"`
Delay float64 `json:"min_playout_delay"`
IsAnomaly bool `json:"-"`
}
// GetSummaryKeys implements model.ExperimentMeasurer.GetSummaryKeys.
func (m Measurer) GetSummaryKeys(measurement *model.Measurement) (interface{}, error) {
sk := SummaryKeys{IsAnomaly: false}
tk, ok := measurement.TestKeys.(*TestKeys)
if !ok {
return sk, errors.New("invalid test keys type")
}
sk.Latency = tk.Simple.ConnectLatency
sk.Bitrate = float64(tk.Simple.MedianBitrate)
sk.Delay = tk.Simple.MinPlayoutDelay
return sk, nil
}