diff --git a/internal/cmd/miniooni/libminiooni.go b/internal/cmd/miniooni/libminiooni.go index 9f7213a..387f917 100644 --- a/internal/cmd/miniooni/libminiooni.go +++ b/internal/cmd/miniooni/libminiooni.go @@ -460,15 +460,12 @@ type experimentWrapper struct { total int } -func (ew *experimentWrapper) MeasureWithContext( - ctx context.Context, idx int, input string) (*model.Measurement, error) { +func (ew *experimentWrapper) MeasureAsync( + ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) { if input != "" { log.Infof("[%d/%d] running with input: %s", idx+1, ew.total, input) } - measurement, err := ew.child.MeasureWithContext(ctx, idx, input) - warnOnError(err, "measurement failed") - // policy: we do not stop the loop if the measurement fails - return measurement, nil + return ew.child.MeasureAsync(ctx, input, idx) } type submitterWrapper struct { diff --git a/internal/engine/doc.go b/internal/engine/doc.go new file mode 100644 index 0000000..b6a139c --- /dev/null +++ b/internal/engine/doc.go @@ -0,0 +1,2 @@ +// Package engine contains the engine API. +package engine diff --git a/internal/engine/experiment.go b/internal/engine/experiment.go index acf7150..025077f 100644 --- a/internal/engine/experiment.go +++ b/internal/engine/experiment.go @@ -1,4 +1,3 @@ -// Package engine contains the engine API. package engine import ( @@ -92,28 +91,130 @@ func (e *Experiment) ReportID() string { // Measure performs a measurement with input. We assume that you have // configured the available test helpers, either manually or by calling // the session's MaybeLookupBackends() method. +// +// Return value: strictly either a non-nil measurement and +// a nil error or a nil measurement and a non-nil error. +// +// CAVEAT: while this API is perfectly fine for experiments that +// return a single measurement, it will only return the first measurement +// when used with an asynchronous experiment. We plan on eventually +// migrating all experiments to run in asynchronous fashion. +// +// Deprecated: use MeasureWithContext instead, please. func (e *Experiment) Measure(input string) (*model.Measurement, error) { return e.MeasureWithContext(context.Background(), input) } -// MeasureWithContext is like Measure but with context. -func (e *Experiment) MeasureWithContext( - ctx context.Context, input string, -) (measurement *model.Measurement, err error) { - err = e.session.MaybeLookupLocationContext(ctx) // this already tracks session bytes +// experimentAsyncWrapper makes a sync experiment behave like it was async +type experimentAsyncWrapper struct { + *Experiment +} + +var _ model.ExperimentMeasurerAsync = &experimentAsyncWrapper{} + +// RunAsync implements ExperimentMeasurerAsync.RunAsync. +func (eaw *experimentAsyncWrapper) RunAsync( + ctx context.Context, sess model.ExperimentSession, input string, + callbacks model.ExperimentCallbacks) (<-chan *model.ExperimentAsyncTestKeys, error) { + out := make(chan *model.ExperimentAsyncTestKeys) + measurement := eaw.Experiment.newMeasurement(input) + start := time.Now() + err := eaw.Experiment.measurer.Run(ctx, eaw.session, measurement, eaw.callbacks) + stop := time.Now() if err != nil { - return + return nil, err + } + go func() { + defer close(out) // signal the reader we're done! + out <- &model.ExperimentAsyncTestKeys{ + Extensions: measurement.Extensions, + MeasurementRuntime: stop.Sub(start).Seconds(), + TestKeys: measurement.TestKeys, + } + }() + return out, nil +} + +// MeasureAsync runs an async measurement. This operation could post +// one or more measurements onto the returned channel. We'll close the +// channel when we've emitted all the measurements. +// +// Arguments: +// +// - ctx is the context for deadline/cancellation/timeout; +// +// - input is the input (typically a URL but it could also be +// just an endpoint or an empty string for input-less experiments +// such as, e.g., ndt7 and dash). +// +// Return value: +// +// - on success, channel where to post measurements (the channel +// will be closed when done) and nil error; +// +// - on failure, nil channel and non-nil error. +func (e *Experiment) MeasureAsync( + ctx context.Context, input string) (<-chan *model.Measurement, error) { + err := e.session.MaybeLookupLocationContext(ctx) // this already tracks session bytes + if err != nil { + return nil, err } ctx = dialer.WithSessionByteCounter(ctx, e.session.byteCounter) ctx = dialer.WithExperimentByteCounter(ctx, e.byteCounter) - measurement = e.newMeasurement(input) - start := time.Now() - err = e.measurer.Run(ctx, e.session, measurement, e.callbacks) - stop := time.Now() - measurement.MeasurementRuntime = stop.Sub(start).Seconds() - scrubErr := measurement.Scrub(e.session.ProbeIP()) - if err == nil { - err = scrubErr + var async model.ExperimentMeasurerAsync + if v, okay := e.measurer.(model.ExperimentMeasurerAsync); okay { + async = v + } else { + async = &experimentAsyncWrapper{e} + } + in, err := async.RunAsync(ctx, e.session, input, e.callbacks) + if err != nil { + return nil, err + } + out := make(chan *model.Measurement) + go func() { + defer close(out) // we need to signal the consumer we're done + for tk := range in { + measurement := e.newMeasurement(input) + measurement.Extensions = tk.Extensions + measurement.MeasurementRuntime = tk.MeasurementRuntime + measurement.TestKeys = tk.TestKeys + if err := measurement.Scrub(e.session.ProbeIP()); err != nil { + // If we fail to scrub the measurement then we are not going to + // submit it. Most likely causes of error here are unlikely, + // e.g., the TestKeys being not serializable. + e.session.Logger().Warnf("can't scrub measurement: %s", err.Error()) + continue + } + out <- measurement + } + }() + return out, nil +} + +// MeasureWithContext is like Measure but with context. +// +// Return value: strictly either a non-nil measurement and +// a nil error or a nil measurement and a non-nil error. +// +// CAVEAT: while this API is perfectly fine for experiments that +// return a single measurement, it will only return the first measurement +// when used with an asynchronous experiment. We plan on eventually +// migrating all experiments to run in asynchronous fashion. +func (e *Experiment) MeasureWithContext( + ctx context.Context, input string, +) (measurement *model.Measurement, err error) { + out, err := e.MeasureAsync(ctx, input) + if err != nil { + return nil, err + } + for m := range out { + if measurement == nil { + measurement = m // as documented just return the first one + } + } + if measurement == nil { + err = errors.New("experiment returned no measurements") } return } @@ -139,11 +240,12 @@ func (e *Experiment) SubmitAndUpdateMeasurement(measurement *model.Measurement) func (e *Experiment) SubmitAndUpdateMeasurementContext( ctx context.Context, measurement *model.Measurement) error { if e.report == nil { - return errors.New("Report is not open") + return errors.New("report is not open") } return e.report.SubmitMeasurement(ctx, measurement) } +// newMeasurement creates a new measurement for this experiment with the given input. func (e *Experiment) newMeasurement(input string) *model.Measurement { utctimenow := time.Now().UTC() m := &model.Measurement{ diff --git a/internal/engine/experiment_integration_test.go b/internal/engine/experiment_integration_test.go index f7b8a68..d10aea4 100644 --- a/internal/engine/experiment_integration_test.go +++ b/internal/engine/experiment_integration_test.go @@ -213,8 +213,8 @@ func TestMeasurementFailure(t *testing.T) { if err.Error() != "mocked error" { t.Fatal("unexpected error type") } - if measurement == nil { - t.Fatal("expected non nil measurement here") + if measurement != nil { + t.Fatal("expected nil measurement here") } } diff --git a/internal/engine/inputprocessor.go b/internal/engine/inputprocessor.go index c828cac..b9e5651 100644 --- a/internal/engine/inputprocessor.go +++ b/internal/engine/inputprocessor.go @@ -10,15 +10,15 @@ import ( // InputProcessorExperiment is the Experiment // according to InputProcessor. type InputProcessorExperiment interface { - MeasureWithContext( - ctx context.Context, input string) (*model.Measurement, error) + MeasureAsync( + ctx context.Context, input string) (<-chan *model.Measurement, error) } // InputProcessorExperimentWrapper is a wrapper for an // Experiment that also allow to pass around the input index. type InputProcessorExperimentWrapper interface { - MeasureWithContext( - ctx context.Context, idx int, input string) (*model.Measurement, error) + MeasureAsync( + ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) } // NewInputProcessorExperimentWrapper creates a new @@ -32,9 +32,9 @@ type inputProcessorExperimentWrapper struct { exp InputProcessorExperiment } -func (ipew inputProcessorExperimentWrapper) MeasureWithContext( - ctx context.Context, idx int, input string) (*model.Measurement, error) { - return ipew.exp.MeasureWithContext(ctx, input) +func (ipew inputProcessorExperimentWrapper) MeasureAsync( + ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) { + return ipew.exp.MeasureAsync(ctx, input) } var _ InputProcessorExperimentWrapper = inputProcessorExperimentWrapper{} @@ -142,21 +142,29 @@ func (ip *InputProcessor) run(ctx context.Context) (int, error) { return stopMaxRuntime, nil } input := url.URL - meas, err := ip.Experiment.MeasureWithContext(ctx, idx, input) + var measurements []*model.Measurement + source, err := ip.Experiment.MeasureAsync(ctx, input, idx) if err != nil { return 0, err } - meas.AddAnnotations(ip.Annotations) - meas.Options = ip.Options - err = ip.Submitter.Submit(ctx, idx, meas) - if err != nil { - return 0, err + // NOTE: we don't want to intermix measuring with submitting + // therefore we collect all measurements first + for meas := range source { + measurements = append(measurements, meas) } - // Note: must be after submission because submission modifies - // the measurement to include the report ID. - err = ip.Saver.SaveMeasurement(idx, meas) - if err != nil { - return 0, err + for _, meas := range measurements { + meas.AddAnnotations(ip.Annotations) + meas.Options = ip.Options + err = ip.Submitter.Submit(ctx, idx, meas) + if err != nil { + return 0, err + } + // Note: must be after submission because submission modifies + // the measurement to include the report ID. + err = ip.Saver.SaveMeasurement(idx, meas) + if err != nil { + return 0, err + } } } return stopNormal, nil diff --git a/internal/engine/inputprocessor_test.go b/internal/engine/inputprocessor_test.go index abb2beb..3a2677f 100644 --- a/internal/engine/inputprocessor_test.go +++ b/internal/engine/inputprocessor_test.go @@ -15,8 +15,8 @@ type FakeInputProcessorExperiment struct { M []*model.Measurement } -func (fipe *FakeInputProcessorExperiment) MeasureWithContext( - ctx context.Context, input string) (*model.Measurement, error) { +func (fipe *FakeInputProcessorExperiment) MeasureAsync( + ctx context.Context, input string) (<-chan *model.Measurement, error) { if fipe.Err != nil { return nil, fipe.Err } @@ -30,7 +30,12 @@ func (fipe *FakeInputProcessorExperiment) MeasureWithContext( m.AddAnnotation("foo", "baz") // would be bar below m.Input = model.MeasurementTarget(input) fipe.M = append(fipe.M, m) - return m, nil + out := make(chan *model.Measurement) + go func() { + defer close(out) + out <- m + }() + return out, nil } func TestInputProcessorMeasurementFailed(t *testing.T) { diff --git a/internal/engine/model/experiment.go b/internal/engine/model/experiment.go index 053fe96..dcaeece 100644 --- a/internal/engine/model/experiment.go +++ b/internal/engine/model/experiment.go @@ -21,6 +21,47 @@ type ExperimentSession interface { UserAgent() string } +// ExperimentAsyncTestKeys is the type of test keys returned by an experiment +// when running in async fashion rather than in sync fashion. +type ExperimentAsyncTestKeys struct { + // MeasurementRuntime is the total measurement runtime. + MeasurementRuntime float64 + + // TestKeys contains the actual test keys. + TestKeys interface{} + + // Extensions contains the extensions used by this experiment. + Extensions map[string]int64 +} + +// ExperimentMeasurerAsync is a measurer that can run in async fashion. +// +// Currently this functionality is optional, but we will likely +// migrate all experiments to use this functionality in 2022. +type ExperimentMeasurerAsync interface { + // RunAsync runs the experiment in async fashion. + // + // Arguments: + // + // - ctx is the context for deadline/timeout/cancellation + // + // - sess is the measurement session + // + // - input is the input URL to measure + // + // - callbacks contains the experiment callbacks + // + // Returns either a channel where TestKeys are posted or an error. + // + // An error indicates that specific preconditions for running the experiment + // are not met (e.g., the input URL is invalid). + // + // On success, the experiment will post on the channel each new + // measurement until it is done and closes the channel. + RunAsync(ctx context.Context, sess ExperimentSession, input string, + callbacks ExperimentCallbacks) (<-chan *ExperimentAsyncTestKeys, error) +} + // ExperimentCallbacks contains experiment event-handling callbacks type ExperimentCallbacks interface { // OnProgress provides information about an experiment progress. diff --git a/pkg/oonimkall/internal/tasks/runner.go b/pkg/oonimkall/internal/tasks/runner.go index d98a02b..f57ef74 100644 --- a/pkg/oonimkall/internal/tasks/runner.go +++ b/pkg/oonimkall/internal/tasks/runner.go @@ -273,7 +273,11 @@ func (r *Runner) Run(ctx context.Context) { Idx: int64(idx), Input: input, }) - // fallthrough: we want to submit the report anyway + // Historical note: here we used to fallthrough but, since we have + // implemented async measurements, the case where there is an error + // and we also have a valid measurement cant't happen anymore. So, + // now the only valid strategy here is to continue. + continue } data, err := json.Marshal(m) runtimex.PanicOnError(err, "measurement.MarshalJSON failed")