From a0763756b2bf990e16f9f243fab0950e7970b9e9 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Mon, 29 Mar 2021 20:38:23 +0200 Subject: [PATCH] fix(miniooni): replace --limit with --max-runtime (#272) Part of https://github.com/ooni/probe/issues/1299 --- internal/cmd/miniooni/libminiooni.go | 27 +++++++++++---- internal/engine/inputprocessor.go | 20 ++++++++++- internal/engine/inputprocessor_test.go | 47 ++++++++++++++++++++++---- 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/internal/cmd/miniooni/libminiooni.go b/internal/cmd/miniooni/libminiooni.go index 765cc1a..a126cf4 100644 --- a/internal/cmd/miniooni/libminiooni.go +++ b/internal/cmd/miniooni/libminiooni.go @@ -1,7 +1,5 @@ package main -// TODO(bassosimone): we need to deprecate or remove --limit. - import ( "context" "errors" @@ -34,6 +32,7 @@ type Options struct { Inputs []string InputFilePaths []string Limit int64 + MaxRuntime int64 NoJSON bool NoCollector bool ProbeServicesURL string @@ -83,6 +82,10 @@ func init() { &globalOptions.Limit, "limit", 0, "Limit the number of URLs tested by Web Connectivity", "N", ) + getopt.FlagLong( + &globalOptions.MaxRuntime, "max-runtime", 0, + "Maximum runtime in seconds when looping over a list of inputs (zero means infinite)", "N", + ) getopt.FlagLong( &globalOptions.NoJSON, "no-json", 'N', "Disable writing to disk", ) @@ -260,12 +263,23 @@ func maybeWriteConsentFile(yes bool, filepath string) (err error) { return } +// limitRemoved is the text printed when the user uses --limit +const limitRemoved = `USAGE CHANGE: The --limit option has been removed in favor of +the --max-runtime option. Please, update your script to use --max-runtime +instead of --limit. The argument to --max-runtime is the maximum number +of seconds after which to stop running Web Connectivity. + +This error message will be removed after 2021-11-01. +` + // MainWithConfiguration is the miniooni main with a specific configuration // represented by the experiment name and the current options. // // This function will panic in case of a fatal error. It is up to you that // integrate this function to either handle the panic of ignore it. func MainWithConfiguration(experimentName string, currentOptions Options) { + fatalIfFalse(currentOptions.Limit == 0, limitRemoved) + ctx := context.Background() extraOptions := mustMakeMap(currentOptions.ExtraOptions) @@ -403,15 +417,16 @@ func MainWithConfiguration(experimentName string, currentOptions Options) { }) fatalOnError(err, "cannot create saver") - inputProcessor := engine.InputProcessor{ + inputProcessor := &engine.InputProcessor{ Annotations: annotations, Experiment: &experimentWrapper{ child: engine.NewInputProcessorExperimentWrapper(experiment), total: len(inputs), }, - Inputs: inputs, - Options: currentOptions.ExtraOptions, - Saver: engine.NewInputProcessorSaverWrapper(saver), + Inputs: inputs, + MaxRuntime: time.Duration(currentOptions.MaxRuntime) * time.Second, + Options: currentOptions.ExtraOptions, + Saver: engine.NewInputProcessorSaverWrapper(saver), Submitter: submitterWrapper{ child: engine.NewInputProcessorSubmitterWrapper(submitter), }, diff --git a/internal/engine/inputprocessor.go b/internal/engine/inputprocessor.go index 1bf3d3a..2e890c1 100644 --- a/internal/engine/inputprocessor.go +++ b/internal/engine/inputprocessor.go @@ -2,6 +2,8 @@ package engine import ( "context" + "sync/atomic" + "time" "github.com/ooni/probe-cli/v3/internal/engine/model" ) @@ -50,6 +52,12 @@ type InputProcessor struct { // Inputs is the list of inputs to measure. Inputs []model.URLInfo + // MaxRuntime is the optional maximum runtime + // when looping over a list of inputs (e.g. when + // running Web Connectivity). Zero means that + // there will be no MaxRuntime limit. + MaxRuntime time.Duration + // Options contains command line options for this experiment. Options []string @@ -60,6 +68,11 @@ type InputProcessor struct { // Submitter is the code that will submit measurements // to the OONI collector. Submitter InputProcessorSubmitterWrapper + + // terminatedByMaxRuntime is an internal atomic variabile + // incremented when we're terminated by MaxRuntime. We + // only use this variable when testing. + terminatedByMaxRuntime int32 } // InputProcessorSaverWrapper is InputProcessor's @@ -115,8 +128,13 @@ func (ipsw inputProcessorSubmitterWrapper) Submit( // is always causing us to break out of the loop. The user // though is free to choose different policies by configuring // the Experiment, Submitter, and Saver fields properly. -func (ip InputProcessor) Run(ctx context.Context) error { +func (ip *InputProcessor) Run(ctx context.Context) error { + start := time.Now() for idx, url := range ip.Inputs { + if ip.MaxRuntime > 0 && time.Since(start) > ip.MaxRuntime { + atomic.AddInt32(&ip.terminatedByMaxRuntime, 1) + return nil + } input := url.URL meas, err := ip.Experiment.MeasureWithContext(ctx, idx, input) if err != nil { diff --git a/internal/engine/inputprocessor_test.go b/internal/engine/inputprocessor_test.go index 4d604b1..461c3bb 100644 --- a/internal/engine/inputprocessor_test.go +++ b/internal/engine/inputprocessor_test.go @@ -4,13 +4,15 @@ import ( "context" "errors" "testing" + "time" "github.com/ooni/probe-cli/v3/internal/engine/model" ) type FakeInputProcessorExperiment struct { - Err error - M []*model.Measurement + SleepTime time.Duration + Err error + M []*model.Measurement } func (fipe *FakeInputProcessorExperiment) MeasureWithContext( @@ -18,6 +20,9 @@ func (fipe *FakeInputProcessorExperiment) MeasureWithContext( if fipe.Err != nil { return nil, fipe.Err } + if fipe.SleepTime > 0 { + time.Sleep(fipe.SleepTime) + } m := new(model.Measurement) // Here we add annotations to ensure that the input processor // is MERGING annotations as opposed to overwriting them. @@ -30,7 +35,7 @@ func (fipe *FakeInputProcessorExperiment) MeasureWithContext( func TestInputProcessorMeasurementFailed(t *testing.T) { expected := errors.New("mocked error") - ip := InputProcessor{ + ip := &InputProcessor{ Experiment: NewInputProcessorExperimentWrapper( &FakeInputProcessorExperiment{Err: expected}, ), @@ -58,7 +63,7 @@ func (fips *FakeInputProcessorSubmitter) Submit( func TestInputProcessorSubmissionFailed(t *testing.T) { fipe := &FakeInputProcessorExperiment{} expected := errors.New("mocked error") - ip := InputProcessor{ + ip := &InputProcessor{ Annotations: map[string]string{ "foo": "bar", }, @@ -108,7 +113,7 @@ func (fips *FakeInputProcessorSaver) SaveMeasurement(m *model.Measurement) error func TestInputProcessorSaveOnDiskFailed(t *testing.T) { expected := errors.New("mocked error") - ip := InputProcessor{ + ip := &InputProcessor{ Experiment: NewInputProcessorExperimentWrapper( &FakeInputProcessorExperiment{}, ), @@ -133,7 +138,7 @@ func TestInputProcessorGood(t *testing.T) { fipe := &FakeInputProcessorExperiment{} saver := &FakeInputProcessorSaver{Err: nil} submitter := &FakeInputProcessorSubmitter{Err: nil} - ip := InputProcessor{ + ip := &InputProcessor{ Experiment: NewInputProcessorExperimentWrapper(fipe), Inputs: []model.URLInfo{{ URL: "https://www.kernel.org/", @@ -148,6 +153,9 @@ func TestInputProcessorGood(t *testing.T) { if err := ip.Run(ctx); err != nil { t.Fatal(err) } + if ip.terminatedByMaxRuntime > 0 { + t.Fatal("terminated by max runtime!?") + } if len(fipe.M) != 2 || len(saver.M) != 2 || len(submitter.M) != 2 { t.Fatal("not all measurements saved") } @@ -164,3 +172,30 @@ func TestInputProcessorGood(t *testing.T) { t.Fatal("invalid saver.M[1].Input") } } + +func TestInputProcessorMaxRuntime(t *testing.T) { + fipe := &FakeInputProcessorExperiment{ + SleepTime: 50 * time.Millisecond, + } + saver := &FakeInputProcessorSaver{Err: nil} + submitter := &FakeInputProcessorSubmitter{Err: nil} + ip := &InputProcessor{ + Experiment: NewInputProcessorExperimentWrapper(fipe), + Inputs: []model.URLInfo{{ + URL: "https://www.kernel.org/", + }, { + URL: "https://www.slashdot.org/", + }}, + MaxRuntime: 1 * time.Nanosecond, + Options: []string{"fake=true"}, + Saver: NewInputProcessorSaverWrapper(saver), + Submitter: NewInputProcessorSubmitterWrapper(submitter), + } + ctx := context.Background() + if err := ip.Run(ctx); err != nil { + t.Fatal(err) + } + if ip.terminatedByMaxRuntime <= 0 { + t.Fatal("not terminated by max runtime") + } +}