ooni-probe-cli/internal/engine/inputprocessor.go
Simone Basso 273b70bacc
refactor: interfaces and data types into the model package (#642)
## Checklist

- [x] I have read the [contribution guidelines](https://github.com/ooni/probe-cli/blob/master/CONTRIBUTING.md)
- [x] reference issue for this pull request: https://github.com/ooni/probe/issues/1885
- [x] related ooni/spec pull request: N/A

Location of the issue tracker: https://github.com/ooni/probe

## Description

This PR contains a set of changes to move important interfaces and data types into the `./internal/model` package.

The criteria for including an interface or data type in here is roughly that the type should be important and used by several packages. We are especially interested to move more interfaces here to increase modularity.

An additional side effect is that, by reading this package, one should be able to understand more quickly how different parts of the codebase interact with each other.

This is what I want to move in `internal/model`:

- [x] most important interfaces from `internal/netxlite`
- [x] everything that was previously part of `internal/engine/model`
- [x] mocks from `internal/netxlite/mocks` should also be moved in here as a subpackage
2022-01-03 13:53:23 +01:00

172 lines
5.2 KiB
Go

package engine
import (
"context"
"time"
"github.com/ooni/probe-cli/v3/internal/model"
)
// InputProcessorExperiment is the Experiment
// according to InputProcessor.
type InputProcessorExperiment interface {
MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error)
}
// InputProcessorExperimentWrapper is a wrapper for an
// Experiment that also allow to pass around the input index.
type InputProcessorExperimentWrapper interface {
MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error)
}
// NewInputProcessorExperimentWrapper creates a new
// instance of InputProcessorExperimentWrapper.
func NewInputProcessorExperimentWrapper(
exp InputProcessorExperiment) InputProcessorExperimentWrapper {
return inputProcessorExperimentWrapper{exp: exp}
}
type inputProcessorExperimentWrapper struct {
exp InputProcessorExperiment
}
func (ipew inputProcessorExperimentWrapper) MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) {
return ipew.exp.MeasureAsync(ctx, input)
}
var _ InputProcessorExperimentWrapper = inputProcessorExperimentWrapper{}
// InputProcessor processes inputs. We perform a Measurement
// for each input using the given Experiment.
type InputProcessor struct {
// Annotations contains the measurement annotations
Annotations map[string]string
// Experiment is the code that will run the experiment.
Experiment InputProcessorExperimentWrapper
// Inputs is the list of inputs to measure.
Inputs []model.OOAPIURLInfo
// MaxRuntime is the optional maximum runtime
// when looping over a list of inputs (e.g. when
// running Web Connectivity). Zero means that
// there will be no MaxRuntime limit.
MaxRuntime time.Duration
// Options contains command line options for this experiment.
Options []string
// Saver is the code that will save measurement results
// on persistent storage (e.g. the file system).
Saver InputProcessorSaverWrapper
// Submitter is the code that will submit measurements
// to the OONI collector.
Submitter InputProcessorSubmitterWrapper
}
// InputProcessorSaverWrapper is InputProcessor's
// wrapper for a Saver implementation.
type InputProcessorSaverWrapper interface {
SaveMeasurement(idx int, m *model.Measurement) error
}
type inputProcessorSaverWrapper struct {
saver Saver
}
// NewInputProcessorSaverWrapper wraps a Saver for InputProcessor.
func NewInputProcessorSaverWrapper(saver Saver) InputProcessorSaverWrapper {
return inputProcessorSaverWrapper{saver: saver}
}
func (ipsw inputProcessorSaverWrapper) SaveMeasurement(
idx int, m *model.Measurement) error {
return ipsw.saver.SaveMeasurement(m)
}
// InputProcessorSubmitterWrapper is InputProcessor's
// wrapper for a Submitter implementation.
type InputProcessorSubmitterWrapper interface {
Submit(ctx context.Context, idx int, m *model.Measurement) error
}
type inputProcessorSubmitterWrapper struct {
submitter Submitter
}
// NewInputProcessorSubmitterWrapper wraps a Submitter
// for the InputProcessor.
func NewInputProcessorSubmitterWrapper(submitter Submitter) InputProcessorSubmitterWrapper {
return inputProcessorSubmitterWrapper{submitter: submitter}
}
func (ipsw inputProcessorSubmitterWrapper) Submit(
ctx context.Context, idx int, m *model.Measurement) error {
return ipsw.submitter.Submit(ctx, m)
}
// Run processes all the input subject to the duration of the
// context. The code will perform measurements using the given
// experiment; submit measurements using the given submitter;
// save measurements using the given saver.
//
// Annotations and Options will be saved in the measurement.
//
// The default behaviour of this code is that an error while
// measuring, while submitting, or while saving a measurement
// is always causing us to break out of the loop. The user
// though is free to choose different policies by configuring
// the Experiment, Submitter, and Saver fields properly.
func (ip *InputProcessor) Run(ctx context.Context) error {
_, err := ip.run(ctx)
return err
}
// These are the reasons why run could stop.
const (
stopNormal = (1 << iota)
stopMaxRuntime
)
// run is like Run but, in addition to returning an error, it
// also returns the reason why we stopped.
func (ip *InputProcessor) run(ctx context.Context) (int, error) {
start := time.Now()
for idx, url := range ip.Inputs {
if ip.MaxRuntime > 0 && time.Since(start) > ip.MaxRuntime {
return stopMaxRuntime, nil
}
input := url.URL
var measurements []*model.Measurement
source, err := ip.Experiment.MeasureAsync(ctx, input, idx)
if err != nil {
return 0, err
}
// NOTE: we don't want to intermix measuring with submitting
// therefore we collect all measurements first
for meas := range source {
measurements = append(measurements, meas)
}
for _, meas := range measurements {
meas.AddAnnotations(ip.Annotations)
meas.Options = ip.Options
err = ip.Submitter.Submit(ctx, idx, meas)
if err != nil {
return 0, err
}
// Note: must be after submission because submission modifies
// the measurement to include the report ID.
err = ip.Saver.SaveMeasurement(idx, meas)
if err != nil {
return 0, err
}
}
}
return stopNormal, nil
}