ooni-probe-cli/internal/engine/inputprocessor.go

172 lines
5.2 KiB
Go
Raw Normal View History

package engine
import (
"context"
"time"
"github.com/ooni/probe-cli/v3/internal/engine/model"
)
// InputProcessorExperiment is the Experiment
// according to InputProcessor.
type InputProcessorExperiment interface {
MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error)
}
// InputProcessorExperimentWrapper is a wrapper for an
// Experiment that also allow to pass around the input index.
type InputProcessorExperimentWrapper interface {
MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error)
}
// NewInputProcessorExperimentWrapper creates a new
// instance of InputProcessorExperimentWrapper.
func NewInputProcessorExperimentWrapper(
exp InputProcessorExperiment) InputProcessorExperimentWrapper {
return inputProcessorExperimentWrapper{exp: exp}
}
type inputProcessorExperimentWrapper struct {
exp InputProcessorExperiment
}
func (ipew inputProcessorExperimentWrapper) MeasureAsync(
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) {
return ipew.exp.MeasureAsync(ctx, input)
}
var _ InputProcessorExperimentWrapper = inputProcessorExperimentWrapper{}
// InputProcessor processes inputs. We perform a Measurement
// for each input using the given Experiment.
type InputProcessor struct {
// Annotations contains the measurement annotations
Annotations map[string]string
// Experiment is the code that will run the experiment.
Experiment InputProcessorExperimentWrapper
// Inputs is the list of inputs to measure.
Inputs []model.URLInfo
// MaxRuntime is the optional maximum runtime
// when looping over a list of inputs (e.g. when
// running Web Connectivity). Zero means that
// there will be no MaxRuntime limit.
MaxRuntime time.Duration
// Options contains command line options for this experiment.
Options []string
// Saver is the code that will save measurement results
// on persistent storage (e.g. the file system).
Saver InputProcessorSaverWrapper
// Submitter is the code that will submit measurements
// to the OONI collector.
Submitter InputProcessorSubmitterWrapper
}
// InputProcessorSaverWrapper is InputProcessor's
// wrapper for a Saver implementation.
type InputProcessorSaverWrapper interface {
SaveMeasurement(idx int, m *model.Measurement) error
}
type inputProcessorSaverWrapper struct {
saver Saver
}
// NewInputProcessorSaverWrapper wraps a Saver for InputProcessor.
func NewInputProcessorSaverWrapper(saver Saver) InputProcessorSaverWrapper {
return inputProcessorSaverWrapper{saver: saver}
}
func (ipsw inputProcessorSaverWrapper) SaveMeasurement(
idx int, m *model.Measurement) error {
return ipsw.saver.SaveMeasurement(m)
}
// InputProcessorSubmitterWrapper is InputProcessor's
// wrapper for a Submitter implementation.
type InputProcessorSubmitterWrapper interface {
Submit(ctx context.Context, idx int, m *model.Measurement) error
}
type inputProcessorSubmitterWrapper struct {
submitter Submitter
}
// NewInputProcessorSubmitterWrapper wraps a Submitter
// for the InputProcessor.
func NewInputProcessorSubmitterWrapper(submitter Submitter) InputProcessorSubmitterWrapper {
return inputProcessorSubmitterWrapper{submitter: submitter}
}
func (ipsw inputProcessorSubmitterWrapper) Submit(
ctx context.Context, idx int, m *model.Measurement) error {
return ipsw.submitter.Submit(ctx, m)
}
// Run processes all the input subject to the duration of the
// context. The code will perform measurements using the given
// experiment; submit measurements using the given submitter;
// save measurements using the given saver.
//
// Annotations and Options will be saved in the measurement.
//
// The default behaviour of this code is that an error while
// measuring, while submitting, or while saving a measurement
// is always causing us to break out of the loop. The user
// though is free to choose different policies by configuring
// the Experiment, Submitter, and Saver fields properly.
func (ip *InputProcessor) Run(ctx context.Context) error {
refactor: flatten and separate (#353) * refactor(atomicx): move outside the engine package After merging probe-engine into probe-cli, my impression is that we have too much unnecessary nesting of packages in this repository. The idea of this commit and of a bunch of following commits will instead be to reduce the nesting and simplify the structure. While there, improve the documentation. * fix: always use the atomicx package For consistency, never use sync/atomic and always use ./internal/atomicx so we can just grep and make sure we're not risking to crash if we make a subtle mistake on a 32 bit platform. While there, mention in the contributing guidelines that we want to always prefer the ./internal/atomicx package over sync/atomic. * fix(atomicx): remove unnecessary constructor We don't need a constructor here. The default constructed `&Int64{}` instance is already usable and the constructor does not add anything to what we are doing, rather it just creates extra confusion. * cleanup(atomicx): we are not using Float64 Because atomicx.Float64 is unused, we can safely zap it. * cleanup(atomicx): simplify impl and improve tests We can simplify the implementation by using defer and by letting the Load() method call Add(0). We can improve tests by making many goroutines updated the atomic int64 value concurrently. * refactor(fsx): can live in the ./internal pkg Let us reduce the amount of nesting. While there, ensure that the package only exports the bare minimum, and improve the documentation of the tests, to ease reading the code. * refactor: move runtimex to ./internal * refactor: move shellx into the ./internal package While there, remove unnecessary dependency between packages. While there, specify in the contributing guidelines that one should use x/sys/execabs instead of os/exec. * refactor: move ooapi into the ./internal pkg * refactor(humanize): move to ./internal and better docs * refactor: move platform to ./internal * refactor(randx): move to ./internal * refactor(multierror): move into the ./internal pkg * refactor(kvstore): all kvstores in ./internal Rather than having part of the kvstore inside ./internal/engine/kvstore and part in ./internal/engine/kvstore.go, let us put every piece of code that is kvstore related into the ./internal/kvstore package. * fix(kvstore): always return ErrNoSuchKey on Get() error It should help to use the kvstore everywhere removing all the copies that are lingering around the tree. * sessionresolver: make KVStore mandatory Simplifies implementation. While there, use the ./internal/kvstore package rather than having our private implementation. * fix(ooapi): use the ./internal/kvstore package * fix(platform): better documentation
2021-06-04 10:34:18 +02:00
_, err := ip.run(ctx)
return err
}
// These are the reasons why run could stop.
const (
stopNormal = (1 << iota)
stopMaxRuntime
)
// run is like Run but, in addition to returning an error, it
// also returns the reason why we stopped.
func (ip *InputProcessor) run(ctx context.Context) (int, error) {
start := time.Now()
for idx, url := range ip.Inputs {
if ip.MaxRuntime > 0 && time.Since(start) > ip.MaxRuntime {
refactor: flatten and separate (#353) * refactor(atomicx): move outside the engine package After merging probe-engine into probe-cli, my impression is that we have too much unnecessary nesting of packages in this repository. The idea of this commit and of a bunch of following commits will instead be to reduce the nesting and simplify the structure. While there, improve the documentation. * fix: always use the atomicx package For consistency, never use sync/atomic and always use ./internal/atomicx so we can just grep and make sure we're not risking to crash if we make a subtle mistake on a 32 bit platform. While there, mention in the contributing guidelines that we want to always prefer the ./internal/atomicx package over sync/atomic. * fix(atomicx): remove unnecessary constructor We don't need a constructor here. The default constructed `&Int64{}` instance is already usable and the constructor does not add anything to what we are doing, rather it just creates extra confusion. * cleanup(atomicx): we are not using Float64 Because atomicx.Float64 is unused, we can safely zap it. * cleanup(atomicx): simplify impl and improve tests We can simplify the implementation by using defer and by letting the Load() method call Add(0). We can improve tests by making many goroutines updated the atomic int64 value concurrently. * refactor(fsx): can live in the ./internal pkg Let us reduce the amount of nesting. While there, ensure that the package only exports the bare minimum, and improve the documentation of the tests, to ease reading the code. * refactor: move runtimex to ./internal * refactor: move shellx into the ./internal package While there, remove unnecessary dependency between packages. While there, specify in the contributing guidelines that one should use x/sys/execabs instead of os/exec. * refactor: move ooapi into the ./internal pkg * refactor(humanize): move to ./internal and better docs * refactor: move platform to ./internal * refactor(randx): move to ./internal * refactor(multierror): move into the ./internal pkg * refactor(kvstore): all kvstores in ./internal Rather than having part of the kvstore inside ./internal/engine/kvstore and part in ./internal/engine/kvstore.go, let us put every piece of code that is kvstore related into the ./internal/kvstore package. * fix(kvstore): always return ErrNoSuchKey on Get() error It should help to use the kvstore everywhere removing all the copies that are lingering around the tree. * sessionresolver: make KVStore mandatory Simplifies implementation. While there, use the ./internal/kvstore package rather than having our private implementation. * fix(ooapi): use the ./internal/kvstore package * fix(platform): better documentation
2021-06-04 10:34:18 +02:00
return stopMaxRuntime, nil
}
input := url.URL
var measurements []*model.Measurement
source, err := ip.Experiment.MeasureAsync(ctx, input, idx)
if err != nil {
refactor: flatten and separate (#353) * refactor(atomicx): move outside the engine package After merging probe-engine into probe-cli, my impression is that we have too much unnecessary nesting of packages in this repository. The idea of this commit and of a bunch of following commits will instead be to reduce the nesting and simplify the structure. While there, improve the documentation. * fix: always use the atomicx package For consistency, never use sync/atomic and always use ./internal/atomicx so we can just grep and make sure we're not risking to crash if we make a subtle mistake on a 32 bit platform. While there, mention in the contributing guidelines that we want to always prefer the ./internal/atomicx package over sync/atomic. * fix(atomicx): remove unnecessary constructor We don't need a constructor here. The default constructed `&Int64{}` instance is already usable and the constructor does not add anything to what we are doing, rather it just creates extra confusion. * cleanup(atomicx): we are not using Float64 Because atomicx.Float64 is unused, we can safely zap it. * cleanup(atomicx): simplify impl and improve tests We can simplify the implementation by using defer and by letting the Load() method call Add(0). We can improve tests by making many goroutines updated the atomic int64 value concurrently. * refactor(fsx): can live in the ./internal pkg Let us reduce the amount of nesting. While there, ensure that the package only exports the bare minimum, and improve the documentation of the tests, to ease reading the code. * refactor: move runtimex to ./internal * refactor: move shellx into the ./internal package While there, remove unnecessary dependency between packages. While there, specify in the contributing guidelines that one should use x/sys/execabs instead of os/exec. * refactor: move ooapi into the ./internal pkg * refactor(humanize): move to ./internal and better docs * refactor: move platform to ./internal * refactor(randx): move to ./internal * refactor(multierror): move into the ./internal pkg * refactor(kvstore): all kvstores in ./internal Rather than having part of the kvstore inside ./internal/engine/kvstore and part in ./internal/engine/kvstore.go, let us put every piece of code that is kvstore related into the ./internal/kvstore package. * fix(kvstore): always return ErrNoSuchKey on Get() error It should help to use the kvstore everywhere removing all the copies that are lingering around the tree. * sessionresolver: make KVStore mandatory Simplifies implementation. While there, use the ./internal/kvstore package rather than having our private implementation. * fix(ooapi): use the ./internal/kvstore package * fix(platform): better documentation
2021-06-04 10:34:18 +02:00
return 0, err
}
// NOTE: we don't want to intermix measuring with submitting
// therefore we collect all measurements first
for meas := range source {
measurements = append(measurements, meas)
}
for _, meas := range measurements {
meas.AddAnnotations(ip.Annotations)
meas.Options = ip.Options
err = ip.Submitter.Submit(ctx, idx, meas)
if err != nil {
return 0, err
}
// Note: must be after submission because submission modifies
// the measurement to include the report ID.
err = ip.Saver.SaveMeasurement(idx, meas)
if err != nil {
return 0, err
}
}
}
refactor: flatten and separate (#353) * refactor(atomicx): move outside the engine package After merging probe-engine into probe-cli, my impression is that we have too much unnecessary nesting of packages in this repository. The idea of this commit and of a bunch of following commits will instead be to reduce the nesting and simplify the structure. While there, improve the documentation. * fix: always use the atomicx package For consistency, never use sync/atomic and always use ./internal/atomicx so we can just grep and make sure we're not risking to crash if we make a subtle mistake on a 32 bit platform. While there, mention in the contributing guidelines that we want to always prefer the ./internal/atomicx package over sync/atomic. * fix(atomicx): remove unnecessary constructor We don't need a constructor here. The default constructed `&Int64{}` instance is already usable and the constructor does not add anything to what we are doing, rather it just creates extra confusion. * cleanup(atomicx): we are not using Float64 Because atomicx.Float64 is unused, we can safely zap it. * cleanup(atomicx): simplify impl and improve tests We can simplify the implementation by using defer and by letting the Load() method call Add(0). We can improve tests by making many goroutines updated the atomic int64 value concurrently. * refactor(fsx): can live in the ./internal pkg Let us reduce the amount of nesting. While there, ensure that the package only exports the bare minimum, and improve the documentation of the tests, to ease reading the code. * refactor: move runtimex to ./internal * refactor: move shellx into the ./internal package While there, remove unnecessary dependency between packages. While there, specify in the contributing guidelines that one should use x/sys/execabs instead of os/exec. * refactor: move ooapi into the ./internal pkg * refactor(humanize): move to ./internal and better docs * refactor: move platform to ./internal * refactor(randx): move to ./internal * refactor(multierror): move into the ./internal pkg * refactor(kvstore): all kvstores in ./internal Rather than having part of the kvstore inside ./internal/engine/kvstore and part in ./internal/engine/kvstore.go, let us put every piece of code that is kvstore related into the ./internal/kvstore package. * fix(kvstore): always return ErrNoSuchKey on Get() error It should help to use the kvstore everywhere removing all the copies that are lingering around the tree. * sessionresolver: make KVStore mandatory Simplifies implementation. While there, use the ./internal/kvstore package rather than having our private implementation. * fix(ooapi): use the ./internal/kvstore package * fix(platform): better documentation
2021-06-04 10:34:18 +02:00
return stopNormal, nil
}