From ff1c1705625cd12d5629133275d992afa48807fc Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 30 Sep 2021 00:54:52 +0200 Subject: [PATCH] feat(engine): allow runner to return many measurements (#527) This is required to implement websteps, which is currently tracked by https://github.com/ooni/probe/issues/1733. We introduce the concept of async runner. An async runner will post measurements on a channel until it is done. When it is done, it will close the channel to notify the reader about that. This change causes sync experiments now to strictly return either a non-nil measurement or a non-nil error. While this is a pretty much obvious situation in golang, we had some parts of the codebase that were not robust to this assumption and attempted to submit a measurement after the measure call returned an error. Luckily, we had enough tests to catch this change in our assumption and this is why there are extra docs and tests changes. --- internal/cmd/miniooni/libminiooni.go | 9 +- internal/engine/doc.go | 2 + internal/engine/experiment.go | 134 +++++++++++++++--- .../engine/experiment_integration_test.go | 4 +- internal/engine/inputprocessor.go | 44 +++--- internal/engine/inputprocessor_test.go | 11 +- internal/engine/model/experiment.go | 41 ++++++ pkg/oonimkall/internal/tasks/runner.go | 6 +- 8 files changed, 205 insertions(+), 46 deletions(-) create mode 100644 internal/engine/doc.go diff --git a/internal/cmd/miniooni/libminiooni.go b/internal/cmd/miniooni/libminiooni.go index 9f7213a..387f917 100644 --- a/internal/cmd/miniooni/libminiooni.go +++ b/internal/cmd/miniooni/libminiooni.go @@ -460,15 +460,12 @@ type experimentWrapper struct { total int } -func (ew *experimentWrapper) MeasureWithContext( - ctx context.Context, idx int, input string) (*model.Measurement, error) { +func (ew *experimentWrapper) MeasureAsync( + ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) { if input != "" { log.Infof("[%d/%d] running with input: %s", idx+1, ew.total, input) } - measurement, err := ew.child.MeasureWithContext(ctx, idx, input) - warnOnError(err, "measurement failed") - // policy: we do not stop the loop if the measurement fails - return measurement, nil + return ew.child.MeasureAsync(ctx, input, idx) } type submitterWrapper struct { diff --git a/internal/engine/doc.go b/internal/engine/doc.go new file mode 100644 index 0000000..b6a139c --- /dev/null +++ b/internal/engine/doc.go @@ -0,0 +1,2 @@ +// Package engine contains the engine API. +package engine diff --git a/internal/engine/experiment.go b/internal/engine/experiment.go index acf7150..025077f 100644 --- a/internal/engine/experiment.go +++ b/internal/engine/experiment.go @@ -1,4 +1,3 @@ -// Package engine contains the engine API. package engine import ( @@ -92,28 +91,130 @@ func (e *Experiment) ReportID() string { // Measure performs a measurement with input. We assume that you have // configured the available test helpers, either manually or by calling // the session's MaybeLookupBackends() method. +// +// Return value: strictly either a non-nil measurement and +// a nil error or a nil measurement and a non-nil error. +// +// CAVEAT: while this API is perfectly fine for experiments that +// return a single measurement, it will only return the first measurement +// when used with an asynchronous experiment. We plan on eventually +// migrating all experiments to run in asynchronous fashion. +// +// Deprecated: use MeasureWithContext instead, please. func (e *Experiment) Measure(input string) (*model.Measurement, error) { return e.MeasureWithContext(context.Background(), input) } -// MeasureWithContext is like Measure but with context. -func (e *Experiment) MeasureWithContext( - ctx context.Context, input string, -) (measurement *model.Measurement, err error) { - err = e.session.MaybeLookupLocationContext(ctx) // this already tracks session bytes +// experimentAsyncWrapper makes a sync experiment behave like it was async +type experimentAsyncWrapper struct { + *Experiment +} + +var _ model.ExperimentMeasurerAsync = &experimentAsyncWrapper{} + +// RunAsync implements ExperimentMeasurerAsync.RunAsync. +func (eaw *experimentAsyncWrapper) RunAsync( + ctx context.Context, sess model.ExperimentSession, input string, + callbacks model.ExperimentCallbacks) (<-chan *model.ExperimentAsyncTestKeys, error) { + out := make(chan *model.ExperimentAsyncTestKeys) + measurement := eaw.Experiment.newMeasurement(input) + start := time.Now() + err := eaw.Experiment.measurer.Run(ctx, eaw.session, measurement, eaw.callbacks) + stop := time.Now() if err != nil { - return + return nil, err + } + go func() { + defer close(out) // signal the reader we're done! + out <- &model.ExperimentAsyncTestKeys{ + Extensions: measurement.Extensions, + MeasurementRuntime: stop.Sub(start).Seconds(), + TestKeys: measurement.TestKeys, + } + }() + return out, nil +} + +// MeasureAsync runs an async measurement. This operation could post +// one or more measurements onto the returned channel. We'll close the +// channel when we've emitted all the measurements. +// +// Arguments: +// +// - ctx is the context for deadline/cancellation/timeout; +// +// - input is the input (typically a URL but it could also be +// just an endpoint or an empty string for input-less experiments +// such as, e.g., ndt7 and dash). +// +// Return value: +// +// - on success, channel where to post measurements (the channel +// will be closed when done) and nil error; +// +// - on failure, nil channel and non-nil error. +func (e *Experiment) MeasureAsync( + ctx context.Context, input string) (<-chan *model.Measurement, error) { + err := e.session.MaybeLookupLocationContext(ctx) // this already tracks session bytes + if err != nil { + return nil, err } ctx = dialer.WithSessionByteCounter(ctx, e.session.byteCounter) ctx = dialer.WithExperimentByteCounter(ctx, e.byteCounter) - measurement = e.newMeasurement(input) - start := time.Now() - err = e.measurer.Run(ctx, e.session, measurement, e.callbacks) - stop := time.Now() - measurement.MeasurementRuntime = stop.Sub(start).Seconds() - scrubErr := measurement.Scrub(e.session.ProbeIP()) - if err == nil { - err = scrubErr + var async model.ExperimentMeasurerAsync + if v, okay := e.measurer.(model.ExperimentMeasurerAsync); okay { + async = v + } else { + async = &experimentAsyncWrapper{e} + } + in, err := async.RunAsync(ctx, e.session, input, e.callbacks) + if err != nil { + return nil, err + } + out := make(chan *model.Measurement) + go func() { + defer close(out) // we need to signal the consumer we're done + for tk := range in { + measurement := e.newMeasurement(input) + measurement.Extensions = tk.Extensions + measurement.MeasurementRuntime = tk.MeasurementRuntime + measurement.TestKeys = tk.TestKeys + if err := measurement.Scrub(e.session.ProbeIP()); err != nil { + // If we fail to scrub the measurement then we are not going to + // submit it. Most likely causes of error here are unlikely, + // e.g., the TestKeys being not serializable. + e.session.Logger().Warnf("can't scrub measurement: %s", err.Error()) + continue + } + out <- measurement + } + }() + return out, nil +} + +// MeasureWithContext is like Measure but with context. +// +// Return value: strictly either a non-nil measurement and +// a nil error or a nil measurement and a non-nil error. +// +// CAVEAT: while this API is perfectly fine for experiments that +// return a single measurement, it will only return the first measurement +// when used with an asynchronous experiment. We plan on eventually +// migrating all experiments to run in asynchronous fashion. +func (e *Experiment) MeasureWithContext( + ctx context.Context, input string, +) (measurement *model.Measurement, err error) { + out, err := e.MeasureAsync(ctx, input) + if err != nil { + return nil, err + } + for m := range out { + if measurement == nil { + measurement = m // as documented just return the first one + } + } + if measurement == nil { + err = errors.New("experiment returned no measurements") } return } @@ -139,11 +240,12 @@ func (e *Experiment) SubmitAndUpdateMeasurement(measurement *model.Measurement) func (e *Experiment) SubmitAndUpdateMeasurementContext( ctx context.Context, measurement *model.Measurement) error { if e.report == nil { - return errors.New("Report is not open") + return errors.New("report is not open") } return e.report.SubmitMeasurement(ctx, measurement) } +// newMeasurement creates a new measurement for this experiment with the given input. func (e *Experiment) newMeasurement(input string) *model.Measurement { utctimenow := time.Now().UTC() m := &model.Measurement{ diff --git a/internal/engine/experiment_integration_test.go b/internal/engine/experiment_integration_test.go index f7b8a68..d10aea4 100644 --- a/internal/engine/experiment_integration_test.go +++ b/internal/engine/experiment_integration_test.go @@ -213,8 +213,8 @@ func TestMeasurementFailure(t *testing.T) { if err.Error() != "mocked error" { t.Fatal("unexpected error type") } - if measurement == nil { - t.Fatal("expected non nil measurement here") + if measurement != nil { + t.Fatal("expected nil measurement here") } } diff --git a/internal/engine/inputprocessor.go b/internal/engine/inputprocessor.go index c828cac..b9e5651 100644 --- a/internal/engine/inputprocessor.go +++ b/internal/engine/inputprocessor.go @@ -10,15 +10,15 @@ import ( // InputProcessorExperiment is the Experiment // according to InputProcessor. type InputProcessorExperiment interface { - MeasureWithContext( - ctx context.Context, input string) (*model.Measurement, error) + MeasureAsync( + ctx context.Context, input string) (<-chan *model.Measurement, error) } // InputProcessorExperimentWrapper is a wrapper for an // Experiment that also allow to pass around the input index. type InputProcessorExperimentWrapper interface { - MeasureWithContext( - ctx context.Context, idx int, input string) (*model.Measurement, error) + MeasureAsync( + ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) } // NewInputProcessorExperimentWrapper creates a new @@ -32,9 +32,9 @@ type inputProcessorExperimentWrapper struct { exp InputProcessorExperiment } -func (ipew inputProcessorExperimentWrapper) MeasureWithContext( - ctx context.Context, idx int, input string) (*model.Measurement, error) { - return ipew.exp.MeasureWithContext(ctx, input) +func (ipew inputProcessorExperimentWrapper) MeasureAsync( + ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) { + return ipew.exp.MeasureAsync(ctx, input) } var _ InputProcessorExperimentWrapper = inputProcessorExperimentWrapper{} @@ -142,21 +142,29 @@ func (ip *InputProcessor) run(ctx context.Context) (int, error) { return stopMaxRuntime, nil } input := url.URL - meas, err := ip.Experiment.MeasureWithContext(ctx, idx, input) + var measurements []*model.Measurement + source, err := ip.Experiment.MeasureAsync(ctx, input, idx) if err != nil { return 0, err } - meas.AddAnnotations(ip.Annotations) - meas.Options = ip.Options - err = ip.Submitter.Submit(ctx, idx, meas) - if err != nil { - return 0, err + // NOTE: we don't want to intermix measuring with submitting + // therefore we collect all measurements first + for meas := range source { + measurements = append(measurements, meas) } - // Note: must be after submission because submission modifies - // the measurement to include the report ID. - err = ip.Saver.SaveMeasurement(idx, meas) - if err != nil { - return 0, err + for _, meas := range measurements { + meas.AddAnnotations(ip.Annotations) + meas.Options = ip.Options + err = ip.Submitter.Submit(ctx, idx, meas) + if err != nil { + return 0, err + } + // Note: must be after submission because submission modifies + // the measurement to include the report ID. + err = ip.Saver.SaveMeasurement(idx, meas) + if err != nil { + return 0, err + } } } return stopNormal, nil diff --git a/internal/engine/inputprocessor_test.go b/internal/engine/inputprocessor_test.go index abb2beb..3a2677f 100644 --- a/internal/engine/inputprocessor_test.go +++ b/internal/engine/inputprocessor_test.go @@ -15,8 +15,8 @@ type FakeInputProcessorExperiment struct { M []*model.Measurement } -func (fipe *FakeInputProcessorExperiment) MeasureWithContext( - ctx context.Context, input string) (*model.Measurement, error) { +func (fipe *FakeInputProcessorExperiment) MeasureAsync( + ctx context.Context, input string) (<-chan *model.Measurement, error) { if fipe.Err != nil { return nil, fipe.Err } @@ -30,7 +30,12 @@ func (fipe *FakeInputProcessorExperiment) MeasureWithContext( m.AddAnnotation("foo", "baz") // would be bar below m.Input = model.MeasurementTarget(input) fipe.M = append(fipe.M, m) - return m, nil + out := make(chan *model.Measurement) + go func() { + defer close(out) + out <- m + }() + return out, nil } func TestInputProcessorMeasurementFailed(t *testing.T) { diff --git a/internal/engine/model/experiment.go b/internal/engine/model/experiment.go index 053fe96..dcaeece 100644 --- a/internal/engine/model/experiment.go +++ b/internal/engine/model/experiment.go @@ -21,6 +21,47 @@ type ExperimentSession interface { UserAgent() string } +// ExperimentAsyncTestKeys is the type of test keys returned by an experiment +// when running in async fashion rather than in sync fashion. +type ExperimentAsyncTestKeys struct { + // MeasurementRuntime is the total measurement runtime. + MeasurementRuntime float64 + + // TestKeys contains the actual test keys. + TestKeys interface{} + + // Extensions contains the extensions used by this experiment. + Extensions map[string]int64 +} + +// ExperimentMeasurerAsync is a measurer that can run in async fashion. +// +// Currently this functionality is optional, but we will likely +// migrate all experiments to use this functionality in 2022. +type ExperimentMeasurerAsync interface { + // RunAsync runs the experiment in async fashion. + // + // Arguments: + // + // - ctx is the context for deadline/timeout/cancellation + // + // - sess is the measurement session + // + // - input is the input URL to measure + // + // - callbacks contains the experiment callbacks + // + // Returns either a channel where TestKeys are posted or an error. + // + // An error indicates that specific preconditions for running the experiment + // are not met (e.g., the input URL is invalid). + // + // On success, the experiment will post on the channel each new + // measurement until it is done and closes the channel. + RunAsync(ctx context.Context, sess ExperimentSession, input string, + callbacks ExperimentCallbacks) (<-chan *ExperimentAsyncTestKeys, error) +} + // ExperimentCallbacks contains experiment event-handling callbacks type ExperimentCallbacks interface { // OnProgress provides information about an experiment progress. diff --git a/pkg/oonimkall/internal/tasks/runner.go b/pkg/oonimkall/internal/tasks/runner.go index d98a02b..f57ef74 100644 --- a/pkg/oonimkall/internal/tasks/runner.go +++ b/pkg/oonimkall/internal/tasks/runner.go @@ -273,7 +273,11 @@ func (r *Runner) Run(ctx context.Context) { Idx: int64(idx), Input: input, }) - // fallthrough: we want to submit the report anyway + // Historical note: here we used to fallthrough but, since we have + // implemented async measurements, the case where there is an error + // and we also have a valid measurement cant't happen anymore. So, + // now the only valid strategy here is to continue. + continue } data, err := json.Marshal(m) runtimex.PanicOnError(err, "measurement.MarshalJSON failed")