feat(engine): allow runner to return many measurements (#527)

This is required to implement websteps, which is currently tracked
by https://github.com/ooni/probe/issues/1733.

We introduce the concept of async runner. An async runner will
post measurements on a channel until it is done. When it is done,
it will close the channel to notify the reader about that.

This change causes sync experiments now to strictly return either
a non-nil measurement or a non-nil error.

While this is a pretty much obvious situation in golang, we had
some parts of the codebase that were not robust to this assumption
and attempted to submit a measurement after the measure call
returned an error.

Luckily, we had enough tests to catch this change in our assumption
and this is why there are extra docs and tests changes.
This commit is contained in:
Simone Basso 2021-09-30 00:54:52 +02:00 committed by GitHub
parent 8931a36cb3
commit ff1c170562
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 205 additions and 46 deletions

View File

@ -460,15 +460,12 @@ type experimentWrapper struct {
total int
}
func (ew *experimentWrapper) MeasureWithContext(
ctx context.Context, idx int, input string) (*model.Measurement, error) {
func (ew *experimentWrapper) MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) {
if input != "" {
log.Infof("[%d/%d] running with input: %s", idx+1, ew.total, input)
}
measurement, err := ew.child.MeasureWithContext(ctx, idx, input)
warnOnError(err, "measurement failed")
// policy: we do not stop the loop if the measurement fails
return measurement, nil
return ew.child.MeasureAsync(ctx, input, idx)
}
type submitterWrapper struct {

2
internal/engine/doc.go Normal file
View File

@ -0,0 +1,2 @@
// Package engine contains the engine API.
package engine

View File

@ -1,4 +1,3 @@
// Package engine contains the engine API.
package engine
import (
@ -92,28 +91,130 @@ func (e *Experiment) ReportID() string {
// Measure performs a measurement with input. We assume that you have
// configured the available test helpers, either manually or by calling
// the session's MaybeLookupBackends() method.
//
// Return value: strictly either a non-nil measurement and
// a nil error or a nil measurement and a non-nil error.
//
// CAVEAT: while this API is perfectly fine for experiments that
// return a single measurement, it will only return the first measurement
// when used with an asynchronous experiment. We plan on eventually
// migrating all experiments to run in asynchronous fashion.
//
// Deprecated: use MeasureWithContext instead, please.
func (e *Experiment) Measure(input string) (*model.Measurement, error) {
return e.MeasureWithContext(context.Background(), input)
}
// MeasureWithContext is like Measure but with context.
func (e *Experiment) MeasureWithContext(
ctx context.Context, input string,
) (measurement *model.Measurement, err error) {
err = e.session.MaybeLookupLocationContext(ctx) // this already tracks session bytes
// experimentAsyncWrapper makes a sync experiment behave like it was async
type experimentAsyncWrapper struct {
*Experiment
}
var _ model.ExperimentMeasurerAsync = &experimentAsyncWrapper{}
// RunAsync implements ExperimentMeasurerAsync.RunAsync.
func (eaw *experimentAsyncWrapper) RunAsync(
ctx context.Context, sess model.ExperimentSession, input string,
callbacks model.ExperimentCallbacks) (<-chan *model.ExperimentAsyncTestKeys, error) {
out := make(chan *model.ExperimentAsyncTestKeys)
measurement := eaw.Experiment.newMeasurement(input)
start := time.Now()
err := eaw.Experiment.measurer.Run(ctx, eaw.session, measurement, eaw.callbacks)
stop := time.Now()
if err != nil {
return
return nil, err
}
go func() {
defer close(out) // signal the reader we're done!
out <- &model.ExperimentAsyncTestKeys{
Extensions: measurement.Extensions,
MeasurementRuntime: stop.Sub(start).Seconds(),
TestKeys: measurement.TestKeys,
}
}()
return out, nil
}
// MeasureAsync runs an async measurement. This operation could post
// one or more measurements onto the returned channel. We'll close the
// channel when we've emitted all the measurements.
//
// Arguments:
//
// - ctx is the context for deadline/cancellation/timeout;
//
// - input is the input (typically a URL but it could also be
// just an endpoint or an empty string for input-less experiments
// such as, e.g., ndt7 and dash).
//
// Return value:
//
// - on success, channel where to post measurements (the channel
// will be closed when done) and nil error;
//
// - on failure, nil channel and non-nil error.
func (e *Experiment) MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error) {
err := e.session.MaybeLookupLocationContext(ctx) // this already tracks session bytes
if err != nil {
return nil, err
}
ctx = dialer.WithSessionByteCounter(ctx, e.session.byteCounter)
ctx = dialer.WithExperimentByteCounter(ctx, e.byteCounter)
measurement = e.newMeasurement(input)
start := time.Now()
err = e.measurer.Run(ctx, e.session, measurement, e.callbacks)
stop := time.Now()
measurement.MeasurementRuntime = stop.Sub(start).Seconds()
scrubErr := measurement.Scrub(e.session.ProbeIP())
if err == nil {
err = scrubErr
var async model.ExperimentMeasurerAsync
if v, okay := e.measurer.(model.ExperimentMeasurerAsync); okay {
async = v
} else {
async = &experimentAsyncWrapper{e}
}
in, err := async.RunAsync(ctx, e.session, input, e.callbacks)
if err != nil {
return nil, err
}
out := make(chan *model.Measurement)
go func() {
defer close(out) // we need to signal the consumer we're done
for tk := range in {
measurement := e.newMeasurement(input)
measurement.Extensions = tk.Extensions
measurement.MeasurementRuntime = tk.MeasurementRuntime
measurement.TestKeys = tk.TestKeys
if err := measurement.Scrub(e.session.ProbeIP()); err != nil {
// If we fail to scrub the measurement then we are not going to
// submit it. Most likely causes of error here are unlikely,
// e.g., the TestKeys being not serializable.
e.session.Logger().Warnf("can't scrub measurement: %s", err.Error())
continue
}
out <- measurement
}
}()
return out, nil
}
// MeasureWithContext is like Measure but with context.
//
// Return value: strictly either a non-nil measurement and
// a nil error or a nil measurement and a non-nil error.
//
// CAVEAT: while this API is perfectly fine for experiments that
// return a single measurement, it will only return the first measurement
// when used with an asynchronous experiment. We plan on eventually
// migrating all experiments to run in asynchronous fashion.
func (e *Experiment) MeasureWithContext(
ctx context.Context, input string,
) (measurement *model.Measurement, err error) {
out, err := e.MeasureAsync(ctx, input)
if err != nil {
return nil, err
}
for m := range out {
if measurement == nil {
measurement = m // as documented just return the first one
}
}
if measurement == nil {
err = errors.New("experiment returned no measurements")
}
return
}
@ -139,11 +240,12 @@ func (e *Experiment) SubmitAndUpdateMeasurement(measurement *model.Measurement)
func (e *Experiment) SubmitAndUpdateMeasurementContext(
ctx context.Context, measurement *model.Measurement) error {
if e.report == nil {
return errors.New("Report is not open")
return errors.New("report is not open")
}
return e.report.SubmitMeasurement(ctx, measurement)
}
// newMeasurement creates a new measurement for this experiment with the given input.
func (e *Experiment) newMeasurement(input string) *model.Measurement {
utctimenow := time.Now().UTC()
m := &model.Measurement{

View File

@ -213,8 +213,8 @@ func TestMeasurementFailure(t *testing.T) {
if err.Error() != "mocked error" {
t.Fatal("unexpected error type")
}
if measurement == nil {
t.Fatal("expected non nil measurement here")
if measurement != nil {
t.Fatal("expected nil measurement here")
}
}

View File

@ -10,15 +10,15 @@ import (
// InputProcessorExperiment is the Experiment
// according to InputProcessor.
type InputProcessorExperiment interface {
MeasureWithContext(
ctx context.Context, input string) (*model.Measurement, error)
MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error)
}
// InputProcessorExperimentWrapper is a wrapper for an
// Experiment that also allow to pass around the input index.
type InputProcessorExperimentWrapper interface {
MeasureWithContext(
ctx context.Context, idx int, input string) (*model.Measurement, error)
MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error)
}
// NewInputProcessorExperimentWrapper creates a new
@ -32,9 +32,9 @@ type inputProcessorExperimentWrapper struct {
exp InputProcessorExperiment
}
func (ipew inputProcessorExperimentWrapper) MeasureWithContext(
ctx context.Context, idx int, input string) (*model.Measurement, error) {
return ipew.exp.MeasureWithContext(ctx, input)
func (ipew inputProcessorExperimentWrapper) MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) {
return ipew.exp.MeasureAsync(ctx, input)
}
var _ InputProcessorExperimentWrapper = inputProcessorExperimentWrapper{}
@ -142,21 +142,29 @@ func (ip *InputProcessor) run(ctx context.Context) (int, error) {
return stopMaxRuntime, nil
}
input := url.URL
meas, err := ip.Experiment.MeasureWithContext(ctx, idx, input)
var measurements []*model.Measurement
source, err := ip.Experiment.MeasureAsync(ctx, input, idx)
if err != nil {
return 0, err
}
meas.AddAnnotations(ip.Annotations)
meas.Options = ip.Options
err = ip.Submitter.Submit(ctx, idx, meas)
if err != nil {
return 0, err
// NOTE: we don't want to intermix measuring with submitting
// therefore we collect all measurements first
for meas := range source {
measurements = append(measurements, meas)
}
// Note: must be after submission because submission modifies
// the measurement to include the report ID.
err = ip.Saver.SaveMeasurement(idx, meas)
if err != nil {
return 0, err
for _, meas := range measurements {
meas.AddAnnotations(ip.Annotations)
meas.Options = ip.Options
err = ip.Submitter.Submit(ctx, idx, meas)
if err != nil {
return 0, err
}
// Note: must be after submission because submission modifies
// the measurement to include the report ID.
err = ip.Saver.SaveMeasurement(idx, meas)
if err != nil {
return 0, err
}
}
}
return stopNormal, nil

View File

@ -15,8 +15,8 @@ type FakeInputProcessorExperiment struct {
M []*model.Measurement
}
func (fipe *FakeInputProcessorExperiment) MeasureWithContext(
ctx context.Context, input string) (*model.Measurement, error) {
func (fipe *FakeInputProcessorExperiment) MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error) {
if fipe.Err != nil {
return nil, fipe.Err
}
@ -30,7 +30,12 @@ func (fipe *FakeInputProcessorExperiment) MeasureWithContext(
m.AddAnnotation("foo", "baz") // would be bar below
m.Input = model.MeasurementTarget(input)
fipe.M = append(fipe.M, m)
return m, nil
out := make(chan *model.Measurement)
go func() {
defer close(out)
out <- m
}()
return out, nil
}
func TestInputProcessorMeasurementFailed(t *testing.T) {

View File

@ -21,6 +21,47 @@ type ExperimentSession interface {
UserAgent() string
}
// ExperimentAsyncTestKeys is the type of test keys returned by an experiment
// when running in async fashion rather than in sync fashion.
type ExperimentAsyncTestKeys struct {
// MeasurementRuntime is the total measurement runtime.
MeasurementRuntime float64
// TestKeys contains the actual test keys.
TestKeys interface{}
// Extensions contains the extensions used by this experiment.
Extensions map[string]int64
}
// ExperimentMeasurerAsync is a measurer that can run in async fashion.
//
// Currently this functionality is optional, but we will likely
// migrate all experiments to use this functionality in 2022.
type ExperimentMeasurerAsync interface {
// RunAsync runs the experiment in async fashion.
//
// Arguments:
//
// - ctx is the context for deadline/timeout/cancellation
//
// - sess is the measurement session
//
// - input is the input URL to measure
//
// - callbacks contains the experiment callbacks
//
// Returns either a channel where TestKeys are posted or an error.
//
// An error indicates that specific preconditions for running the experiment
// are not met (e.g., the input URL is invalid).
//
// On success, the experiment will post on the channel each new
// measurement until it is done and closes the channel.
RunAsync(ctx context.Context, sess ExperimentSession, input string,
callbacks ExperimentCallbacks) (<-chan *ExperimentAsyncTestKeys, error)
}
// ExperimentCallbacks contains experiment event-handling callbacks
type ExperimentCallbacks interface {
// OnProgress provides information about an experiment progress.

View File

@ -273,7 +273,11 @@ func (r *Runner) Run(ctx context.Context) {
Idx: int64(idx),
Input: input,
})
// fallthrough: we want to submit the report anyway
// Historical note: here we used to fallthrough but, since we have
// implemented async measurements, the case where there is an error
// and we also have a valid measurement cant't happen anymore. So,
// now the only valid strategy here is to continue.
continue
}
data, err := json.Marshal(m)
runtimex.PanicOnError(err, "measurement.MarshalJSON failed")