2021-02-02 12:05:47 +01:00
|
|
|
package engine
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2021-03-29 20:38:23 +02:00
|
|
|
"time"
|
2021-02-02 12:05:47 +01:00
|
|
|
|
|
|
|
"github.com/ooni/probe-cli/v3/internal/engine/model"
|
|
|
|
)
|
|
|
|
|
|
|
|
// InputProcessorExperiment is the Experiment
|
|
|
|
// according to InputProcessor.
|
|
|
|
type InputProcessorExperiment interface {
|
2021-09-30 00:54:52 +02:00
|
|
|
MeasureAsync(
|
|
|
|
ctx context.Context, input string) (<-chan *model.Measurement, error)
|
2021-02-02 12:05:47 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// InputProcessorExperimentWrapper is a wrapper for an
|
|
|
|
// Experiment that also allow to pass around the input index.
|
|
|
|
type InputProcessorExperimentWrapper interface {
|
2021-09-30 00:54:52 +02:00
|
|
|
MeasureAsync(
|
|
|
|
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error)
|
2021-02-02 12:05:47 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewInputProcessorExperimentWrapper creates a new
|
|
|
|
// instance of InputProcessorExperimentWrapper.
|
|
|
|
func NewInputProcessorExperimentWrapper(
|
|
|
|
exp InputProcessorExperiment) InputProcessorExperimentWrapper {
|
|
|
|
return inputProcessorExperimentWrapper{exp: exp}
|
|
|
|
}
|
|
|
|
|
|
|
|
type inputProcessorExperimentWrapper struct {
|
|
|
|
exp InputProcessorExperiment
|
|
|
|
}
|
|
|
|
|
2021-09-30 00:54:52 +02:00
|
|
|
func (ipew inputProcessorExperimentWrapper) MeasureAsync(
|
|
|
|
ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) {
|
|
|
|
return ipew.exp.MeasureAsync(ctx, input)
|
2021-02-02 12:05:47 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
var _ InputProcessorExperimentWrapper = inputProcessorExperimentWrapper{}
|
|
|
|
|
|
|
|
// InputProcessor processes inputs. We perform a Measurement
|
|
|
|
// for each input using the given Experiment.
|
|
|
|
type InputProcessor struct {
|
|
|
|
// Annotations contains the measurement annotations
|
|
|
|
Annotations map[string]string
|
|
|
|
|
|
|
|
// Experiment is the code that will run the experiment.
|
|
|
|
Experiment InputProcessorExperimentWrapper
|
|
|
|
|
|
|
|
// Inputs is the list of inputs to measure.
|
|
|
|
Inputs []model.URLInfo
|
|
|
|
|
2021-03-29 20:38:23 +02:00
|
|
|
// MaxRuntime is the optional maximum runtime
|
|
|
|
// when looping over a list of inputs (e.g. when
|
|
|
|
// running Web Connectivity). Zero means that
|
|
|
|
// there will be no MaxRuntime limit.
|
|
|
|
MaxRuntime time.Duration
|
|
|
|
|
2021-02-02 12:05:47 +01:00
|
|
|
// Options contains command line options for this experiment.
|
|
|
|
Options []string
|
|
|
|
|
|
|
|
// Saver is the code that will save measurement results
|
|
|
|
// on persistent storage (e.g. the file system).
|
|
|
|
Saver InputProcessorSaverWrapper
|
|
|
|
|
|
|
|
// Submitter is the code that will submit measurements
|
|
|
|
// to the OONI collector.
|
|
|
|
Submitter InputProcessorSubmitterWrapper
|
|
|
|
}
|
|
|
|
|
|
|
|
// InputProcessorSaverWrapper is InputProcessor's
|
|
|
|
// wrapper for a Saver implementation.
|
|
|
|
type InputProcessorSaverWrapper interface {
|
|
|
|
SaveMeasurement(idx int, m *model.Measurement) error
|
|
|
|
}
|
|
|
|
|
|
|
|
type inputProcessorSaverWrapper struct {
|
|
|
|
saver Saver
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewInputProcessorSaverWrapper wraps a Saver for InputProcessor.
|
|
|
|
func NewInputProcessorSaverWrapper(saver Saver) InputProcessorSaverWrapper {
|
|
|
|
return inputProcessorSaverWrapper{saver: saver}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ipsw inputProcessorSaverWrapper) SaveMeasurement(
|
|
|
|
idx int, m *model.Measurement) error {
|
|
|
|
return ipsw.saver.SaveMeasurement(m)
|
|
|
|
}
|
|
|
|
|
|
|
|
// InputProcessorSubmitterWrapper is InputProcessor's
|
|
|
|
// wrapper for a Submitter implementation.
|
|
|
|
type InputProcessorSubmitterWrapper interface {
|
|
|
|
Submit(ctx context.Context, idx int, m *model.Measurement) error
|
|
|
|
}
|
|
|
|
|
|
|
|
type inputProcessorSubmitterWrapper struct {
|
|
|
|
submitter Submitter
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewInputProcessorSubmitterWrapper wraps a Submitter
|
|
|
|
// for the InputProcessor.
|
|
|
|
func NewInputProcessorSubmitterWrapper(submitter Submitter) InputProcessorSubmitterWrapper {
|
|
|
|
return inputProcessorSubmitterWrapper{submitter: submitter}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ipsw inputProcessorSubmitterWrapper) Submit(
|
|
|
|
ctx context.Context, idx int, m *model.Measurement) error {
|
|
|
|
return ipsw.submitter.Submit(ctx, m)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run processes all the input subject to the duration of the
|
|
|
|
// context. The code will perform measurements using the given
|
|
|
|
// experiment; submit measurements using the given submitter;
|
|
|
|
// save measurements using the given saver.
|
|
|
|
//
|
|
|
|
// Annotations and Options will be saved in the measurement.
|
|
|
|
//
|
|
|
|
// The default behaviour of this code is that an error while
|
|
|
|
// measuring, while submitting, or while saving a measurement
|
|
|
|
// is always causing us to break out of the loop. The user
|
|
|
|
// though is free to choose different policies by configuring
|
|
|
|
// the Experiment, Submitter, and Saver fields properly.
|
2021-03-29 20:38:23 +02:00
|
|
|
func (ip *InputProcessor) Run(ctx context.Context) error {
|
refactor: flatten and separate (#353)
* refactor(atomicx): move outside the engine package
After merging probe-engine into probe-cli, my impression is that we have
too much unnecessary nesting of packages in this repository.
The idea of this commit and of a bunch of following commits will instead
be to reduce the nesting and simplify the structure.
While there, improve the documentation.
* fix: always use the atomicx package
For consistency, never use sync/atomic and always use ./internal/atomicx
so we can just grep and make sure we're not risking to crash if we make
a subtle mistake on a 32 bit platform.
While there, mention in the contributing guidelines that we want to
always prefer the ./internal/atomicx package over sync/atomic.
* fix(atomicx): remove unnecessary constructor
We don't need a constructor here. The default constructed `&Int64{}`
instance is already usable and the constructor does not add anything to
what we are doing, rather it just creates extra confusion.
* cleanup(atomicx): we are not using Float64
Because atomicx.Float64 is unused, we can safely zap it.
* cleanup(atomicx): simplify impl and improve tests
We can simplify the implementation by using defer and by letting
the Load() method call Add(0).
We can improve tests by making many goroutines updated the
atomic int64 value concurrently.
* refactor(fsx): can live in the ./internal pkg
Let us reduce the amount of nesting. While there, ensure that the
package only exports the bare minimum, and improve the documentation
of the tests, to ease reading the code.
* refactor: move runtimex to ./internal
* refactor: move shellx into the ./internal package
While there, remove unnecessary dependency between packages.
While there, specify in the contributing guidelines that
one should use x/sys/execabs instead of os/exec.
* refactor: move ooapi into the ./internal pkg
* refactor(humanize): move to ./internal and better docs
* refactor: move platform to ./internal
* refactor(randx): move to ./internal
* refactor(multierror): move into the ./internal pkg
* refactor(kvstore): all kvstores in ./internal
Rather than having part of the kvstore inside ./internal/engine/kvstore
and part in ./internal/engine/kvstore.go, let us put every piece of code
that is kvstore related into the ./internal/kvstore package.
* fix(kvstore): always return ErrNoSuchKey on Get() error
It should help to use the kvstore everywhere removing all the
copies that are lingering around the tree.
* sessionresolver: make KVStore mandatory
Simplifies implementation. While there, use the ./internal/kvstore
package rather than having our private implementation.
* fix(ooapi): use the ./internal/kvstore package
* fix(platform): better documentation
2021-06-04 10:34:18 +02:00
|
|
|
_, err := ip.run(ctx)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// These are the reasons why run could stop.
|
|
|
|
const (
|
|
|
|
stopNormal = (1 << iota)
|
|
|
|
stopMaxRuntime
|
|
|
|
)
|
|
|
|
|
|
|
|
// run is like Run but, in addition to returning an error, it
|
|
|
|
// also returns the reason why we stopped.
|
|
|
|
func (ip *InputProcessor) run(ctx context.Context) (int, error) {
|
2021-03-29 20:38:23 +02:00
|
|
|
start := time.Now()
|
2021-02-02 12:05:47 +01:00
|
|
|
for idx, url := range ip.Inputs {
|
2021-03-29 20:38:23 +02:00
|
|
|
if ip.MaxRuntime > 0 && time.Since(start) > ip.MaxRuntime {
|
refactor: flatten and separate (#353)
* refactor(atomicx): move outside the engine package
After merging probe-engine into probe-cli, my impression is that we have
too much unnecessary nesting of packages in this repository.
The idea of this commit and of a bunch of following commits will instead
be to reduce the nesting and simplify the structure.
While there, improve the documentation.
* fix: always use the atomicx package
For consistency, never use sync/atomic and always use ./internal/atomicx
so we can just grep and make sure we're not risking to crash if we make
a subtle mistake on a 32 bit platform.
While there, mention in the contributing guidelines that we want to
always prefer the ./internal/atomicx package over sync/atomic.
* fix(atomicx): remove unnecessary constructor
We don't need a constructor here. The default constructed `&Int64{}`
instance is already usable and the constructor does not add anything to
what we are doing, rather it just creates extra confusion.
* cleanup(atomicx): we are not using Float64
Because atomicx.Float64 is unused, we can safely zap it.
* cleanup(atomicx): simplify impl and improve tests
We can simplify the implementation by using defer and by letting
the Load() method call Add(0).
We can improve tests by making many goroutines updated the
atomic int64 value concurrently.
* refactor(fsx): can live in the ./internal pkg
Let us reduce the amount of nesting. While there, ensure that the
package only exports the bare minimum, and improve the documentation
of the tests, to ease reading the code.
* refactor: move runtimex to ./internal
* refactor: move shellx into the ./internal package
While there, remove unnecessary dependency between packages.
While there, specify in the contributing guidelines that
one should use x/sys/execabs instead of os/exec.
* refactor: move ooapi into the ./internal pkg
* refactor(humanize): move to ./internal and better docs
* refactor: move platform to ./internal
* refactor(randx): move to ./internal
* refactor(multierror): move into the ./internal pkg
* refactor(kvstore): all kvstores in ./internal
Rather than having part of the kvstore inside ./internal/engine/kvstore
and part in ./internal/engine/kvstore.go, let us put every piece of code
that is kvstore related into the ./internal/kvstore package.
* fix(kvstore): always return ErrNoSuchKey on Get() error
It should help to use the kvstore everywhere removing all the
copies that are lingering around the tree.
* sessionresolver: make KVStore mandatory
Simplifies implementation. While there, use the ./internal/kvstore
package rather than having our private implementation.
* fix(ooapi): use the ./internal/kvstore package
* fix(platform): better documentation
2021-06-04 10:34:18 +02:00
|
|
|
return stopMaxRuntime, nil
|
2021-03-29 20:38:23 +02:00
|
|
|
}
|
2021-02-02 12:05:47 +01:00
|
|
|
input := url.URL
|
2021-09-30 00:54:52 +02:00
|
|
|
var measurements []*model.Measurement
|
|
|
|
source, err := ip.Experiment.MeasureAsync(ctx, input, idx)
|
2021-02-02 12:05:47 +01:00
|
|
|
if err != nil {
|
refactor: flatten and separate (#353)
* refactor(atomicx): move outside the engine package
After merging probe-engine into probe-cli, my impression is that we have
too much unnecessary nesting of packages in this repository.
The idea of this commit and of a bunch of following commits will instead
be to reduce the nesting and simplify the structure.
While there, improve the documentation.
* fix: always use the atomicx package
For consistency, never use sync/atomic and always use ./internal/atomicx
so we can just grep and make sure we're not risking to crash if we make
a subtle mistake on a 32 bit platform.
While there, mention in the contributing guidelines that we want to
always prefer the ./internal/atomicx package over sync/atomic.
* fix(atomicx): remove unnecessary constructor
We don't need a constructor here. The default constructed `&Int64{}`
instance is already usable and the constructor does not add anything to
what we are doing, rather it just creates extra confusion.
* cleanup(atomicx): we are not using Float64
Because atomicx.Float64 is unused, we can safely zap it.
* cleanup(atomicx): simplify impl and improve tests
We can simplify the implementation by using defer and by letting
the Load() method call Add(0).
We can improve tests by making many goroutines updated the
atomic int64 value concurrently.
* refactor(fsx): can live in the ./internal pkg
Let us reduce the amount of nesting. While there, ensure that the
package only exports the bare minimum, and improve the documentation
of the tests, to ease reading the code.
* refactor: move runtimex to ./internal
* refactor: move shellx into the ./internal package
While there, remove unnecessary dependency between packages.
While there, specify in the contributing guidelines that
one should use x/sys/execabs instead of os/exec.
* refactor: move ooapi into the ./internal pkg
* refactor(humanize): move to ./internal and better docs
* refactor: move platform to ./internal
* refactor(randx): move to ./internal
* refactor(multierror): move into the ./internal pkg
* refactor(kvstore): all kvstores in ./internal
Rather than having part of the kvstore inside ./internal/engine/kvstore
and part in ./internal/engine/kvstore.go, let us put every piece of code
that is kvstore related into the ./internal/kvstore package.
* fix(kvstore): always return ErrNoSuchKey on Get() error
It should help to use the kvstore everywhere removing all the
copies that are lingering around the tree.
* sessionresolver: make KVStore mandatory
Simplifies implementation. While there, use the ./internal/kvstore
package rather than having our private implementation.
* fix(ooapi): use the ./internal/kvstore package
* fix(platform): better documentation
2021-06-04 10:34:18 +02:00
|
|
|
return 0, err
|
2021-02-02 12:05:47 +01:00
|
|
|
}
|
2021-09-30 00:54:52 +02:00
|
|
|
// NOTE: we don't want to intermix measuring with submitting
|
|
|
|
// therefore we collect all measurements first
|
|
|
|
for meas := range source {
|
|
|
|
measurements = append(measurements, meas)
|
2021-02-02 12:05:47 +01:00
|
|
|
}
|
2021-09-30 00:54:52 +02:00
|
|
|
for _, meas := range measurements {
|
|
|
|
meas.AddAnnotations(ip.Annotations)
|
|
|
|
meas.Options = ip.Options
|
|
|
|
err = ip.Submitter.Submit(ctx, idx, meas)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
// Note: must be after submission because submission modifies
|
|
|
|
// the measurement to include the report ID.
|
|
|
|
err = ip.Saver.SaveMeasurement(idx, meas)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
2021-02-02 12:05:47 +01:00
|
|
|
}
|
|
|
|
}
|
refactor: flatten and separate (#353)
* refactor(atomicx): move outside the engine package
After merging probe-engine into probe-cli, my impression is that we have
too much unnecessary nesting of packages in this repository.
The idea of this commit and of a bunch of following commits will instead
be to reduce the nesting and simplify the structure.
While there, improve the documentation.
* fix: always use the atomicx package
For consistency, never use sync/atomic and always use ./internal/atomicx
so we can just grep and make sure we're not risking to crash if we make
a subtle mistake on a 32 bit platform.
While there, mention in the contributing guidelines that we want to
always prefer the ./internal/atomicx package over sync/atomic.
* fix(atomicx): remove unnecessary constructor
We don't need a constructor here. The default constructed `&Int64{}`
instance is already usable and the constructor does not add anything to
what we are doing, rather it just creates extra confusion.
* cleanup(atomicx): we are not using Float64
Because atomicx.Float64 is unused, we can safely zap it.
* cleanup(atomicx): simplify impl and improve tests
We can simplify the implementation by using defer and by letting
the Load() method call Add(0).
We can improve tests by making many goroutines updated the
atomic int64 value concurrently.
* refactor(fsx): can live in the ./internal pkg
Let us reduce the amount of nesting. While there, ensure that the
package only exports the bare minimum, and improve the documentation
of the tests, to ease reading the code.
* refactor: move runtimex to ./internal
* refactor: move shellx into the ./internal package
While there, remove unnecessary dependency between packages.
While there, specify in the contributing guidelines that
one should use x/sys/execabs instead of os/exec.
* refactor: move ooapi into the ./internal pkg
* refactor(humanize): move to ./internal and better docs
* refactor: move platform to ./internal
* refactor(randx): move to ./internal
* refactor(multierror): move into the ./internal pkg
* refactor(kvstore): all kvstores in ./internal
Rather than having part of the kvstore inside ./internal/engine/kvstore
and part in ./internal/engine/kvstore.go, let us put every piece of code
that is kvstore related into the ./internal/kvstore package.
* fix(kvstore): always return ErrNoSuchKey on Get() error
It should help to use the kvstore everywhere removing all the
copies that are lingering around the tree.
* sessionresolver: make KVStore mandatory
Simplifies implementation. While there, use the ./internal/kvstore
package rather than having our private implementation.
* fix(ooapi): use the ./internal/kvstore package
* fix(platform): better documentation
2021-06-04 10:34:18 +02:00
|
|
|
return stopNormal, nil
|
2021-02-02 12:05:47 +01:00
|
|
|
}
|