0fdc9cafb5
* fix(all): introduce and use iox.ReadAllContext This improvement over the ioutil.ReadAll utility returns early if the context expires. This enables us to unblock stuck code in case there's censorship confounding the TCP stack. See https://github.com/ooni/probe/issues/1417. Compared to the functionality postulated in the above mentioned issue, I choose to be more generic and separate limiting the maximum body size (not implemented here) from using the context to return early when reading a body (or any other reader). After implementing iox.ReadAllContext, I made sure we always use it everywhere in the tree instead of ioutil.ReadAll. This includes many parts of the codebase where in theory we don't need iox.ReadAllContext. Though, changing all the places makes checking whether we're not using ioutil.ReadAll where we should not be using it easy: `git grep` should return no lines. * Update internal/iox/iox_test.go * fix(ndt7): treat context errors as non-errors The rationale is explained by the comment documenting reduceErr. * Update internal/engine/experiment/ndt7/download.go
308 lines
8.7 KiB
Go
308 lines
8.7 KiB
Go
// Package dash implements the DASH network experiment.
|
|
//
|
|
// Spec: https://github.com/ooni/spec/blob/master/nettests/ts-021-dash.md
|
|
package dash
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"runtime"
|
|
"time"
|
|
|
|
"github.com/montanaflynn/stats"
|
|
"github.com/ooni/probe-cli/v3/internal/engine/model"
|
|
"github.com/ooni/probe-cli/v3/internal/engine/netx"
|
|
"github.com/ooni/probe-cli/v3/internal/engine/netx/errorx"
|
|
"github.com/ooni/probe-cli/v3/internal/engine/netx/trace"
|
|
"github.com/ooni/probe-cli/v3/internal/humanize"
|
|
"github.com/ooni/probe-cli/v3/internal/iox"
|
|
)
|
|
|
|
const (
|
|
defaultTimeout = 120 * time.Second
|
|
magicVersion = "0.008000000"
|
|
testName = "dash"
|
|
testVersion = "0.12.0"
|
|
totalStep = 15.0
|
|
)
|
|
|
|
var (
|
|
errServerBusy = errors.New("dash: server busy; try again later")
|
|
errHTTPRequestFailed = errors.New("dash: request failed")
|
|
)
|
|
|
|
// Config contains the experiment config.
|
|
type Config struct{}
|
|
|
|
// Simple contains the experiment total summary
|
|
type Simple struct {
|
|
ConnectLatency float64 `json:"connect_latency"`
|
|
MedianBitrate int64 `json:"median_bitrate"`
|
|
MinPlayoutDelay float64 `json:"min_playout_delay"`
|
|
}
|
|
|
|
// ServerInfo contains information on the selected server
|
|
//
|
|
// This is currently an extension to the DASH specification
|
|
// until the data format of the new mlab locate is clear.
|
|
type ServerInfo struct {
|
|
Hostname string `json:"hostname"`
|
|
Site string `json:"site,omitempty"`
|
|
}
|
|
|
|
// TestKeys contains the test keys
|
|
type TestKeys struct {
|
|
Server ServerInfo `json:"server"`
|
|
Simple Simple `json:"simple"`
|
|
Failure *string `json:"failure"`
|
|
ReceiverData []clientResults `json:"receiver_data"`
|
|
}
|
|
|
|
type runner struct {
|
|
callbacks model.ExperimentCallbacks
|
|
httpClient *http.Client
|
|
saver *trace.Saver
|
|
sess model.ExperimentSession
|
|
tk *TestKeys
|
|
}
|
|
|
|
func (r runner) HTTPClient() *http.Client {
|
|
return r.httpClient
|
|
}
|
|
|
|
func (r runner) JSONMarshal(v interface{}) ([]byte, error) {
|
|
return json.Marshal(v)
|
|
}
|
|
|
|
func (r runner) Logger() model.Logger {
|
|
return r.sess.Logger()
|
|
}
|
|
|
|
func (r runner) NewHTTPRequest(meth, url string, body io.Reader) (*http.Request, error) {
|
|
return http.NewRequest(meth, url, body)
|
|
}
|
|
|
|
func (r runner) ReadAllContext(ctx context.Context, reader io.Reader) ([]byte, error) {
|
|
return iox.ReadAllContext(ctx, reader)
|
|
}
|
|
|
|
func (r runner) Scheme() string {
|
|
return "https"
|
|
}
|
|
|
|
func (r runner) UserAgent() string {
|
|
return r.sess.UserAgent()
|
|
}
|
|
|
|
func (r runner) loop(ctx context.Context, numIterations int64) error {
|
|
locateResult, err := locate(ctx, r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.tk.Server = ServerInfo{
|
|
Hostname: locateResult.FQDN,
|
|
Site: locateResult.Site,
|
|
}
|
|
fqdn := locateResult.FQDN
|
|
r.callbacks.OnProgress(0.0, fmt.Sprintf("streaming: server: %s", fqdn))
|
|
negotiateResp, err := negotiate(ctx, fqdn, r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := r.measure(ctx, fqdn, negotiateResp, numIterations); err != nil {
|
|
return err
|
|
}
|
|
// TODO(bassosimone): it seems we're not saving the server data?
|
|
err = collect(ctx, fqdn, negotiateResp.Authorization, r.tk.ReceiverData, r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return r.tk.analyze()
|
|
}
|
|
|
|
func (r runner) measure(
|
|
ctx context.Context, fqdn string, negotiateResp negotiateResponse,
|
|
numIterations int64) error {
|
|
// Note: according to a comment in MK sources 3000 kbit/s was the
|
|
// minimum speed recommended by Netflix for SD quality in 2017.
|
|
//
|
|
// See: <https://help.netflix.com/en/node/306>.
|
|
const initialBitrate = 3000
|
|
current := clientResults{
|
|
ElapsedTarget: 2,
|
|
Platform: runtime.GOOS,
|
|
Rate: initialBitrate,
|
|
RealAddress: negotiateResp.RealAddress,
|
|
Version: magicVersion,
|
|
}
|
|
var (
|
|
begin = time.Now()
|
|
connectTime float64
|
|
total int64
|
|
)
|
|
for current.Iteration < numIterations {
|
|
result, err := download(ctx, downloadConfig{
|
|
authorization: negotiateResp.Authorization,
|
|
begin: begin,
|
|
currentRate: current.Rate,
|
|
deps: r,
|
|
elapsedTarget: current.ElapsedTarget,
|
|
fqdn: fqdn,
|
|
})
|
|
if err != nil {
|
|
// Implementation note: ndt7 controls the connection much
|
|
// more than us and it can tell whether an error occurs when
|
|
// connecting or later. We cannot say that very precisely
|
|
// because, in principle, we may reconnect. So we always
|
|
// return error here. This comment is being introduced so
|
|
// that we don't do https://github.com/ooni/probe-cli/v3/internal/engine/pull/526
|
|
// again, because that isn't accurate.
|
|
return err
|
|
}
|
|
current.Elapsed = result.elapsed
|
|
current.Received = result.received
|
|
current.RequestTicks = result.requestTicks
|
|
current.Timestamp = result.timestamp
|
|
current.ServerURL = result.serverURL
|
|
// Read the events so far and possibly update our measurement
|
|
// of the latest connect time. We should have one sample in most
|
|
// cases, because the connection should be persistent.
|
|
for _, ev := range r.saver.Read() {
|
|
if ev.Name == errorx.ConnectOperation {
|
|
connectTime = ev.Duration.Seconds()
|
|
}
|
|
}
|
|
current.ConnectTime = connectTime
|
|
r.tk.ReceiverData = append(r.tk.ReceiverData, current)
|
|
total += current.Received
|
|
avgspeed := 8 * float64(total) / time.Since(begin).Seconds()
|
|
percentage := float64(current.Iteration) / float64(numIterations)
|
|
message := fmt.Sprintf("streaming: speed: %s", humanize.SI(avgspeed, "bit/s"))
|
|
r.callbacks.OnProgress(percentage, message)
|
|
current.Iteration++
|
|
speed := float64(current.Received) / float64(current.Elapsed)
|
|
speed *= 8.0 // to bits per second
|
|
speed /= 1000.0 // to kbit/s
|
|
current.Rate = int64(speed)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (tk *TestKeys) analyze() error {
|
|
var (
|
|
rates []float64
|
|
frameReadyTime float64
|
|
playTime float64
|
|
)
|
|
for _, results := range tk.ReceiverData {
|
|
rates = append(rates, float64(results.Rate))
|
|
// Same in all samples if we're using a single connection
|
|
tk.Simple.ConnectLatency = results.ConnectTime
|
|
// Rationale: first segment plays when it arrives. Subsequent segments
|
|
// would play in ElapsedTarget seconds. However, will play when they
|
|
// arrive. Stall is the time we need to wait for a frame to arrive with
|
|
// the video stopped and the spinning icon.
|
|
frameReadyTime += results.Elapsed
|
|
if playTime == 0.0 {
|
|
playTime += frameReadyTime
|
|
} else {
|
|
playTime += float64(results.ElapsedTarget)
|
|
}
|
|
stall := frameReadyTime - playTime
|
|
if stall > tk.Simple.MinPlayoutDelay {
|
|
tk.Simple.MinPlayoutDelay = stall
|
|
}
|
|
}
|
|
median, err := stats.Median(rates)
|
|
tk.Simple.MedianBitrate = int64(median)
|
|
return err
|
|
}
|
|
|
|
func (r runner) do(ctx context.Context) error {
|
|
defer r.callbacks.OnProgress(1, "streaming: done")
|
|
const numIterations = 15
|
|
err := r.loop(ctx, numIterations)
|
|
if err != nil {
|
|
s := err.Error()
|
|
r.tk.Failure = &s
|
|
// fallthrough
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Measurer performs the measurement.
|
|
type Measurer struct {
|
|
config Config
|
|
}
|
|
|
|
// ExperimentName implements model.ExperimentMeasurer.ExperimentName.
|
|
func (m Measurer) ExperimentName() string {
|
|
return testName
|
|
}
|
|
|
|
// ExperimentVersion implements model.ExperimentMeasurer.ExperimentVersion.
|
|
func (m Measurer) ExperimentVersion() string {
|
|
return testVersion
|
|
}
|
|
|
|
// Run implements model.ExperimentMeasurer.Run.
|
|
func (m Measurer) Run(
|
|
ctx context.Context, sess model.ExperimentSession,
|
|
measurement *model.Measurement, callbacks model.ExperimentCallbacks,
|
|
) error {
|
|
tk := new(TestKeys)
|
|
measurement.TestKeys = tk
|
|
saver := &trace.Saver{}
|
|
httpClient := &http.Client{
|
|
Transport: netx.NewHTTPTransport(netx.Config{
|
|
ContextByteCounting: true,
|
|
DialSaver: saver,
|
|
Logger: sess.Logger(),
|
|
}),
|
|
}
|
|
defer httpClient.CloseIdleConnections()
|
|
r := runner{
|
|
callbacks: callbacks,
|
|
httpClient: httpClient,
|
|
saver: saver,
|
|
sess: sess,
|
|
tk: tk,
|
|
}
|
|
ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
|
|
defer cancel()
|
|
return r.do(ctx)
|
|
}
|
|
|
|
// NewExperimentMeasurer creates a new ExperimentMeasurer.
|
|
func NewExperimentMeasurer(config Config) model.ExperimentMeasurer {
|
|
return Measurer{config: config}
|
|
}
|
|
|
|
// SummaryKeys contains summary keys for this experiment.
|
|
//
|
|
// Note that this structure is part of the ABI contract with probe-cli
|
|
// therefore we should be careful when changing it.
|
|
type SummaryKeys struct {
|
|
Latency float64 `json:"connect_latency"`
|
|
Bitrate float64 `json:"median_bitrate"`
|
|
Delay float64 `json:"min_playout_delay"`
|
|
IsAnomaly bool `json:"-"`
|
|
}
|
|
|
|
// GetSummaryKeys implements model.ExperimentMeasurer.GetSummaryKeys.
|
|
func (m Measurer) GetSummaryKeys(measurement *model.Measurement) (interface{}, error) {
|
|
sk := SummaryKeys{IsAnomaly: false}
|
|
tk, ok := measurement.TestKeys.(*TestKeys)
|
|
if !ok {
|
|
return sk, errors.New("invalid test keys type")
|
|
}
|
|
sk.Latency = tk.Simple.ConnectLatency
|
|
sk.Bitrate = float64(tk.Simple.MedianBitrate)
|
|
sk.Delay = tk.Simple.MinPlayoutDelay
|
|
return sk, nil
|
|
}
|