From 9cdca4137d53c44671ac5c3c522521b035140b02 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 2 Dec 2021 12:47:07 +0100 Subject: [PATCH] forwardport: pull the patches mentioned in ooni/probe#1908 (#629) * [forwardport] fix(oonimkall): make logger used by tasks unit testable (#623) This diff forward ports e4b04642c51e7461728b25941624e1b97ef0ec83. Reference issue: https://github.com/ooni/probe/issues/1903 * [forwardport] feat(oonimkall): improve taskEmitter testability (#624) This diff forward ports 3e0f01a389c1f4cdd7878ec151aff91870a0bdff. 1. rename eventemitter{,_test}.go => taskemitter{,_test}.go because the new name is more proper after we merged the internal/task package inside of the oonimkall package; 2. rename runner.go's `run` function to `runTask`; 3. modify `runTask` to use the new `taskEmitterUsingChan` abstraction on which we will spend more works in a later point of this list; 4. introduce `runTaskWithEmitter` factory that is called by `runTask` and allows us to more easily write unit tests; 5. acknowledge that `runner` was not using its `out` field; 6. use the new `taskEmitterWrapper` in `newRunner`; 7. acknowledge that `runnerCallbacks` could use a generic `taskEmitter` as field type rather than a specific type; 8. rewrite tests to use `runTaskWithEmitter` which leads to simpler code that does not require a goroutine; 9. acknowledge that the code has been ignoring the `DisabledEvents` settings for quite some time, so stop supporting it; 10. refactor the `taskEmitter` implementation to be like: 1. we still have the `taskEmitter` interface; 2. `taskEmitterUsingChan` wraps the channel and allows for emitting events using the channel; 3. `taskEmitterUsingChan` owns an `eof` channel that is closed by `Close` (which is idempotent) and signals we should be stop emitting; 4. make sure `runTask` creates a `taskEmitterUsingChan` and calls its `Close` method when done; 5. completely remove the code for disabling events since the code was actually ignoring the stting; 6. add a `taskEmitterWrapper` that adds common functions for emitting events to _any_ `taskWrapper`; 7. write unit tests for `taskEmitterUsingChan` and for `taskEmitterWrapper`; 11. acknowledge that the abstraction we need for testing is actually a thread-safe thing that collects events into a vector containing events and refactor all tests accordingly. See https://github.com/ooni/probe/issues/1903 * [forwardport] refactor(oonimkall): make the runner unit-testable (#625) This diff forward ports 9423947faf6980d92d2fe67efe3829e8fef76586. See https://github.com/ooni/probe/issues/1903 * [forwardport] feat(oonimkall): write unit tests for the runner component (#626) This diff forward ports 35dd0e3788b8fa99c541452bbb5e0ae4871239e1. Forward porting note: compared to 35dd0e3788b8fa99c541452bbb5e0ae4871239e1, the diff I'm committing here is slightly different. In `master` we do not have the case where a measurement fails and a measurement is returned, thus I needed to adapt the test to become like this: ```diff diff --git a/pkg/oonimkall/runner_internal_test.go b/pkg/oonimkall/runner_internal_test.go index 334b574..84c7436 100644 --- a/pkg/oonimkall/runner_internal_test.go +++ b/pkg/oonimkall/runner_internal_test.go @@ -568,15 +568,6 @@ func TestTaskRunnerRun(t *testing.T) { }, { Key: failureMeasurement, Count: 1, - }, { - Key: measurement, - Count: 1, - }, { - Key: statusMeasurementSubmission, - Count: 1, - }, { - Key: statusMeasurementDone, - Count: 1, }, { Key: statusEnd, Count: 1, ``` I still need to write more assertions for each emitted event but the code we've here is already a great starting point. See https://github.com/ooni/probe/issues/1903 * [forwardport] refactor(oonimkall): merge files, use proper names, zap unneeded integration tests (#627) This diff forward ports f894427d24edc9a03fc78306d0093e7b51c46c25. Forward porting note: this diff is slightly different from the original mentioned above because it carries forward changes mentioned in the previous diff caused by a different way of handling a failed measurement in the master branch compared to the release/3.11 branch. Move everything that looked like "task's model" inside of the taskmodel.go file, for consistency. Make sure it's clear some variables are event types. Rename the concrete `runner` as `runnerForTask`. Also, remove now-unnecessary (and flaky!) integration tests for the `runnerForTask` type. While there, notice there were wrong URLs that were generated during the probe-engine => probe-cli move and fix them. See https://github.com/ooni/probe/issues/1903 * [forwardport] refactor(oonimkall): we can simplify StartTask tests (#628) This diff forward ports dcf2986c2032d8185d58d24130a7f2c2d61ef2fb. * refactor(oonimkall): we can simplify StartTask tests We have enough checks for runnerForTask. So we do not need to duplicate them when checking for StartTask. While there, refactor how we start tasks to remove the need for extra runner functions. This is the objective I wanted to achieve for oonimkall: 1. less duplicate tests, and 2. more unit tests (which are less flaky) At this point, we're basically done (pending forwardporting to master) with https://github.com/ooni/probe/issues/1903. * fix(oonimkall): TestStartTaskGood shouldn't cancel the test This creates a race condition where the test may fail if we cannot complete the whole "Example" test in less than one second. This should explain the build failures I've seen so far and why I didn't see those failures when running locally. --- internal/engine/allexperiments.go | 53 -- internal/engine/experiment/dash/dash.go | 2 +- internal/engine/legacy/netx/resolver_test.go | 2 +- .../engine/netx/archival/archival_test.go | 2 +- .../engine/netx/resolver/integration_test.go | 2 +- pkg/oonimkall/chanlogger.go | 85 -- pkg/oonimkall/event.go | 57 -- pkg/oonimkall/eventemitter.go | 47 -- pkg/oonimkall/eventemitter_test.go | 84 -- pkg/oonimkall/runner_integration_test.go | 402 ---------- pkg/oonimkall/runner_internal_test.go | 83 -- pkg/oonimkall/settings.go | 91 --- pkg/oonimkall/task.go | 9 +- pkg/oonimkall/task_integration_test.go | 528 ------------- pkg/oonimkall/task_test.go | 171 ++++ pkg/oonimkall/taskemitter.go | 63 ++ pkg/oonimkall/taskemitter_test.go | 109 +++ pkg/oonimkall/tasklogger.go | 116 +++ pkg/oonimkall/tasklogger_test.go | 106 +++ pkg/oonimkall/taskmocks_test.go | 222 ++++++ pkg/oonimkall/taskmodel.go | 374 +++++++++ pkg/oonimkall/{runner.go => taskrunner.go} | 134 ++-- pkg/oonimkall/taskrunner_test.go | 741 ++++++++++++++++++ pkg/oonimkall/tasksession.go | 78 ++ pkg/oonimkall/tasksession_test.go | 128 +++ 25 files changed, 2167 insertions(+), 1522 deletions(-) delete mode 100644 pkg/oonimkall/chanlogger.go delete mode 100644 pkg/oonimkall/event.go delete mode 100644 pkg/oonimkall/eventemitter.go delete mode 100644 pkg/oonimkall/eventemitter_test.go delete mode 100644 pkg/oonimkall/runner_integration_test.go delete mode 100644 pkg/oonimkall/runner_internal_test.go delete mode 100644 pkg/oonimkall/settings.go delete mode 100644 pkg/oonimkall/task_integration_test.go create mode 100644 pkg/oonimkall/task_test.go create mode 100644 pkg/oonimkall/taskemitter.go create mode 100644 pkg/oonimkall/taskemitter_test.go create mode 100644 pkg/oonimkall/tasklogger.go create mode 100644 pkg/oonimkall/tasklogger_test.go create mode 100644 pkg/oonimkall/taskmocks_test.go create mode 100644 pkg/oonimkall/taskmodel.go rename pkg/oonimkall/{runner.go => taskrunner.go} (61%) create mode 100644 pkg/oonimkall/taskrunner_test.go create mode 100644 pkg/oonimkall/tasksession.go create mode 100644 pkg/oonimkall/tasksession_test.go diff --git a/internal/engine/allexperiments.go b/internal/engine/allexperiments.go index dd12d3d..62619b9 100644 --- a/internal/engine/allexperiments.go +++ b/internal/engine/allexperiments.go @@ -69,59 +69,6 @@ var experimentsByName = map[string]func(*Session) *ExperimentBuilder{ } }, - "example_with_input": func(session *Session) *ExperimentBuilder { - return &ExperimentBuilder{ - build: func(config interface{}) *Experiment { - return NewExperiment(session, example.NewExperimentMeasurer( - *config.(*example.Config), "example_with_input", - )) - }, - config: &example.Config{ - Message: "Good day from the example with input experiment!", - SleepTime: int64(time.Second), - }, - interruptible: true, - inputPolicy: InputStrictlyRequired, - } - }, - - // TODO(bassosimone): when we can set experiment options using the JSON - // we need to get rid of all these multiple experiments. - // - // See https://github.com/ooni/probe-engine/issues/413 - "example_with_input_non_interruptible": func(session *Session) *ExperimentBuilder { - return &ExperimentBuilder{ - build: func(config interface{}) *Experiment { - return NewExperiment(session, example.NewExperimentMeasurer( - *config.(*example.Config), "example_with_input_non_interruptible", - )) - }, - config: &example.Config{ - Message: "Good day from the example with input experiment!", - SleepTime: int64(time.Second), - }, - interruptible: false, - inputPolicy: InputStrictlyRequired, - } - }, - - "example_with_failure": func(session *Session) *ExperimentBuilder { - return &ExperimentBuilder{ - build: func(config interface{}) *Experiment { - return NewExperiment(session, example.NewExperimentMeasurer( - *config.(*example.Config), "example_with_failure", - )) - }, - config: &example.Config{ - Message: "Good day from the example with failure experiment!", - ReturnError: true, - SleepTime: int64(time.Second), - }, - interruptible: true, - inputPolicy: InputNone, - } - }, - "facebook_messenger": func(session *Session) *ExperimentBuilder { return &ExperimentBuilder{ build: func(config interface{}) *Experiment { diff --git a/internal/engine/experiment/dash/dash.go b/internal/engine/experiment/dash/dash.go index 3792de8..6414085 100644 --- a/internal/engine/experiment/dash/dash.go +++ b/internal/engine/experiment/dash/dash.go @@ -158,7 +158,7 @@ func (r runner) measure( // connecting or later. We cannot say that very precisely // because, in principle, we may reconnect. So we always // return error here. This comment is being introduced so - // that we don't do https://github.com/ooni/probe-cli/v3/internal/engine/pull/526 + // that we don't do https://github.com/ooni/probe-engine/pull/526 // again, because that isn't accurate. return err } diff --git a/internal/engine/legacy/netx/resolver_test.go b/internal/engine/legacy/netx/resolver_test.go index eba00bc..724194c 100644 --- a/internal/engine/legacy/netx/resolver_test.go +++ b/internal/engine/legacy/netx/resolver_test.go @@ -28,7 +28,7 @@ func testresolverquick(t *testing.T, network, address string) { } var foundquad8 bool for _, addr := range addrs { - // See https://github.com/ooni/probe-cli/v3/internal/engine/pull/954/checks?check_run_id=1182269025 + // See https://github.com/ooni/probe-engine/pull/954/checks?check_run_id=1182269025 if addr == "8.8.8.8" || addr == "2001:4860:4860::8888" { foundquad8 = true } diff --git a/internal/engine/netx/archival/archival_test.go b/internal/engine/netx/archival/archival_test.go index dfd0f52..f531e4d 100644 --- a/internal/engine/netx/archival/archival_test.go +++ b/internal/engine/netx/archival/archival_test.go @@ -206,7 +206,7 @@ func TestNewRequestList(t *testing.T) { }}, }, { // for an example of why we need to sort headers, see - // https://github.com/ooni/probe-cli/v3/internal/engine/pull/751/checks?check_run_id=853562310 + // https://github.com/ooni/probe-engine/pull/751/checks?check_run_id=853562310 name: "run with redirect and headers to sort", args: args{ begin: begin, diff --git a/internal/engine/netx/resolver/integration_test.go b/internal/engine/netx/resolver/integration_test.go index 280214e..6181e5e 100644 --- a/internal/engine/netx/resolver/integration_test.go +++ b/internal/engine/netx/resolver/integration_test.go @@ -32,7 +32,7 @@ func testresolverquick(t *testing.T, reso resolver.Resolver) { } var foundquad8 bool for _, addr := range addrs { - // See https://github.com/ooni/probe-cli/v3/internal/engine/pull/954/checks?check_run_id=1182269025 + // See https://github.com/ooni/probe-engine/pull/954/checks?check_run_id=1182269025 if addr == "8.8.8.8" || addr == "2001:4860:4860::8888" { foundquad8 = true } diff --git a/pkg/oonimkall/chanlogger.go b/pkg/oonimkall/chanlogger.go deleted file mode 100644 index 0f8b957..0000000 --- a/pkg/oonimkall/chanlogger.go +++ /dev/null @@ -1,85 +0,0 @@ -package oonimkall - -import "fmt" - -// chanLogger is a logger targeting a channel -type chanLogger struct { - emitter *eventEmitter - hasdebug bool - hasinfo bool - haswarning bool - out chan<- *event -} - -// Debug implements Logger.Debug -func (cl *chanLogger) Debug(msg string) { - if cl.hasdebug { - cl.emitter.Emit("log", eventLog{ - LogLevel: "DEBUG", - Message: msg, - }) - } -} - -// Debugf implements Logger.Debugf -func (cl *chanLogger) Debugf(format string, v ...interface{}) { - if cl.hasdebug { - cl.Debug(fmt.Sprintf(format, v...)) - } -} - -// Info implements Logger.Info -func (cl *chanLogger) Info(msg string) { - if cl.hasinfo { - cl.emitter.Emit("log", eventLog{ - LogLevel: "INFO", - Message: msg, - }) - } -} - -// Infof implements Logger.Infof -func (cl *chanLogger) Infof(format string, v ...interface{}) { - if cl.hasinfo { - cl.Info(fmt.Sprintf(format, v...)) - } -} - -// Warn implements Logger.Warn -func (cl *chanLogger) Warn(msg string) { - if cl.haswarning { - cl.emitter.Emit("log", eventLog{ - LogLevel: "WARNING", - Message: msg, - }) - } -} - -// Warnf implements Logger.Warnf -func (cl *chanLogger) Warnf(format string, v ...interface{}) { - if cl.haswarning { - cl.Warn(fmt.Sprintf(format, v...)) - } -} - -// newChanLogger creates a new ChanLogger instance. -func newChanLogger(emitter *eventEmitter, logLevel string, - out chan<- *event) *chanLogger { - cl := &chanLogger{ - emitter: emitter, - out: out, - } - switch logLevel { - case "DEBUG", "DEBUG2": - cl.hasdebug = true - fallthrough - case "INFO": - cl.hasinfo = true - fallthrough - case "ERR", "WARNING": - fallthrough - default: - cl.haswarning = true - } - return cl -} diff --git a/pkg/oonimkall/event.go b/pkg/oonimkall/event.go deleted file mode 100644 index 08e93f4..0000000 --- a/pkg/oonimkall/event.go +++ /dev/null @@ -1,57 +0,0 @@ -package oonimkall - -type eventEmpty struct{} - -// eventFailure contains information on a failure. -type eventFailure struct { - Failure string `json:"failure"` -} - -// eventLog is an event containing a log message. -type eventLog struct { - LogLevel string `json:"log_level"` - Message string `json:"message"` -} - -type eventMeasurementGeneric struct { - Failure string `json:"failure,omitempty"` - Idx int64 `json:"idx"` - Input string `json:"input"` - JSONStr string `json:"json_str,omitempty"` -} - -type eventStatusEnd struct { - DownloadedKB float64 `json:"downloaded_kb"` - Failure string `json:"failure"` - UploadedKB float64 `json:"uploaded_kb"` -} - -type eventStatusGeoIPLookup struct { - ProbeASN string `json:"probe_asn"` - ProbeCC string `json:"probe_cc"` - ProbeIP string `json:"probe_ip"` - ProbeNetworkName string `json:"probe_network_name"` -} - -// eventStatusProgress reports progress information. -type eventStatusProgress struct { - Message string `json:"message"` - Percentage float64 `json:"percentage"` -} - -type eventStatusReportGeneric struct { - ReportID string `json:"report_id"` -} - -type eventStatusResolverLookup struct { - ResolverASN string `json:"resolver_asn"` - ResolverIP string `json:"resolver_ip"` - ResolverNetworkName string `json:"resolver_network_name"` -} - -// event is an event emitted by a task. This structure extends the event -// described by MK v0.10.9 FFI API (https://git.io/Jv4Rv). -type event struct { - Key string `json:"key"` - Value interface{} `json:"value"` -} diff --git a/pkg/oonimkall/eventemitter.go b/pkg/oonimkall/eventemitter.go deleted file mode 100644 index 83b19b5..0000000 --- a/pkg/oonimkall/eventemitter.go +++ /dev/null @@ -1,47 +0,0 @@ -package oonimkall - -// eventEmitter emits event on a channel -type eventEmitter struct { - disabled map[string]bool - eof <-chan interface{} - out chan<- *event -} - -// newEventEmitter creates a new Emitter -func newEventEmitter(disabledEvents []string, out chan<- *event, - eof <-chan interface{}) *eventEmitter { - ee := &eventEmitter{eof: eof, out: out} - ee.disabled = make(map[string]bool) - for _, eventname := range disabledEvents { - ee.disabled[eventname] = true - } - return ee -} - -// EmitFailureStartup emits the failureStartup event -func (ee *eventEmitter) EmitFailureStartup(failure string) { - ee.EmitFailureGeneric(failureStartup, failure) -} - -// EmitFailureGeneric emits a failure event -func (ee *eventEmitter) EmitFailureGeneric(name, failure string) { - ee.Emit(name, eventFailure{Failure: failure}) -} - -// EmitStatusProgress emits the status.Progress event -func (ee *eventEmitter) EmitStatusProgress(percentage float64, message string) { - ee.Emit(statusProgress, eventStatusProgress{Message: message, Percentage: percentage}) -} - -// Emit emits the specified event -func (ee *eventEmitter) Emit(key string, value interface{}) { - if ee.disabled[key] { - return - } - // Prevent this goroutine from blocking on `ee.out` if the caller - // has already told us it's not going to accept more events. - select { - case ee.out <- &event{Key: key, Value: value}: - case <-ee.eof: - } -} diff --git a/pkg/oonimkall/eventemitter_test.go b/pkg/oonimkall/eventemitter_test.go deleted file mode 100644 index e0c18e5..0000000 --- a/pkg/oonimkall/eventemitter_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package oonimkall - -import "testing" - -func TestDisabledEvents(t *testing.T) { - out := make(chan *event) - eof := make(chan interface{}) - emitter := newEventEmitter([]string{"log"}, out, eof) - go func() { - emitter.Emit("log", eventLog{Message: "foo"}) - close(eof) - }() - var count int64 -Loop: - for { - select { - case ev := <-out: - if ev.Key == "log" { - count++ - } - case <-eof: - break Loop - } - } - if count > 0 { - t.Fatal("cannot disable events") - } -} - -func TestEmitFailureStartup(t *testing.T) { - out := make(chan *event) - eof := make(chan interface{}) - emitter := newEventEmitter([]string{}, out, eof) - go func() { - emitter.EmitFailureStartup("mocked error") - close(eof) - }() - var found bool -Loop: - for { - select { - case ev := <-out: - if ev.Key == "failure.startup" { - evv := ev.Value.(eventFailure) // panic if not castable - if evv.Failure == "mocked error" { - found = true - } - } - case <-eof: - break Loop - } - } - if !found { - t.Fatal("did not see expected event") - } -} - -func TestEmitStatusProgress(t *testing.T) { - out := make(chan *event) - eof := make(chan interface{}) - emitter := newEventEmitter([]string{}, out, eof) - go func() { - emitter.EmitStatusProgress(0.7, "foo") - close(eof) - }() - var found bool -Loop: - for { - select { - case ev := <-out: - if ev.Key == "status.progress" { - evv := ev.Value.(eventStatusProgress) // panic if not castable - if evv.Message == "foo" && evv.Percentage == 0.7 { - found = true - } - } - case <-eof: - break Loop - } - } - if !found { - t.Fatal("did not see expected event") - } -} diff --git a/pkg/oonimkall/runner_integration_test.go b/pkg/oonimkall/runner_integration_test.go deleted file mode 100644 index e4c2988..0000000 --- a/pkg/oonimkall/runner_integration_test.go +++ /dev/null @@ -1,402 +0,0 @@ -package oonimkall - -import ( - "context" - "fmt" - "net/http" - "net/http/httptest" - "sync" - "testing" - "time" -) - -func TestRunnerMaybeLookupBackendsFailure(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(500) - })) - defer server.Close() - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - Name: "Example", - Options: settingsOptions{ - ProbeServicesBaseURL: server.URL, - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var failures []string - for ev := range out { - switch ev.Key { - case "failure.startup": - failure := ev.Value.(eventFailure).Failure - failures = append(failures, failure) - case "status.queued", "status.started", "log", "status.end": - default: - panic(fmt.Sprintf("unexpected key: %s", ev.Key)) - } - } - if len(failures) != 1 { - t.Fatal("unexpected number of failures") - } - if failures[0] != "all available probe services failed" { - t.Fatal("invalid failure") - } -} - -func TestRunnerOpenReportFailure(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - var ( - nreq int64 - mu sync.Mutex - ) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - mu.Lock() - defer mu.Unlock() - nreq++ - if nreq == 1 { - w.Write([]byte(`{}`)) - return - } - w.WriteHeader(500) - })) - defer server.Close() - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - Name: "Example", - Options: settingsOptions{ - ProbeServicesBaseURL: server.URL, - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - seench := make(chan int64) - go func() { - var seen int64 - for ev := range out { - switch ev.Key { - case "failure.report_create": - seen++ - case "status.progress": - evv := ev.Value.(eventStatusProgress) - if evv.Percentage >= 0.4 { - panic(fmt.Sprintf("too much progress: %+v", ev)) - } - case "status.queued", "status.started", "log", "status.end", - "status.geoip_lookup", "status.resolver_lookup": - default: - panic(fmt.Sprintf("unexpected key: %s", ev.Key)) - } - } - seench <- seen - }() - run(context.Background(), settings, out) - close(out) - if n := <-seench; n != 1 { - t.Fatal("unexpected number of events") - } -} - -func TestRunnerGood(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - LogLevel: "DEBUG", - Name: "Example", - Options: settingsOptions{ - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var found bool - for ev := range out { - if ev.Key == "status.end" { - found = true - } - } - if !found { - t.Fatal("status.end event not found") - } -} - -func TestRunnerWithUnsupportedSettings(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - LogLevel: "DEBUG", - Name: "Example", - Options: settingsOptions{ - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var failures []string - for ev := range out { - if ev.Key == "failure.startup" { - failure := ev.Value.(eventFailure).Failure - failures = append(failures, failure) - } - } - if len(failures) != 1 { - t.Fatal("invalid number of failures") - } - if failures[0] != failureInvalidVersion { - t.Fatal("not the failure we expected") - } -} - -func TestRunnerWithInvalidKVStorePath(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - LogLevel: "DEBUG", - Name: "Example", - Options: settingsOptions{ - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "", // must be empty to cause the failure below - Version: 1, - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var failures []string - for ev := range out { - if ev.Key == "failure.startup" { - failure := ev.Value.(eventFailure).Failure - failures = append(failures, failure) - } - } - if len(failures) != 1 { - t.Fatal("invalid number of failures") - } - if failures[0] != "mkdir : no such file or directory" { - t.Fatal("not the failure we expected") - } -} - -func TestRunnerWithInvalidExperimentName(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - LogLevel: "DEBUG", - Name: "Nonexistent", - Options: settingsOptions{ - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var failures []string - for ev := range out { - if ev.Key == "failure.startup" { - failure := ev.Value.(eventFailure).Failure - failures = append(failures, failure) - } - } - if len(failures) != 1 { - t.Fatal("invalid number of failures") - } - if failures[0] != "no such experiment: Nonexistent" { - t.Fatalf("not the failure we expected: %s", failures[0]) - } -} - -func TestRunnerWithMissingInput(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - LogLevel: "DEBUG", - Name: "ExampleWithInput", - Options: settingsOptions{ - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var failures []string - for ev := range out { - if ev.Key == "failure.startup" { - failure := ev.Value.(eventFailure).Failure - failures = append(failures, failure) - } - } - if len(failures) != 1 { - t.Fatal("invalid number of failures") - } - if failures[0] != "no input provided" { - t.Fatalf("not the failure we expected: %s", failures[0]) - } -} - -func TestRunnerWithMaxRuntime(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, - LogLevel: "DEBUG", - Name: "ExampleWithInput", - Options: settingsOptions{ - MaxRuntime: 1, - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - begin := time.Now() - go func() { - run(context.Background(), settings, out) - close(out) - }() - var found bool - for ev := range out { - if ev.Key == "status.end" { - found = true - } - } - if !found { - t.Fatal("status.end event not found") - } - // The runtime is long because of ancillary operations and is even more - // longer because of self shaping we may be performing (especially in - // CI builds) using `-tags shaping`). We have experimentally determined - // that ~10 seconds is the typical CI test run time. See: - // - // 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788 - // - // 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855 - if time.Since(begin) > 10*time.Second { - t.Fatal("expected shorter runtime") - } -} - -func TestRunnerWithMaxRuntimeNonInterruptible(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, - LogLevel: "DEBUG", - Name: "ExampleWithInputNonInterruptible", - Options: settingsOptions{ - MaxRuntime: 1, - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - begin := time.Now() - go func() { - run(context.Background(), settings, out) - close(out) - }() - var found bool - for ev := range out { - if ev.Key == "status.end" { - found = true - } - } - if !found { - t.Fatal("status.end event not found") - } - // The runtime is long because of ancillary operations and is even more - // longer because of self shaping we may be performing (especially in - // CI builds) using `-tags shaping`). We have experimentally determined - // that ~10 seconds is the typical CI test run time. See: - // - // 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788 - // - // 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855 - if time.Since(begin) > 10*time.Second { - t.Fatal("expected shorter runtime") - } -} - -func TestRunnerWithFailedMeasurement(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, - LogLevel: "DEBUG", - Name: "ExampleWithFailure", - Options: settingsOptions{ - MaxRuntime: 1, - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var found bool - for ev := range out { - if ev.Key == "failure.measurement" { - found = true - } - } - if !found { - t.Fatal("failure.measurement event not found") - } -} diff --git a/pkg/oonimkall/runner_internal_test.go b/pkg/oonimkall/runner_internal_test.go deleted file mode 100644 index 0f3f604..0000000 --- a/pkg/oonimkall/runner_internal_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package oonimkall - -import ( - "context" - "errors" - "fmt" - "testing" - - engine "github.com/ooni/probe-cli/v3/internal/engine" -) - -func TestMeasurementSubmissionEventName(t *testing.T) { - if measurementSubmissionEventName(nil) != statusMeasurementSubmission { - t.Fatal("unexpected submission event name") - } - if measurementSubmissionEventName(errors.New("mocked error")) != failureMeasurementSubmission { - t.Fatal("unexpected submission event name") - } -} - -func TestMeasurementSubmissionFailure(t *testing.T) { - if measurementSubmissionFailure(nil) != "" { - t.Fatal("unexpected submission failure") - } - if measurementSubmissionFailure(errors.New("mocked error")) != "mocked error" { - t.Fatal("unexpected submission failure") - } -} - -func TestRunnerMaybeLookupLocationFailure(t *testing.T) { - if testing.Short() { - // TODO(https://github.com/ooni/probe-cli/pull/518) - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - Name: "Example", - Options: settingsOptions{ - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - seench := make(chan int64) - eof := make(chan interface{}) - go func() { - var seen int64 - Loop: - for { - select { - case ev := <-out: - switch ev.Key { - case "failure.ip_lookup", "failure.asn_lookup", - "failure.cc_lookup", "failure.resolver_lookup": - seen++ - case "status.progress": - evv := ev.Value.(eventStatusProgress) - if evv.Percentage >= 0.2 { - panic(fmt.Sprintf("too much progress: %+v", ev)) - } - case "status.queued", "status.started", "status.end": - default: - panic(fmt.Sprintf("unexpected key: %s - %+v", ev.Key, ev.Value)) - } - case <-eof: - break Loop - } - } - seench <- seen - }() - expected := errors.New("mocked error") - r := newRunner(settings, out, eof) - r.maybeLookupLocation = func(*engine.Session) error { - return expected - } - r.Run(context.Background()) - close(eof) - if n := <-seench; n != 4 { - t.Fatal("unexpected number of events") - } -} diff --git a/pkg/oonimkall/settings.go b/pkg/oonimkall/settings.go deleted file mode 100644 index d18eafd..0000000 --- a/pkg/oonimkall/settings.go +++ /dev/null @@ -1,91 +0,0 @@ -package oonimkall - -// Settings contains settings for a task. This structure derives from -// the one described by MK v0.10.9 FFI API (https://git.io/Jv4Rv), yet -// since 2020-12-03 we're not backwards compatible anymore. -type settings struct { - // Annotations contains the annotations to be added - // to every measurements performed by the task. - Annotations map[string]string `json:"annotations,omitempty"` - - // AssetsDir is the directory where to store assets. This - // field is an extension of MK's specification. If - // this field is empty, the task won't start. - AssetsDir string `json:"assets_dir"` - - // DisabledEvents contains disabled events. See - // https://git.io/Jv4Rv for the events names. - DisabledEvents []string `json:"disabled_events,omitempty"` - - // Inputs contains the inputs. The task will fail if it - // requires input and you provide no input. - Inputs []string `json:"inputs,omitempty"` - - // LogLevel contains the logs level. See https://git.io/Jv4Rv - // for the names of the available log levels. - LogLevel string `json:"log_level,omitempty"` - - // Name contains the task name. By https://git.io/Jv4Rv the - // names are in camel case, e.g. `Ndt`. - Name string `json:"name"` - - // Options contains the task options. - Options settingsOptions `json:"options"` - - // Proxy allows you to optionally force a specific proxy - // rather than using no proxy (the default). - // - // Use `psiphon:///` to force using Psiphon with the - // embedded configuration file. Not all builds have - // an embedded configuration file, but OONI builds have - // such a file, so they can use this functionality. - // - // Use `socks5://10.0.0.1:9050/` to connect to a SOCKS5 - // proxy running on 10.0.0.1:9050. This could be, for - // example, a suitably configured `tor` instance. - Proxy string - - // StateDir is the directory where to store persistent data. This - // field is an extension of MK's specification. If - // this field is empty, the task won't start. - StateDir string `json:"state_dir"` - - // TempDir is the temporary directory. This field is an extension of MK's - // specification. If this field is empty, we will pick the tempdir that - // ioutil.TempDir uses by default, which may not work on mobile. According - // to our experiments as of 2020-06-10, leaving the TempDir empty works - // for iOS and does not work for Android. - TempDir string `json:"temp_dir"` - - // TunnelDir is the directory where to store persistent state - // related to circumvention tunnels. This directory is required - // only if you want to use the tunnels. Added since 3.10.0. - TunnelDir string `json:"tunnel_dir"` - - // Version indicates the version of this structure. - Version int64 `json:"version"` -} - -// settingsOptions contains the settings options -type settingsOptions struct { - // MaxRuntime is the maximum runtime expressed in seconds. A negative - // value for this field disables the maximum runtime. Using - // a zero value will also mean disabled. This is not the - // original behaviour of Measurement Kit, which used to run - // for zero time in such case. - MaxRuntime float64 `json:"max_runtime,omitempty"` - - // NoCollector indicates whether to use a collector - NoCollector bool `json:"no_collector,omitempty"` - - // ProbeServicesBaseURL contains the probe services base URL. - ProbeServicesBaseURL string `json:"probe_services_base_url,omitempty"` - - // SoftwareName is the software name. If this option is not - // present, then the library startup will fail. - SoftwareName string `json:"software_name,omitempty"` - - // SoftwareVersion is the software version. If this option is not - // present, then the library startup will fail. - SoftwareVersion string `json:"software_version,omitempty"` -} diff --git a/pkg/oonimkall/task.go b/pkg/oonimkall/task.go index 2262750..5eec0a0 100644 --- a/pkg/oonimkall/task.go +++ b/pkg/oonimkall/task.go @@ -23,7 +23,7 @@ // (measurement-kit/measurement-kit@v0.10.9) for a comprehensive // description of MK's FFI API. // -// See also https://github.com/ooni/probe-cli/v3/internal/engine/pull/347 for the +// See also https://github.com/ooni/probe-engine/pull/347 for the // design document describing the task API. // // See also https://github.com/ooni/probe-cli/v3/internal/engine/blob/master/DESIGN.md, @@ -33,7 +33,7 @@ // // The Session API is a Go API that can be exported to mobile apps // using the gomobile tool. The latest design document for this API is -// at https://github.com/ooni/probe-cli/v3/internal/engine/pull/954. +// at https://github.com/ooni/probe-engine/pull/954. // // The basic tenet of the session API is that you create an instance // of `Session` and use it to perform the operations you need. @@ -84,8 +84,11 @@ func StartTask(input string) (*Task, error) { } go func() { close(task.isstarted) - run(ctx, &settings, task.out) + emitter := newTaskEmitterUsingChan(task.out) + r := newRunner(&settings, emitter) + r.Run(ctx) task.out <- nil // signal that we're done w/o closing the channel + emitter.Close() close(task.isstopped) }() return task, nil diff --git a/pkg/oonimkall/task_integration_test.go b/pkg/oonimkall/task_integration_test.go deleted file mode 100644 index 8ac50b1..0000000 --- a/pkg/oonimkall/task_integration_test.go +++ /dev/null @@ -1,528 +0,0 @@ -package oonimkall - -import ( - "encoding/json" - "errors" - "strings" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/ooni/probe-cli/v3/internal/engine/model" -) - -type eventlike struct { - Key string `json:"key"` - Value map[string]interface{} `json:"value"` -} - -func TestGood(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "log_level": "DEBUG", - "name": "Example", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - // interrupt the task so we also exercise this functionality - go func() { - <-time.After(time.Second) - task.Interrupt() - }() - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - t.Fatal("unexpected failure.startup event") - } - } - // make sure we only see task_terminated at this point - for { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "task_terminated" { - break - } - t.Fatalf("unexpected event.Key: %s", event.Key) - } -} - -func TestWithMeasurementFailure(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "log_level": "DEBUG", - "name": "ExampleWithFailure", - "options": { - "no_geoip": true, - "no_resolver_lookup": true, - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - t.Fatal("unexpected failure.startup event") - } - } -} - -func TestInvalidJSON(t *testing.T) { - task, err := StartTask(`{`) - var syntaxerr *json.SyntaxError - if !errors.As(err, &syntaxerr) { - t.Fatal("not the expected error") - } - if task != nil { - t.Fatal("task is not nil") - } -} - -func TestUnsupportedSetting(t *testing.T) { - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "log_level": "DEBUG", - "name": "Example", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state" - }`) - if err != nil { - t.Fatal(err) - } - var seen bool - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - if strings.Contains(eventstr, failureInvalidVersion) { - seen = true - } - } - } - if !seen { - t.Fatal("did not see failure.startup with invalid version info") - } -} - -func TestEmptyStateDir(t *testing.T) { - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "log_level": "DEBUG", - "name": "Example", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - var seen bool - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - if strings.Contains(eventstr, "mkdir : no such file or directory") { - seen = true - } - } - } - if !seen { - t.Fatal("did not see failure.startup with info that state dir is empty") - } -} - -func TestUnknownExperiment(t *testing.T) { - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "log_level": "DEBUG", - "name": "Antani", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - var seen bool - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - if strings.Contains(eventstr, "no such experiment: ") { - seen = true - } - } - } - if !seen { - t.Fatal("did not see failure.startup") - } -} - -func TestInputIsRequired(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "log_level": "DEBUG", - "name": "ExampleWithInput", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - var seen bool - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - if strings.Contains(eventstr, "no input provided") { - seen = true - } - } - } - if !seen { - t.Fatal("did not see failure.startup") - } -} - -func TestMaxRuntime(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - begin := time.Now() - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "inputs": ["a", "b", "c"], - "name": "ExampleWithInput", - "options": { - "max_runtime": 1, - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - t.Fatal(eventstr) - } - } - // The runtime is long because of ancillary operations and is even more - // longer because of self shaping we may be performing (especially in - // CI builds) using `-tags shaping`). We have experimentally determined - // that ~10 seconds is the typical CI test run time. See: - // - // 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788 - // - // 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855 - // - // In case there are further timeouts, e.g. in the sessionresolver, the - // time used by the experiment will be much more. This is for example the - // case in https://github.com/ooni/probe-engine/issues/1005. - if time.Since(begin) > 10*time.Second { - t.Fatal("expected shorter runtime") - } -} - -func TestInterruptExampleWithInput(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - t.Skip("Skipping broken test; see https://github.com/ooni/probe-engine/issues/992") - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "inputs": [ - "http://www.kernel.org/", - "http://www.x.org/", - "http://www.microsoft.com/", - "http://www.slashdot.org/", - "http://www.repubblica.it/", - "http://www.google.it/", - "http://ooni.org/" - ], - "name": "ExampleWithInputNonInterruptible", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - var keys []string - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - switch event.Key { - case "failure.startup": - t.Fatal(eventstr) - case "status.measurement_start": - go task.Interrupt() - } - // We compress the keys. What matters is basically that we - // see just one of the many possible measurements here. - if keys == nil || keys[len(keys)-1] != event.Key { - keys = append(keys, event.Key) - } - } - expect := []string{ - "status.queued", - "status.started", - "status.progress", - "status.geoip_lookup", - "status.resolver_lookup", - "status.progress", - "status.report_create", - "status.measurement_start", - "log", - "status.progress", - "measurement", - "status.measurement_submission", - "status.measurement_done", - "status.end", - "task_terminated", - } - if diff := cmp.Diff(expect, keys); diff != "" { - t.Fatal(diff) - } -} - -func TestInterruptNdt7(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "name": "Ndt7", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - go func() { - <-time.After(11 * time.Second) - task.Interrupt() - }() - var keys []string - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - t.Fatal(eventstr) - } - // We compress the keys because we don't know how many - // status.progress we will see. What matters is that we - // don't see a measurement submission, since it means - // that we have interrupted the measurement. - if keys == nil || keys[len(keys)-1] != event.Key { - keys = append(keys, event.Key) - } - } - expect := []string{ - "status.queued", - "status.started", - "status.progress", - "status.geoip_lookup", - "status.resolver_lookup", - "status.progress", - "status.report_create", - "status.measurement_start", - "status.progress", - "status.end", - "task_terminated", - } - if diff := cmp.Diff(expect, keys); diff != "" { - t.Fatal(diff) - } -} - -func TestCountBytesForExample(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "name": "Example", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - var downloadKB, uploadKB float64 - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - switch event.Key { - case "failure.startup": - t.Fatal(eventstr) - case "status.end": - downloadKB = event.Value["downloaded_kb"].(float64) - uploadKB = event.Value["uploaded_kb"].(float64) - } - } - if downloadKB == 0 { - t.Fatal("downloadKB is zero") - } - if uploadKB == 0 { - t.Fatal("uploadKB is zero") - } -} - -func TestPrivacyAndScrubbing(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "name": "Example", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - var m *model.Measurement - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - switch event.Key { - case "failure.startup": - t.Fatal(eventstr) - case "measurement": - v := []byte(event.Value["json_str"].(string)) - m = new(model.Measurement) - if err := json.Unmarshal(v, &m); err != nil { - t.Fatal(err) - } - } - } - if m == nil { - t.Fatal("measurement is nil") - } - if m.ProbeASN == "AS0" || m.ProbeCC == "ZZ" || m.ProbeIP != "127.0.0.1" { - t.Fatal("unexpected result") - } -} - -func TestNonblockWithFewEvents(t *testing.T) { - // This test tests whether we won't block for a small - // number of events emitted by the task - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "name": "Example", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - // Wait for the task thread to start - <-task.isstarted - // Wait for the task thread to complete - <-task.isstopped - var count int - for !task.IsDone() { - task.WaitForNextEvent() - count++ - } - if count < 5 { - t.Fatal("too few events") - } -} diff --git a/pkg/oonimkall/task_test.go b/pkg/oonimkall/task_test.go new file mode 100644 index 0000000..034a1de --- /dev/null +++ b/pkg/oonimkall/task_test.go @@ -0,0 +1,171 @@ +package oonimkall + +import ( + "encoding/json" + "errors" + "testing" + + "github.com/ooni/probe-cli/v3/internal/engine/model" +) + +type eventlike struct { + Key string `json:"key"` + Value map[string]interface{} `json:"value"` +} + +func TestStartTaskGood(t *testing.T) { + task, err := StartTask(`{ + "log_level": "DEBUG", + "name": "Example", + "options": { + "software_name": "oonimkall-test", + "software_version": "0.1.0" + }, + "state_dir": "testdata/state", + "version": 1 + }`) + if err != nil { + t.Fatal(err) + } + for !task.IsDone() { + eventstr := task.WaitForNextEvent() + var event eventlike + if err := json.Unmarshal([]byte(eventstr), &event); err != nil { + t.Fatal(err) + } + if event.Key == "failure.startup" { + t.Fatal("unexpected failure.startup event") + } + } + // make sure we only see task_terminated at this point + for { + eventstr := task.WaitForNextEvent() + var event eventlike + if err := json.Unmarshal([]byte(eventstr), &event); err != nil { + t.Fatal(err) + } + if event.Key == "task_terminated" { + break + } + t.Fatalf("unexpected event.Key: %s", event.Key) + } +} + +func TestStartTaskInvalidJSON(t *testing.T) { + task, err := StartTask(`{`) + var syntaxerr *json.SyntaxError + if !errors.As(err, &syntaxerr) { + t.Fatal("not the expected error") + } + if task != nil { + t.Fatal("task is not nil") + } +} + +func TestStartTaskCountBytesForExample(t *testing.T) { + task, err := StartTask(`{ + "name": "Example", + "options": { + "software_name": "oonimkall-test", + "software_version": "0.1.0" + }, + "state_dir": "testdata/state", + "version": 1 + }`) + if err != nil { + t.Fatal(err) + } + var downloadKB, uploadKB float64 + for !task.IsDone() { + eventstr := task.WaitForNextEvent() + var event eventlike + if err := json.Unmarshal([]byte(eventstr), &event); err != nil { + t.Fatal(err) + } + switch event.Key { + case "failure.startup": + t.Fatal(eventstr) + case "status.end": + downloadKB = event.Value["downloaded_kb"].(float64) + uploadKB = event.Value["uploaded_kb"].(float64) + } + } + if downloadKB == 0 { + t.Fatal("downloadKB is zero") + } + if uploadKB == 0 { + t.Fatal("uploadKB is zero") + } +} + +func TestPrivacyAndScrubbing(t *testing.T) { + if testing.Short() { + t.Skip("skip test in short mode") + } + task, err := StartTask(`{ + "assets_dir": "../testdata/oonimkall/assets", + "name": "Example", + "options": { + "software_name": "oonimkall-test", + "software_version": "0.1.0" + }, + "state_dir": "../testdata/oonimkall/state", + "version": 1 + }`) + if err != nil { + t.Fatal(err) + } + var m *model.Measurement + for !task.IsDone() { + eventstr := task.WaitForNextEvent() + var event eventlike + if err := json.Unmarshal([]byte(eventstr), &event); err != nil { + t.Fatal(err) + } + switch event.Key { + case "failure.startup": + t.Fatal(eventstr) + case "measurement": + v := []byte(event.Value["json_str"].(string)) + m = new(model.Measurement) + if err := json.Unmarshal(v, &m); err != nil { + t.Fatal(err) + } + } + } + if m == nil { + t.Fatal("measurement is nil") + } + if m.ProbeASN == "AS0" || m.ProbeCC == "ZZ" || m.ProbeIP != "127.0.0.1" { + t.Fatal("unexpected result") + } +} + +func TestNonblockWithFewEvents(t *testing.T) { + // This test tests whether we won't block for a small + // number of events emitted by the task + task, err := StartTask(`{ + "name": "Example", + "options": { + "software_name": "oonimkall-test", + "software_version": "0.1.0" + }, + "state_dir": "testdata/state", + "version": 1 + }`) + if err != nil { + t.Fatal(err) + } + // Wait for the task thread to start + <-task.isstarted + // Wait for the task thread to complete + <-task.isstopped + var count int + for !task.IsDone() { + task.WaitForNextEvent() + count++ + } + if count < 5 { + t.Fatal("too few events") + } +} diff --git a/pkg/oonimkall/taskemitter.go b/pkg/oonimkall/taskemitter.go new file mode 100644 index 0000000..9506ba8 --- /dev/null +++ b/pkg/oonimkall/taskemitter.go @@ -0,0 +1,63 @@ +package oonimkall + +import "sync" + +// taskEmitterUsingChan is a task emitter using a channel. +type taskEmitterUsingChan struct { + // eof indicates we should not emit anymore. + eof chan interface{} + + // once ensures we close the eof channel just once. + once sync.Once + + // out is the possibly buffered channel where to emit events. + out chan<- *event +} + +// ensure that taskEmitterUsingChan is a taskEmitter. +var _ taskEmitterCloser = &taskEmitterUsingChan{} + +// newTaskEmitterUsingChan creates a taskEmitterUsingChan. +func newTaskEmitterUsingChan(out chan<- *event) *taskEmitterUsingChan { + return &taskEmitterUsingChan{ + eof: make(chan interface{}), + once: sync.Once{}, + out: out, + } +} + +// Emit implements taskEmitter.Emit. +func (ee *taskEmitterUsingChan) Emit(key string, value interface{}) { + // Prevent this goroutine from blocking on `ee.out` if the caller + // has already told us it's not going to accept more events. + select { + case ee.out <- &event{Key: key, Value: value}: + case <-ee.eof: + } +} + +// Close implements taskEmitterCloser.Closer. +func (ee *taskEmitterUsingChan) Close() error { + ee.once.Do(func() { close(ee.eof) }) + return nil +} + +// taskEmitterWrapper is a convenient wrapper for taskEmitter. +type taskEmitterWrapper struct { + taskEmitter +} + +// EmitFailureStartup emits the failureStartup event +func (ee *taskEmitterWrapper) EmitFailureStartup(failure string) { + ee.EmitFailureGeneric(eventTypeFailureStartup, failure) +} + +// EmitFailureGeneric emits a failure event +func (ee *taskEmitterWrapper) EmitFailureGeneric(name, failure string) { + ee.Emit(name, eventFailure{Failure: failure}) +} + +// EmitStatusProgress emits the status.Progress event +func (ee *taskEmitterWrapper) EmitStatusProgress(percentage float64, message string) { + ee.Emit(eventTypeStatusProgress, eventStatusProgress{Message: message, Percentage: percentage}) +} diff --git a/pkg/oonimkall/taskemitter_test.go b/pkg/oonimkall/taskemitter_test.go new file mode 100644 index 0000000..dd950f1 --- /dev/null +++ b/pkg/oonimkall/taskemitter_test.go @@ -0,0 +1,109 @@ +package oonimkall + +import "testing" + +func TestTaskEmitterUsingChan(t *testing.T) { + t.Run("ordinary emit", func(t *testing.T) { + out := make(chan *event) + emitter := newTaskEmitterUsingChan(out) + go func() { + emitter.Emit("foo", nil) + }() + ev := <-out + if ev.Key != "foo" { + t.Fatal("invalid key") + } + if ev.Value != nil { + t.Fatal("invalid value") + } + }) + + t.Run("emit after close", func(t *testing.T) { + out := make(chan *event) + emitter := newTaskEmitterUsingChan(out) + emitter.Close() + done := make(chan interface{}) + go func() { + emitter.Emit("foo", nil) + close(done) + }() + <-done + select { + case <-out: + t.Fatal("should not receive event here") + default: + } + }) + + t.Run("close is idempotent", func(t *testing.T) { + out := make(chan *event) + emitter := newTaskEmitterUsingChan(out) + for i := 0; i < 4; i++ { + emitter.Close() + } + }) +} + +func TestTaskEmitterWrapper(t *testing.T) { + t.Run("emit failureStartup", func(t *testing.T) { + expect := "antani" + collector := &CollectorTaskEmitter{} + emitter := &taskEmitterWrapper{collector} + emitter.EmitFailureStartup(expect) + events := collector.Collect() + if len(events) != 1 { + t.Fatal("invalid number of events") + } + ev := events[0] + if ev.Key != eventTypeFailureStartup { + t.Fatal("invalid key") + } + value := ev.Value.(eventFailure) + if value.Failure != expect { + t.Fatal("invalid failure value") + } + }) + + t.Run("emit failureGeneric", func(t *testing.T) { + expectName := "mascetti" + expectFailure := "antani" + collector := &CollectorTaskEmitter{} + emitter := &taskEmitterWrapper{collector} + emitter.EmitFailureGeneric(expectName, expectFailure) + events := collector.Collect() + if len(events) != 1 { + t.Fatal("invalid number of events") + } + ev := events[0] + if ev.Key != expectName { + t.Fatal("invalid key") + } + value := ev.Value.(eventFailure) + if value.Failure != expectFailure { + t.Fatal("invalid failure value") + } + }) + + t.Run("emit statusProgress", func(t *testing.T) { + percentage := 0.66 + message := "mascetti" + collector := &CollectorTaskEmitter{} + emitter := &taskEmitterWrapper{collector} + emitter.EmitStatusProgress(percentage, message) + events := collector.Collect() + if len(events) != 1 { + t.Fatal("invalid number of events") + } + ev := events[0] + if ev.Key != eventTypeStatusProgress { + t.Fatal("invalid key") + } + value := ev.Value.(eventStatusProgress) + if value.Percentage != percentage { + t.Fatal("invalid percentage value") + } + if value.Message != message { + t.Fatal("invalid message value") + } + }) +} diff --git a/pkg/oonimkall/tasklogger.go b/pkg/oonimkall/tasklogger.go new file mode 100644 index 0000000..d50c970 --- /dev/null +++ b/pkg/oonimkall/tasklogger.go @@ -0,0 +1,116 @@ +package oonimkall + +import ( + "fmt" + + "github.com/ooni/probe-cli/v3/internal/engine/model" +) + +// +// This file implements the logger used by a task. Outside +// of this file, the rest of the codebase just sees a generic +// model.Logger that can log events. +// + +// taskLogger is the logger used by a task. +type taskLogger struct { + // emitter is the event emitter. + emitter taskEmitter + + // hasDebug indicates whether to emit debug logs. + hasDebug bool + + // hasInfo indicates whether to emit info logs. + hasInfo bool + + // hasWarning indicates whether to emit warning logs. + hasWarning bool +} + +// ensure that taskLogger implements model.Logger. +var _ model.Logger = &taskLogger{} + +// Debug implements model.Logger.Debug. +func (cl *taskLogger) Debug(msg string) { + if cl.hasDebug { + cl.emit(logLevelDebug, msg) + } +} + +// Debugf implements model.Logger.Debugf. +func (cl *taskLogger) Debugf(format string, v ...interface{}) { + if cl.hasDebug { + cl.Debug(fmt.Sprintf(format, v...)) + } +} + +// Info implements model.Logger.Info. +func (cl *taskLogger) Info(msg string) { + if cl.hasInfo { + cl.emit(logLevelInfo, msg) + } +} + +// Infof implements model.Logger.Infof. +func (cl *taskLogger) Infof(format string, v ...interface{}) { + if cl.hasInfo { + cl.Info(fmt.Sprintf(format, v...)) + } +} + +// Warn implements model.Logger.Warn. +func (cl *taskLogger) Warn(msg string) { + if cl.hasWarning { + cl.emit(logLevelWarning, msg) + } +} + +// Warnf implements model.Logger.Warnf. +func (cl *taskLogger) Warnf(format string, v ...interface{}) { + if cl.hasWarning { + cl.Warn(fmt.Sprintf(format, v...)) + } +} + +// emit is the code that actually emits the log event. +func (cl *taskLogger) emit(level string, message string) { + cl.emitter.Emit(eventTypeLog, eventLog{ + LogLevel: level, + Message: message, + }) +} + +// newTaskLogger creates a new taskLogger instance. +// +// Arguments: +// +// - emitter is the emitter that will emit log events; +// +// - logLevel is the maximum log level that will be emitted. +// +// Returns: +// +// - a properly configured instance of taskLogger. +// +// Remarks: +// +// - log levels are sorted as usual: ERR is more sever than +// WARNING, WARNING is more sever than INFO, etc. +func newTaskLogger(emitter taskEmitter, logLevel string) *taskLogger { + cl := &taskLogger{ + emitter: emitter, + } + switch logLevel { + case logLevelDebug, logLevelDebug2: + cl.hasDebug = true + fallthrough + case logLevelInfo: + cl.hasInfo = true + fallthrough + case logLevelErr, logLevelWarning: + fallthrough + default: + cl.hasWarning = true + } + return cl +} diff --git a/pkg/oonimkall/tasklogger_test.go b/pkg/oonimkall/tasklogger_test.go new file mode 100644 index 0000000..240c711 --- /dev/null +++ b/pkg/oonimkall/tasklogger_test.go @@ -0,0 +1,106 @@ +package oonimkall + +import ( + "testing" + + "github.com/ooni/probe-cli/v3/internal/engine/model" +) + +// +// This file contains tests for the taskLogger type. +// + +func TestTaskLogger(t *testing.T) { + // debugMessage is the debug message we expect to see. + debugMessage := "debug message" + + // infoMessage is the info message we expect to see. + infoMessage := "info message" + + // warningMessage is the warning message we expect to see. + warningMessage := "warning message" + + // emitMessages is an helper function for implementing this test. + emitMessages := func(logger model.Logger) { + logger.Debug(debugMessage) + logger.Debugf("%s", debugMessage) + logger.Info(infoMessage) + logger.Infof("%s", infoMessage) + logger.Warn(warningMessage) + logger.Warnf("%s", warningMessage) + } + + // convertEventsToLogEvents converts the generic events to + // logEvents and fails if this operation is not possible. + convertEventsToLogEvents := func(t *testing.T, in []*event) (out []eventLog) { + for _, ev := range in { + if ev.Key != eventTypeLog { + t.Fatalf("expected log event, found %s", ev.Key) + } + out = append(out, ev.Value.(eventLog)) + } + return + } + + // checkNumberOfEvents ensures we've the right number of events. + checkNumberOfEvents := func(t *testing.T, events []eventLog, expect int) { + if len(events) != expect { + t.Fatalf( + "invalid number of log events %d (expected %d)", + len(events), expect, + ) + } + } + + // matchEvent ensures the given event has the right level and message. + matchEvent := func(t *testing.T, event eventLog, level, msg string) { + if event.LogLevel != level { + t.Fatalf( + "invalid log level %s (expected %s)", + event.LogLevel, level, + ) + } + if event.Message != msg { + t.Fatalf( + "invalid log message '%s' (expected '%s')", + event.Message, msg, + ) + } + } + + t.Run("debug logger", func(t *testing.T) { + emitter := &CollectorTaskEmitter{} + logger := newTaskLogger(emitter, logLevelDebug) + emitMessages(logger) + logEvents := convertEventsToLogEvents(t, emitter.Collect()) + checkNumberOfEvents(t, logEvents, 6) + matchEvent(t, logEvents[0], logLevelDebug, debugMessage) + matchEvent(t, logEvents[1], logLevelDebug, debugMessage) + matchEvent(t, logEvents[2], logLevelInfo, infoMessage) + matchEvent(t, logEvents[3], logLevelInfo, infoMessage) + matchEvent(t, logEvents[4], logLevelWarning, warningMessage) + matchEvent(t, logEvents[5], logLevelWarning, warningMessage) + }) + + t.Run("info logger", func(t *testing.T) { + emitter := &CollectorTaskEmitter{} + logger := newTaskLogger(emitter, logLevelInfo) + emitMessages(logger) + logEvents := convertEventsToLogEvents(t, emitter.Collect()) + checkNumberOfEvents(t, logEvents, 4) + matchEvent(t, logEvents[0], logLevelInfo, infoMessage) + matchEvent(t, logEvents[1], logLevelInfo, infoMessage) + matchEvent(t, logEvents[2], logLevelWarning, warningMessage) + matchEvent(t, logEvents[3], logLevelWarning, warningMessage) + }) + + t.Run("warn logger", func(t *testing.T) { + emitter := &CollectorTaskEmitter{} + logger := newTaskLogger(emitter, logLevelWarning) + emitMessages(logger) + logEvents := convertEventsToLogEvents(t, emitter.Collect()) + checkNumberOfEvents(t, logEvents, 2) + matchEvent(t, logEvents[0], logLevelWarning, warningMessage) + matchEvent(t, logEvents[1], logLevelWarning, warningMessage) + }) +} diff --git a/pkg/oonimkall/taskmocks_test.go b/pkg/oonimkall/taskmocks_test.go new file mode 100644 index 0000000..8165916 --- /dev/null +++ b/pkg/oonimkall/taskmocks_test.go @@ -0,0 +1,222 @@ +package oonimkall + +import ( + "context" + "errors" + "sync" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/engine/model" +) + +// +// This file contains mocks for types used by tasks. Because +// we only use mocks when testing, this file is a `_test.go` file. +// + +// CollectorTaskEmitter is a thread-safe taskEmitter +// that stores all the events inside itself. +type CollectorTaskEmitter struct { + // events contains the events + events []*event + + // mu provides mutual exclusion + mu sync.Mutex +} + +// ensures that a CollectorTaskEmitter is a taskEmitter. +var _ taskEmitter = &CollectorTaskEmitter{} + +// Emit implements the taskEmitter.Emit method. +func (e *CollectorTaskEmitter) Emit(key string, value interface{}) { + e.mu.Lock() + e.events = append(e.events, &event{Key: key, Value: value}) + e.mu.Unlock() +} + +// Collect returns a copy of the collected events. It is safe +// to read the events. It's a data race to modify them. +// +// After this function has been called, the internal array +// of events will now be empty. +func (e *CollectorTaskEmitter) Collect() (out []*event) { + e.mu.Lock() + out = e.events + e.events = nil + e.mu.Unlock() + return +} + +// SessionBuilderConfigSaver is a session builder that +// saves the received config and returns an error. +type SessionBuilderConfigSaver struct { + Config engine.SessionConfig +} + +var _ taskSessionBuilder = &SessionBuilderConfigSaver{} + +func (b *SessionBuilderConfigSaver) NewSession( + ctx context.Context, config engine.SessionConfig) (taskSession, error) { + b.Config = config + return nil, errors.New("mocked error") +} + +// MockableTaskRunnerDependencies allows to mock all the +// dependencies of taskRunner using a single structure. +type MockableTaskRunnerDependencies struct { + + // taskSessionBuilder: + + MockNewSession func(ctx context.Context, + config engine.SessionConfig) (taskSession, error) + + // taskSession: + + MockClose func() error + MockNewExperimentBuilderByName func(name string) (taskExperimentBuilder, error) + MockMaybeLookupBackendsContext func(ctx context.Context) error + MockMaybeLookupLocationContext func(ctx context.Context) error + MockProbeIP func() string + MockProbeASNString func() string + MockProbeCC func() string + MockProbeNetworkName func() string + MockResolverASNString func() string + MockResolverIP func() string + MockResolverNetworkName func() string + + // taskExperimentBuilder: + + MockableSetCallbacks func(callbacks model.ExperimentCallbacks) + MockableInputPolicy func() engine.InputPolicy + MockableNewExperimentInstance func() taskExperiment + MockableInterruptible func() bool + + // taskExperiment: + + MockableKibiBytesReceived func() float64 + MockableKibiBytesSent func() float64 + MockableOpenReportContext func(ctx context.Context) error + MockableReportID func() string + MockableMeasureWithContext func(ctx context.Context, input string) ( + measurement *model.Measurement, err error) + MockableSubmitAndUpdateMeasurementContext func( + ctx context.Context, measurement *model.Measurement) error +} + +var ( + _ taskSessionBuilder = &MockableTaskRunnerDependencies{} + _ taskSession = &MockableTaskRunnerDependencies{} + _ taskExperimentBuilder = &MockableTaskRunnerDependencies{} + _ taskExperiment = &MockableTaskRunnerDependencies{} +) + +func (dep *MockableTaskRunnerDependencies) NewSession( + ctx context.Context, config engine.SessionConfig) (taskSession, error) { + if f := dep.MockNewSession; f != nil { + return f(ctx, config) + } + return dep, nil +} + +func (dep *MockableTaskRunnerDependencies) Close() error { + return dep.MockClose() +} + +func (dep *MockableTaskRunnerDependencies) NewExperimentBuilderByName(name string) (taskExperimentBuilder, error) { + if f := dep.MockNewExperimentBuilderByName; f != nil { + return f(name) + } + return dep, nil +} + +func (dep *MockableTaskRunnerDependencies) MaybeLookupBackendsContext(ctx context.Context) error { + return dep.MockMaybeLookupBackendsContext(ctx) +} + +func (dep *MockableTaskRunnerDependencies) MaybeLookupLocationContext(ctx context.Context) error { + return dep.MockMaybeLookupLocationContext(ctx) +} + +func (dep *MockableTaskRunnerDependencies) ProbeIP() string { + return dep.MockProbeIP() +} + +func (dep *MockableTaskRunnerDependencies) ProbeASNString() string { + return dep.MockProbeASNString() +} + +func (dep *MockableTaskRunnerDependencies) ProbeCC() string { + return dep.MockProbeCC() +} + +func (dep *MockableTaskRunnerDependencies) ProbeNetworkName() string { + return dep.MockProbeNetworkName() +} + +func (dep *MockableTaskRunnerDependencies) ResolverASNString() string { + return dep.MockResolverASNString() +} + +func (dep *MockableTaskRunnerDependencies) ResolverIP() string { + return dep.MockResolverIP() +} + +func (dep *MockableTaskRunnerDependencies) ResolverNetworkName() string { + return dep.MockResolverNetworkName() +} + +func (dep *MockableTaskRunnerDependencies) SetCallbacks(callbacks model.ExperimentCallbacks) { + dep.MockableSetCallbacks(callbacks) +} + +func (dep *MockableTaskRunnerDependencies) InputPolicy() engine.InputPolicy { + return dep.MockableInputPolicy() +} + +func (dep *MockableTaskRunnerDependencies) NewExperimentInstance() taskExperiment { + if f := dep.MockableNewExperimentInstance; f != nil { + return f() + } + return dep +} + +func (dep *MockableTaskRunnerDependencies) Interruptible() bool { + return dep.MockableInterruptible() +} + +func (dep *MockableTaskRunnerDependencies) KibiBytesReceived() float64 { + return dep.MockableKibiBytesReceived() +} + +func (dep *MockableTaskRunnerDependencies) KibiBytesSent() float64 { + return dep.MockableKibiBytesSent() +} + +func (dep *MockableTaskRunnerDependencies) OpenReportContext(ctx context.Context) error { + return dep.MockableOpenReportContext(ctx) +} + +func (dep *MockableTaskRunnerDependencies) ReportID() string { + return dep.MockableReportID() +} + +func (dep *MockableTaskRunnerDependencies) MeasureWithContext(ctx context.Context, input string) ( + measurement *model.Measurement, err error) { + return dep.MockableMeasureWithContext(ctx, input) +} + +func (dep *MockableTaskRunnerDependencies) SubmitAndUpdateMeasurementContext( + ctx context.Context, measurement *model.Measurement) error { + return dep.MockableSubmitAndUpdateMeasurementContext(ctx, measurement) +} + +// MockableKVStoreFSBuilder is a mockable taskKVStoreFSBuilder. +type MockableKVStoreFSBuilder struct { + MockNewFS func(path string) (model.KeyValueStore, error) +} + +var _ taskKVStoreFSBuilder = &MockableKVStoreFSBuilder{} + +func (m *MockableKVStoreFSBuilder) NewFS(path string) (model.KeyValueStore, error) { + return m.MockNewFS(path) +} diff --git a/pkg/oonimkall/taskmodel.go b/pkg/oonimkall/taskmodel.go new file mode 100644 index 0000000..5b75930 --- /dev/null +++ b/pkg/oonimkall/taskmodel.go @@ -0,0 +1,374 @@ +package oonimkall + +import ( + "context" + "io" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/engine/model" +) + +// +// Task Model +// +// The oonimkall package allows you to run OONI network +// experiments as "tasks". This file defines all the +// underlying model entailed by running such tasks. +// +// Logging +// +// This section of the file defines the types and the +// interfaces required to implement logging. +// +// The rest of the codebase will use a generic model.Logger +// as a logger. This is a pretty fundamental interface in +// ooni/probe-cli and so it's not defined in this file. +// + +const taskABIVersion = 1 + +// Running tasks emit logs using different log levels. We +// define log levels with the usual semantics. +// +// The logger used by a task _may_ be configured to not +// emit log events that are less severe than a given +// severity. +// +// We use the following definitions both for defining the +// log level of a log and for configuring the maximum +// acceptable log level emitted by a logger. +const ( + logLevelDebug2 = "DEBUG2" + logLevelDebug = "DEBUG" + logLevelInfo = "INFO" + logLevelErr = "ERR" + logLevelWarning = "WARNING" +) + +// +// Emitting Events +// +// While it is running, a task emits events. This section +// of the file defines the types needed to emit events. +// + +// type of emitted events. +const ( + eventTypeFailureIPLookup = "failure.ip_lookup" + eventTypeFailureASNLookup = "failure.asn_lookup" + eventTypeFailureCCLookup = "failure.cc_lookup" + eventTypeFailureMeasurement = "failure.measurement" + eventTypeFailureMeasurementSubmission = "failure.measurement_submission" + eventTypeFailureReportCreate = "failure.report_create" + eventTypeFailureResolverLookup = "failure.resolver_lookup" + eventTypeFailureStartup = "failure.startup" + eventTypeLog = "log" + eventTypeMeasurement = "measurement" + eventTypeStatusEnd = "status.end" + eventTypeStatusGeoIPLookup = "status.geoip_lookup" + eventTypeStatusMeasurementDone = "status.measurement_done" + eventTypeStatusMeasurementStart = "status.measurement_start" + eventTypeStatusMeasurementSubmission = "status.measurement_submission" + eventTypeStatusProgress = "status.progress" + eventTypeStatusQueued = "status.queued" + eventTypeStatusReportCreate = "status.report_create" + eventTypeStatusResolverLookup = "status.resolver_lookup" + eventTypeStatusStarted = "status.started" +) + +// taskEmitter is anything that allows us to +// emit events while running a task. +// +// Note that a task emitter _may_ be configured +// to ignore _some_ events though. +type taskEmitter interface { + // Emit emits the event (unless the emitter is + // configured to ignore this event key). + Emit(key string, value interface{}) +} + +// taskEmitterCloser is a closeable taskEmitter. +type taskEmitterCloser interface { + taskEmitter + io.Closer +} + +type eventEmpty struct{} + +// eventFailure contains information on a failure. +type eventFailure struct { + Failure string `json:"failure"` +} + +// eventLog is an event containing a log message. +type eventLog struct { + LogLevel string `json:"log_level"` + Message string `json:"message"` +} + +type eventMeasurementGeneric struct { + Failure string `json:"failure,omitempty"` + Idx int64 `json:"idx"` + Input string `json:"input"` + JSONStr string `json:"json_str,omitempty"` +} + +type eventStatusEnd struct { + DownloadedKB float64 `json:"downloaded_kb"` + Failure string `json:"failure"` + UploadedKB float64 `json:"uploaded_kb"` +} + +type eventStatusGeoIPLookup struct { + ProbeASN string `json:"probe_asn"` + ProbeCC string `json:"probe_cc"` + ProbeIP string `json:"probe_ip"` + ProbeNetworkName string `json:"probe_network_name"` +} + +// eventStatusProgress reports progress information. +type eventStatusProgress struct { + Message string `json:"message"` + Percentage float64 `json:"percentage"` +} + +type eventStatusReportGeneric struct { + ReportID string `json:"report_id"` +} + +type eventStatusResolverLookup struct { + ResolverASN string `json:"resolver_asn"` + ResolverIP string `json:"resolver_ip"` + ResolverNetworkName string `json:"resolver_network_name"` +} + +// event is an event emitted by a task. This structure extends the event +// described by MK v0.10.9 FFI API (https://git.io/Jv4Rv). +type event struct { + Key string `json:"key"` + Value interface{} `json:"value"` +} + +// +// OONI Session +// +// For performing several operations, including running +// experiments, we need to create an OONI session. +// +// This section of the file defines the interface between +// our oonimkall API and the real session. +// +// The abstraction representing a OONI session is taskSession. +// + +// taskKVStoreFSBuilder constructs a KVStore with +// filesystem backing for running tests. +type taskKVStoreFSBuilder interface { + // NewFS creates a new KVStore using the filesystem. + NewFS(path string) (model.KeyValueStore, error) +} + +// taskSessionBuilder constructs a new Session. +type taskSessionBuilder interface { + // NewSession creates a new taskSession. + NewSession(ctx context.Context, + config engine.SessionConfig) (taskSession, error) +} + +// taskSession abstracts a OONI session. +type taskSession interface { + // A session can be closed. + io.Closer + + // NewExperimentBuilderByName creates the builder for constructing + // a new experiment given the experiment's name. + NewExperimentBuilderByName(name string) (taskExperimentBuilder, error) + + // MaybeLookupBackendsContext lookups the OONI backend unless + // this operation has already been performed. + MaybeLookupBackendsContext(ctx context.Context) error + + // MaybeLookupLocationContext lookups the probe location unless + // this operation has already been performed. + MaybeLookupLocationContext(ctx context.Context) error + + // ProbeIP must be called after MaybeLookupLocationContext + // and returns the resolved probe IP. + ProbeIP() string + + // ProbeASNString must be called after MaybeLookupLocationContext + // and returns the resolved probe ASN as a string. + ProbeASNString() string + + // ProbeCC must be called after MaybeLookupLocationContext + // and returns the resolved probe country code. + ProbeCC() string + + // ProbeNetworkName must be called after MaybeLookupLocationContext + // and returns the resolved probe country code. + ProbeNetworkName() string + + // ResolverANSString must be called after MaybeLookupLocationContext + // and returns the resolved resolver's ASN as a string. + ResolverASNString() string + + // ResolverIP must be called after MaybeLookupLocationContext + // and returns the resolved resolver's IP. + ResolverIP() string + + // ResolverNetworkName must be called after MaybeLookupLocationContext + // and returns the resolved resolver's network name. + ResolverNetworkName() string +} + +// taskExperimentBuilder builds a taskExperiment. +type taskExperimentBuilder interface { + // SetCallbacks sets the experiment callbacks. + SetCallbacks(callbacks model.ExperimentCallbacks) + + // InputPolicy returns the experiment's input policy. + InputPolicy() engine.InputPolicy + + // NewExperiment creates the new experiment. + NewExperimentInstance() taskExperiment + + // Interruptible returns whether this experiment is interruptible. + Interruptible() bool +} + +// taskExperiment is a runnable experiment. +type taskExperiment interface { + // KibiBytesReceived returns the KiB received by the experiment. + KibiBytesReceived() float64 + + // KibiBytesSent returns the KiB sent by the experiment. + KibiBytesSent() float64 + + // OpenReportContext opens a new report. + OpenReportContext(ctx context.Context) error + + // ReportID must be called after a successful OpenReportContext + // and returns the report ID for this measurement. + ReportID() string + + // MeasureWithContext runs the measurement. + MeasureWithContext(ctx context.Context, input string) ( + measurement *model.Measurement, err error) + + // SubmitAndUpdateMeasurementContext submits the measurement + // and updates its report ID on success. + SubmitAndUpdateMeasurementContext( + ctx context.Context, measurement *model.Measurement) error +} + +// +// Task Running +// +// This section contains the interfaces allowing us +// to run a task until completion. +// + +// taskRunner runs a task until completion. +type taskRunner interface { + // Run runs until completion. + Run(ctx context.Context) +} + +// +// Task Settings +// +// This section defines the settings used by a task. +// + +// Settings contains settings for a task. This structure derives from +// the one described by MK v0.10.9 FFI API (https://git.io/Jv4Rv), yet +// since 2020-12-03 we're not backwards compatible anymore. +type settings struct { + // Annotations contains the annotations to be added + // to every measurements performed by the task. + Annotations map[string]string `json:"annotations,omitempty"` + + // AssetsDir is the directory where to store assets. This + // field is an extension of MK's specification. If + // this field is empty, the task won't start. + AssetsDir string `json:"assets_dir"` + + // DisabledEvents contains disabled events. See + // https://git.io/Jv4Rv for the events names. + // + // This setting is currently ignored. We noticed the + // code was ignoring it on 2021-12-01. + DisabledEvents []string `json:"disabled_events,omitempty"` + + // Inputs contains the inputs. The task will fail if it + // requires input and you provide no input. + Inputs []string `json:"inputs,omitempty"` + + // LogLevel contains the logs level. See https://git.io/Jv4Rv + // for the names of the available log levels. + LogLevel string `json:"log_level,omitempty"` + + // Name contains the task name. By https://git.io/Jv4Rv the + // names are in camel case, e.g. `Ndt`. + Name string `json:"name"` + + // Options contains the task options. + Options settingsOptions `json:"options"` + + // Proxy allows you to optionally force a specific proxy + // rather than using no proxy (the default). + // + // Use `psiphon:///` to force using Psiphon with the + // embedded configuration file. Not all builds have + // an embedded configuration file, but OONI builds have + // such a file, so they can use this functionality. + // + // Use `socks5://10.0.0.1:9050/` to connect to a SOCKS5 + // proxy running on 10.0.0.1:9050. This could be, for + // example, a suitably configured `tor` instance. + Proxy string + + // StateDir is the directory where to store persistent data. This + // field is an extension of MK's specification. If + // this field is empty, the task won't start. + StateDir string `json:"state_dir"` + + // TempDir is the temporary directory. This field is an extension of MK's + // specification. If this field is empty, we will pick the tempdir that + // ioutil.TempDir uses by default, which may not work on mobile. According + // to our experiments as of 2020-06-10, leaving the TempDir empty works + // for iOS and does not work for Android. + TempDir string `json:"temp_dir"` + + // TunnelDir is the directory where to store persistent state + // related to circumvention tunnels. This directory is required + // only if you want to use the tunnels. Added since 3.10.0. + TunnelDir string `json:"tunnel_dir"` + + // Version indicates the version of this structure. + Version int64 `json:"version"` +} + +// settingsOptions contains the settings options +type settingsOptions struct { + // MaxRuntime is the maximum runtime expressed in seconds. A negative + // value for this field disables the maximum runtime. Using + // a zero value will also mean disabled. This is not the + // original behaviour of Measurement Kit, which used to run + // for zero time in such case. + MaxRuntime float64 `json:"max_runtime,omitempty"` + + // NoCollector indicates whether to use a collector + NoCollector bool `json:"no_collector,omitempty"` + + // ProbeServicesBaseURL contains the probe services base URL. + ProbeServicesBaseURL string `json:"probe_services_base_url,omitempty"` + + // SoftwareName is the software name. If this option is not + // present, then the library startup will fail. + SoftwareName string `json:"software_name,omitempty"` + + // SoftwareVersion is the software version. If this option is not + // present, then the library startup will fail. + SoftwareVersion string `json:"software_version,omitempty"` +} diff --git a/pkg/oonimkall/runner.go b/pkg/oonimkall/taskrunner.go similarity index 61% rename from pkg/oonimkall/runner.go rename to pkg/oonimkall/taskrunner.go index 316f115..da35737 100644 --- a/pkg/oonimkall/runner.go +++ b/pkg/oonimkall/taskrunner.go @@ -9,77 +9,46 @@ import ( "github.com/ooni/probe-cli/v3/internal/engine" "github.com/ooni/probe-cli/v3/internal/engine/model" - "github.com/ooni/probe-cli/v3/internal/kvstore" "github.com/ooni/probe-cli/v3/internal/runtimex" ) -const ( - failureIPLookup = "failure.ip_lookup" - failureASNLookup = "failure.asn_lookup" - failureCCLookup = "failure.cc_lookup" - failureMeasurement = "failure.measurement" - failureMeasurementSubmission = "failure.measurement_submission" - failureReportCreate = "failure.report_create" - failureResolverLookup = "failure.resolver_lookup" - failureStartup = "failure.startup" - measurement = "measurement" - statusEnd = "status.end" - statusGeoIPLookup = "status.geoip_lookup" - statusMeasurementDone = "status.measurement_done" - statusMeasurementStart = "status.measurement_start" - statusMeasurementSubmission = "status.measurement_submission" - statusProgress = "status.progress" - statusQueued = "status.queued" - statusReportCreate = "status.report_create" - statusResolverLookup = "status.resolver_lookup" - statusStarted = "status.started" -) - -// run runs the task specified by settings.Name until completion. This is the -// top-level API that should be called by oonimkall. -func run(ctx context.Context, settings *settings, out chan<- *event) { - eof := make(chan interface{}) - defer close(eof) // tell the emitter to not emit anymore. - r := newRunner(settings, out, eof) - r.Run(ctx) +// runnerForTask runs a specific task +type runnerForTask struct { + emitter *taskEmitterWrapper + kvStoreBuilder taskKVStoreFSBuilder + sessionBuilder taskSessionBuilder + settings *settings } -// runner runs a specific task -type runner struct { - emitter *eventEmitter - maybeLookupLocation func(*engine.Session) error - out chan<- *event - settings *settings -} +var _ taskRunner = &runnerForTask{} // newRunner creates a new task runner -func newRunner(settings *settings, out chan<- *event, eof <-chan interface{}) *runner { - return &runner{ - emitter: newEventEmitter(settings.DisabledEvents, out, eof), - out: out, - settings: settings, +func newRunner(settings *settings, emitter taskEmitter) *runnerForTask { + return &runnerForTask{ + emitter: &taskEmitterWrapper{emitter}, + kvStoreBuilder: &taskKVStoreFSBuilderEngine{}, + sessionBuilder: &taskSessionBuilderEngine{}, + settings: settings, } } // failureInvalidVersion is the failure returned when Version is invalid const failureInvalidVersion = "invalid Settings.Version number" -func (r *runner) hasUnsupportedSettings(logger *chanLogger) bool { - if r.settings.Version < 1 { +func (r *runnerForTask) hasUnsupportedSettings() bool { + if r.settings.Version < taskABIVersion { r.emitter.EmitFailureStartup(failureInvalidVersion) return true } return false } -func (r *runner) newsession(ctx context.Context, logger *chanLogger) (*engine.Session, error) { - kvstore, err := kvstore.NewFS(r.settings.StateDir) +func (r *runnerForTask) newsession(ctx context.Context, logger model.Logger) (taskSession, error) { + kvstore, err := r.kvStoreBuilder.NewFS(r.settings.StateDir) if err != nil { return nil, err } - // TODO(bassosimone): write tests for this functionality - // See https://github.com/ooni/probe/issues/1465. var proxyURL *url.URL if r.settings.Proxy != "" { var err error @@ -104,11 +73,11 @@ func (r *runner) newsession(ctx context.Context, logger *chanLogger) (*engine.Se Address: r.settings.Options.ProbeServicesBaseURL, }} } - return engine.NewSession(ctx, config) + return r.sessionBuilder.NewSession(ctx, config) } -func (r *runner) contextForExperiment( - ctx context.Context, builder *engine.ExperimentBuilder, +func (r *runnerForTask) contextForExperiment( + ctx context.Context, builder taskExperimentBuilder, ) context.Context { if builder.Interruptible() { return ctx @@ -117,11 +86,11 @@ func (r *runner) contextForExperiment( } type runnerCallbacks struct { - emitter *eventEmitter + emitter taskEmitter } func (cb *runnerCallbacks) OnProgress(percentage float64, message string) { - cb.emitter.Emit(statusProgress, eventStatusProgress{ + cb.emitter.Emit(eventTypeStatusProgress, eventStatusProgress{ Percentage: 0.4 + (percentage * 0.6), // open report is 40% Message: message, }) @@ -130,13 +99,14 @@ func (cb *runnerCallbacks) OnProgress(percentage float64, message string) { // Run runs the runner until completion. The context argument controls // when to stop when processing multiple inputs, as well as when to stop // experiments explicitly marked as interruptible. -func (r *runner) Run(ctx context.Context) { - logger := newChanLogger(r.emitter, r.settings.LogLevel, r.out) - r.emitter.Emit(statusQueued, eventEmpty{}) - if r.hasUnsupportedSettings(logger) { +func (r *runnerForTask) Run(ctx context.Context) { + var logger model.Logger = newTaskLogger(r.emitter, r.settings.LogLevel) + r.emitter.Emit(eventTypeStatusQueued, eventEmpty{}) + if r.hasUnsupportedSettings() { + // event failureStartup already emitted return } - r.emitter.Emit(statusStarted, eventEmpty{}) + r.emitter.Emit(eventTypeStatusStarted, eventEmpty{}) sess, err := r.newsession(ctx, logger) if err != nil { r.emitter.EmitFailureStartup(err.Error()) @@ -145,45 +115,39 @@ func (r *runner) Run(ctx context.Context) { endEvent := new(eventStatusEnd) defer func() { sess.Close() - r.emitter.Emit(statusEnd, endEvent) + r.emitter.Emit(eventTypeStatusEnd, endEvent) }() - builder, err := sess.NewExperimentBuilder(r.settings.Name) + builder, err := sess.NewExperimentBuilderByName(r.settings.Name) if err != nil { r.emitter.EmitFailureStartup(err.Error()) return } logger.Info("Looking up OONI backends... please, be patient") - if err := sess.MaybeLookupBackends(); err != nil { + if err := sess.MaybeLookupBackendsContext(ctx); err != nil { r.emitter.EmitFailureStartup(err.Error()) return } r.emitter.EmitStatusProgress(0.1, "contacted bouncer") logger.Info("Looking up your location... please, be patient") - maybeLookupLocation := r.maybeLookupLocation - if maybeLookupLocation == nil { - maybeLookupLocation = func(sess *engine.Session) error { - return sess.MaybeLookupLocation() - } - } - if err := maybeLookupLocation(sess); err != nil { - r.emitter.EmitFailureGeneric(failureIPLookup, err.Error()) - r.emitter.EmitFailureGeneric(failureASNLookup, err.Error()) - r.emitter.EmitFailureGeneric(failureCCLookup, err.Error()) - r.emitter.EmitFailureGeneric(failureResolverLookup, err.Error()) + if err := sess.MaybeLookupLocationContext(ctx); err != nil { + r.emitter.EmitFailureGeneric(eventTypeFailureIPLookup, err.Error()) + r.emitter.EmitFailureGeneric(eventTypeFailureASNLookup, err.Error()) + r.emitter.EmitFailureGeneric(eventTypeFailureCCLookup, err.Error()) + r.emitter.EmitFailureGeneric(eventTypeFailureResolverLookup, err.Error()) return } r.emitter.EmitStatusProgress(0.2, "geoip lookup") r.emitter.EmitStatusProgress(0.3, "resolver lookup") - r.emitter.Emit(statusGeoIPLookup, eventStatusGeoIPLookup{ + r.emitter.Emit(eventTypeStatusGeoIPLookup, eventStatusGeoIPLookup{ ProbeIP: sess.ProbeIP(), ProbeASN: sess.ProbeASNString(), ProbeCC: sess.ProbeCC(), ProbeNetworkName: sess.ProbeNetworkName(), }) - r.emitter.Emit(statusResolverLookup, eventStatusResolverLookup{ + r.emitter.Emit(eventTypeStatusResolverLookup, eventStatusResolverLookup{ ResolverASN: sess.ResolverASNString(), ResolverIP: sess.ResolverIP(), ResolverNetworkName: sess.ResolverNetworkName(), @@ -198,19 +162,19 @@ func (r *runner) Run(ctx context.Context) { } r.settings.Inputs = append(r.settings.Inputs, "") } - experiment := builder.NewExperiment() + experiment := builder.NewExperimentInstance() defer func() { endEvent.DownloadedKB = experiment.KibiBytesReceived() endEvent.UploadedKB = experiment.KibiBytesSent() }() if !r.settings.Options.NoCollector { logger.Info("Opening report... please, be patient") - if err := experiment.OpenReport(); err != nil { - r.emitter.EmitFailureGeneric(failureReportCreate, err.Error()) + if err := experiment.OpenReportContext(ctx); err != nil { + r.emitter.EmitFailureGeneric(eventTypeFailureReportCreate, err.Error()) return } r.emitter.EmitStatusProgress(0.4, "open report") - r.emitter.Emit(statusReportCreate, eventStatusReportGeneric{ + r.emitter.Emit(eventTypeStatusReportCreate, eventStatusReportGeneric{ ReportID: experiment.ReportID(), }) } @@ -242,7 +206,7 @@ func (r *runner) Run(ctx context.Context) { break } logger.Infof("Starting measurement with index %d", idx) - r.emitter.Emit(statusMeasurementStart, eventMeasurementGeneric{ + r.emitter.Emit(eventTypeStatusMeasurementStart, eventMeasurementGeneric{ Idx: int64(idx), Input: input, }) @@ -269,7 +233,7 @@ func (r *runner) Run(ctx context.Context) { } m.AddAnnotations(r.settings.Annotations) if err != nil { - r.emitter.Emit(failureMeasurement, eventMeasurementGeneric{ + r.emitter.Emit(eventTypeFailureMeasurement, eventMeasurementGeneric{ Failure: err.Error(), Idx: int64(idx), Input: input, @@ -282,14 +246,14 @@ func (r *runner) Run(ctx context.Context) { } data, err := json.Marshal(m) runtimex.PanicOnError(err, "measurement.MarshalJSON failed") - r.emitter.Emit(measurement, eventMeasurementGeneric{ + r.emitter.Emit(eventTypeMeasurement, eventMeasurementGeneric{ Idx: int64(idx), Input: input, JSONStr: string(data), }) if !r.settings.Options.NoCollector { logger.Info("Submitting measurement... please, be patient") - err := experiment.SubmitAndUpdateMeasurement(m) + err := experiment.SubmitAndUpdateMeasurementContext(ctx, m) r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{ Idx: int64(idx), Input: input, @@ -297,7 +261,7 @@ func (r *runner) Run(ctx context.Context) { Failure: measurementSubmissionFailure(err), }) } - r.emitter.Emit(statusMeasurementDone, eventMeasurementGeneric{ + r.emitter.Emit(eventTypeStatusMeasurementDone, eventMeasurementGeneric{ Idx: int64(idx), Input: input, }) @@ -306,9 +270,9 @@ func (r *runner) Run(ctx context.Context) { func measurementSubmissionEventName(err error) string { if err != nil { - return failureMeasurementSubmission + return eventTypeFailureMeasurementSubmission } - return statusMeasurementSubmission + return eventTypeStatusMeasurementSubmission } func measurementSubmissionFailure(err error) string { diff --git a/pkg/oonimkall/taskrunner_test.go b/pkg/oonimkall/taskrunner_test.go new file mode 100644 index 0000000..fc34f19 --- /dev/null +++ b/pkg/oonimkall/taskrunner_test.go @@ -0,0 +1,741 @@ +package oonimkall + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + engine "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/engine/model" +) + +func TestMeasurementSubmissionEventName(t *testing.T) { + if measurementSubmissionEventName(nil) != eventTypeStatusMeasurementSubmission { + t.Fatal("unexpected submission event name") + } + if measurementSubmissionEventName(errors.New("mocked error")) != eventTypeFailureMeasurementSubmission { + t.Fatal("unexpected submission event name") + } +} + +func TestMeasurementSubmissionFailure(t *testing.T) { + if measurementSubmissionFailure(nil) != "" { + t.Fatal("unexpected submission failure") + } + if measurementSubmissionFailure(errors.New("mocked error")) != "mocked error" { + t.Fatal("unexpected submission failure") + } +} + +func TestTaskRunnerRun(t *testing.T) { + + // newRunnerForTesting is a factory for creating a new + // runner that wraps newRunner and also sets a specific + // taskSessionBuilder for testing purposes. + newRunnerForTesting := func() (*runnerForTask, *CollectorTaskEmitter) { + settings := &settings{ + Name: "Example", + Options: settingsOptions{ + SoftwareName: "oonimkall-test", + SoftwareVersion: "0.1.0", + }, + StateDir: "testdata/state", + Version: 1, + } + e := &CollectorTaskEmitter{} + r := newRunner(settings, e) + return r, e + } + + // runAndCollectContext runs the task until completion + // and collects the emitted events. Remember that + // it's not race safe to modify the events. + runAndCollectContext := func(ctx context.Context, r taskRunner, e *CollectorTaskEmitter) []*event { + r.Run(ctx) + return e.Collect() + } + + // runAndCollect is like runAndCollectContext + // but uses context.Background() + runAndCollect := func(r taskRunner, e *CollectorTaskEmitter) []*event { + return runAndCollectContext(context.Background(), r, e) + } + + // countEventsByKey returns the number of events + // with the given key inside of the list. + countEventsByKey := func(events []*event, key string) (count int) { + for _, ev := range events { + if ev.Key == key { + count++ + } + } + return + } + + // assertCountEventsByKey fails is the number of events + // of the given type is not the expected one. + assertCountEventsByKey := func(events []*event, key string, count int) { + if countEventsByKey(events, key) != count { + t.Fatalf("unexpected number of '%s' events", key) + } + } + + t.Run("with unsupported settings", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + runner.settings.Version = 0 // force unsupported version + events := runAndCollect(runner, emitter) + assertCountEventsByKey(events, eventTypeFailureStartup, 1) + }) + + t.Run("with failure when creating a new kvstore", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + // override the kvstore builder to provoke an error + runner.kvStoreBuilder = &MockableKVStoreFSBuilder{ + MockNewFS: func(path string) (model.KeyValueStore, error) { + return nil, errors.New("generic error") + }, + } + events := runAndCollect(runner, emitter) + assertCountEventsByKey(events, eventTypeFailureStartup, 1) + }) + + t.Run("with unparsable proxyURL", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + runner.settings.Proxy = "\t" // invalid proxy URL + events := runAndCollect(runner, emitter) + assertCountEventsByKey(events, eventTypeFailureStartup, 1) + }) + + t.Run("with a parsable proxyURL", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + // set a valid URL + runner.settings.Proxy = "https://127.0.0.1/" + // set a fake session builder that causes the startup to + // fail but records the config passed to NewSession + saver := &SessionBuilderConfigSaver{} + runner.sessionBuilder = saver + events := runAndCollect(runner, emitter) + assertCountEventsByKey(events, eventTypeFailureStartup, 1) + if saver.Config.ProxyURL.String() != runner.settings.Proxy { + t.Fatal("invalid proxy URL") + } + }) + + t.Run("with custom probe services URL", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + // set a probe services URL + runner.settings.Options.ProbeServicesBaseURL = "https://127.0.0.1" + // set a fake session builder that causes the startup to + // fail but records the config passed to NewSession + saver := &SessionBuilderConfigSaver{} + runner.sessionBuilder = saver + events := runAndCollect(runner, emitter) + assertCountEventsByKey(events, eventTypeFailureStartup, 1) + psu := saver.Config.AvailableProbeServices + if len(psu) != 1 { + t.Fatal("invalid length") + } + if psu[0].Type != "https" { + t.Fatal("invalid type") + } + if psu[0].Address != runner.settings.Options.ProbeServicesBaseURL { + t.Fatal("invalid address") + } + if psu[0].Front != "" { + t.Fatal("invalid front") + } + }) + + type eventKeyCount struct { + Key string + Count int + } + + // reduceEventsKeysIgnoreLog reduces the list of event keys + // counting equal subsequent keys and ignoring log events + reduceEventsKeysIgnoreLog := func(events []*event) (out []eventKeyCount) { + var current eventKeyCount + for _, ev := range events { + if ev.Key == eventTypeLog { + continue + } + if current.Key == ev.Key { + current.Count++ + continue + } + if current.Key != "" { + out = append(out, current) + } + current.Key = ev.Key + current.Count = 1 + } + if current.Key != "" { + out = append(out, current) + } + return + } + + // fakeSuccessfulRun returns a new set of dependencies that + // will perform a fully successful, but fake, run. + fakeSuccessfulRun := func() *MockableTaskRunnerDependencies { + return &MockableTaskRunnerDependencies{ + MockableKibiBytesReceived: func() float64 { + return 10 + }, + MockableKibiBytesSent: func() float64 { + return 4 + }, + MockableOpenReportContext: func(ctx context.Context) error { + return nil + }, + MockableReportID: func() string { + return "20211202T074907Z_example_IT_30722_n1_axDLHNUfJaV1IbuU" + }, + MockableMeasureWithContext: func(ctx context.Context, input string) (*model.Measurement, error) { + return &model.Measurement{}, nil + }, + MockableSubmitAndUpdateMeasurementContext: func(ctx context.Context, measurement *model.Measurement) error { + return nil + }, + MockableSetCallbacks: func(callbacks model.ExperimentCallbacks) { + }, + MockableInputPolicy: func() engine.InputPolicy { + return engine.InputNone + }, + MockableInterruptible: func() bool { + return false + }, + MockClose: func() error { + return nil + }, + MockMaybeLookupBackendsContext: func(ctx context.Context) error { + return nil + }, + MockMaybeLookupLocationContext: func(ctx context.Context) error { + return nil + }, + MockProbeIP: func() string { + return "130.192.91.211" + }, + MockProbeASNString: func() string { + return "AS137" + }, + MockProbeCC: func() string { + return "IT" + }, + MockProbeNetworkName: func() string { + return "GARR" + }, + MockResolverASNString: func() string { + return "AS137" + }, + MockResolverIP: func() string { + return "130.192.3.24" + }, + MockResolverNetworkName: func() string { + return "GARR" + }, + } + } + + assertReducedEventsLike := func(t *testing.T, expected, got []eventKeyCount) { + if diff := cmp.Diff(expected, got); diff != "" { + t.Fatal(diff) + } + } + + t.Run("with invalid experiment name", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockNewExperimentBuilderByName = func(name string) (taskExperimentBuilder, error) { + return nil, errors.New("invalid experiment name") + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeFailureStartup, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with error during backends lookup", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockMaybeLookupBackendsContext = func(ctx context.Context) error { + return errors.New("mocked error") + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeFailureStartup, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with error during location lookup", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockMaybeLookupLocationContext = func(ctx context.Context) error { + return errors.New("mocked error") + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 1, + }, { + Key: eventTypeFailureIPLookup, + Count: 1, + }, { + Key: eventTypeFailureASNLookup, + Count: 1, + }, { + Key: eventTypeFailureCCLookup, + Count: 1, + }, { + Key: eventTypeFailureResolverLookup, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with missing input and input or query backend", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockableInputPolicy = func() engine.InputPolicy { + return engine.InputOrQueryBackend + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 3, + }, { + Key: eventTypeStatusGeoIPLookup, + Count: 1, + }, { + Key: eventTypeStatusResolverLookup, + Count: 1, + }, { + Key: eventTypeFailureStartup, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with missing input and input strictly required", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockableInputPolicy = func() engine.InputPolicy { + return engine.InputStrictlyRequired + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 3, + }, { + Key: eventTypeStatusGeoIPLookup, + Count: 1, + }, { + Key: eventTypeStatusResolverLookup, + Count: 1, + }, { + Key: eventTypeFailureStartup, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with failure opening report", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockableOpenReportContext = func(ctx context.Context) error { + return errors.New("mocked error") + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 3, + }, { + Key: eventTypeStatusGeoIPLookup, + Count: 1, + }, { + Key: eventTypeStatusResolverLookup, + Count: 1, + }, { + Key: eventTypeFailureReportCreate, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with success and no input", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 3, + }, { + Key: eventTypeStatusGeoIPLookup, + Count: 1, + }, { + Key: eventTypeStatusResolverLookup, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 1, + }, { + Key: eventTypeStatusReportCreate, + Count: 1, + }, { + Key: eventTypeStatusMeasurementStart, + Count: 1, + }, { + Key: eventTypeMeasurement, + Count: 1, + }, { + Key: eventTypeStatusMeasurementSubmission, + Count: 1, + }, { + Key: eventTypeStatusMeasurementDone, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with measurement failure and no input", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockableMeasureWithContext = func(ctx context.Context, input string) (measurement *model.Measurement, err error) { + return nil, errors.New("preconditions error") + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 3, + }, { + Key: eventTypeStatusGeoIPLookup, + Count: 1, + }, { + Key: eventTypeStatusResolverLookup, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 1, + }, { + Key: eventTypeStatusReportCreate, + Count: 1, + }, { + Key: eventTypeStatusMeasurementStart, + Count: 1, + }, { + Key: eventTypeFailureMeasurement, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with success and input", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + runner.settings.Inputs = []string{"a", "b", "c", "d"} + fake := fakeSuccessfulRun() + fake.MockableInputPolicy = func() engine.InputPolicy { + return engine.InputStrictlyRequired + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{ + {Key: eventTypeStatusQueued, Count: 1}, + {Key: eventTypeStatusStarted, Count: 1}, + {Key: eventTypeStatusProgress, Count: 3}, + {Key: eventTypeStatusGeoIPLookup, Count: 1}, + {Key: eventTypeStatusResolverLookup, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeStatusReportCreate, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeStatusMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeStatusMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeStatusMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeStatusMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusEnd, Count: 1}, + } + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with succes and max runtime", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + runner.settings.Inputs = []string{"a", "b", "c", "d"} + runner.settings.Options.MaxRuntime = 2 + fake := fakeSuccessfulRun() + fake.MockableInputPolicy = func() engine.InputPolicy { + return engine.InputStrictlyRequired + } + fake.MockableMeasureWithContext = func(ctx context.Context, input string) (measurement *model.Measurement, err error) { + time.Sleep(1 * time.Second) + return &model.Measurement{}, nil + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{ + {Key: eventTypeStatusQueued, Count: 1}, + {Key: eventTypeStatusStarted, Count: 1}, + {Key: eventTypeStatusProgress, Count: 3}, + {Key: eventTypeStatusGeoIPLookup, Count: 1}, + {Key: eventTypeStatusResolverLookup, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeStatusReportCreate, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeStatusMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeStatusMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusEnd, Count: 1}, + } + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with interrupted experiment", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + runner.settings.Inputs = []string{"a", "b", "c", "d"} + runner.settings.Options.MaxRuntime = 2 + fake := fakeSuccessfulRun() + fake.MockableInputPolicy = func() engine.InputPolicy { + return engine.InputStrictlyRequired + } + fake.MockableInterruptible = func() bool { + return true + } + ctx, cancel := context.WithCancel(context.Background()) + fake.MockableMeasureWithContext = func(ctx context.Context, input string) (measurement *model.Measurement, err error) { + cancel() + return &model.Measurement{}, nil + } + runner.sessionBuilder = fake + events := runAndCollectContext(ctx, runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{ + {Key: eventTypeStatusQueued, Count: 1}, + {Key: eventTypeStatusStarted, Count: 1}, + {Key: eventTypeStatusProgress, Count: 3}, + {Key: eventTypeStatusGeoIPLookup, Count: 1}, + {Key: eventTypeStatusResolverLookup, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeStatusReportCreate, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + // + {Key: eventTypeStatusEnd, Count: 1}, + } + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with measurement submission failure", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + runner.settings.Inputs = []string{"a"} + fake := fakeSuccessfulRun() + fake.MockableInputPolicy = func() engine.InputPolicy { + return engine.InputStrictlyRequired + } + fake.MockableSubmitAndUpdateMeasurementContext = func(ctx context.Context, measurement *model.Measurement) error { + return errors.New("cannot submit") + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{ + {Key: eventTypeStatusQueued, Count: 1}, + {Key: eventTypeStatusStarted, Count: 1}, + {Key: eventTypeStatusProgress, Count: 3}, + {Key: eventTypeStatusGeoIPLookup, Count: 1}, + {Key: eventTypeStatusResolverLookup, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeStatusReportCreate, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeFailureMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusEnd, Count: 1}, + } + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with success and progress", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + var callbacks model.ExperimentCallbacks + fake.MockableSetCallbacks = func(cbs model.ExperimentCallbacks) { + callbacks = cbs + } + fake.MockableMeasureWithContext = func(ctx context.Context, input string) (measurement *model.Measurement, err error) { + callbacks.OnProgress(1, "hello from the fake experiment") + return &model.Measurement{}, nil + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 3, + }, { + Key: eventTypeStatusGeoIPLookup, + Count: 1, + }, { + Key: eventTypeStatusResolverLookup, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 1, + }, { + Key: eventTypeStatusReportCreate, + Count: 1, + }, { + Key: eventTypeStatusMeasurementStart, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 1, + }, { + Key: eventTypeMeasurement, + Count: 1, + }, { + Key: eventTypeStatusMeasurementSubmission, + Count: 1, + }, { + Key: eventTypeStatusMeasurementDone, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) +} diff --git a/pkg/oonimkall/tasksession.go b/pkg/oonimkall/tasksession.go new file mode 100644 index 0000000..c6d31f7 --- /dev/null +++ b/pkg/oonimkall/tasksession.go @@ -0,0 +1,78 @@ +package oonimkall + +import ( + "context" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/engine/model" + "github.com/ooni/probe-cli/v3/internal/kvstore" +) + +// +// This file implements taskSession and derived types. +// + +// taskKVStoreFSBuilderEngine creates a new KVStore +// using the ./internal/engine package. +type taskKVStoreFSBuilderEngine struct{} + +var _ taskKVStoreFSBuilder = &taskKVStoreFSBuilderEngine{} + +func (*taskKVStoreFSBuilderEngine) NewFS(path string) (model.KeyValueStore, error) { + return kvstore.NewFS(path) +} + +// taskSessionBuilderEngine builds a new session +// using the ./internal/engine package. +type taskSessionBuilderEngine struct{} + +var _ taskSessionBuilder = &taskSessionBuilderEngine{} + +// NewSession implements taskSessionBuilder.NewSession. +func (b *taskSessionBuilderEngine) NewSession(ctx context.Context, + config engine.SessionConfig) (taskSession, error) { + sess, err := engine.NewSession(ctx, config) + if err != nil { + return nil, err + } + return &taskSessionEngine{sess}, nil +} + +// taskSessionEngine wraps ./internal/engine's Session. +type taskSessionEngine struct { + *engine.Session +} + +var _ taskSession = &taskSessionEngine{} + +// NewExperimentBuilderByName implements +// taskSessionEngine.NewExperimentBuilderByName. +func (sess *taskSessionEngine) NewExperimentBuilderByName( + name string) (taskExperimentBuilder, error) { + builder, err := sess.NewExperimentBuilder(name) + if err != nil { + return nil, err + } + return &taskExperimentBuilderEngine{builder}, err +} + +// taskExperimentBuilderEngine wraps ./internal/engine's +// ExperimentBuilder type. +type taskExperimentBuilderEngine struct { + *engine.ExperimentBuilder +} + +var _ taskExperimentBuilder = &taskExperimentBuilderEngine{} + +// NewExperimentInstance implements +// taskExperimentBuilder.NewExperimentInstance. +func (b *taskExperimentBuilderEngine) NewExperimentInstance() taskExperiment { + return &taskExperimentEngine{b.NewExperiment()} +} + +// taskExperimentEngine wraps ./internal/engine's Experiment. +type taskExperimentEngine struct { + *engine.Experiment +} + +var _ taskExperiment = &taskExperimentEngine{} diff --git a/pkg/oonimkall/tasksession_test.go b/pkg/oonimkall/tasksession_test.go new file mode 100644 index 0000000..f525491 --- /dev/null +++ b/pkg/oonimkall/tasksession_test.go @@ -0,0 +1,128 @@ +package oonimkall + +import ( + "context" + "testing" + + "github.com/apex/log" + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/version" +) + +func TestTaskKVSToreFSBuilderEngine(t *testing.T) { + b := &taskKVStoreFSBuilderEngine{} + store, err := b.NewFS("testdata/state") + if err != nil { + t.Fatal(err) + } + if store == nil { + t.Fatal("expected non-nil store here") + } +} + +func TestTaskSessionBuilderEngine(t *testing.T) { + t.Run("NewSession", func(t *testing.T) { + t.Run("on success", func(t *testing.T) { + builder := &taskSessionBuilderEngine{} + ctx := context.Background() + config := engine.SessionConfig{ + Logger: log.Log, + SoftwareName: "ooniprobe-cli", + SoftwareVersion: version.Version, + } + sess, err := builder.NewSession(ctx, config) + if err != nil { + t.Fatal(err) + } + sess.Close() + }) + + t.Run("on failure", func(t *testing.T) { + builder := &taskSessionBuilderEngine{} + ctx := context.Background() + config := engine.SessionConfig{} + sess, err := builder.NewSession(ctx, config) + if err == nil { + t.Fatal("expected an error here") + } + if sess != nil { + t.Fatal("expected nil session here") + } + }) + }) +} + +func TestTaskSessionEngine(t *testing.T) { + + // newSession is a helper function for creating a new session. + newSession := func(t *testing.T) taskSession { + builder := &taskSessionBuilderEngine{} + ctx := context.Background() + config := engine.SessionConfig{ + Logger: log.Log, + SoftwareName: "ooniprobe-cli", + SoftwareVersion: version.Version, + } + sess, err := builder.NewSession(ctx, config) + if err != nil { + t.Fatal(err) + } + return sess + } + + t.Run("NewExperimentBuilderByName", func(t *testing.T) { + t.Run("on success", func(t *testing.T) { + sess := newSession(t) + builder, err := sess.NewExperimentBuilderByName("ndt") + if err != nil { + t.Fatal(err) + } + if builder == nil { + t.Fatal("expected non-nil builder") + } + }) + + t.Run("on failure", func(t *testing.T) { + sess := newSession(t) + builder, err := sess.NewExperimentBuilderByName("antani") + if err == nil { + t.Fatal("expected an error here") + } + if builder != nil { + t.Fatal("expected nil builder") + } + }) + }) +} + +func TestTaskExperimentBuilderEngine(t *testing.T) { + + // newBuilder is a helper function for creating a new session + // as well as a new experiment builder + newBuilder := func(t *testing.T) (taskSession, taskExperimentBuilder) { + builder := &taskSessionBuilderEngine{} + ctx := context.Background() + config := engine.SessionConfig{ + Logger: log.Log, + SoftwareName: "ooniprobe-cli", + SoftwareVersion: version.Version, + } + sess, err := builder.NewSession(ctx, config) + if err != nil { + t.Fatal(err) + } + expBuilder, err := sess.NewExperimentBuilderByName("ndt") + if err != nil { + t.Fatal(err) + } + return sess, expBuilder + } + + t.Run("NewExperiment", func(t *testing.T) { + _, builder := newBuilder(t) + exp := builder.NewExperimentInstance() + if exp == nil { + t.Fatal("expected non-nil experiment here") + } + }) +}