diff --git a/internal/engine/allexperiments.go b/internal/engine/allexperiments.go index dd12d3d..62619b9 100644 --- a/internal/engine/allexperiments.go +++ b/internal/engine/allexperiments.go @@ -69,59 +69,6 @@ var experimentsByName = map[string]func(*Session) *ExperimentBuilder{ } }, - "example_with_input": func(session *Session) *ExperimentBuilder { - return &ExperimentBuilder{ - build: func(config interface{}) *Experiment { - return NewExperiment(session, example.NewExperimentMeasurer( - *config.(*example.Config), "example_with_input", - )) - }, - config: &example.Config{ - Message: "Good day from the example with input experiment!", - SleepTime: int64(time.Second), - }, - interruptible: true, - inputPolicy: InputStrictlyRequired, - } - }, - - // TODO(bassosimone): when we can set experiment options using the JSON - // we need to get rid of all these multiple experiments. - // - // See https://github.com/ooni/probe-engine/issues/413 - "example_with_input_non_interruptible": func(session *Session) *ExperimentBuilder { - return &ExperimentBuilder{ - build: func(config interface{}) *Experiment { - return NewExperiment(session, example.NewExperimentMeasurer( - *config.(*example.Config), "example_with_input_non_interruptible", - )) - }, - config: &example.Config{ - Message: "Good day from the example with input experiment!", - SleepTime: int64(time.Second), - }, - interruptible: false, - inputPolicy: InputStrictlyRequired, - } - }, - - "example_with_failure": func(session *Session) *ExperimentBuilder { - return &ExperimentBuilder{ - build: func(config interface{}) *Experiment { - return NewExperiment(session, example.NewExperimentMeasurer( - *config.(*example.Config), "example_with_failure", - )) - }, - config: &example.Config{ - Message: "Good day from the example with failure experiment!", - ReturnError: true, - SleepTime: int64(time.Second), - }, - interruptible: true, - inputPolicy: InputNone, - } - }, - "facebook_messenger": func(session *Session) *ExperimentBuilder { return &ExperimentBuilder{ build: func(config interface{}) *Experiment { diff --git a/internal/engine/experiment/dash/dash.go b/internal/engine/experiment/dash/dash.go index 3792de8..6414085 100644 --- a/internal/engine/experiment/dash/dash.go +++ b/internal/engine/experiment/dash/dash.go @@ -158,7 +158,7 @@ func (r runner) measure( // connecting or later. We cannot say that very precisely // because, in principle, we may reconnect. So we always // return error here. This comment is being introduced so - // that we don't do https://github.com/ooni/probe-cli/v3/internal/engine/pull/526 + // that we don't do https://github.com/ooni/probe-engine/pull/526 // again, because that isn't accurate. return err } diff --git a/internal/engine/legacy/netx/resolver_test.go b/internal/engine/legacy/netx/resolver_test.go index eba00bc..724194c 100644 --- a/internal/engine/legacy/netx/resolver_test.go +++ b/internal/engine/legacy/netx/resolver_test.go @@ -28,7 +28,7 @@ func testresolverquick(t *testing.T, network, address string) { } var foundquad8 bool for _, addr := range addrs { - // See https://github.com/ooni/probe-cli/v3/internal/engine/pull/954/checks?check_run_id=1182269025 + // See https://github.com/ooni/probe-engine/pull/954/checks?check_run_id=1182269025 if addr == "8.8.8.8" || addr == "2001:4860:4860::8888" { foundquad8 = true } diff --git a/internal/engine/netx/archival/archival_test.go b/internal/engine/netx/archival/archival_test.go index dfd0f52..f531e4d 100644 --- a/internal/engine/netx/archival/archival_test.go +++ b/internal/engine/netx/archival/archival_test.go @@ -206,7 +206,7 @@ func TestNewRequestList(t *testing.T) { }}, }, { // for an example of why we need to sort headers, see - // https://github.com/ooni/probe-cli/v3/internal/engine/pull/751/checks?check_run_id=853562310 + // https://github.com/ooni/probe-engine/pull/751/checks?check_run_id=853562310 name: "run with redirect and headers to sort", args: args{ begin: begin, diff --git a/internal/engine/netx/resolver/integration_test.go b/internal/engine/netx/resolver/integration_test.go index 280214e..6181e5e 100644 --- a/internal/engine/netx/resolver/integration_test.go +++ b/internal/engine/netx/resolver/integration_test.go @@ -32,7 +32,7 @@ func testresolverquick(t *testing.T, reso resolver.Resolver) { } var foundquad8 bool for _, addr := range addrs { - // See https://github.com/ooni/probe-cli/v3/internal/engine/pull/954/checks?check_run_id=1182269025 + // See https://github.com/ooni/probe-engine/pull/954/checks?check_run_id=1182269025 if addr == "8.8.8.8" || addr == "2001:4860:4860::8888" { foundquad8 = true } diff --git a/pkg/oonimkall/chanlogger.go b/pkg/oonimkall/chanlogger.go deleted file mode 100644 index 0f8b957..0000000 --- a/pkg/oonimkall/chanlogger.go +++ /dev/null @@ -1,85 +0,0 @@ -package oonimkall - -import "fmt" - -// chanLogger is a logger targeting a channel -type chanLogger struct { - emitter *eventEmitter - hasdebug bool - hasinfo bool - haswarning bool - out chan<- *event -} - -// Debug implements Logger.Debug -func (cl *chanLogger) Debug(msg string) { - if cl.hasdebug { - cl.emitter.Emit("log", eventLog{ - LogLevel: "DEBUG", - Message: msg, - }) - } -} - -// Debugf implements Logger.Debugf -func (cl *chanLogger) Debugf(format string, v ...interface{}) { - if cl.hasdebug { - cl.Debug(fmt.Sprintf(format, v...)) - } -} - -// Info implements Logger.Info -func (cl *chanLogger) Info(msg string) { - if cl.hasinfo { - cl.emitter.Emit("log", eventLog{ - LogLevel: "INFO", - Message: msg, - }) - } -} - -// Infof implements Logger.Infof -func (cl *chanLogger) Infof(format string, v ...interface{}) { - if cl.hasinfo { - cl.Info(fmt.Sprintf(format, v...)) - } -} - -// Warn implements Logger.Warn -func (cl *chanLogger) Warn(msg string) { - if cl.haswarning { - cl.emitter.Emit("log", eventLog{ - LogLevel: "WARNING", - Message: msg, - }) - } -} - -// Warnf implements Logger.Warnf -func (cl *chanLogger) Warnf(format string, v ...interface{}) { - if cl.haswarning { - cl.Warn(fmt.Sprintf(format, v...)) - } -} - -// newChanLogger creates a new ChanLogger instance. -func newChanLogger(emitter *eventEmitter, logLevel string, - out chan<- *event) *chanLogger { - cl := &chanLogger{ - emitter: emitter, - out: out, - } - switch logLevel { - case "DEBUG", "DEBUG2": - cl.hasdebug = true - fallthrough - case "INFO": - cl.hasinfo = true - fallthrough - case "ERR", "WARNING": - fallthrough - default: - cl.haswarning = true - } - return cl -} diff --git a/pkg/oonimkall/event.go b/pkg/oonimkall/event.go deleted file mode 100644 index 08e93f4..0000000 --- a/pkg/oonimkall/event.go +++ /dev/null @@ -1,57 +0,0 @@ -package oonimkall - -type eventEmpty struct{} - -// eventFailure contains information on a failure. -type eventFailure struct { - Failure string `json:"failure"` -} - -// eventLog is an event containing a log message. -type eventLog struct { - LogLevel string `json:"log_level"` - Message string `json:"message"` -} - -type eventMeasurementGeneric struct { - Failure string `json:"failure,omitempty"` - Idx int64 `json:"idx"` - Input string `json:"input"` - JSONStr string `json:"json_str,omitempty"` -} - -type eventStatusEnd struct { - DownloadedKB float64 `json:"downloaded_kb"` - Failure string `json:"failure"` - UploadedKB float64 `json:"uploaded_kb"` -} - -type eventStatusGeoIPLookup struct { - ProbeASN string `json:"probe_asn"` - ProbeCC string `json:"probe_cc"` - ProbeIP string `json:"probe_ip"` - ProbeNetworkName string `json:"probe_network_name"` -} - -// eventStatusProgress reports progress information. -type eventStatusProgress struct { - Message string `json:"message"` - Percentage float64 `json:"percentage"` -} - -type eventStatusReportGeneric struct { - ReportID string `json:"report_id"` -} - -type eventStatusResolverLookup struct { - ResolverASN string `json:"resolver_asn"` - ResolverIP string `json:"resolver_ip"` - ResolverNetworkName string `json:"resolver_network_name"` -} - -// event is an event emitted by a task. This structure extends the event -// described by MK v0.10.9 FFI API (https://git.io/Jv4Rv). -type event struct { - Key string `json:"key"` - Value interface{} `json:"value"` -} diff --git a/pkg/oonimkall/eventemitter.go b/pkg/oonimkall/eventemitter.go deleted file mode 100644 index 83b19b5..0000000 --- a/pkg/oonimkall/eventemitter.go +++ /dev/null @@ -1,47 +0,0 @@ -package oonimkall - -// eventEmitter emits event on a channel -type eventEmitter struct { - disabled map[string]bool - eof <-chan interface{} - out chan<- *event -} - -// newEventEmitter creates a new Emitter -func newEventEmitter(disabledEvents []string, out chan<- *event, - eof <-chan interface{}) *eventEmitter { - ee := &eventEmitter{eof: eof, out: out} - ee.disabled = make(map[string]bool) - for _, eventname := range disabledEvents { - ee.disabled[eventname] = true - } - return ee -} - -// EmitFailureStartup emits the failureStartup event -func (ee *eventEmitter) EmitFailureStartup(failure string) { - ee.EmitFailureGeneric(failureStartup, failure) -} - -// EmitFailureGeneric emits a failure event -func (ee *eventEmitter) EmitFailureGeneric(name, failure string) { - ee.Emit(name, eventFailure{Failure: failure}) -} - -// EmitStatusProgress emits the status.Progress event -func (ee *eventEmitter) EmitStatusProgress(percentage float64, message string) { - ee.Emit(statusProgress, eventStatusProgress{Message: message, Percentage: percentage}) -} - -// Emit emits the specified event -func (ee *eventEmitter) Emit(key string, value interface{}) { - if ee.disabled[key] { - return - } - // Prevent this goroutine from blocking on `ee.out` if the caller - // has already told us it's not going to accept more events. - select { - case ee.out <- &event{Key: key, Value: value}: - case <-ee.eof: - } -} diff --git a/pkg/oonimkall/eventemitter_test.go b/pkg/oonimkall/eventemitter_test.go deleted file mode 100644 index e0c18e5..0000000 --- a/pkg/oonimkall/eventemitter_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package oonimkall - -import "testing" - -func TestDisabledEvents(t *testing.T) { - out := make(chan *event) - eof := make(chan interface{}) - emitter := newEventEmitter([]string{"log"}, out, eof) - go func() { - emitter.Emit("log", eventLog{Message: "foo"}) - close(eof) - }() - var count int64 -Loop: - for { - select { - case ev := <-out: - if ev.Key == "log" { - count++ - } - case <-eof: - break Loop - } - } - if count > 0 { - t.Fatal("cannot disable events") - } -} - -func TestEmitFailureStartup(t *testing.T) { - out := make(chan *event) - eof := make(chan interface{}) - emitter := newEventEmitter([]string{}, out, eof) - go func() { - emitter.EmitFailureStartup("mocked error") - close(eof) - }() - var found bool -Loop: - for { - select { - case ev := <-out: - if ev.Key == "failure.startup" { - evv := ev.Value.(eventFailure) // panic if not castable - if evv.Failure == "mocked error" { - found = true - } - } - case <-eof: - break Loop - } - } - if !found { - t.Fatal("did not see expected event") - } -} - -func TestEmitStatusProgress(t *testing.T) { - out := make(chan *event) - eof := make(chan interface{}) - emitter := newEventEmitter([]string{}, out, eof) - go func() { - emitter.EmitStatusProgress(0.7, "foo") - close(eof) - }() - var found bool -Loop: - for { - select { - case ev := <-out: - if ev.Key == "status.progress" { - evv := ev.Value.(eventStatusProgress) // panic if not castable - if evv.Message == "foo" && evv.Percentage == 0.7 { - found = true - } - } - case <-eof: - break Loop - } - } - if !found { - t.Fatal("did not see expected event") - } -} diff --git a/pkg/oonimkall/runner_integration_test.go b/pkg/oonimkall/runner_integration_test.go deleted file mode 100644 index e4c2988..0000000 --- a/pkg/oonimkall/runner_integration_test.go +++ /dev/null @@ -1,402 +0,0 @@ -package oonimkall - -import ( - "context" - "fmt" - "net/http" - "net/http/httptest" - "sync" - "testing" - "time" -) - -func TestRunnerMaybeLookupBackendsFailure(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(500) - })) - defer server.Close() - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - Name: "Example", - Options: settingsOptions{ - ProbeServicesBaseURL: server.URL, - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var failures []string - for ev := range out { - switch ev.Key { - case "failure.startup": - failure := ev.Value.(eventFailure).Failure - failures = append(failures, failure) - case "status.queued", "status.started", "log", "status.end": - default: - panic(fmt.Sprintf("unexpected key: %s", ev.Key)) - } - } - if len(failures) != 1 { - t.Fatal("unexpected number of failures") - } - if failures[0] != "all available probe services failed" { - t.Fatal("invalid failure") - } -} - -func TestRunnerOpenReportFailure(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - var ( - nreq int64 - mu sync.Mutex - ) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - mu.Lock() - defer mu.Unlock() - nreq++ - if nreq == 1 { - w.Write([]byte(`{}`)) - return - } - w.WriteHeader(500) - })) - defer server.Close() - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - Name: "Example", - Options: settingsOptions{ - ProbeServicesBaseURL: server.URL, - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - seench := make(chan int64) - go func() { - var seen int64 - for ev := range out { - switch ev.Key { - case "failure.report_create": - seen++ - case "status.progress": - evv := ev.Value.(eventStatusProgress) - if evv.Percentage >= 0.4 { - panic(fmt.Sprintf("too much progress: %+v", ev)) - } - case "status.queued", "status.started", "log", "status.end", - "status.geoip_lookup", "status.resolver_lookup": - default: - panic(fmt.Sprintf("unexpected key: %s", ev.Key)) - } - } - seench <- seen - }() - run(context.Background(), settings, out) - close(out) - if n := <-seench; n != 1 { - t.Fatal("unexpected number of events") - } -} - -func TestRunnerGood(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - LogLevel: "DEBUG", - Name: "Example", - Options: settingsOptions{ - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var found bool - for ev := range out { - if ev.Key == "status.end" { - found = true - } - } - if !found { - t.Fatal("status.end event not found") - } -} - -func TestRunnerWithUnsupportedSettings(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - LogLevel: "DEBUG", - Name: "Example", - Options: settingsOptions{ - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var failures []string - for ev := range out { - if ev.Key == "failure.startup" { - failure := ev.Value.(eventFailure).Failure - failures = append(failures, failure) - } - } - if len(failures) != 1 { - t.Fatal("invalid number of failures") - } - if failures[0] != failureInvalidVersion { - t.Fatal("not the failure we expected") - } -} - -func TestRunnerWithInvalidKVStorePath(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - LogLevel: "DEBUG", - Name: "Example", - Options: settingsOptions{ - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "", // must be empty to cause the failure below - Version: 1, - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var failures []string - for ev := range out { - if ev.Key == "failure.startup" { - failure := ev.Value.(eventFailure).Failure - failures = append(failures, failure) - } - } - if len(failures) != 1 { - t.Fatal("invalid number of failures") - } - if failures[0] != "mkdir : no such file or directory" { - t.Fatal("not the failure we expected") - } -} - -func TestRunnerWithInvalidExperimentName(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - LogLevel: "DEBUG", - Name: "Nonexistent", - Options: settingsOptions{ - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var failures []string - for ev := range out { - if ev.Key == "failure.startup" { - failure := ev.Value.(eventFailure).Failure - failures = append(failures, failure) - } - } - if len(failures) != 1 { - t.Fatal("invalid number of failures") - } - if failures[0] != "no such experiment: Nonexistent" { - t.Fatalf("not the failure we expected: %s", failures[0]) - } -} - -func TestRunnerWithMissingInput(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - LogLevel: "DEBUG", - Name: "ExampleWithInput", - Options: settingsOptions{ - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var failures []string - for ev := range out { - if ev.Key == "failure.startup" { - failure := ev.Value.(eventFailure).Failure - failures = append(failures, failure) - } - } - if len(failures) != 1 { - t.Fatal("invalid number of failures") - } - if failures[0] != "no input provided" { - t.Fatalf("not the failure we expected: %s", failures[0]) - } -} - -func TestRunnerWithMaxRuntime(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, - LogLevel: "DEBUG", - Name: "ExampleWithInput", - Options: settingsOptions{ - MaxRuntime: 1, - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - begin := time.Now() - go func() { - run(context.Background(), settings, out) - close(out) - }() - var found bool - for ev := range out { - if ev.Key == "status.end" { - found = true - } - } - if !found { - t.Fatal("status.end event not found") - } - // The runtime is long because of ancillary operations and is even more - // longer because of self shaping we may be performing (especially in - // CI builds) using `-tags shaping`). We have experimentally determined - // that ~10 seconds is the typical CI test run time. See: - // - // 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788 - // - // 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855 - if time.Since(begin) > 10*time.Second { - t.Fatal("expected shorter runtime") - } -} - -func TestRunnerWithMaxRuntimeNonInterruptible(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, - LogLevel: "DEBUG", - Name: "ExampleWithInputNonInterruptible", - Options: settingsOptions{ - MaxRuntime: 1, - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - begin := time.Now() - go func() { - run(context.Background(), settings, out) - close(out) - }() - var found bool - for ev := range out { - if ev.Key == "status.end" { - found = true - } - } - if !found { - t.Fatal("status.end event not found") - } - // The runtime is long because of ancillary operations and is even more - // longer because of self shaping we may be performing (especially in - // CI builds) using `-tags shaping`). We have experimentally determined - // that ~10 seconds is the typical CI test run time. See: - // - // 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788 - // - // 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855 - if time.Since(begin) > 10*time.Second { - t.Fatal("expected shorter runtime") - } -} - -func TestRunnerWithFailedMeasurement(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, - LogLevel: "DEBUG", - Name: "ExampleWithFailure", - Options: settingsOptions{ - MaxRuntime: 1, - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - go func() { - run(context.Background(), settings, out) - close(out) - }() - var found bool - for ev := range out { - if ev.Key == "failure.measurement" { - found = true - } - } - if !found { - t.Fatal("failure.measurement event not found") - } -} diff --git a/pkg/oonimkall/runner_internal_test.go b/pkg/oonimkall/runner_internal_test.go deleted file mode 100644 index 0f3f604..0000000 --- a/pkg/oonimkall/runner_internal_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package oonimkall - -import ( - "context" - "errors" - "fmt" - "testing" - - engine "github.com/ooni/probe-cli/v3/internal/engine" -) - -func TestMeasurementSubmissionEventName(t *testing.T) { - if measurementSubmissionEventName(nil) != statusMeasurementSubmission { - t.Fatal("unexpected submission event name") - } - if measurementSubmissionEventName(errors.New("mocked error")) != failureMeasurementSubmission { - t.Fatal("unexpected submission event name") - } -} - -func TestMeasurementSubmissionFailure(t *testing.T) { - if measurementSubmissionFailure(nil) != "" { - t.Fatal("unexpected submission failure") - } - if measurementSubmissionFailure(errors.New("mocked error")) != "mocked error" { - t.Fatal("unexpected submission failure") - } -} - -func TestRunnerMaybeLookupLocationFailure(t *testing.T) { - if testing.Short() { - // TODO(https://github.com/ooni/probe-cli/pull/518) - t.Skip("skip test in short mode") - } - out := make(chan *event) - settings := &settings{ - AssetsDir: "../../testdata/oonimkall/assets", - Name: "Example", - Options: settingsOptions{ - SoftwareName: "oonimkall-test", - SoftwareVersion: "0.1.0", - }, - StateDir: "../../testdata/oonimkall/state", - Version: 1, - } - seench := make(chan int64) - eof := make(chan interface{}) - go func() { - var seen int64 - Loop: - for { - select { - case ev := <-out: - switch ev.Key { - case "failure.ip_lookup", "failure.asn_lookup", - "failure.cc_lookup", "failure.resolver_lookup": - seen++ - case "status.progress": - evv := ev.Value.(eventStatusProgress) - if evv.Percentage >= 0.2 { - panic(fmt.Sprintf("too much progress: %+v", ev)) - } - case "status.queued", "status.started", "status.end": - default: - panic(fmt.Sprintf("unexpected key: %s - %+v", ev.Key, ev.Value)) - } - case <-eof: - break Loop - } - } - seench <- seen - }() - expected := errors.New("mocked error") - r := newRunner(settings, out, eof) - r.maybeLookupLocation = func(*engine.Session) error { - return expected - } - r.Run(context.Background()) - close(eof) - if n := <-seench; n != 4 { - t.Fatal("unexpected number of events") - } -} diff --git a/pkg/oonimkall/settings.go b/pkg/oonimkall/settings.go deleted file mode 100644 index d18eafd..0000000 --- a/pkg/oonimkall/settings.go +++ /dev/null @@ -1,91 +0,0 @@ -package oonimkall - -// Settings contains settings for a task. This structure derives from -// the one described by MK v0.10.9 FFI API (https://git.io/Jv4Rv), yet -// since 2020-12-03 we're not backwards compatible anymore. -type settings struct { - // Annotations contains the annotations to be added - // to every measurements performed by the task. - Annotations map[string]string `json:"annotations,omitempty"` - - // AssetsDir is the directory where to store assets. This - // field is an extension of MK's specification. If - // this field is empty, the task won't start. - AssetsDir string `json:"assets_dir"` - - // DisabledEvents contains disabled events. See - // https://git.io/Jv4Rv for the events names. - DisabledEvents []string `json:"disabled_events,omitempty"` - - // Inputs contains the inputs. The task will fail if it - // requires input and you provide no input. - Inputs []string `json:"inputs,omitempty"` - - // LogLevel contains the logs level. See https://git.io/Jv4Rv - // for the names of the available log levels. - LogLevel string `json:"log_level,omitempty"` - - // Name contains the task name. By https://git.io/Jv4Rv the - // names are in camel case, e.g. `Ndt`. - Name string `json:"name"` - - // Options contains the task options. - Options settingsOptions `json:"options"` - - // Proxy allows you to optionally force a specific proxy - // rather than using no proxy (the default). - // - // Use `psiphon:///` to force using Psiphon with the - // embedded configuration file. Not all builds have - // an embedded configuration file, but OONI builds have - // such a file, so they can use this functionality. - // - // Use `socks5://10.0.0.1:9050/` to connect to a SOCKS5 - // proxy running on 10.0.0.1:9050. This could be, for - // example, a suitably configured `tor` instance. - Proxy string - - // StateDir is the directory where to store persistent data. This - // field is an extension of MK's specification. If - // this field is empty, the task won't start. - StateDir string `json:"state_dir"` - - // TempDir is the temporary directory. This field is an extension of MK's - // specification. If this field is empty, we will pick the tempdir that - // ioutil.TempDir uses by default, which may not work on mobile. According - // to our experiments as of 2020-06-10, leaving the TempDir empty works - // for iOS and does not work for Android. - TempDir string `json:"temp_dir"` - - // TunnelDir is the directory where to store persistent state - // related to circumvention tunnels. This directory is required - // only if you want to use the tunnels. Added since 3.10.0. - TunnelDir string `json:"tunnel_dir"` - - // Version indicates the version of this structure. - Version int64 `json:"version"` -} - -// settingsOptions contains the settings options -type settingsOptions struct { - // MaxRuntime is the maximum runtime expressed in seconds. A negative - // value for this field disables the maximum runtime. Using - // a zero value will also mean disabled. This is not the - // original behaviour of Measurement Kit, which used to run - // for zero time in such case. - MaxRuntime float64 `json:"max_runtime,omitempty"` - - // NoCollector indicates whether to use a collector - NoCollector bool `json:"no_collector,omitempty"` - - // ProbeServicesBaseURL contains the probe services base URL. - ProbeServicesBaseURL string `json:"probe_services_base_url,omitempty"` - - // SoftwareName is the software name. If this option is not - // present, then the library startup will fail. - SoftwareName string `json:"software_name,omitempty"` - - // SoftwareVersion is the software version. If this option is not - // present, then the library startup will fail. - SoftwareVersion string `json:"software_version,omitempty"` -} diff --git a/pkg/oonimkall/task.go b/pkg/oonimkall/task.go index 2262750..5eec0a0 100644 --- a/pkg/oonimkall/task.go +++ b/pkg/oonimkall/task.go @@ -23,7 +23,7 @@ // (measurement-kit/measurement-kit@v0.10.9) for a comprehensive // description of MK's FFI API. // -// See also https://github.com/ooni/probe-cli/v3/internal/engine/pull/347 for the +// See also https://github.com/ooni/probe-engine/pull/347 for the // design document describing the task API. // // See also https://github.com/ooni/probe-cli/v3/internal/engine/blob/master/DESIGN.md, @@ -33,7 +33,7 @@ // // The Session API is a Go API that can be exported to mobile apps // using the gomobile tool. The latest design document for this API is -// at https://github.com/ooni/probe-cli/v3/internal/engine/pull/954. +// at https://github.com/ooni/probe-engine/pull/954. // // The basic tenet of the session API is that you create an instance // of `Session` and use it to perform the operations you need. @@ -84,8 +84,11 @@ func StartTask(input string) (*Task, error) { } go func() { close(task.isstarted) - run(ctx, &settings, task.out) + emitter := newTaskEmitterUsingChan(task.out) + r := newRunner(&settings, emitter) + r.Run(ctx) task.out <- nil // signal that we're done w/o closing the channel + emitter.Close() close(task.isstopped) }() return task, nil diff --git a/pkg/oonimkall/task_integration_test.go b/pkg/oonimkall/task_integration_test.go deleted file mode 100644 index 8ac50b1..0000000 --- a/pkg/oonimkall/task_integration_test.go +++ /dev/null @@ -1,528 +0,0 @@ -package oonimkall - -import ( - "encoding/json" - "errors" - "strings" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/ooni/probe-cli/v3/internal/engine/model" -) - -type eventlike struct { - Key string `json:"key"` - Value map[string]interface{} `json:"value"` -} - -func TestGood(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "log_level": "DEBUG", - "name": "Example", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - // interrupt the task so we also exercise this functionality - go func() { - <-time.After(time.Second) - task.Interrupt() - }() - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - t.Fatal("unexpected failure.startup event") - } - } - // make sure we only see task_terminated at this point - for { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "task_terminated" { - break - } - t.Fatalf("unexpected event.Key: %s", event.Key) - } -} - -func TestWithMeasurementFailure(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "log_level": "DEBUG", - "name": "ExampleWithFailure", - "options": { - "no_geoip": true, - "no_resolver_lookup": true, - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - t.Fatal("unexpected failure.startup event") - } - } -} - -func TestInvalidJSON(t *testing.T) { - task, err := StartTask(`{`) - var syntaxerr *json.SyntaxError - if !errors.As(err, &syntaxerr) { - t.Fatal("not the expected error") - } - if task != nil { - t.Fatal("task is not nil") - } -} - -func TestUnsupportedSetting(t *testing.T) { - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "log_level": "DEBUG", - "name": "Example", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state" - }`) - if err != nil { - t.Fatal(err) - } - var seen bool - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - if strings.Contains(eventstr, failureInvalidVersion) { - seen = true - } - } - } - if !seen { - t.Fatal("did not see failure.startup with invalid version info") - } -} - -func TestEmptyStateDir(t *testing.T) { - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "log_level": "DEBUG", - "name": "Example", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - var seen bool - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - if strings.Contains(eventstr, "mkdir : no such file or directory") { - seen = true - } - } - } - if !seen { - t.Fatal("did not see failure.startup with info that state dir is empty") - } -} - -func TestUnknownExperiment(t *testing.T) { - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "log_level": "DEBUG", - "name": "Antani", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - var seen bool - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - if strings.Contains(eventstr, "no such experiment: ") { - seen = true - } - } - } - if !seen { - t.Fatal("did not see failure.startup") - } -} - -func TestInputIsRequired(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "log_level": "DEBUG", - "name": "ExampleWithInput", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - var seen bool - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - if strings.Contains(eventstr, "no input provided") { - seen = true - } - } - } - if !seen { - t.Fatal("did not see failure.startup") - } -} - -func TestMaxRuntime(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - begin := time.Now() - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "inputs": ["a", "b", "c"], - "name": "ExampleWithInput", - "options": { - "max_runtime": 1, - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - t.Fatal(eventstr) - } - } - // The runtime is long because of ancillary operations and is even more - // longer because of self shaping we may be performing (especially in - // CI builds) using `-tags shaping`). We have experimentally determined - // that ~10 seconds is the typical CI test run time. See: - // - // 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788 - // - // 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855 - // - // In case there are further timeouts, e.g. in the sessionresolver, the - // time used by the experiment will be much more. This is for example the - // case in https://github.com/ooni/probe-engine/issues/1005. - if time.Since(begin) > 10*time.Second { - t.Fatal("expected shorter runtime") - } -} - -func TestInterruptExampleWithInput(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - t.Skip("Skipping broken test; see https://github.com/ooni/probe-engine/issues/992") - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "inputs": [ - "http://www.kernel.org/", - "http://www.x.org/", - "http://www.microsoft.com/", - "http://www.slashdot.org/", - "http://www.repubblica.it/", - "http://www.google.it/", - "http://ooni.org/" - ], - "name": "ExampleWithInputNonInterruptible", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - var keys []string - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - switch event.Key { - case "failure.startup": - t.Fatal(eventstr) - case "status.measurement_start": - go task.Interrupt() - } - // We compress the keys. What matters is basically that we - // see just one of the many possible measurements here. - if keys == nil || keys[len(keys)-1] != event.Key { - keys = append(keys, event.Key) - } - } - expect := []string{ - "status.queued", - "status.started", - "status.progress", - "status.geoip_lookup", - "status.resolver_lookup", - "status.progress", - "status.report_create", - "status.measurement_start", - "log", - "status.progress", - "measurement", - "status.measurement_submission", - "status.measurement_done", - "status.end", - "task_terminated", - } - if diff := cmp.Diff(expect, keys); diff != "" { - t.Fatal(diff) - } -} - -func TestInterruptNdt7(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "name": "Ndt7", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - go func() { - <-time.After(11 * time.Second) - task.Interrupt() - }() - var keys []string - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - if event.Key == "failure.startup" { - t.Fatal(eventstr) - } - // We compress the keys because we don't know how many - // status.progress we will see. What matters is that we - // don't see a measurement submission, since it means - // that we have interrupted the measurement. - if keys == nil || keys[len(keys)-1] != event.Key { - keys = append(keys, event.Key) - } - } - expect := []string{ - "status.queued", - "status.started", - "status.progress", - "status.geoip_lookup", - "status.resolver_lookup", - "status.progress", - "status.report_create", - "status.measurement_start", - "status.progress", - "status.end", - "task_terminated", - } - if diff := cmp.Diff(expect, keys); diff != "" { - t.Fatal(diff) - } -} - -func TestCountBytesForExample(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "name": "Example", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - var downloadKB, uploadKB float64 - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - switch event.Key { - case "failure.startup": - t.Fatal(eventstr) - case "status.end": - downloadKB = event.Value["downloaded_kb"].(float64) - uploadKB = event.Value["uploaded_kb"].(float64) - } - } - if downloadKB == 0 { - t.Fatal("downloadKB is zero") - } - if uploadKB == 0 { - t.Fatal("uploadKB is zero") - } -} - -func TestPrivacyAndScrubbing(t *testing.T) { - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "name": "Example", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - var m *model.Measurement - for !task.IsDone() { - eventstr := task.WaitForNextEvent() - var event eventlike - if err := json.Unmarshal([]byte(eventstr), &event); err != nil { - t.Fatal(err) - } - switch event.Key { - case "failure.startup": - t.Fatal(eventstr) - case "measurement": - v := []byte(event.Value["json_str"].(string)) - m = new(model.Measurement) - if err := json.Unmarshal(v, &m); err != nil { - t.Fatal(err) - } - } - } - if m == nil { - t.Fatal("measurement is nil") - } - if m.ProbeASN == "AS0" || m.ProbeCC == "ZZ" || m.ProbeIP != "127.0.0.1" { - t.Fatal("unexpected result") - } -} - -func TestNonblockWithFewEvents(t *testing.T) { - // This test tests whether we won't block for a small - // number of events emitted by the task - if testing.Short() { - t.Skip("skip test in short mode") - } - task, err := StartTask(`{ - "assets_dir": "../testdata/oonimkall/assets", - "name": "Example", - "options": { - "software_name": "oonimkall-test", - "software_version": "0.1.0" - }, - "state_dir": "../testdata/oonimkall/state", - "version": 1 - }`) - if err != nil { - t.Fatal(err) - } - // Wait for the task thread to start - <-task.isstarted - // Wait for the task thread to complete - <-task.isstopped - var count int - for !task.IsDone() { - task.WaitForNextEvent() - count++ - } - if count < 5 { - t.Fatal("too few events") - } -} diff --git a/pkg/oonimkall/task_test.go b/pkg/oonimkall/task_test.go new file mode 100644 index 0000000..034a1de --- /dev/null +++ b/pkg/oonimkall/task_test.go @@ -0,0 +1,171 @@ +package oonimkall + +import ( + "encoding/json" + "errors" + "testing" + + "github.com/ooni/probe-cli/v3/internal/engine/model" +) + +type eventlike struct { + Key string `json:"key"` + Value map[string]interface{} `json:"value"` +} + +func TestStartTaskGood(t *testing.T) { + task, err := StartTask(`{ + "log_level": "DEBUG", + "name": "Example", + "options": { + "software_name": "oonimkall-test", + "software_version": "0.1.0" + }, + "state_dir": "testdata/state", + "version": 1 + }`) + if err != nil { + t.Fatal(err) + } + for !task.IsDone() { + eventstr := task.WaitForNextEvent() + var event eventlike + if err := json.Unmarshal([]byte(eventstr), &event); err != nil { + t.Fatal(err) + } + if event.Key == "failure.startup" { + t.Fatal("unexpected failure.startup event") + } + } + // make sure we only see task_terminated at this point + for { + eventstr := task.WaitForNextEvent() + var event eventlike + if err := json.Unmarshal([]byte(eventstr), &event); err != nil { + t.Fatal(err) + } + if event.Key == "task_terminated" { + break + } + t.Fatalf("unexpected event.Key: %s", event.Key) + } +} + +func TestStartTaskInvalidJSON(t *testing.T) { + task, err := StartTask(`{`) + var syntaxerr *json.SyntaxError + if !errors.As(err, &syntaxerr) { + t.Fatal("not the expected error") + } + if task != nil { + t.Fatal("task is not nil") + } +} + +func TestStartTaskCountBytesForExample(t *testing.T) { + task, err := StartTask(`{ + "name": "Example", + "options": { + "software_name": "oonimkall-test", + "software_version": "0.1.0" + }, + "state_dir": "testdata/state", + "version": 1 + }`) + if err != nil { + t.Fatal(err) + } + var downloadKB, uploadKB float64 + for !task.IsDone() { + eventstr := task.WaitForNextEvent() + var event eventlike + if err := json.Unmarshal([]byte(eventstr), &event); err != nil { + t.Fatal(err) + } + switch event.Key { + case "failure.startup": + t.Fatal(eventstr) + case "status.end": + downloadKB = event.Value["downloaded_kb"].(float64) + uploadKB = event.Value["uploaded_kb"].(float64) + } + } + if downloadKB == 0 { + t.Fatal("downloadKB is zero") + } + if uploadKB == 0 { + t.Fatal("uploadKB is zero") + } +} + +func TestPrivacyAndScrubbing(t *testing.T) { + if testing.Short() { + t.Skip("skip test in short mode") + } + task, err := StartTask(`{ + "assets_dir": "../testdata/oonimkall/assets", + "name": "Example", + "options": { + "software_name": "oonimkall-test", + "software_version": "0.1.0" + }, + "state_dir": "../testdata/oonimkall/state", + "version": 1 + }`) + if err != nil { + t.Fatal(err) + } + var m *model.Measurement + for !task.IsDone() { + eventstr := task.WaitForNextEvent() + var event eventlike + if err := json.Unmarshal([]byte(eventstr), &event); err != nil { + t.Fatal(err) + } + switch event.Key { + case "failure.startup": + t.Fatal(eventstr) + case "measurement": + v := []byte(event.Value["json_str"].(string)) + m = new(model.Measurement) + if err := json.Unmarshal(v, &m); err != nil { + t.Fatal(err) + } + } + } + if m == nil { + t.Fatal("measurement is nil") + } + if m.ProbeASN == "AS0" || m.ProbeCC == "ZZ" || m.ProbeIP != "127.0.0.1" { + t.Fatal("unexpected result") + } +} + +func TestNonblockWithFewEvents(t *testing.T) { + // This test tests whether we won't block for a small + // number of events emitted by the task + task, err := StartTask(`{ + "name": "Example", + "options": { + "software_name": "oonimkall-test", + "software_version": "0.1.0" + }, + "state_dir": "testdata/state", + "version": 1 + }`) + if err != nil { + t.Fatal(err) + } + // Wait for the task thread to start + <-task.isstarted + // Wait for the task thread to complete + <-task.isstopped + var count int + for !task.IsDone() { + task.WaitForNextEvent() + count++ + } + if count < 5 { + t.Fatal("too few events") + } +} diff --git a/pkg/oonimkall/taskemitter.go b/pkg/oonimkall/taskemitter.go new file mode 100644 index 0000000..9506ba8 --- /dev/null +++ b/pkg/oonimkall/taskemitter.go @@ -0,0 +1,63 @@ +package oonimkall + +import "sync" + +// taskEmitterUsingChan is a task emitter using a channel. +type taskEmitterUsingChan struct { + // eof indicates we should not emit anymore. + eof chan interface{} + + // once ensures we close the eof channel just once. + once sync.Once + + // out is the possibly buffered channel where to emit events. + out chan<- *event +} + +// ensure that taskEmitterUsingChan is a taskEmitter. +var _ taskEmitterCloser = &taskEmitterUsingChan{} + +// newTaskEmitterUsingChan creates a taskEmitterUsingChan. +func newTaskEmitterUsingChan(out chan<- *event) *taskEmitterUsingChan { + return &taskEmitterUsingChan{ + eof: make(chan interface{}), + once: sync.Once{}, + out: out, + } +} + +// Emit implements taskEmitter.Emit. +func (ee *taskEmitterUsingChan) Emit(key string, value interface{}) { + // Prevent this goroutine from blocking on `ee.out` if the caller + // has already told us it's not going to accept more events. + select { + case ee.out <- &event{Key: key, Value: value}: + case <-ee.eof: + } +} + +// Close implements taskEmitterCloser.Closer. +func (ee *taskEmitterUsingChan) Close() error { + ee.once.Do(func() { close(ee.eof) }) + return nil +} + +// taskEmitterWrapper is a convenient wrapper for taskEmitter. +type taskEmitterWrapper struct { + taskEmitter +} + +// EmitFailureStartup emits the failureStartup event +func (ee *taskEmitterWrapper) EmitFailureStartup(failure string) { + ee.EmitFailureGeneric(eventTypeFailureStartup, failure) +} + +// EmitFailureGeneric emits a failure event +func (ee *taskEmitterWrapper) EmitFailureGeneric(name, failure string) { + ee.Emit(name, eventFailure{Failure: failure}) +} + +// EmitStatusProgress emits the status.Progress event +func (ee *taskEmitterWrapper) EmitStatusProgress(percentage float64, message string) { + ee.Emit(eventTypeStatusProgress, eventStatusProgress{Message: message, Percentage: percentage}) +} diff --git a/pkg/oonimkall/taskemitter_test.go b/pkg/oonimkall/taskemitter_test.go new file mode 100644 index 0000000..dd950f1 --- /dev/null +++ b/pkg/oonimkall/taskemitter_test.go @@ -0,0 +1,109 @@ +package oonimkall + +import "testing" + +func TestTaskEmitterUsingChan(t *testing.T) { + t.Run("ordinary emit", func(t *testing.T) { + out := make(chan *event) + emitter := newTaskEmitterUsingChan(out) + go func() { + emitter.Emit("foo", nil) + }() + ev := <-out + if ev.Key != "foo" { + t.Fatal("invalid key") + } + if ev.Value != nil { + t.Fatal("invalid value") + } + }) + + t.Run("emit after close", func(t *testing.T) { + out := make(chan *event) + emitter := newTaskEmitterUsingChan(out) + emitter.Close() + done := make(chan interface{}) + go func() { + emitter.Emit("foo", nil) + close(done) + }() + <-done + select { + case <-out: + t.Fatal("should not receive event here") + default: + } + }) + + t.Run("close is idempotent", func(t *testing.T) { + out := make(chan *event) + emitter := newTaskEmitterUsingChan(out) + for i := 0; i < 4; i++ { + emitter.Close() + } + }) +} + +func TestTaskEmitterWrapper(t *testing.T) { + t.Run("emit failureStartup", func(t *testing.T) { + expect := "antani" + collector := &CollectorTaskEmitter{} + emitter := &taskEmitterWrapper{collector} + emitter.EmitFailureStartup(expect) + events := collector.Collect() + if len(events) != 1 { + t.Fatal("invalid number of events") + } + ev := events[0] + if ev.Key != eventTypeFailureStartup { + t.Fatal("invalid key") + } + value := ev.Value.(eventFailure) + if value.Failure != expect { + t.Fatal("invalid failure value") + } + }) + + t.Run("emit failureGeneric", func(t *testing.T) { + expectName := "mascetti" + expectFailure := "antani" + collector := &CollectorTaskEmitter{} + emitter := &taskEmitterWrapper{collector} + emitter.EmitFailureGeneric(expectName, expectFailure) + events := collector.Collect() + if len(events) != 1 { + t.Fatal("invalid number of events") + } + ev := events[0] + if ev.Key != expectName { + t.Fatal("invalid key") + } + value := ev.Value.(eventFailure) + if value.Failure != expectFailure { + t.Fatal("invalid failure value") + } + }) + + t.Run("emit statusProgress", func(t *testing.T) { + percentage := 0.66 + message := "mascetti" + collector := &CollectorTaskEmitter{} + emitter := &taskEmitterWrapper{collector} + emitter.EmitStatusProgress(percentage, message) + events := collector.Collect() + if len(events) != 1 { + t.Fatal("invalid number of events") + } + ev := events[0] + if ev.Key != eventTypeStatusProgress { + t.Fatal("invalid key") + } + value := ev.Value.(eventStatusProgress) + if value.Percentage != percentage { + t.Fatal("invalid percentage value") + } + if value.Message != message { + t.Fatal("invalid message value") + } + }) +} diff --git a/pkg/oonimkall/tasklogger.go b/pkg/oonimkall/tasklogger.go new file mode 100644 index 0000000..d50c970 --- /dev/null +++ b/pkg/oonimkall/tasklogger.go @@ -0,0 +1,116 @@ +package oonimkall + +import ( + "fmt" + + "github.com/ooni/probe-cli/v3/internal/engine/model" +) + +// +// This file implements the logger used by a task. Outside +// of this file, the rest of the codebase just sees a generic +// model.Logger that can log events. +// + +// taskLogger is the logger used by a task. +type taskLogger struct { + // emitter is the event emitter. + emitter taskEmitter + + // hasDebug indicates whether to emit debug logs. + hasDebug bool + + // hasInfo indicates whether to emit info logs. + hasInfo bool + + // hasWarning indicates whether to emit warning logs. + hasWarning bool +} + +// ensure that taskLogger implements model.Logger. +var _ model.Logger = &taskLogger{} + +// Debug implements model.Logger.Debug. +func (cl *taskLogger) Debug(msg string) { + if cl.hasDebug { + cl.emit(logLevelDebug, msg) + } +} + +// Debugf implements model.Logger.Debugf. +func (cl *taskLogger) Debugf(format string, v ...interface{}) { + if cl.hasDebug { + cl.Debug(fmt.Sprintf(format, v...)) + } +} + +// Info implements model.Logger.Info. +func (cl *taskLogger) Info(msg string) { + if cl.hasInfo { + cl.emit(logLevelInfo, msg) + } +} + +// Infof implements model.Logger.Infof. +func (cl *taskLogger) Infof(format string, v ...interface{}) { + if cl.hasInfo { + cl.Info(fmt.Sprintf(format, v...)) + } +} + +// Warn implements model.Logger.Warn. +func (cl *taskLogger) Warn(msg string) { + if cl.hasWarning { + cl.emit(logLevelWarning, msg) + } +} + +// Warnf implements model.Logger.Warnf. +func (cl *taskLogger) Warnf(format string, v ...interface{}) { + if cl.hasWarning { + cl.Warn(fmt.Sprintf(format, v...)) + } +} + +// emit is the code that actually emits the log event. +func (cl *taskLogger) emit(level string, message string) { + cl.emitter.Emit(eventTypeLog, eventLog{ + LogLevel: level, + Message: message, + }) +} + +// newTaskLogger creates a new taskLogger instance. +// +// Arguments: +// +// - emitter is the emitter that will emit log events; +// +// - logLevel is the maximum log level that will be emitted. +// +// Returns: +// +// - a properly configured instance of taskLogger. +// +// Remarks: +// +// - log levels are sorted as usual: ERR is more sever than +// WARNING, WARNING is more sever than INFO, etc. +func newTaskLogger(emitter taskEmitter, logLevel string) *taskLogger { + cl := &taskLogger{ + emitter: emitter, + } + switch logLevel { + case logLevelDebug, logLevelDebug2: + cl.hasDebug = true + fallthrough + case logLevelInfo: + cl.hasInfo = true + fallthrough + case logLevelErr, logLevelWarning: + fallthrough + default: + cl.hasWarning = true + } + return cl +} diff --git a/pkg/oonimkall/tasklogger_test.go b/pkg/oonimkall/tasklogger_test.go new file mode 100644 index 0000000..240c711 --- /dev/null +++ b/pkg/oonimkall/tasklogger_test.go @@ -0,0 +1,106 @@ +package oonimkall + +import ( + "testing" + + "github.com/ooni/probe-cli/v3/internal/engine/model" +) + +// +// This file contains tests for the taskLogger type. +// + +func TestTaskLogger(t *testing.T) { + // debugMessage is the debug message we expect to see. + debugMessage := "debug message" + + // infoMessage is the info message we expect to see. + infoMessage := "info message" + + // warningMessage is the warning message we expect to see. + warningMessage := "warning message" + + // emitMessages is an helper function for implementing this test. + emitMessages := func(logger model.Logger) { + logger.Debug(debugMessage) + logger.Debugf("%s", debugMessage) + logger.Info(infoMessage) + logger.Infof("%s", infoMessage) + logger.Warn(warningMessage) + logger.Warnf("%s", warningMessage) + } + + // convertEventsToLogEvents converts the generic events to + // logEvents and fails if this operation is not possible. + convertEventsToLogEvents := func(t *testing.T, in []*event) (out []eventLog) { + for _, ev := range in { + if ev.Key != eventTypeLog { + t.Fatalf("expected log event, found %s", ev.Key) + } + out = append(out, ev.Value.(eventLog)) + } + return + } + + // checkNumberOfEvents ensures we've the right number of events. + checkNumberOfEvents := func(t *testing.T, events []eventLog, expect int) { + if len(events) != expect { + t.Fatalf( + "invalid number of log events %d (expected %d)", + len(events), expect, + ) + } + } + + // matchEvent ensures the given event has the right level and message. + matchEvent := func(t *testing.T, event eventLog, level, msg string) { + if event.LogLevel != level { + t.Fatalf( + "invalid log level %s (expected %s)", + event.LogLevel, level, + ) + } + if event.Message != msg { + t.Fatalf( + "invalid log message '%s' (expected '%s')", + event.Message, msg, + ) + } + } + + t.Run("debug logger", func(t *testing.T) { + emitter := &CollectorTaskEmitter{} + logger := newTaskLogger(emitter, logLevelDebug) + emitMessages(logger) + logEvents := convertEventsToLogEvents(t, emitter.Collect()) + checkNumberOfEvents(t, logEvents, 6) + matchEvent(t, logEvents[0], logLevelDebug, debugMessage) + matchEvent(t, logEvents[1], logLevelDebug, debugMessage) + matchEvent(t, logEvents[2], logLevelInfo, infoMessage) + matchEvent(t, logEvents[3], logLevelInfo, infoMessage) + matchEvent(t, logEvents[4], logLevelWarning, warningMessage) + matchEvent(t, logEvents[5], logLevelWarning, warningMessage) + }) + + t.Run("info logger", func(t *testing.T) { + emitter := &CollectorTaskEmitter{} + logger := newTaskLogger(emitter, logLevelInfo) + emitMessages(logger) + logEvents := convertEventsToLogEvents(t, emitter.Collect()) + checkNumberOfEvents(t, logEvents, 4) + matchEvent(t, logEvents[0], logLevelInfo, infoMessage) + matchEvent(t, logEvents[1], logLevelInfo, infoMessage) + matchEvent(t, logEvents[2], logLevelWarning, warningMessage) + matchEvent(t, logEvents[3], logLevelWarning, warningMessage) + }) + + t.Run("warn logger", func(t *testing.T) { + emitter := &CollectorTaskEmitter{} + logger := newTaskLogger(emitter, logLevelWarning) + emitMessages(logger) + logEvents := convertEventsToLogEvents(t, emitter.Collect()) + checkNumberOfEvents(t, logEvents, 2) + matchEvent(t, logEvents[0], logLevelWarning, warningMessage) + matchEvent(t, logEvents[1], logLevelWarning, warningMessage) + }) +} diff --git a/pkg/oonimkall/taskmocks_test.go b/pkg/oonimkall/taskmocks_test.go new file mode 100644 index 0000000..8165916 --- /dev/null +++ b/pkg/oonimkall/taskmocks_test.go @@ -0,0 +1,222 @@ +package oonimkall + +import ( + "context" + "errors" + "sync" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/engine/model" +) + +// +// This file contains mocks for types used by tasks. Because +// we only use mocks when testing, this file is a `_test.go` file. +// + +// CollectorTaskEmitter is a thread-safe taskEmitter +// that stores all the events inside itself. +type CollectorTaskEmitter struct { + // events contains the events + events []*event + + // mu provides mutual exclusion + mu sync.Mutex +} + +// ensures that a CollectorTaskEmitter is a taskEmitter. +var _ taskEmitter = &CollectorTaskEmitter{} + +// Emit implements the taskEmitter.Emit method. +func (e *CollectorTaskEmitter) Emit(key string, value interface{}) { + e.mu.Lock() + e.events = append(e.events, &event{Key: key, Value: value}) + e.mu.Unlock() +} + +// Collect returns a copy of the collected events. It is safe +// to read the events. It's a data race to modify them. +// +// After this function has been called, the internal array +// of events will now be empty. +func (e *CollectorTaskEmitter) Collect() (out []*event) { + e.mu.Lock() + out = e.events + e.events = nil + e.mu.Unlock() + return +} + +// SessionBuilderConfigSaver is a session builder that +// saves the received config and returns an error. +type SessionBuilderConfigSaver struct { + Config engine.SessionConfig +} + +var _ taskSessionBuilder = &SessionBuilderConfigSaver{} + +func (b *SessionBuilderConfigSaver) NewSession( + ctx context.Context, config engine.SessionConfig) (taskSession, error) { + b.Config = config + return nil, errors.New("mocked error") +} + +// MockableTaskRunnerDependencies allows to mock all the +// dependencies of taskRunner using a single structure. +type MockableTaskRunnerDependencies struct { + + // taskSessionBuilder: + + MockNewSession func(ctx context.Context, + config engine.SessionConfig) (taskSession, error) + + // taskSession: + + MockClose func() error + MockNewExperimentBuilderByName func(name string) (taskExperimentBuilder, error) + MockMaybeLookupBackendsContext func(ctx context.Context) error + MockMaybeLookupLocationContext func(ctx context.Context) error + MockProbeIP func() string + MockProbeASNString func() string + MockProbeCC func() string + MockProbeNetworkName func() string + MockResolverASNString func() string + MockResolverIP func() string + MockResolverNetworkName func() string + + // taskExperimentBuilder: + + MockableSetCallbacks func(callbacks model.ExperimentCallbacks) + MockableInputPolicy func() engine.InputPolicy + MockableNewExperimentInstance func() taskExperiment + MockableInterruptible func() bool + + // taskExperiment: + + MockableKibiBytesReceived func() float64 + MockableKibiBytesSent func() float64 + MockableOpenReportContext func(ctx context.Context) error + MockableReportID func() string + MockableMeasureWithContext func(ctx context.Context, input string) ( + measurement *model.Measurement, err error) + MockableSubmitAndUpdateMeasurementContext func( + ctx context.Context, measurement *model.Measurement) error +} + +var ( + _ taskSessionBuilder = &MockableTaskRunnerDependencies{} + _ taskSession = &MockableTaskRunnerDependencies{} + _ taskExperimentBuilder = &MockableTaskRunnerDependencies{} + _ taskExperiment = &MockableTaskRunnerDependencies{} +) + +func (dep *MockableTaskRunnerDependencies) NewSession( + ctx context.Context, config engine.SessionConfig) (taskSession, error) { + if f := dep.MockNewSession; f != nil { + return f(ctx, config) + } + return dep, nil +} + +func (dep *MockableTaskRunnerDependencies) Close() error { + return dep.MockClose() +} + +func (dep *MockableTaskRunnerDependencies) NewExperimentBuilderByName(name string) (taskExperimentBuilder, error) { + if f := dep.MockNewExperimentBuilderByName; f != nil { + return f(name) + } + return dep, nil +} + +func (dep *MockableTaskRunnerDependencies) MaybeLookupBackendsContext(ctx context.Context) error { + return dep.MockMaybeLookupBackendsContext(ctx) +} + +func (dep *MockableTaskRunnerDependencies) MaybeLookupLocationContext(ctx context.Context) error { + return dep.MockMaybeLookupLocationContext(ctx) +} + +func (dep *MockableTaskRunnerDependencies) ProbeIP() string { + return dep.MockProbeIP() +} + +func (dep *MockableTaskRunnerDependencies) ProbeASNString() string { + return dep.MockProbeASNString() +} + +func (dep *MockableTaskRunnerDependencies) ProbeCC() string { + return dep.MockProbeCC() +} + +func (dep *MockableTaskRunnerDependencies) ProbeNetworkName() string { + return dep.MockProbeNetworkName() +} + +func (dep *MockableTaskRunnerDependencies) ResolverASNString() string { + return dep.MockResolverASNString() +} + +func (dep *MockableTaskRunnerDependencies) ResolverIP() string { + return dep.MockResolverIP() +} + +func (dep *MockableTaskRunnerDependencies) ResolverNetworkName() string { + return dep.MockResolverNetworkName() +} + +func (dep *MockableTaskRunnerDependencies) SetCallbacks(callbacks model.ExperimentCallbacks) { + dep.MockableSetCallbacks(callbacks) +} + +func (dep *MockableTaskRunnerDependencies) InputPolicy() engine.InputPolicy { + return dep.MockableInputPolicy() +} + +func (dep *MockableTaskRunnerDependencies) NewExperimentInstance() taskExperiment { + if f := dep.MockableNewExperimentInstance; f != nil { + return f() + } + return dep +} + +func (dep *MockableTaskRunnerDependencies) Interruptible() bool { + return dep.MockableInterruptible() +} + +func (dep *MockableTaskRunnerDependencies) KibiBytesReceived() float64 { + return dep.MockableKibiBytesReceived() +} + +func (dep *MockableTaskRunnerDependencies) KibiBytesSent() float64 { + return dep.MockableKibiBytesSent() +} + +func (dep *MockableTaskRunnerDependencies) OpenReportContext(ctx context.Context) error { + return dep.MockableOpenReportContext(ctx) +} + +func (dep *MockableTaskRunnerDependencies) ReportID() string { + return dep.MockableReportID() +} + +func (dep *MockableTaskRunnerDependencies) MeasureWithContext(ctx context.Context, input string) ( + measurement *model.Measurement, err error) { + return dep.MockableMeasureWithContext(ctx, input) +} + +func (dep *MockableTaskRunnerDependencies) SubmitAndUpdateMeasurementContext( + ctx context.Context, measurement *model.Measurement) error { + return dep.MockableSubmitAndUpdateMeasurementContext(ctx, measurement) +} + +// MockableKVStoreFSBuilder is a mockable taskKVStoreFSBuilder. +type MockableKVStoreFSBuilder struct { + MockNewFS func(path string) (model.KeyValueStore, error) +} + +var _ taskKVStoreFSBuilder = &MockableKVStoreFSBuilder{} + +func (m *MockableKVStoreFSBuilder) NewFS(path string) (model.KeyValueStore, error) { + return m.MockNewFS(path) +} diff --git a/pkg/oonimkall/taskmodel.go b/pkg/oonimkall/taskmodel.go new file mode 100644 index 0000000..5b75930 --- /dev/null +++ b/pkg/oonimkall/taskmodel.go @@ -0,0 +1,374 @@ +package oonimkall + +import ( + "context" + "io" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/engine/model" +) + +// +// Task Model +// +// The oonimkall package allows you to run OONI network +// experiments as "tasks". This file defines all the +// underlying model entailed by running such tasks. +// +// Logging +// +// This section of the file defines the types and the +// interfaces required to implement logging. +// +// The rest of the codebase will use a generic model.Logger +// as a logger. This is a pretty fundamental interface in +// ooni/probe-cli and so it's not defined in this file. +// + +const taskABIVersion = 1 + +// Running tasks emit logs using different log levels. We +// define log levels with the usual semantics. +// +// The logger used by a task _may_ be configured to not +// emit log events that are less severe than a given +// severity. +// +// We use the following definitions both for defining the +// log level of a log and for configuring the maximum +// acceptable log level emitted by a logger. +const ( + logLevelDebug2 = "DEBUG2" + logLevelDebug = "DEBUG" + logLevelInfo = "INFO" + logLevelErr = "ERR" + logLevelWarning = "WARNING" +) + +// +// Emitting Events +// +// While it is running, a task emits events. This section +// of the file defines the types needed to emit events. +// + +// type of emitted events. +const ( + eventTypeFailureIPLookup = "failure.ip_lookup" + eventTypeFailureASNLookup = "failure.asn_lookup" + eventTypeFailureCCLookup = "failure.cc_lookup" + eventTypeFailureMeasurement = "failure.measurement" + eventTypeFailureMeasurementSubmission = "failure.measurement_submission" + eventTypeFailureReportCreate = "failure.report_create" + eventTypeFailureResolverLookup = "failure.resolver_lookup" + eventTypeFailureStartup = "failure.startup" + eventTypeLog = "log" + eventTypeMeasurement = "measurement" + eventTypeStatusEnd = "status.end" + eventTypeStatusGeoIPLookup = "status.geoip_lookup" + eventTypeStatusMeasurementDone = "status.measurement_done" + eventTypeStatusMeasurementStart = "status.measurement_start" + eventTypeStatusMeasurementSubmission = "status.measurement_submission" + eventTypeStatusProgress = "status.progress" + eventTypeStatusQueued = "status.queued" + eventTypeStatusReportCreate = "status.report_create" + eventTypeStatusResolverLookup = "status.resolver_lookup" + eventTypeStatusStarted = "status.started" +) + +// taskEmitter is anything that allows us to +// emit events while running a task. +// +// Note that a task emitter _may_ be configured +// to ignore _some_ events though. +type taskEmitter interface { + // Emit emits the event (unless the emitter is + // configured to ignore this event key). + Emit(key string, value interface{}) +} + +// taskEmitterCloser is a closeable taskEmitter. +type taskEmitterCloser interface { + taskEmitter + io.Closer +} + +type eventEmpty struct{} + +// eventFailure contains information on a failure. +type eventFailure struct { + Failure string `json:"failure"` +} + +// eventLog is an event containing a log message. +type eventLog struct { + LogLevel string `json:"log_level"` + Message string `json:"message"` +} + +type eventMeasurementGeneric struct { + Failure string `json:"failure,omitempty"` + Idx int64 `json:"idx"` + Input string `json:"input"` + JSONStr string `json:"json_str,omitempty"` +} + +type eventStatusEnd struct { + DownloadedKB float64 `json:"downloaded_kb"` + Failure string `json:"failure"` + UploadedKB float64 `json:"uploaded_kb"` +} + +type eventStatusGeoIPLookup struct { + ProbeASN string `json:"probe_asn"` + ProbeCC string `json:"probe_cc"` + ProbeIP string `json:"probe_ip"` + ProbeNetworkName string `json:"probe_network_name"` +} + +// eventStatusProgress reports progress information. +type eventStatusProgress struct { + Message string `json:"message"` + Percentage float64 `json:"percentage"` +} + +type eventStatusReportGeneric struct { + ReportID string `json:"report_id"` +} + +type eventStatusResolverLookup struct { + ResolverASN string `json:"resolver_asn"` + ResolverIP string `json:"resolver_ip"` + ResolverNetworkName string `json:"resolver_network_name"` +} + +// event is an event emitted by a task. This structure extends the event +// described by MK v0.10.9 FFI API (https://git.io/Jv4Rv). +type event struct { + Key string `json:"key"` + Value interface{} `json:"value"` +} + +// +// OONI Session +// +// For performing several operations, including running +// experiments, we need to create an OONI session. +// +// This section of the file defines the interface between +// our oonimkall API and the real session. +// +// The abstraction representing a OONI session is taskSession. +// + +// taskKVStoreFSBuilder constructs a KVStore with +// filesystem backing for running tests. +type taskKVStoreFSBuilder interface { + // NewFS creates a new KVStore using the filesystem. + NewFS(path string) (model.KeyValueStore, error) +} + +// taskSessionBuilder constructs a new Session. +type taskSessionBuilder interface { + // NewSession creates a new taskSession. + NewSession(ctx context.Context, + config engine.SessionConfig) (taskSession, error) +} + +// taskSession abstracts a OONI session. +type taskSession interface { + // A session can be closed. + io.Closer + + // NewExperimentBuilderByName creates the builder for constructing + // a new experiment given the experiment's name. + NewExperimentBuilderByName(name string) (taskExperimentBuilder, error) + + // MaybeLookupBackendsContext lookups the OONI backend unless + // this operation has already been performed. + MaybeLookupBackendsContext(ctx context.Context) error + + // MaybeLookupLocationContext lookups the probe location unless + // this operation has already been performed. + MaybeLookupLocationContext(ctx context.Context) error + + // ProbeIP must be called after MaybeLookupLocationContext + // and returns the resolved probe IP. + ProbeIP() string + + // ProbeASNString must be called after MaybeLookupLocationContext + // and returns the resolved probe ASN as a string. + ProbeASNString() string + + // ProbeCC must be called after MaybeLookupLocationContext + // and returns the resolved probe country code. + ProbeCC() string + + // ProbeNetworkName must be called after MaybeLookupLocationContext + // and returns the resolved probe country code. + ProbeNetworkName() string + + // ResolverANSString must be called after MaybeLookupLocationContext + // and returns the resolved resolver's ASN as a string. + ResolverASNString() string + + // ResolverIP must be called after MaybeLookupLocationContext + // and returns the resolved resolver's IP. + ResolverIP() string + + // ResolverNetworkName must be called after MaybeLookupLocationContext + // and returns the resolved resolver's network name. + ResolverNetworkName() string +} + +// taskExperimentBuilder builds a taskExperiment. +type taskExperimentBuilder interface { + // SetCallbacks sets the experiment callbacks. + SetCallbacks(callbacks model.ExperimentCallbacks) + + // InputPolicy returns the experiment's input policy. + InputPolicy() engine.InputPolicy + + // NewExperiment creates the new experiment. + NewExperimentInstance() taskExperiment + + // Interruptible returns whether this experiment is interruptible. + Interruptible() bool +} + +// taskExperiment is a runnable experiment. +type taskExperiment interface { + // KibiBytesReceived returns the KiB received by the experiment. + KibiBytesReceived() float64 + + // KibiBytesSent returns the KiB sent by the experiment. + KibiBytesSent() float64 + + // OpenReportContext opens a new report. + OpenReportContext(ctx context.Context) error + + // ReportID must be called after a successful OpenReportContext + // and returns the report ID for this measurement. + ReportID() string + + // MeasureWithContext runs the measurement. + MeasureWithContext(ctx context.Context, input string) ( + measurement *model.Measurement, err error) + + // SubmitAndUpdateMeasurementContext submits the measurement + // and updates its report ID on success. + SubmitAndUpdateMeasurementContext( + ctx context.Context, measurement *model.Measurement) error +} + +// +// Task Running +// +// This section contains the interfaces allowing us +// to run a task until completion. +// + +// taskRunner runs a task until completion. +type taskRunner interface { + // Run runs until completion. + Run(ctx context.Context) +} + +// +// Task Settings +// +// This section defines the settings used by a task. +// + +// Settings contains settings for a task. This structure derives from +// the one described by MK v0.10.9 FFI API (https://git.io/Jv4Rv), yet +// since 2020-12-03 we're not backwards compatible anymore. +type settings struct { + // Annotations contains the annotations to be added + // to every measurements performed by the task. + Annotations map[string]string `json:"annotations,omitempty"` + + // AssetsDir is the directory where to store assets. This + // field is an extension of MK's specification. If + // this field is empty, the task won't start. + AssetsDir string `json:"assets_dir"` + + // DisabledEvents contains disabled events. See + // https://git.io/Jv4Rv for the events names. + // + // This setting is currently ignored. We noticed the + // code was ignoring it on 2021-12-01. + DisabledEvents []string `json:"disabled_events,omitempty"` + + // Inputs contains the inputs. The task will fail if it + // requires input and you provide no input. + Inputs []string `json:"inputs,omitempty"` + + // LogLevel contains the logs level. See https://git.io/Jv4Rv + // for the names of the available log levels. + LogLevel string `json:"log_level,omitempty"` + + // Name contains the task name. By https://git.io/Jv4Rv the + // names are in camel case, e.g. `Ndt`. + Name string `json:"name"` + + // Options contains the task options. + Options settingsOptions `json:"options"` + + // Proxy allows you to optionally force a specific proxy + // rather than using no proxy (the default). + // + // Use `psiphon:///` to force using Psiphon with the + // embedded configuration file. Not all builds have + // an embedded configuration file, but OONI builds have + // such a file, so they can use this functionality. + // + // Use `socks5://10.0.0.1:9050/` to connect to a SOCKS5 + // proxy running on 10.0.0.1:9050. This could be, for + // example, a suitably configured `tor` instance. + Proxy string + + // StateDir is the directory where to store persistent data. This + // field is an extension of MK's specification. If + // this field is empty, the task won't start. + StateDir string `json:"state_dir"` + + // TempDir is the temporary directory. This field is an extension of MK's + // specification. If this field is empty, we will pick the tempdir that + // ioutil.TempDir uses by default, which may not work on mobile. According + // to our experiments as of 2020-06-10, leaving the TempDir empty works + // for iOS and does not work for Android. + TempDir string `json:"temp_dir"` + + // TunnelDir is the directory where to store persistent state + // related to circumvention tunnels. This directory is required + // only if you want to use the tunnels. Added since 3.10.0. + TunnelDir string `json:"tunnel_dir"` + + // Version indicates the version of this structure. + Version int64 `json:"version"` +} + +// settingsOptions contains the settings options +type settingsOptions struct { + // MaxRuntime is the maximum runtime expressed in seconds. A negative + // value for this field disables the maximum runtime. Using + // a zero value will also mean disabled. This is not the + // original behaviour of Measurement Kit, which used to run + // for zero time in such case. + MaxRuntime float64 `json:"max_runtime,omitempty"` + + // NoCollector indicates whether to use a collector + NoCollector bool `json:"no_collector,omitempty"` + + // ProbeServicesBaseURL contains the probe services base URL. + ProbeServicesBaseURL string `json:"probe_services_base_url,omitempty"` + + // SoftwareName is the software name. If this option is not + // present, then the library startup will fail. + SoftwareName string `json:"software_name,omitempty"` + + // SoftwareVersion is the software version. If this option is not + // present, then the library startup will fail. + SoftwareVersion string `json:"software_version,omitempty"` +} diff --git a/pkg/oonimkall/runner.go b/pkg/oonimkall/taskrunner.go similarity index 61% rename from pkg/oonimkall/runner.go rename to pkg/oonimkall/taskrunner.go index 316f115..da35737 100644 --- a/pkg/oonimkall/runner.go +++ b/pkg/oonimkall/taskrunner.go @@ -9,77 +9,46 @@ import ( "github.com/ooni/probe-cli/v3/internal/engine" "github.com/ooni/probe-cli/v3/internal/engine/model" - "github.com/ooni/probe-cli/v3/internal/kvstore" "github.com/ooni/probe-cli/v3/internal/runtimex" ) -const ( - failureIPLookup = "failure.ip_lookup" - failureASNLookup = "failure.asn_lookup" - failureCCLookup = "failure.cc_lookup" - failureMeasurement = "failure.measurement" - failureMeasurementSubmission = "failure.measurement_submission" - failureReportCreate = "failure.report_create" - failureResolverLookup = "failure.resolver_lookup" - failureStartup = "failure.startup" - measurement = "measurement" - statusEnd = "status.end" - statusGeoIPLookup = "status.geoip_lookup" - statusMeasurementDone = "status.measurement_done" - statusMeasurementStart = "status.measurement_start" - statusMeasurementSubmission = "status.measurement_submission" - statusProgress = "status.progress" - statusQueued = "status.queued" - statusReportCreate = "status.report_create" - statusResolverLookup = "status.resolver_lookup" - statusStarted = "status.started" -) - -// run runs the task specified by settings.Name until completion. This is the -// top-level API that should be called by oonimkall. -func run(ctx context.Context, settings *settings, out chan<- *event) { - eof := make(chan interface{}) - defer close(eof) // tell the emitter to not emit anymore. - r := newRunner(settings, out, eof) - r.Run(ctx) +// runnerForTask runs a specific task +type runnerForTask struct { + emitter *taskEmitterWrapper + kvStoreBuilder taskKVStoreFSBuilder + sessionBuilder taskSessionBuilder + settings *settings } -// runner runs a specific task -type runner struct { - emitter *eventEmitter - maybeLookupLocation func(*engine.Session) error - out chan<- *event - settings *settings -} +var _ taskRunner = &runnerForTask{} // newRunner creates a new task runner -func newRunner(settings *settings, out chan<- *event, eof <-chan interface{}) *runner { - return &runner{ - emitter: newEventEmitter(settings.DisabledEvents, out, eof), - out: out, - settings: settings, +func newRunner(settings *settings, emitter taskEmitter) *runnerForTask { + return &runnerForTask{ + emitter: &taskEmitterWrapper{emitter}, + kvStoreBuilder: &taskKVStoreFSBuilderEngine{}, + sessionBuilder: &taskSessionBuilderEngine{}, + settings: settings, } } // failureInvalidVersion is the failure returned when Version is invalid const failureInvalidVersion = "invalid Settings.Version number" -func (r *runner) hasUnsupportedSettings(logger *chanLogger) bool { - if r.settings.Version < 1 { +func (r *runnerForTask) hasUnsupportedSettings() bool { + if r.settings.Version < taskABIVersion { r.emitter.EmitFailureStartup(failureInvalidVersion) return true } return false } -func (r *runner) newsession(ctx context.Context, logger *chanLogger) (*engine.Session, error) { - kvstore, err := kvstore.NewFS(r.settings.StateDir) +func (r *runnerForTask) newsession(ctx context.Context, logger model.Logger) (taskSession, error) { + kvstore, err := r.kvStoreBuilder.NewFS(r.settings.StateDir) if err != nil { return nil, err } - // TODO(bassosimone): write tests for this functionality - // See https://github.com/ooni/probe/issues/1465. var proxyURL *url.URL if r.settings.Proxy != "" { var err error @@ -104,11 +73,11 @@ func (r *runner) newsession(ctx context.Context, logger *chanLogger) (*engine.Se Address: r.settings.Options.ProbeServicesBaseURL, }} } - return engine.NewSession(ctx, config) + return r.sessionBuilder.NewSession(ctx, config) } -func (r *runner) contextForExperiment( - ctx context.Context, builder *engine.ExperimentBuilder, +func (r *runnerForTask) contextForExperiment( + ctx context.Context, builder taskExperimentBuilder, ) context.Context { if builder.Interruptible() { return ctx @@ -117,11 +86,11 @@ func (r *runner) contextForExperiment( } type runnerCallbacks struct { - emitter *eventEmitter + emitter taskEmitter } func (cb *runnerCallbacks) OnProgress(percentage float64, message string) { - cb.emitter.Emit(statusProgress, eventStatusProgress{ + cb.emitter.Emit(eventTypeStatusProgress, eventStatusProgress{ Percentage: 0.4 + (percentage * 0.6), // open report is 40% Message: message, }) @@ -130,13 +99,14 @@ func (cb *runnerCallbacks) OnProgress(percentage float64, message string) { // Run runs the runner until completion. The context argument controls // when to stop when processing multiple inputs, as well as when to stop // experiments explicitly marked as interruptible. -func (r *runner) Run(ctx context.Context) { - logger := newChanLogger(r.emitter, r.settings.LogLevel, r.out) - r.emitter.Emit(statusQueued, eventEmpty{}) - if r.hasUnsupportedSettings(logger) { +func (r *runnerForTask) Run(ctx context.Context) { + var logger model.Logger = newTaskLogger(r.emitter, r.settings.LogLevel) + r.emitter.Emit(eventTypeStatusQueued, eventEmpty{}) + if r.hasUnsupportedSettings() { + // event failureStartup already emitted return } - r.emitter.Emit(statusStarted, eventEmpty{}) + r.emitter.Emit(eventTypeStatusStarted, eventEmpty{}) sess, err := r.newsession(ctx, logger) if err != nil { r.emitter.EmitFailureStartup(err.Error()) @@ -145,45 +115,39 @@ func (r *runner) Run(ctx context.Context) { endEvent := new(eventStatusEnd) defer func() { sess.Close() - r.emitter.Emit(statusEnd, endEvent) + r.emitter.Emit(eventTypeStatusEnd, endEvent) }() - builder, err := sess.NewExperimentBuilder(r.settings.Name) + builder, err := sess.NewExperimentBuilderByName(r.settings.Name) if err != nil { r.emitter.EmitFailureStartup(err.Error()) return } logger.Info("Looking up OONI backends... please, be patient") - if err := sess.MaybeLookupBackends(); err != nil { + if err := sess.MaybeLookupBackendsContext(ctx); err != nil { r.emitter.EmitFailureStartup(err.Error()) return } r.emitter.EmitStatusProgress(0.1, "contacted bouncer") logger.Info("Looking up your location... please, be patient") - maybeLookupLocation := r.maybeLookupLocation - if maybeLookupLocation == nil { - maybeLookupLocation = func(sess *engine.Session) error { - return sess.MaybeLookupLocation() - } - } - if err := maybeLookupLocation(sess); err != nil { - r.emitter.EmitFailureGeneric(failureIPLookup, err.Error()) - r.emitter.EmitFailureGeneric(failureASNLookup, err.Error()) - r.emitter.EmitFailureGeneric(failureCCLookup, err.Error()) - r.emitter.EmitFailureGeneric(failureResolverLookup, err.Error()) + if err := sess.MaybeLookupLocationContext(ctx); err != nil { + r.emitter.EmitFailureGeneric(eventTypeFailureIPLookup, err.Error()) + r.emitter.EmitFailureGeneric(eventTypeFailureASNLookup, err.Error()) + r.emitter.EmitFailureGeneric(eventTypeFailureCCLookup, err.Error()) + r.emitter.EmitFailureGeneric(eventTypeFailureResolverLookup, err.Error()) return } r.emitter.EmitStatusProgress(0.2, "geoip lookup") r.emitter.EmitStatusProgress(0.3, "resolver lookup") - r.emitter.Emit(statusGeoIPLookup, eventStatusGeoIPLookup{ + r.emitter.Emit(eventTypeStatusGeoIPLookup, eventStatusGeoIPLookup{ ProbeIP: sess.ProbeIP(), ProbeASN: sess.ProbeASNString(), ProbeCC: sess.ProbeCC(), ProbeNetworkName: sess.ProbeNetworkName(), }) - r.emitter.Emit(statusResolverLookup, eventStatusResolverLookup{ + r.emitter.Emit(eventTypeStatusResolverLookup, eventStatusResolverLookup{ ResolverASN: sess.ResolverASNString(), ResolverIP: sess.ResolverIP(), ResolverNetworkName: sess.ResolverNetworkName(), @@ -198,19 +162,19 @@ func (r *runner) Run(ctx context.Context) { } r.settings.Inputs = append(r.settings.Inputs, "") } - experiment := builder.NewExperiment() + experiment := builder.NewExperimentInstance() defer func() { endEvent.DownloadedKB = experiment.KibiBytesReceived() endEvent.UploadedKB = experiment.KibiBytesSent() }() if !r.settings.Options.NoCollector { logger.Info("Opening report... please, be patient") - if err := experiment.OpenReport(); err != nil { - r.emitter.EmitFailureGeneric(failureReportCreate, err.Error()) + if err := experiment.OpenReportContext(ctx); err != nil { + r.emitter.EmitFailureGeneric(eventTypeFailureReportCreate, err.Error()) return } r.emitter.EmitStatusProgress(0.4, "open report") - r.emitter.Emit(statusReportCreate, eventStatusReportGeneric{ + r.emitter.Emit(eventTypeStatusReportCreate, eventStatusReportGeneric{ ReportID: experiment.ReportID(), }) } @@ -242,7 +206,7 @@ func (r *runner) Run(ctx context.Context) { break } logger.Infof("Starting measurement with index %d", idx) - r.emitter.Emit(statusMeasurementStart, eventMeasurementGeneric{ + r.emitter.Emit(eventTypeStatusMeasurementStart, eventMeasurementGeneric{ Idx: int64(idx), Input: input, }) @@ -269,7 +233,7 @@ func (r *runner) Run(ctx context.Context) { } m.AddAnnotations(r.settings.Annotations) if err != nil { - r.emitter.Emit(failureMeasurement, eventMeasurementGeneric{ + r.emitter.Emit(eventTypeFailureMeasurement, eventMeasurementGeneric{ Failure: err.Error(), Idx: int64(idx), Input: input, @@ -282,14 +246,14 @@ func (r *runner) Run(ctx context.Context) { } data, err := json.Marshal(m) runtimex.PanicOnError(err, "measurement.MarshalJSON failed") - r.emitter.Emit(measurement, eventMeasurementGeneric{ + r.emitter.Emit(eventTypeMeasurement, eventMeasurementGeneric{ Idx: int64(idx), Input: input, JSONStr: string(data), }) if !r.settings.Options.NoCollector { logger.Info("Submitting measurement... please, be patient") - err := experiment.SubmitAndUpdateMeasurement(m) + err := experiment.SubmitAndUpdateMeasurementContext(ctx, m) r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{ Idx: int64(idx), Input: input, @@ -297,7 +261,7 @@ func (r *runner) Run(ctx context.Context) { Failure: measurementSubmissionFailure(err), }) } - r.emitter.Emit(statusMeasurementDone, eventMeasurementGeneric{ + r.emitter.Emit(eventTypeStatusMeasurementDone, eventMeasurementGeneric{ Idx: int64(idx), Input: input, }) @@ -306,9 +270,9 @@ func (r *runner) Run(ctx context.Context) { func measurementSubmissionEventName(err error) string { if err != nil { - return failureMeasurementSubmission + return eventTypeFailureMeasurementSubmission } - return statusMeasurementSubmission + return eventTypeStatusMeasurementSubmission } func measurementSubmissionFailure(err error) string { diff --git a/pkg/oonimkall/taskrunner_test.go b/pkg/oonimkall/taskrunner_test.go new file mode 100644 index 0000000..fc34f19 --- /dev/null +++ b/pkg/oonimkall/taskrunner_test.go @@ -0,0 +1,741 @@ +package oonimkall + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + engine "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/engine/model" +) + +func TestMeasurementSubmissionEventName(t *testing.T) { + if measurementSubmissionEventName(nil) != eventTypeStatusMeasurementSubmission { + t.Fatal("unexpected submission event name") + } + if measurementSubmissionEventName(errors.New("mocked error")) != eventTypeFailureMeasurementSubmission { + t.Fatal("unexpected submission event name") + } +} + +func TestMeasurementSubmissionFailure(t *testing.T) { + if measurementSubmissionFailure(nil) != "" { + t.Fatal("unexpected submission failure") + } + if measurementSubmissionFailure(errors.New("mocked error")) != "mocked error" { + t.Fatal("unexpected submission failure") + } +} + +func TestTaskRunnerRun(t *testing.T) { + + // newRunnerForTesting is a factory for creating a new + // runner that wraps newRunner and also sets a specific + // taskSessionBuilder for testing purposes. + newRunnerForTesting := func() (*runnerForTask, *CollectorTaskEmitter) { + settings := &settings{ + Name: "Example", + Options: settingsOptions{ + SoftwareName: "oonimkall-test", + SoftwareVersion: "0.1.0", + }, + StateDir: "testdata/state", + Version: 1, + } + e := &CollectorTaskEmitter{} + r := newRunner(settings, e) + return r, e + } + + // runAndCollectContext runs the task until completion + // and collects the emitted events. Remember that + // it's not race safe to modify the events. + runAndCollectContext := func(ctx context.Context, r taskRunner, e *CollectorTaskEmitter) []*event { + r.Run(ctx) + return e.Collect() + } + + // runAndCollect is like runAndCollectContext + // but uses context.Background() + runAndCollect := func(r taskRunner, e *CollectorTaskEmitter) []*event { + return runAndCollectContext(context.Background(), r, e) + } + + // countEventsByKey returns the number of events + // with the given key inside of the list. + countEventsByKey := func(events []*event, key string) (count int) { + for _, ev := range events { + if ev.Key == key { + count++ + } + } + return + } + + // assertCountEventsByKey fails is the number of events + // of the given type is not the expected one. + assertCountEventsByKey := func(events []*event, key string, count int) { + if countEventsByKey(events, key) != count { + t.Fatalf("unexpected number of '%s' events", key) + } + } + + t.Run("with unsupported settings", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + runner.settings.Version = 0 // force unsupported version + events := runAndCollect(runner, emitter) + assertCountEventsByKey(events, eventTypeFailureStartup, 1) + }) + + t.Run("with failure when creating a new kvstore", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + // override the kvstore builder to provoke an error + runner.kvStoreBuilder = &MockableKVStoreFSBuilder{ + MockNewFS: func(path string) (model.KeyValueStore, error) { + return nil, errors.New("generic error") + }, + } + events := runAndCollect(runner, emitter) + assertCountEventsByKey(events, eventTypeFailureStartup, 1) + }) + + t.Run("with unparsable proxyURL", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + runner.settings.Proxy = "\t" // invalid proxy URL + events := runAndCollect(runner, emitter) + assertCountEventsByKey(events, eventTypeFailureStartup, 1) + }) + + t.Run("with a parsable proxyURL", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + // set a valid URL + runner.settings.Proxy = "https://127.0.0.1/" + // set a fake session builder that causes the startup to + // fail but records the config passed to NewSession + saver := &SessionBuilderConfigSaver{} + runner.sessionBuilder = saver + events := runAndCollect(runner, emitter) + assertCountEventsByKey(events, eventTypeFailureStartup, 1) + if saver.Config.ProxyURL.String() != runner.settings.Proxy { + t.Fatal("invalid proxy URL") + } + }) + + t.Run("with custom probe services URL", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + // set a probe services URL + runner.settings.Options.ProbeServicesBaseURL = "https://127.0.0.1" + // set a fake session builder that causes the startup to + // fail but records the config passed to NewSession + saver := &SessionBuilderConfigSaver{} + runner.sessionBuilder = saver + events := runAndCollect(runner, emitter) + assertCountEventsByKey(events, eventTypeFailureStartup, 1) + psu := saver.Config.AvailableProbeServices + if len(psu) != 1 { + t.Fatal("invalid length") + } + if psu[0].Type != "https" { + t.Fatal("invalid type") + } + if psu[0].Address != runner.settings.Options.ProbeServicesBaseURL { + t.Fatal("invalid address") + } + if psu[0].Front != "" { + t.Fatal("invalid front") + } + }) + + type eventKeyCount struct { + Key string + Count int + } + + // reduceEventsKeysIgnoreLog reduces the list of event keys + // counting equal subsequent keys and ignoring log events + reduceEventsKeysIgnoreLog := func(events []*event) (out []eventKeyCount) { + var current eventKeyCount + for _, ev := range events { + if ev.Key == eventTypeLog { + continue + } + if current.Key == ev.Key { + current.Count++ + continue + } + if current.Key != "" { + out = append(out, current) + } + current.Key = ev.Key + current.Count = 1 + } + if current.Key != "" { + out = append(out, current) + } + return + } + + // fakeSuccessfulRun returns a new set of dependencies that + // will perform a fully successful, but fake, run. + fakeSuccessfulRun := func() *MockableTaskRunnerDependencies { + return &MockableTaskRunnerDependencies{ + MockableKibiBytesReceived: func() float64 { + return 10 + }, + MockableKibiBytesSent: func() float64 { + return 4 + }, + MockableOpenReportContext: func(ctx context.Context) error { + return nil + }, + MockableReportID: func() string { + return "20211202T074907Z_example_IT_30722_n1_axDLHNUfJaV1IbuU" + }, + MockableMeasureWithContext: func(ctx context.Context, input string) (*model.Measurement, error) { + return &model.Measurement{}, nil + }, + MockableSubmitAndUpdateMeasurementContext: func(ctx context.Context, measurement *model.Measurement) error { + return nil + }, + MockableSetCallbacks: func(callbacks model.ExperimentCallbacks) { + }, + MockableInputPolicy: func() engine.InputPolicy { + return engine.InputNone + }, + MockableInterruptible: func() bool { + return false + }, + MockClose: func() error { + return nil + }, + MockMaybeLookupBackendsContext: func(ctx context.Context) error { + return nil + }, + MockMaybeLookupLocationContext: func(ctx context.Context) error { + return nil + }, + MockProbeIP: func() string { + return "130.192.91.211" + }, + MockProbeASNString: func() string { + return "AS137" + }, + MockProbeCC: func() string { + return "IT" + }, + MockProbeNetworkName: func() string { + return "GARR" + }, + MockResolverASNString: func() string { + return "AS137" + }, + MockResolverIP: func() string { + return "130.192.3.24" + }, + MockResolverNetworkName: func() string { + return "GARR" + }, + } + } + + assertReducedEventsLike := func(t *testing.T, expected, got []eventKeyCount) { + if diff := cmp.Diff(expected, got); diff != "" { + t.Fatal(diff) + } + } + + t.Run("with invalid experiment name", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockNewExperimentBuilderByName = func(name string) (taskExperimentBuilder, error) { + return nil, errors.New("invalid experiment name") + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeFailureStartup, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with error during backends lookup", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockMaybeLookupBackendsContext = func(ctx context.Context) error { + return errors.New("mocked error") + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeFailureStartup, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with error during location lookup", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockMaybeLookupLocationContext = func(ctx context.Context) error { + return errors.New("mocked error") + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 1, + }, { + Key: eventTypeFailureIPLookup, + Count: 1, + }, { + Key: eventTypeFailureASNLookup, + Count: 1, + }, { + Key: eventTypeFailureCCLookup, + Count: 1, + }, { + Key: eventTypeFailureResolverLookup, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with missing input and input or query backend", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockableInputPolicy = func() engine.InputPolicy { + return engine.InputOrQueryBackend + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 3, + }, { + Key: eventTypeStatusGeoIPLookup, + Count: 1, + }, { + Key: eventTypeStatusResolverLookup, + Count: 1, + }, { + Key: eventTypeFailureStartup, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with missing input and input strictly required", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockableInputPolicy = func() engine.InputPolicy { + return engine.InputStrictlyRequired + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 3, + }, { + Key: eventTypeStatusGeoIPLookup, + Count: 1, + }, { + Key: eventTypeStatusResolverLookup, + Count: 1, + }, { + Key: eventTypeFailureStartup, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with failure opening report", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockableOpenReportContext = func(ctx context.Context) error { + return errors.New("mocked error") + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 3, + }, { + Key: eventTypeStatusGeoIPLookup, + Count: 1, + }, { + Key: eventTypeStatusResolverLookup, + Count: 1, + }, { + Key: eventTypeFailureReportCreate, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with success and no input", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 3, + }, { + Key: eventTypeStatusGeoIPLookup, + Count: 1, + }, { + Key: eventTypeStatusResolverLookup, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 1, + }, { + Key: eventTypeStatusReportCreate, + Count: 1, + }, { + Key: eventTypeStatusMeasurementStart, + Count: 1, + }, { + Key: eventTypeMeasurement, + Count: 1, + }, { + Key: eventTypeStatusMeasurementSubmission, + Count: 1, + }, { + Key: eventTypeStatusMeasurementDone, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with measurement failure and no input", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + fake.MockableMeasureWithContext = func(ctx context.Context, input string) (measurement *model.Measurement, err error) { + return nil, errors.New("preconditions error") + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 3, + }, { + Key: eventTypeStatusGeoIPLookup, + Count: 1, + }, { + Key: eventTypeStatusResolverLookup, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 1, + }, { + Key: eventTypeStatusReportCreate, + Count: 1, + }, { + Key: eventTypeStatusMeasurementStart, + Count: 1, + }, { + Key: eventTypeFailureMeasurement, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with success and input", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + runner.settings.Inputs = []string{"a", "b", "c", "d"} + fake := fakeSuccessfulRun() + fake.MockableInputPolicy = func() engine.InputPolicy { + return engine.InputStrictlyRequired + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{ + {Key: eventTypeStatusQueued, Count: 1}, + {Key: eventTypeStatusStarted, Count: 1}, + {Key: eventTypeStatusProgress, Count: 3}, + {Key: eventTypeStatusGeoIPLookup, Count: 1}, + {Key: eventTypeStatusResolverLookup, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeStatusReportCreate, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeStatusMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeStatusMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeStatusMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeStatusMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusEnd, Count: 1}, + } + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with succes and max runtime", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + runner.settings.Inputs = []string{"a", "b", "c", "d"} + runner.settings.Options.MaxRuntime = 2 + fake := fakeSuccessfulRun() + fake.MockableInputPolicy = func() engine.InputPolicy { + return engine.InputStrictlyRequired + } + fake.MockableMeasureWithContext = func(ctx context.Context, input string) (measurement *model.Measurement, err error) { + time.Sleep(1 * time.Second) + return &model.Measurement{}, nil + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{ + {Key: eventTypeStatusQueued, Count: 1}, + {Key: eventTypeStatusStarted, Count: 1}, + {Key: eventTypeStatusProgress, Count: 3}, + {Key: eventTypeStatusGeoIPLookup, Count: 1}, + {Key: eventTypeStatusResolverLookup, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeStatusReportCreate, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeStatusMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeStatusMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusEnd, Count: 1}, + } + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with interrupted experiment", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + runner.settings.Inputs = []string{"a", "b", "c", "d"} + runner.settings.Options.MaxRuntime = 2 + fake := fakeSuccessfulRun() + fake.MockableInputPolicy = func() engine.InputPolicy { + return engine.InputStrictlyRequired + } + fake.MockableInterruptible = func() bool { + return true + } + ctx, cancel := context.WithCancel(context.Background()) + fake.MockableMeasureWithContext = func(ctx context.Context, input string) (measurement *model.Measurement, err error) { + cancel() + return &model.Measurement{}, nil + } + runner.sessionBuilder = fake + events := runAndCollectContext(ctx, runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{ + {Key: eventTypeStatusQueued, Count: 1}, + {Key: eventTypeStatusStarted, Count: 1}, + {Key: eventTypeStatusProgress, Count: 3}, + {Key: eventTypeStatusGeoIPLookup, Count: 1}, + {Key: eventTypeStatusResolverLookup, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeStatusReportCreate, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + // + {Key: eventTypeStatusEnd, Count: 1}, + } + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with measurement submission failure", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + runner.settings.Inputs = []string{"a"} + fake := fakeSuccessfulRun() + fake.MockableInputPolicy = func() engine.InputPolicy { + return engine.InputStrictlyRequired + } + fake.MockableSubmitAndUpdateMeasurementContext = func(ctx context.Context, measurement *model.Measurement) error { + return errors.New("cannot submit") + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{ + {Key: eventTypeStatusQueued, Count: 1}, + {Key: eventTypeStatusStarted, Count: 1}, + {Key: eventTypeStatusProgress, Count: 3}, + {Key: eventTypeStatusGeoIPLookup, Count: 1}, + {Key: eventTypeStatusResolverLookup, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeStatusReportCreate, Count: 1}, + // + {Key: eventTypeStatusMeasurementStart, Count: 1}, + {Key: eventTypeStatusProgress, Count: 1}, + {Key: eventTypeMeasurement, Count: 1}, + {Key: eventTypeFailureMeasurementSubmission, Count: 1}, + {Key: eventTypeStatusMeasurementDone, Count: 1}, + // + {Key: eventTypeStatusEnd, Count: 1}, + } + assertReducedEventsLike(t, expect, reduced) + }) + + t.Run("with success and progress", func(t *testing.T) { + runner, emitter := newRunnerForTesting() + fake := fakeSuccessfulRun() + var callbacks model.ExperimentCallbacks + fake.MockableSetCallbacks = func(cbs model.ExperimentCallbacks) { + callbacks = cbs + } + fake.MockableMeasureWithContext = func(ctx context.Context, input string) (measurement *model.Measurement, err error) { + callbacks.OnProgress(1, "hello from the fake experiment") + return &model.Measurement{}, nil + } + runner.sessionBuilder = fake + events := runAndCollect(runner, emitter) + reduced := reduceEventsKeysIgnoreLog(events) + expect := []eventKeyCount{{ + Key: eventTypeStatusQueued, + Count: 1, + }, { + Key: eventTypeStatusStarted, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 3, + }, { + Key: eventTypeStatusGeoIPLookup, + Count: 1, + }, { + Key: eventTypeStatusResolverLookup, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 1, + }, { + Key: eventTypeStatusReportCreate, + Count: 1, + }, { + Key: eventTypeStatusMeasurementStart, + Count: 1, + }, { + Key: eventTypeStatusProgress, + Count: 1, + }, { + Key: eventTypeMeasurement, + Count: 1, + }, { + Key: eventTypeStatusMeasurementSubmission, + Count: 1, + }, { + Key: eventTypeStatusMeasurementDone, + Count: 1, + }, { + Key: eventTypeStatusEnd, + Count: 1, + }} + assertReducedEventsLike(t, expect, reduced) + }) +} diff --git a/pkg/oonimkall/tasksession.go b/pkg/oonimkall/tasksession.go new file mode 100644 index 0000000..c6d31f7 --- /dev/null +++ b/pkg/oonimkall/tasksession.go @@ -0,0 +1,78 @@ +package oonimkall + +import ( + "context" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/engine/model" + "github.com/ooni/probe-cli/v3/internal/kvstore" +) + +// +// This file implements taskSession and derived types. +// + +// taskKVStoreFSBuilderEngine creates a new KVStore +// using the ./internal/engine package. +type taskKVStoreFSBuilderEngine struct{} + +var _ taskKVStoreFSBuilder = &taskKVStoreFSBuilderEngine{} + +func (*taskKVStoreFSBuilderEngine) NewFS(path string) (model.KeyValueStore, error) { + return kvstore.NewFS(path) +} + +// taskSessionBuilderEngine builds a new session +// using the ./internal/engine package. +type taskSessionBuilderEngine struct{} + +var _ taskSessionBuilder = &taskSessionBuilderEngine{} + +// NewSession implements taskSessionBuilder.NewSession. +func (b *taskSessionBuilderEngine) NewSession(ctx context.Context, + config engine.SessionConfig) (taskSession, error) { + sess, err := engine.NewSession(ctx, config) + if err != nil { + return nil, err + } + return &taskSessionEngine{sess}, nil +} + +// taskSessionEngine wraps ./internal/engine's Session. +type taskSessionEngine struct { + *engine.Session +} + +var _ taskSession = &taskSessionEngine{} + +// NewExperimentBuilderByName implements +// taskSessionEngine.NewExperimentBuilderByName. +func (sess *taskSessionEngine) NewExperimentBuilderByName( + name string) (taskExperimentBuilder, error) { + builder, err := sess.NewExperimentBuilder(name) + if err != nil { + return nil, err + } + return &taskExperimentBuilderEngine{builder}, err +} + +// taskExperimentBuilderEngine wraps ./internal/engine's +// ExperimentBuilder type. +type taskExperimentBuilderEngine struct { + *engine.ExperimentBuilder +} + +var _ taskExperimentBuilder = &taskExperimentBuilderEngine{} + +// NewExperimentInstance implements +// taskExperimentBuilder.NewExperimentInstance. +func (b *taskExperimentBuilderEngine) NewExperimentInstance() taskExperiment { + return &taskExperimentEngine{b.NewExperiment()} +} + +// taskExperimentEngine wraps ./internal/engine's Experiment. +type taskExperimentEngine struct { + *engine.Experiment +} + +var _ taskExperiment = &taskExperimentEngine{} diff --git a/pkg/oonimkall/tasksession_test.go b/pkg/oonimkall/tasksession_test.go new file mode 100644 index 0000000..f525491 --- /dev/null +++ b/pkg/oonimkall/tasksession_test.go @@ -0,0 +1,128 @@ +package oonimkall + +import ( + "context" + "testing" + + "github.com/apex/log" + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/version" +) + +func TestTaskKVSToreFSBuilderEngine(t *testing.T) { + b := &taskKVStoreFSBuilderEngine{} + store, err := b.NewFS("testdata/state") + if err != nil { + t.Fatal(err) + } + if store == nil { + t.Fatal("expected non-nil store here") + } +} + +func TestTaskSessionBuilderEngine(t *testing.T) { + t.Run("NewSession", func(t *testing.T) { + t.Run("on success", func(t *testing.T) { + builder := &taskSessionBuilderEngine{} + ctx := context.Background() + config := engine.SessionConfig{ + Logger: log.Log, + SoftwareName: "ooniprobe-cli", + SoftwareVersion: version.Version, + } + sess, err := builder.NewSession(ctx, config) + if err != nil { + t.Fatal(err) + } + sess.Close() + }) + + t.Run("on failure", func(t *testing.T) { + builder := &taskSessionBuilderEngine{} + ctx := context.Background() + config := engine.SessionConfig{} + sess, err := builder.NewSession(ctx, config) + if err == nil { + t.Fatal("expected an error here") + } + if sess != nil { + t.Fatal("expected nil session here") + } + }) + }) +} + +func TestTaskSessionEngine(t *testing.T) { + + // newSession is a helper function for creating a new session. + newSession := func(t *testing.T) taskSession { + builder := &taskSessionBuilderEngine{} + ctx := context.Background() + config := engine.SessionConfig{ + Logger: log.Log, + SoftwareName: "ooniprobe-cli", + SoftwareVersion: version.Version, + } + sess, err := builder.NewSession(ctx, config) + if err != nil { + t.Fatal(err) + } + return sess + } + + t.Run("NewExperimentBuilderByName", func(t *testing.T) { + t.Run("on success", func(t *testing.T) { + sess := newSession(t) + builder, err := sess.NewExperimentBuilderByName("ndt") + if err != nil { + t.Fatal(err) + } + if builder == nil { + t.Fatal("expected non-nil builder") + } + }) + + t.Run("on failure", func(t *testing.T) { + sess := newSession(t) + builder, err := sess.NewExperimentBuilderByName("antani") + if err == nil { + t.Fatal("expected an error here") + } + if builder != nil { + t.Fatal("expected nil builder") + } + }) + }) +} + +func TestTaskExperimentBuilderEngine(t *testing.T) { + + // newBuilder is a helper function for creating a new session + // as well as a new experiment builder + newBuilder := func(t *testing.T) (taskSession, taskExperimentBuilder) { + builder := &taskSessionBuilderEngine{} + ctx := context.Background() + config := engine.SessionConfig{ + Logger: log.Log, + SoftwareName: "ooniprobe-cli", + SoftwareVersion: version.Version, + } + sess, err := builder.NewSession(ctx, config) + if err != nil { + t.Fatal(err) + } + expBuilder, err := sess.NewExperimentBuilderByName("ndt") + if err != nil { + t.Fatal(err) + } + return sess, expBuilder + } + + t.Run("NewExperiment", func(t *testing.T) { + _, builder := newBuilder(t) + exp := builder.NewExperimentInstance() + if exp == nil { + t.Fatal("expected non-nil experiment here") + } + }) +}