ooni-probe-cli/internal/engine/inputprocessor.go
Simone Basso ff1c170562
feat(engine): allow runner to return many measurements (#527)
This is required to implement websteps, which is currently tracked
by https://github.com/ooni/probe/issues/1733.

We introduce the concept of async runner. An async runner will
post measurements on a channel until it is done. When it is done,
it will close the channel to notify the reader about that.

This change causes sync experiments now to strictly return either
a non-nil measurement or a non-nil error.

While this is a pretty much obvious situation in golang, we had
some parts of the codebase that were not robust to this assumption
and attempted to submit a measurement after the measure call
returned an error.

Luckily, we had enough tests to catch this change in our assumption
and this is why there are extra docs and tests changes.
2021-09-30 00:54:52 +02:00

172 lines
5.2 KiB
Go

package engine
import (
"context"
"time"
"github.com/ooni/probe-cli/v3/internal/engine/model"
)
// InputProcessorExperiment is the Experiment
// according to InputProcessor.
type InputProcessorExperiment interface {
MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error)
}
// InputProcessorExperimentWrapper is a wrapper for an
// Experiment that also allow to pass around the input index.
type InputProcessorExperimentWrapper interface {
MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error)
}
// NewInputProcessorExperimentWrapper creates a new
// instance of InputProcessorExperimentWrapper.
func NewInputProcessorExperimentWrapper(
exp InputProcessorExperiment) InputProcessorExperimentWrapper {
return inputProcessorExperimentWrapper{exp: exp}
}
type inputProcessorExperimentWrapper struct {
exp InputProcessorExperiment
}
func (ipew inputProcessorExperimentWrapper) MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) {
return ipew.exp.MeasureAsync(ctx, input)
}
var _ InputProcessorExperimentWrapper = inputProcessorExperimentWrapper{}
// InputProcessor processes inputs. We perform a Measurement
// for each input using the given Experiment.
type InputProcessor struct {
// Annotations contains the measurement annotations
Annotations map[string]string
// Experiment is the code that will run the experiment.
Experiment InputProcessorExperimentWrapper
// Inputs is the list of inputs to measure.
Inputs []model.URLInfo
// MaxRuntime is the optional maximum runtime
// when looping over a list of inputs (e.g. when
// running Web Connectivity). Zero means that
// there will be no MaxRuntime limit.
MaxRuntime time.Duration
// Options contains command line options for this experiment.
Options []string
// Saver is the code that will save measurement results
// on persistent storage (e.g. the file system).
Saver InputProcessorSaverWrapper
// Submitter is the code that will submit measurements
// to the OONI collector.
Submitter InputProcessorSubmitterWrapper
}
// InputProcessorSaverWrapper is InputProcessor's
// wrapper for a Saver implementation.
type InputProcessorSaverWrapper interface {
SaveMeasurement(idx int, m *model.Measurement) error
}
type inputProcessorSaverWrapper struct {
saver Saver
}
// NewInputProcessorSaverWrapper wraps a Saver for InputProcessor.
func NewInputProcessorSaverWrapper(saver Saver) InputProcessorSaverWrapper {
return inputProcessorSaverWrapper{saver: saver}
}
func (ipsw inputProcessorSaverWrapper) SaveMeasurement(
idx int, m *model.Measurement) error {
return ipsw.saver.SaveMeasurement(m)
}
// InputProcessorSubmitterWrapper is InputProcessor's
// wrapper for a Submitter implementation.
type InputProcessorSubmitterWrapper interface {
Submit(ctx context.Context, idx int, m *model.Measurement) error
}
type inputProcessorSubmitterWrapper struct {
submitter Submitter
}
// NewInputProcessorSubmitterWrapper wraps a Submitter
// for the InputProcessor.
func NewInputProcessorSubmitterWrapper(submitter Submitter) InputProcessorSubmitterWrapper {
return inputProcessorSubmitterWrapper{submitter: submitter}
}
func (ipsw inputProcessorSubmitterWrapper) Submit(
ctx context.Context, idx int, m *model.Measurement) error {
return ipsw.submitter.Submit(ctx, m)
}
// Run processes all the input subject to the duration of the
// context. The code will perform measurements using the given
// experiment; submit measurements using the given submitter;
// save measurements using the given saver.
//
// Annotations and Options will be saved in the measurement.
//
// The default behaviour of this code is that an error while
// measuring, while submitting, or while saving a measurement
// is always causing us to break out of the loop. The user
// though is free to choose different policies by configuring
// the Experiment, Submitter, and Saver fields properly.
func (ip *InputProcessor) Run(ctx context.Context) error {
_, err := ip.run(ctx)
return err
}
// These are the reasons why run could stop.
const (
stopNormal = (1 << iota)
stopMaxRuntime
)
// run is like Run but, in addition to returning an error, it
// also returns the reason why we stopped.
func (ip *InputProcessor) run(ctx context.Context) (int, error) {
start := time.Now()
for idx, url := range ip.Inputs {
if ip.MaxRuntime > 0 && time.Since(start) > ip.MaxRuntime {
return stopMaxRuntime, nil
}
input := url.URL
var measurements []*model.Measurement
source, err := ip.Experiment.MeasureAsync(ctx, input, idx)
if err != nil {
return 0, err
}
// NOTE: we don't want to intermix measuring with submitting
// therefore we collect all measurements first
for meas := range source {
measurements = append(measurements, meas)
}
for _, meas := range measurements {
meas.AddAnnotations(ip.Annotations)
meas.Options = ip.Options
err = ip.Submitter.Submit(ctx, idx, meas)
if err != nil {
return 0, err
}
// Note: must be after submission because submission modifies
// the measurement to include the report ID.
err = ip.Saver.SaveMeasurement(idx, meas)
if err != nil {
return 0, err
}
}
}
return stopNormal, nil
}