forwardport: pull the patches mentioned in ooni/probe#1908 (#629)

* [forwardport] fix(oonimkall): make logger used by tasks unit testable (#623)

This diff forward ports e4b04642c51e7461728b25941624e1b97ef0ec83.

Reference issue: https://github.com/ooni/probe/issues/1903

* [forwardport] feat(oonimkall): improve taskEmitter testability (#624)

This diff forward ports 3e0f01a389c1f4cdd7878ec151aff91870a0bdff.

1. rename eventemitter{,_test}.go => taskemitter{,_test}.go because
the new name is more proper after we merged the internal/task package
inside of the oonimkall package;

2. rename runner.go's `run` function to `runTask`;

3. modify `runTask` to use the new `taskEmitterUsingChan` abstraction
on which we will spend more works in a later point of this list;

4. introduce `runTaskWithEmitter` factory that is called by `runTask`
and allows us to more easily write unit tests;

5. acknowledge that `runner` was not using its `out` field;

6. use the new `taskEmitterWrapper` in `newRunner`;

7. acknowledge that `runnerCallbacks` could use a generic
`taskEmitter` as field type rather than a specific type;

8. rewrite tests to use `runTaskWithEmitter` which leads to
simpler code that does not require a goroutine;

9. acknowledge that the code has been ignoring the `DisabledEvents`
settings for quite some time, so stop supporting it;

10. refactor the `taskEmitter` implementation to be like:

    1. we still have the `taskEmitter` interface;

    2. `taskEmitterUsingChan` wraps the channel and allows for
    emitting events using the channel;

    3. `taskEmitterUsingChan` owns an `eof` channel that is
    closed by `Close` (which is idempotent) and signals we
    should be stop emitting;

    4. make sure `runTask` creates a `taskEmitterUsingChan`
    and calls its `Close` method when done;

    5. completely remove the code for disabling events
    since the code was actually ignoring the stting;

    6. add a `taskEmitterWrapper` that adds common functions
    for emitting events to _any_ `taskWrapper`;

    7. write unit tests for `taskEmitterUsingChan` and
    for `taskEmitterWrapper`;

11. acknowledge that the abstraction we need for testing is
actually a thread-safe thing that collects events into a
vector containing events and refactor all tests accordingly.

See https://github.com/ooni/probe/issues/1903

* [forwardport] refactor(oonimkall): make the runner unit-testable (#625)

This diff forward ports 9423947faf6980d92d2fe67efe3829e8fef76586.

See https://github.com/ooni/probe/issues/1903

* [forwardport] feat(oonimkall): write unit tests for the runner component (#626)

This diff forward ports 35dd0e3788b8fa99c541452bbb5e0ae4871239e1.

Forward porting note: compared to 35dd0e3788b8fa99c541452bbb5e0ae4871239e1,
the diff I'm committing here is slightly different. In `master` we do not
have the case where a measurement fails and a measurement is returned, thus
I needed to adapt the test to become like this:

```diff
diff --git a/pkg/oonimkall/runner_internal_test.go b/pkg/oonimkall/runner_internal_test.go
index 334b574..84c7436 100644
--- a/pkg/oonimkall/runner_internal_test.go
+++ b/pkg/oonimkall/runner_internal_test.go
@@ -568,15 +568,6 @@ func TestTaskRunnerRun(t *testing.T) {
                }, {
                        Key:   failureMeasurement,
                        Count: 1,
-               }, {
-                       Key:   measurement,
-                       Count: 1,
-               }, {
-                       Key:   statusMeasurementSubmission,
-                       Count: 1,
-               }, {
-                       Key:   statusMeasurementDone,
-                       Count: 1,
                }, {
                        Key:   statusEnd,
                        Count: 1,
```

I still need to write more assertions for each emitted event
but the code we've here is already a great starting point.

See https://github.com/ooni/probe/issues/1903

* [forwardport] refactor(oonimkall): merge files, use proper names, zap unneeded integration tests (#627)

This diff forward ports f894427d24edc9a03fc78306d0093e7b51c46c25.

Forward porting note: this diff is slightly different from the original
mentioned above because it carries forward changes mentioned in the
previous diff caused by a different way of handling a failed measurement
in the master branch compared to the release/3.11 branch.

Move everything that looked like "task's model" inside of the
taskmodel.go file, for consistency.

Make sure it's clear some variables are event types.

Rename the concrete `runner` as `runnerForTask`.

Also, remove now-unnecessary (and flaky!) integration tests
for the `runnerForTask` type.

While there, notice there were wrong URLs that were generated
during the probe-engine => probe-cli move and fix them.

See https://github.com/ooni/probe/issues/1903

* [forwardport] refactor(oonimkall): we can simplify StartTask tests (#628)

This diff forward ports dcf2986c2032d8185d58d24130a7f2c2d61ef2fb.

* refactor(oonimkall): we can simplify StartTask tests

We have enough checks for runnerForTask. So we do not need to
duplicate them when checking for StartTask.

While there, refactor how we start tasks to remove the need for
extra runner functions.

This is the objective I wanted to achieve for oonimkall:

1. less duplicate tests, and

2. more unit tests (which are less flaky)

At this point, we're basically done (pending forwardporting to
master) with https://github.com/ooni/probe/issues/1903.

* fix(oonimkall): TestStartTaskGood shouldn't cancel the test

This creates a race condition where the test may fail if we cannot
complete the whole "Example" test in less than one second.

This should explain the build failures I've seen so far and why
I didn't see those failures when running locally.
This commit is contained in:
Simone Basso 2021-12-02 12:47:07 +01:00 committed by GitHub
parent 120f2b9fbf
commit 9cdca4137d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 2167 additions and 1522 deletions

View File

@ -69,59 +69,6 @@ var experimentsByName = map[string]func(*Session) *ExperimentBuilder{
}
},
"example_with_input": func(session *Session) *ExperimentBuilder {
return &ExperimentBuilder{
build: func(config interface{}) *Experiment {
return NewExperiment(session, example.NewExperimentMeasurer(
*config.(*example.Config), "example_with_input",
))
},
config: &example.Config{
Message: "Good day from the example with input experiment!",
SleepTime: int64(time.Second),
},
interruptible: true,
inputPolicy: InputStrictlyRequired,
}
},
// TODO(bassosimone): when we can set experiment options using the JSON
// we need to get rid of all these multiple experiments.
//
// See https://github.com/ooni/probe-engine/issues/413
"example_with_input_non_interruptible": func(session *Session) *ExperimentBuilder {
return &ExperimentBuilder{
build: func(config interface{}) *Experiment {
return NewExperiment(session, example.NewExperimentMeasurer(
*config.(*example.Config), "example_with_input_non_interruptible",
))
},
config: &example.Config{
Message: "Good day from the example with input experiment!",
SleepTime: int64(time.Second),
},
interruptible: false,
inputPolicy: InputStrictlyRequired,
}
},
"example_with_failure": func(session *Session) *ExperimentBuilder {
return &ExperimentBuilder{
build: func(config interface{}) *Experiment {
return NewExperiment(session, example.NewExperimentMeasurer(
*config.(*example.Config), "example_with_failure",
))
},
config: &example.Config{
Message: "Good day from the example with failure experiment!",
ReturnError: true,
SleepTime: int64(time.Second),
},
interruptible: true,
inputPolicy: InputNone,
}
},
"facebook_messenger": func(session *Session) *ExperimentBuilder {
return &ExperimentBuilder{
build: func(config interface{}) *Experiment {

View File

@ -158,7 +158,7 @@ func (r runner) measure(
// connecting or later. We cannot say that very precisely
// because, in principle, we may reconnect. So we always
// return error here. This comment is being introduced so
// that we don't do https://github.com/ooni/probe-cli/v3/internal/engine/pull/526
// that we don't do https://github.com/ooni/probe-engine/pull/526
// again, because that isn't accurate.
return err
}

View File

@ -28,7 +28,7 @@ func testresolverquick(t *testing.T, network, address string) {
}
var foundquad8 bool
for _, addr := range addrs {
// See https://github.com/ooni/probe-cli/v3/internal/engine/pull/954/checks?check_run_id=1182269025
// See https://github.com/ooni/probe-engine/pull/954/checks?check_run_id=1182269025
if addr == "8.8.8.8" || addr == "2001:4860:4860::8888" {
foundquad8 = true
}

View File

@ -206,7 +206,7 @@ func TestNewRequestList(t *testing.T) {
}},
}, {
// for an example of why we need to sort headers, see
// https://github.com/ooni/probe-cli/v3/internal/engine/pull/751/checks?check_run_id=853562310
// https://github.com/ooni/probe-engine/pull/751/checks?check_run_id=853562310
name: "run with redirect and headers to sort",
args: args{
begin: begin,

View File

@ -32,7 +32,7 @@ func testresolverquick(t *testing.T, reso resolver.Resolver) {
}
var foundquad8 bool
for _, addr := range addrs {
// See https://github.com/ooni/probe-cli/v3/internal/engine/pull/954/checks?check_run_id=1182269025
// See https://github.com/ooni/probe-engine/pull/954/checks?check_run_id=1182269025
if addr == "8.8.8.8" || addr == "2001:4860:4860::8888" {
foundquad8 = true
}

View File

@ -1,85 +0,0 @@
package oonimkall
import "fmt"
// chanLogger is a logger targeting a channel
type chanLogger struct {
emitter *eventEmitter
hasdebug bool
hasinfo bool
haswarning bool
out chan<- *event
}
// Debug implements Logger.Debug
func (cl *chanLogger) Debug(msg string) {
if cl.hasdebug {
cl.emitter.Emit("log", eventLog{
LogLevel: "DEBUG",
Message: msg,
})
}
}
// Debugf implements Logger.Debugf
func (cl *chanLogger) Debugf(format string, v ...interface{}) {
if cl.hasdebug {
cl.Debug(fmt.Sprintf(format, v...))
}
}
// Info implements Logger.Info
func (cl *chanLogger) Info(msg string) {
if cl.hasinfo {
cl.emitter.Emit("log", eventLog{
LogLevel: "INFO",
Message: msg,
})
}
}
// Infof implements Logger.Infof
func (cl *chanLogger) Infof(format string, v ...interface{}) {
if cl.hasinfo {
cl.Info(fmt.Sprintf(format, v...))
}
}
// Warn implements Logger.Warn
func (cl *chanLogger) Warn(msg string) {
if cl.haswarning {
cl.emitter.Emit("log", eventLog{
LogLevel: "WARNING",
Message: msg,
})
}
}
// Warnf implements Logger.Warnf
func (cl *chanLogger) Warnf(format string, v ...interface{}) {
if cl.haswarning {
cl.Warn(fmt.Sprintf(format, v...))
}
}
// newChanLogger creates a new ChanLogger instance.
func newChanLogger(emitter *eventEmitter, logLevel string,
out chan<- *event) *chanLogger {
cl := &chanLogger{
emitter: emitter,
out: out,
}
switch logLevel {
case "DEBUG", "DEBUG2":
cl.hasdebug = true
fallthrough
case "INFO":
cl.hasinfo = true
fallthrough
case "ERR", "WARNING":
fallthrough
default:
cl.haswarning = true
}
return cl
}

View File

@ -1,57 +0,0 @@
package oonimkall
type eventEmpty struct{}
// eventFailure contains information on a failure.
type eventFailure struct {
Failure string `json:"failure"`
}
// eventLog is an event containing a log message.
type eventLog struct {
LogLevel string `json:"log_level"`
Message string `json:"message"`
}
type eventMeasurementGeneric struct {
Failure string `json:"failure,omitempty"`
Idx int64 `json:"idx"`
Input string `json:"input"`
JSONStr string `json:"json_str,omitempty"`
}
type eventStatusEnd struct {
DownloadedKB float64 `json:"downloaded_kb"`
Failure string `json:"failure"`
UploadedKB float64 `json:"uploaded_kb"`
}
type eventStatusGeoIPLookup struct {
ProbeASN string `json:"probe_asn"`
ProbeCC string `json:"probe_cc"`
ProbeIP string `json:"probe_ip"`
ProbeNetworkName string `json:"probe_network_name"`
}
// eventStatusProgress reports progress information.
type eventStatusProgress struct {
Message string `json:"message"`
Percentage float64 `json:"percentage"`
}
type eventStatusReportGeneric struct {
ReportID string `json:"report_id"`
}
type eventStatusResolverLookup struct {
ResolverASN string `json:"resolver_asn"`
ResolverIP string `json:"resolver_ip"`
ResolverNetworkName string `json:"resolver_network_name"`
}
// event is an event emitted by a task. This structure extends the event
// described by MK v0.10.9 FFI API (https://git.io/Jv4Rv).
type event struct {
Key string `json:"key"`
Value interface{} `json:"value"`
}

View File

@ -1,47 +0,0 @@
package oonimkall
// eventEmitter emits event on a channel
type eventEmitter struct {
disabled map[string]bool
eof <-chan interface{}
out chan<- *event
}
// newEventEmitter creates a new Emitter
func newEventEmitter(disabledEvents []string, out chan<- *event,
eof <-chan interface{}) *eventEmitter {
ee := &eventEmitter{eof: eof, out: out}
ee.disabled = make(map[string]bool)
for _, eventname := range disabledEvents {
ee.disabled[eventname] = true
}
return ee
}
// EmitFailureStartup emits the failureStartup event
func (ee *eventEmitter) EmitFailureStartup(failure string) {
ee.EmitFailureGeneric(failureStartup, failure)
}
// EmitFailureGeneric emits a failure event
func (ee *eventEmitter) EmitFailureGeneric(name, failure string) {
ee.Emit(name, eventFailure{Failure: failure})
}
// EmitStatusProgress emits the status.Progress event
func (ee *eventEmitter) EmitStatusProgress(percentage float64, message string) {
ee.Emit(statusProgress, eventStatusProgress{Message: message, Percentage: percentage})
}
// Emit emits the specified event
func (ee *eventEmitter) Emit(key string, value interface{}) {
if ee.disabled[key] {
return
}
// Prevent this goroutine from blocking on `ee.out` if the caller
// has already told us it's not going to accept more events.
select {
case ee.out <- &event{Key: key, Value: value}:
case <-ee.eof:
}
}

View File

@ -1,84 +0,0 @@
package oonimkall
import "testing"
func TestDisabledEvents(t *testing.T) {
out := make(chan *event)
eof := make(chan interface{})
emitter := newEventEmitter([]string{"log"}, out, eof)
go func() {
emitter.Emit("log", eventLog{Message: "foo"})
close(eof)
}()
var count int64
Loop:
for {
select {
case ev := <-out:
if ev.Key == "log" {
count++
}
case <-eof:
break Loop
}
}
if count > 0 {
t.Fatal("cannot disable events")
}
}
func TestEmitFailureStartup(t *testing.T) {
out := make(chan *event)
eof := make(chan interface{})
emitter := newEventEmitter([]string{}, out, eof)
go func() {
emitter.EmitFailureStartup("mocked error")
close(eof)
}()
var found bool
Loop:
for {
select {
case ev := <-out:
if ev.Key == "failure.startup" {
evv := ev.Value.(eventFailure) // panic if not castable
if evv.Failure == "mocked error" {
found = true
}
}
case <-eof:
break Loop
}
}
if !found {
t.Fatal("did not see expected event")
}
}
func TestEmitStatusProgress(t *testing.T) {
out := make(chan *event)
eof := make(chan interface{})
emitter := newEventEmitter([]string{}, out, eof)
go func() {
emitter.EmitStatusProgress(0.7, "foo")
close(eof)
}()
var found bool
Loop:
for {
select {
case ev := <-out:
if ev.Key == "status.progress" {
evv := ev.Value.(eventStatusProgress) // panic if not castable
if evv.Message == "foo" && evv.Percentage == 0.7 {
found = true
}
}
case <-eof:
break Loop
}
}
if !found {
t.Fatal("did not see expected event")
}
}

View File

@ -1,402 +0,0 @@
package oonimkall
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
)
func TestRunnerMaybeLookupBackendsFailure(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(500)
}))
defer server.Close()
out := make(chan *event)
settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets",
Name: "Example",
Options: settingsOptions{
ProbeServicesBaseURL: server.URL,
SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0",
},
StateDir: "../../testdata/oonimkall/state",
Version: 1,
}
go func() {
run(context.Background(), settings, out)
close(out)
}()
var failures []string
for ev := range out {
switch ev.Key {
case "failure.startup":
failure := ev.Value.(eventFailure).Failure
failures = append(failures, failure)
case "status.queued", "status.started", "log", "status.end":
default:
panic(fmt.Sprintf("unexpected key: %s", ev.Key))
}
}
if len(failures) != 1 {
t.Fatal("unexpected number of failures")
}
if failures[0] != "all available probe services failed" {
t.Fatal("invalid failure")
}
}
func TestRunnerOpenReportFailure(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
var (
nreq int64
mu sync.Mutex
)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
nreq++
if nreq == 1 {
w.Write([]byte(`{}`))
return
}
w.WriteHeader(500)
}))
defer server.Close()
out := make(chan *event)
settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets",
Name: "Example",
Options: settingsOptions{
ProbeServicesBaseURL: server.URL,
SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0",
},
StateDir: "../../testdata/oonimkall/state",
Version: 1,
}
seench := make(chan int64)
go func() {
var seen int64
for ev := range out {
switch ev.Key {
case "failure.report_create":
seen++
case "status.progress":
evv := ev.Value.(eventStatusProgress)
if evv.Percentage >= 0.4 {
panic(fmt.Sprintf("too much progress: %+v", ev))
}
case "status.queued", "status.started", "log", "status.end",
"status.geoip_lookup", "status.resolver_lookup":
default:
panic(fmt.Sprintf("unexpected key: %s", ev.Key))
}
}
seench <- seen
}()
run(context.Background(), settings, out)
close(out)
if n := <-seench; n != 1 {
t.Fatal("unexpected number of events")
}
}
func TestRunnerGood(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
out := make(chan *event)
settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets",
LogLevel: "DEBUG",
Name: "Example",
Options: settingsOptions{
SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0",
},
StateDir: "../../testdata/oonimkall/state",
Version: 1,
}
go func() {
run(context.Background(), settings, out)
close(out)
}()
var found bool
for ev := range out {
if ev.Key == "status.end" {
found = true
}
}
if !found {
t.Fatal("status.end event not found")
}
}
func TestRunnerWithUnsupportedSettings(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
out := make(chan *event)
settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets",
LogLevel: "DEBUG",
Name: "Example",
Options: settingsOptions{
SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0",
},
StateDir: "../../testdata/oonimkall/state",
}
go func() {
run(context.Background(), settings, out)
close(out)
}()
var failures []string
for ev := range out {
if ev.Key == "failure.startup" {
failure := ev.Value.(eventFailure).Failure
failures = append(failures, failure)
}
}
if len(failures) != 1 {
t.Fatal("invalid number of failures")
}
if failures[0] != failureInvalidVersion {
t.Fatal("not the failure we expected")
}
}
func TestRunnerWithInvalidKVStorePath(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
out := make(chan *event)
settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets",
LogLevel: "DEBUG",
Name: "Example",
Options: settingsOptions{
SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0",
},
StateDir: "", // must be empty to cause the failure below
Version: 1,
}
go func() {
run(context.Background(), settings, out)
close(out)
}()
var failures []string
for ev := range out {
if ev.Key == "failure.startup" {
failure := ev.Value.(eventFailure).Failure
failures = append(failures, failure)
}
}
if len(failures) != 1 {
t.Fatal("invalid number of failures")
}
if failures[0] != "mkdir : no such file or directory" {
t.Fatal("not the failure we expected")
}
}
func TestRunnerWithInvalidExperimentName(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
out := make(chan *event)
settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets",
LogLevel: "DEBUG",
Name: "Nonexistent",
Options: settingsOptions{
SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0",
},
StateDir: "../../testdata/oonimkall/state",
Version: 1,
}
go func() {
run(context.Background(), settings, out)
close(out)
}()
var failures []string
for ev := range out {
if ev.Key == "failure.startup" {
failure := ev.Value.(eventFailure).Failure
failures = append(failures, failure)
}
}
if len(failures) != 1 {
t.Fatal("invalid number of failures")
}
if failures[0] != "no such experiment: Nonexistent" {
t.Fatalf("not the failure we expected: %s", failures[0])
}
}
func TestRunnerWithMissingInput(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
out := make(chan *event)
settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets",
LogLevel: "DEBUG",
Name: "ExampleWithInput",
Options: settingsOptions{
SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0",
},
StateDir: "../../testdata/oonimkall/state",
Version: 1,
}
go func() {
run(context.Background(), settings, out)
close(out)
}()
var failures []string
for ev := range out {
if ev.Key == "failure.startup" {
failure := ev.Value.(eventFailure).Failure
failures = append(failures, failure)
}
}
if len(failures) != 1 {
t.Fatal("invalid number of failures")
}
if failures[0] != "no input provided" {
t.Fatalf("not the failure we expected: %s", failures[0])
}
}
func TestRunnerWithMaxRuntime(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
out := make(chan *event)
settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets",
Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"},
LogLevel: "DEBUG",
Name: "ExampleWithInput",
Options: settingsOptions{
MaxRuntime: 1,
SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0",
},
StateDir: "../../testdata/oonimkall/state",
Version: 1,
}
begin := time.Now()
go func() {
run(context.Background(), settings, out)
close(out)
}()
var found bool
for ev := range out {
if ev.Key == "status.end" {
found = true
}
}
if !found {
t.Fatal("status.end event not found")
}
// The runtime is long because of ancillary operations and is even more
// longer because of self shaping we may be performing (especially in
// CI builds) using `-tags shaping`). We have experimentally determined
// that ~10 seconds is the typical CI test run time. See:
//
// 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788
//
// 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855
if time.Since(begin) > 10*time.Second {
t.Fatal("expected shorter runtime")
}
}
func TestRunnerWithMaxRuntimeNonInterruptible(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
out := make(chan *event)
settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets",
Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"},
LogLevel: "DEBUG",
Name: "ExampleWithInputNonInterruptible",
Options: settingsOptions{
MaxRuntime: 1,
SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0",
},
StateDir: "../../testdata/oonimkall/state",
Version: 1,
}
begin := time.Now()
go func() {
run(context.Background(), settings, out)
close(out)
}()
var found bool
for ev := range out {
if ev.Key == "status.end" {
found = true
}
}
if !found {
t.Fatal("status.end event not found")
}
// The runtime is long because of ancillary operations and is even more
// longer because of self shaping we may be performing (especially in
// CI builds) using `-tags shaping`). We have experimentally determined
// that ~10 seconds is the typical CI test run time. See:
//
// 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788
//
// 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855
if time.Since(begin) > 10*time.Second {
t.Fatal("expected shorter runtime")
}
}
func TestRunnerWithFailedMeasurement(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
out := make(chan *event)
settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets",
Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"},
LogLevel: "DEBUG",
Name: "ExampleWithFailure",
Options: settingsOptions{
MaxRuntime: 1,
SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0",
},
StateDir: "../../testdata/oonimkall/state",
Version: 1,
}
go func() {
run(context.Background(), settings, out)
close(out)
}()
var found bool
for ev := range out {
if ev.Key == "failure.measurement" {
found = true
}
}
if !found {
t.Fatal("failure.measurement event not found")
}
}

View File

@ -1,83 +0,0 @@
package oonimkall
import (
"context"
"errors"
"fmt"
"testing"
engine "github.com/ooni/probe-cli/v3/internal/engine"
)
func TestMeasurementSubmissionEventName(t *testing.T) {
if measurementSubmissionEventName(nil) != statusMeasurementSubmission {
t.Fatal("unexpected submission event name")
}
if measurementSubmissionEventName(errors.New("mocked error")) != failureMeasurementSubmission {
t.Fatal("unexpected submission event name")
}
}
func TestMeasurementSubmissionFailure(t *testing.T) {
if measurementSubmissionFailure(nil) != "" {
t.Fatal("unexpected submission failure")
}
if measurementSubmissionFailure(errors.New("mocked error")) != "mocked error" {
t.Fatal("unexpected submission failure")
}
}
func TestRunnerMaybeLookupLocationFailure(t *testing.T) {
if testing.Short() {
// TODO(https://github.com/ooni/probe-cli/pull/518)
t.Skip("skip test in short mode")
}
out := make(chan *event)
settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets",
Name: "Example",
Options: settingsOptions{
SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0",
},
StateDir: "../../testdata/oonimkall/state",
Version: 1,
}
seench := make(chan int64)
eof := make(chan interface{})
go func() {
var seen int64
Loop:
for {
select {
case ev := <-out:
switch ev.Key {
case "failure.ip_lookup", "failure.asn_lookup",
"failure.cc_lookup", "failure.resolver_lookup":
seen++
case "status.progress":
evv := ev.Value.(eventStatusProgress)
if evv.Percentage >= 0.2 {
panic(fmt.Sprintf("too much progress: %+v", ev))
}
case "status.queued", "status.started", "status.end":
default:
panic(fmt.Sprintf("unexpected key: %s - %+v", ev.Key, ev.Value))
}
case <-eof:
break Loop
}
}
seench <- seen
}()
expected := errors.New("mocked error")
r := newRunner(settings, out, eof)
r.maybeLookupLocation = func(*engine.Session) error {
return expected
}
r.Run(context.Background())
close(eof)
if n := <-seench; n != 4 {
t.Fatal("unexpected number of events")
}
}

View File

@ -1,91 +0,0 @@
package oonimkall
// Settings contains settings for a task. This structure derives from
// the one described by MK v0.10.9 FFI API (https://git.io/Jv4Rv), yet
// since 2020-12-03 we're not backwards compatible anymore.
type settings struct {
// Annotations contains the annotations to be added
// to every measurements performed by the task.
Annotations map[string]string `json:"annotations,omitempty"`
// AssetsDir is the directory where to store assets. This
// field is an extension of MK's specification. If
// this field is empty, the task won't start.
AssetsDir string `json:"assets_dir"`
// DisabledEvents contains disabled events. See
// https://git.io/Jv4Rv for the events names.
DisabledEvents []string `json:"disabled_events,omitempty"`
// Inputs contains the inputs. The task will fail if it
// requires input and you provide no input.
Inputs []string `json:"inputs,omitempty"`
// LogLevel contains the logs level. See https://git.io/Jv4Rv
// for the names of the available log levels.
LogLevel string `json:"log_level,omitempty"`
// Name contains the task name. By https://git.io/Jv4Rv the
// names are in camel case, e.g. `Ndt`.
Name string `json:"name"`
// Options contains the task options.
Options settingsOptions `json:"options"`
// Proxy allows you to optionally force a specific proxy
// rather than using no proxy (the default).
//
// Use `psiphon:///` to force using Psiphon with the
// embedded configuration file. Not all builds have
// an embedded configuration file, but OONI builds have
// such a file, so they can use this functionality.
//
// Use `socks5://10.0.0.1:9050/` to connect to a SOCKS5
// proxy running on 10.0.0.1:9050. This could be, for
// example, a suitably configured `tor` instance.
Proxy string
// StateDir is the directory where to store persistent data. This
// field is an extension of MK's specification. If
// this field is empty, the task won't start.
StateDir string `json:"state_dir"`
// TempDir is the temporary directory. This field is an extension of MK's
// specification. If this field is empty, we will pick the tempdir that
// ioutil.TempDir uses by default, which may not work on mobile. According
// to our experiments as of 2020-06-10, leaving the TempDir empty works
// for iOS and does not work for Android.
TempDir string `json:"temp_dir"`
// TunnelDir is the directory where to store persistent state
// related to circumvention tunnels. This directory is required
// only if you want to use the tunnels. Added since 3.10.0.
TunnelDir string `json:"tunnel_dir"`
// Version indicates the version of this structure.
Version int64 `json:"version"`
}
// settingsOptions contains the settings options
type settingsOptions struct {
// MaxRuntime is the maximum runtime expressed in seconds. A negative
// value for this field disables the maximum runtime. Using
// a zero value will also mean disabled. This is not the
// original behaviour of Measurement Kit, which used to run
// for zero time in such case.
MaxRuntime float64 `json:"max_runtime,omitempty"`
// NoCollector indicates whether to use a collector
NoCollector bool `json:"no_collector,omitempty"`
// ProbeServicesBaseURL contains the probe services base URL.
ProbeServicesBaseURL string `json:"probe_services_base_url,omitempty"`
// SoftwareName is the software name. If this option is not
// present, then the library startup will fail.
SoftwareName string `json:"software_name,omitempty"`
// SoftwareVersion is the software version. If this option is not
// present, then the library startup will fail.
SoftwareVersion string `json:"software_version,omitempty"`
}

View File

@ -23,7 +23,7 @@
// (measurement-kit/measurement-kit@v0.10.9) for a comprehensive
// description of MK's FFI API.
//
// See also https://github.com/ooni/probe-cli/v3/internal/engine/pull/347 for the
// See also https://github.com/ooni/probe-engine/pull/347 for the
// design document describing the task API.
//
// See also https://github.com/ooni/probe-cli/v3/internal/engine/blob/master/DESIGN.md,
@ -33,7 +33,7 @@
//
// The Session API is a Go API that can be exported to mobile apps
// using the gomobile tool. The latest design document for this API is
// at https://github.com/ooni/probe-cli/v3/internal/engine/pull/954.
// at https://github.com/ooni/probe-engine/pull/954.
//
// The basic tenet of the session API is that you create an instance
// of `Session` and use it to perform the operations you need.
@ -84,8 +84,11 @@ func StartTask(input string) (*Task, error) {
}
go func() {
close(task.isstarted)
run(ctx, &settings, task.out)
emitter := newTaskEmitterUsingChan(task.out)
r := newRunner(&settings, emitter)
r.Run(ctx)
task.out <- nil // signal that we're done w/o closing the channel
emitter.Close()
close(task.isstopped)
}()
return task, nil

View File

@ -1,528 +0,0 @@
package oonimkall
import (
"encoding/json"
"errors"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/ooni/probe-cli/v3/internal/engine/model"
)
type eventlike struct {
Key string `json:"key"`
Value map[string]interface{} `json:"value"`
}
func TestGood(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets",
"log_level": "DEBUG",
"name": "Example",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "../testdata/oonimkall/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
// interrupt the task so we also exercise this functionality
go func() {
<-time.After(time.Second)
task.Interrupt()
}()
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
if event.Key == "failure.startup" {
t.Fatal("unexpected failure.startup event")
}
}
// make sure we only see task_terminated at this point
for {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
if event.Key == "task_terminated" {
break
}
t.Fatalf("unexpected event.Key: %s", event.Key)
}
}
func TestWithMeasurementFailure(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets",
"log_level": "DEBUG",
"name": "ExampleWithFailure",
"options": {
"no_geoip": true,
"no_resolver_lookup": true,
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "../testdata/oonimkall/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
if event.Key == "failure.startup" {
t.Fatal("unexpected failure.startup event")
}
}
}
func TestInvalidJSON(t *testing.T) {
task, err := StartTask(`{`)
var syntaxerr *json.SyntaxError
if !errors.As(err, &syntaxerr) {
t.Fatal("not the expected error")
}
if task != nil {
t.Fatal("task is not nil")
}
}
func TestUnsupportedSetting(t *testing.T) {
task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets",
"log_level": "DEBUG",
"name": "Example",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "../testdata/oonimkall/state"
}`)
if err != nil {
t.Fatal(err)
}
var seen bool
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
if event.Key == "failure.startup" {
if strings.Contains(eventstr, failureInvalidVersion) {
seen = true
}
}
}
if !seen {
t.Fatal("did not see failure.startup with invalid version info")
}
}
func TestEmptyStateDir(t *testing.T) {
task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets",
"log_level": "DEBUG",
"name": "Example",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
var seen bool
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
if event.Key == "failure.startup" {
if strings.Contains(eventstr, "mkdir : no such file or directory") {
seen = true
}
}
}
if !seen {
t.Fatal("did not see failure.startup with info that state dir is empty")
}
}
func TestUnknownExperiment(t *testing.T) {
task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets",
"log_level": "DEBUG",
"name": "Antani",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "../testdata/oonimkall/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
var seen bool
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
if event.Key == "failure.startup" {
if strings.Contains(eventstr, "no such experiment: ") {
seen = true
}
}
}
if !seen {
t.Fatal("did not see failure.startup")
}
}
func TestInputIsRequired(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets",
"log_level": "DEBUG",
"name": "ExampleWithInput",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "../testdata/oonimkall/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
var seen bool
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
if event.Key == "failure.startup" {
if strings.Contains(eventstr, "no input provided") {
seen = true
}
}
}
if !seen {
t.Fatal("did not see failure.startup")
}
}
func TestMaxRuntime(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
begin := time.Now()
task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets",
"inputs": ["a", "b", "c"],
"name": "ExampleWithInput",
"options": {
"max_runtime": 1,
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "../testdata/oonimkall/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
if event.Key == "failure.startup" {
t.Fatal(eventstr)
}
}
// The runtime is long because of ancillary operations and is even more
// longer because of self shaping we may be performing (especially in
// CI builds) using `-tags shaping`). We have experimentally determined
// that ~10 seconds is the typical CI test run time. See:
//
// 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788
//
// 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855
//
// In case there are further timeouts, e.g. in the sessionresolver, the
// time used by the experiment will be much more. This is for example the
// case in https://github.com/ooni/probe-engine/issues/1005.
if time.Since(begin) > 10*time.Second {
t.Fatal("expected shorter runtime")
}
}
func TestInterruptExampleWithInput(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
t.Skip("Skipping broken test; see https://github.com/ooni/probe-engine/issues/992")
task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets",
"inputs": [
"http://www.kernel.org/",
"http://www.x.org/",
"http://www.microsoft.com/",
"http://www.slashdot.org/",
"http://www.repubblica.it/",
"http://www.google.it/",
"http://ooni.org/"
],
"name": "ExampleWithInputNonInterruptible",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "../testdata/oonimkall/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
var keys []string
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
switch event.Key {
case "failure.startup":
t.Fatal(eventstr)
case "status.measurement_start":
go task.Interrupt()
}
// We compress the keys. What matters is basically that we
// see just one of the many possible measurements here.
if keys == nil || keys[len(keys)-1] != event.Key {
keys = append(keys, event.Key)
}
}
expect := []string{
"status.queued",
"status.started",
"status.progress",
"status.geoip_lookup",
"status.resolver_lookup",
"status.progress",
"status.report_create",
"status.measurement_start",
"log",
"status.progress",
"measurement",
"status.measurement_submission",
"status.measurement_done",
"status.end",
"task_terminated",
}
if diff := cmp.Diff(expect, keys); diff != "" {
t.Fatal(diff)
}
}
func TestInterruptNdt7(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets",
"name": "Ndt7",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "../testdata/oonimkall/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
go func() {
<-time.After(11 * time.Second)
task.Interrupt()
}()
var keys []string
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
if event.Key == "failure.startup" {
t.Fatal(eventstr)
}
// We compress the keys because we don't know how many
// status.progress we will see. What matters is that we
// don't see a measurement submission, since it means
// that we have interrupted the measurement.
if keys == nil || keys[len(keys)-1] != event.Key {
keys = append(keys, event.Key)
}
}
expect := []string{
"status.queued",
"status.started",
"status.progress",
"status.geoip_lookup",
"status.resolver_lookup",
"status.progress",
"status.report_create",
"status.measurement_start",
"status.progress",
"status.end",
"task_terminated",
}
if diff := cmp.Diff(expect, keys); diff != "" {
t.Fatal(diff)
}
}
func TestCountBytesForExample(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets",
"name": "Example",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "../testdata/oonimkall/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
var downloadKB, uploadKB float64
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
switch event.Key {
case "failure.startup":
t.Fatal(eventstr)
case "status.end":
downloadKB = event.Value["downloaded_kb"].(float64)
uploadKB = event.Value["uploaded_kb"].(float64)
}
}
if downloadKB == 0 {
t.Fatal("downloadKB is zero")
}
if uploadKB == 0 {
t.Fatal("uploadKB is zero")
}
}
func TestPrivacyAndScrubbing(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets",
"name": "Example",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "../testdata/oonimkall/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
var m *model.Measurement
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
switch event.Key {
case "failure.startup":
t.Fatal(eventstr)
case "measurement":
v := []byte(event.Value["json_str"].(string))
m = new(model.Measurement)
if err := json.Unmarshal(v, &m); err != nil {
t.Fatal(err)
}
}
}
if m == nil {
t.Fatal("measurement is nil")
}
if m.ProbeASN == "AS0" || m.ProbeCC == "ZZ" || m.ProbeIP != "127.0.0.1" {
t.Fatal("unexpected result")
}
}
func TestNonblockWithFewEvents(t *testing.T) {
// This test tests whether we won't block for a small
// number of events emitted by the task
if testing.Short() {
t.Skip("skip test in short mode")
}
task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets",
"name": "Example",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "../testdata/oonimkall/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
// Wait for the task thread to start
<-task.isstarted
// Wait for the task thread to complete
<-task.isstopped
var count int
for !task.IsDone() {
task.WaitForNextEvent()
count++
}
if count < 5 {
t.Fatal("too few events")
}
}

171
pkg/oonimkall/task_test.go Normal file
View File

@ -0,0 +1,171 @@
package oonimkall
import (
"encoding/json"
"errors"
"testing"
"github.com/ooni/probe-cli/v3/internal/engine/model"
)
type eventlike struct {
Key string `json:"key"`
Value map[string]interface{} `json:"value"`
}
func TestStartTaskGood(t *testing.T) {
task, err := StartTask(`{
"log_level": "DEBUG",
"name": "Example",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "testdata/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
if event.Key == "failure.startup" {
t.Fatal("unexpected failure.startup event")
}
}
// make sure we only see task_terminated at this point
for {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
if event.Key == "task_terminated" {
break
}
t.Fatalf("unexpected event.Key: %s", event.Key)
}
}
func TestStartTaskInvalidJSON(t *testing.T) {
task, err := StartTask(`{`)
var syntaxerr *json.SyntaxError
if !errors.As(err, &syntaxerr) {
t.Fatal("not the expected error")
}
if task != nil {
t.Fatal("task is not nil")
}
}
func TestStartTaskCountBytesForExample(t *testing.T) {
task, err := StartTask(`{
"name": "Example",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "testdata/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
var downloadKB, uploadKB float64
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
switch event.Key {
case "failure.startup":
t.Fatal(eventstr)
case "status.end":
downloadKB = event.Value["downloaded_kb"].(float64)
uploadKB = event.Value["uploaded_kb"].(float64)
}
}
if downloadKB == 0 {
t.Fatal("downloadKB is zero")
}
if uploadKB == 0 {
t.Fatal("uploadKB is zero")
}
}
func TestPrivacyAndScrubbing(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets",
"name": "Example",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "../testdata/oonimkall/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
var m *model.Measurement
for !task.IsDone() {
eventstr := task.WaitForNextEvent()
var event eventlike
if err := json.Unmarshal([]byte(eventstr), &event); err != nil {
t.Fatal(err)
}
switch event.Key {
case "failure.startup":
t.Fatal(eventstr)
case "measurement":
v := []byte(event.Value["json_str"].(string))
m = new(model.Measurement)
if err := json.Unmarshal(v, &m); err != nil {
t.Fatal(err)
}
}
}
if m == nil {
t.Fatal("measurement is nil")
}
if m.ProbeASN == "AS0" || m.ProbeCC == "ZZ" || m.ProbeIP != "127.0.0.1" {
t.Fatal("unexpected result")
}
}
func TestNonblockWithFewEvents(t *testing.T) {
// This test tests whether we won't block for a small
// number of events emitted by the task
task, err := StartTask(`{
"name": "Example",
"options": {
"software_name": "oonimkall-test",
"software_version": "0.1.0"
},
"state_dir": "testdata/state",
"version": 1
}`)
if err != nil {
t.Fatal(err)
}
// Wait for the task thread to start
<-task.isstarted
// Wait for the task thread to complete
<-task.isstopped
var count int
for !task.IsDone() {
task.WaitForNextEvent()
count++
}
if count < 5 {
t.Fatal("too few events")
}
}

View File

@ -0,0 +1,63 @@
package oonimkall
import "sync"
// taskEmitterUsingChan is a task emitter using a channel.
type taskEmitterUsingChan struct {
// eof indicates we should not emit anymore.
eof chan interface{}
// once ensures we close the eof channel just once.
once sync.Once
// out is the possibly buffered channel where to emit events.
out chan<- *event
}
// ensure that taskEmitterUsingChan is a taskEmitter.
var _ taskEmitterCloser = &taskEmitterUsingChan{}
// newTaskEmitterUsingChan creates a taskEmitterUsingChan.
func newTaskEmitterUsingChan(out chan<- *event) *taskEmitterUsingChan {
return &taskEmitterUsingChan{
eof: make(chan interface{}),
once: sync.Once{},
out: out,
}
}
// Emit implements taskEmitter.Emit.
func (ee *taskEmitterUsingChan) Emit(key string, value interface{}) {
// Prevent this goroutine from blocking on `ee.out` if the caller
// has already told us it's not going to accept more events.
select {
case ee.out <- &event{Key: key, Value: value}:
case <-ee.eof:
}
}
// Close implements taskEmitterCloser.Closer.
func (ee *taskEmitterUsingChan) Close() error {
ee.once.Do(func() { close(ee.eof) })
return nil
}
// taskEmitterWrapper is a convenient wrapper for taskEmitter.
type taskEmitterWrapper struct {
taskEmitter
}
// EmitFailureStartup emits the failureStartup event
func (ee *taskEmitterWrapper) EmitFailureStartup(failure string) {
ee.EmitFailureGeneric(eventTypeFailureStartup, failure)
}
// EmitFailureGeneric emits a failure event
func (ee *taskEmitterWrapper) EmitFailureGeneric(name, failure string) {
ee.Emit(name, eventFailure{Failure: failure})
}
// EmitStatusProgress emits the status.Progress event
func (ee *taskEmitterWrapper) EmitStatusProgress(percentage float64, message string) {
ee.Emit(eventTypeStatusProgress, eventStatusProgress{Message: message, Percentage: percentage})
}

View File

@ -0,0 +1,109 @@
package oonimkall
import "testing"
func TestTaskEmitterUsingChan(t *testing.T) {
t.Run("ordinary emit", func(t *testing.T) {
out := make(chan *event)
emitter := newTaskEmitterUsingChan(out)
go func() {
emitter.Emit("foo", nil)
}()
ev := <-out
if ev.Key != "foo" {
t.Fatal("invalid key")
}
if ev.Value != nil {
t.Fatal("invalid value")
}
})
t.Run("emit after close", func(t *testing.T) {
out := make(chan *event)
emitter := newTaskEmitterUsingChan(out)
emitter.Close()
done := make(chan interface{})
go func() {
emitter.Emit("foo", nil)
close(done)
}()
<-done
select {
case <-out:
t.Fatal("should not receive event here")
default:
}
})
t.Run("close is idempotent", func(t *testing.T) {
out := make(chan *event)
emitter := newTaskEmitterUsingChan(out)
for i := 0; i < 4; i++ {
emitter.Close()
}
})
}
func TestTaskEmitterWrapper(t *testing.T) {
t.Run("emit failureStartup", func(t *testing.T) {
expect := "antani"
collector := &CollectorTaskEmitter{}
emitter := &taskEmitterWrapper{collector}
emitter.EmitFailureStartup(expect)
events := collector.Collect()
if len(events) != 1 {
t.Fatal("invalid number of events")
}
ev := events[0]
if ev.Key != eventTypeFailureStartup {
t.Fatal("invalid key")
}
value := ev.Value.(eventFailure)
if value.Failure != expect {
t.Fatal("invalid failure value")
}
})
t.Run("emit failureGeneric", func(t *testing.T) {
expectName := "mascetti"
expectFailure := "antani"
collector := &CollectorTaskEmitter{}
emitter := &taskEmitterWrapper{collector}
emitter.EmitFailureGeneric(expectName, expectFailure)
events := collector.Collect()
if len(events) != 1 {
t.Fatal("invalid number of events")
}
ev := events[0]
if ev.Key != expectName {
t.Fatal("invalid key")
}
value := ev.Value.(eventFailure)
if value.Failure != expectFailure {
t.Fatal("invalid failure value")
}
})
t.Run("emit statusProgress", func(t *testing.T) {
percentage := 0.66
message := "mascetti"
collector := &CollectorTaskEmitter{}
emitter := &taskEmitterWrapper{collector}
emitter.EmitStatusProgress(percentage, message)
events := collector.Collect()
if len(events) != 1 {
t.Fatal("invalid number of events")
}
ev := events[0]
if ev.Key != eventTypeStatusProgress {
t.Fatal("invalid key")
}
value := ev.Value.(eventStatusProgress)
if value.Percentage != percentage {
t.Fatal("invalid percentage value")
}
if value.Message != message {
t.Fatal("invalid message value")
}
})
}

116
pkg/oonimkall/tasklogger.go Normal file
View File

@ -0,0 +1,116 @@
package oonimkall
import (
"fmt"
"github.com/ooni/probe-cli/v3/internal/engine/model"
)
//
// This file implements the logger used by a task. Outside
// of this file, the rest of the codebase just sees a generic
// model.Logger that can log events.
//
// taskLogger is the logger used by a task.
type taskLogger struct {
// emitter is the event emitter.
emitter taskEmitter
// hasDebug indicates whether to emit debug logs.
hasDebug bool
// hasInfo indicates whether to emit info logs.
hasInfo bool
// hasWarning indicates whether to emit warning logs.
hasWarning bool
}
// ensure that taskLogger implements model.Logger.
var _ model.Logger = &taskLogger{}
// Debug implements model.Logger.Debug.
func (cl *taskLogger) Debug(msg string) {
if cl.hasDebug {
cl.emit(logLevelDebug, msg)
}
}
// Debugf implements model.Logger.Debugf.
func (cl *taskLogger) Debugf(format string, v ...interface{}) {
if cl.hasDebug {
cl.Debug(fmt.Sprintf(format, v...))
}
}
// Info implements model.Logger.Info.
func (cl *taskLogger) Info(msg string) {
if cl.hasInfo {
cl.emit(logLevelInfo, msg)
}
}
// Infof implements model.Logger.Infof.
func (cl *taskLogger) Infof(format string, v ...interface{}) {
if cl.hasInfo {
cl.Info(fmt.Sprintf(format, v...))
}
}
// Warn implements model.Logger.Warn.
func (cl *taskLogger) Warn(msg string) {
if cl.hasWarning {
cl.emit(logLevelWarning, msg)
}
}
// Warnf implements model.Logger.Warnf.
func (cl *taskLogger) Warnf(format string, v ...interface{}) {
if cl.hasWarning {
cl.Warn(fmt.Sprintf(format, v...))
}
}
// emit is the code that actually emits the log event.
func (cl *taskLogger) emit(level string, message string) {
cl.emitter.Emit(eventTypeLog, eventLog{
LogLevel: level,
Message: message,
})
}
// newTaskLogger creates a new taskLogger instance.
//
// Arguments:
//
// - emitter is the emitter that will emit log events;
//
// - logLevel is the maximum log level that will be emitted.
//
// Returns:
//
// - a properly configured instance of taskLogger.
//
// Remarks:
//
// - log levels are sorted as usual: ERR is more sever than
// WARNING, WARNING is more sever than INFO, etc.
func newTaskLogger(emitter taskEmitter, logLevel string) *taskLogger {
cl := &taskLogger{
emitter: emitter,
}
switch logLevel {
case logLevelDebug, logLevelDebug2:
cl.hasDebug = true
fallthrough
case logLevelInfo:
cl.hasInfo = true
fallthrough
case logLevelErr, logLevelWarning:
fallthrough
default:
cl.hasWarning = true
}
return cl
}

View File

@ -0,0 +1,106 @@
package oonimkall
import (
"testing"
"github.com/ooni/probe-cli/v3/internal/engine/model"
)
//
// This file contains tests for the taskLogger type.
//
func TestTaskLogger(t *testing.T) {
// debugMessage is the debug message we expect to see.
debugMessage := "debug message"
// infoMessage is the info message we expect to see.
infoMessage := "info message"
// warningMessage is the warning message we expect to see.
warningMessage := "warning message"
// emitMessages is an helper function for implementing this test.
emitMessages := func(logger model.Logger) {
logger.Debug(debugMessage)
logger.Debugf("%s", debugMessage)
logger.Info(infoMessage)
logger.Infof("%s", infoMessage)
logger.Warn(warningMessage)
logger.Warnf("%s", warningMessage)
}
// convertEventsToLogEvents converts the generic events to
// logEvents and fails if this operation is not possible.
convertEventsToLogEvents := func(t *testing.T, in []*event) (out []eventLog) {
for _, ev := range in {
if ev.Key != eventTypeLog {
t.Fatalf("expected log event, found %s", ev.Key)
}
out = append(out, ev.Value.(eventLog))
}
return
}
// checkNumberOfEvents ensures we've the right number of events.
checkNumberOfEvents := func(t *testing.T, events []eventLog, expect int) {
if len(events) != expect {
t.Fatalf(
"invalid number of log events %d (expected %d)",
len(events), expect,
)
}
}
// matchEvent ensures the given event has the right level and message.
matchEvent := func(t *testing.T, event eventLog, level, msg string) {
if event.LogLevel != level {
t.Fatalf(
"invalid log level %s (expected %s)",
event.LogLevel, level,
)
}
if event.Message != msg {
t.Fatalf(
"invalid log message '%s' (expected '%s')",
event.Message, msg,
)
}
}
t.Run("debug logger", func(t *testing.T) {
emitter := &CollectorTaskEmitter{}
logger := newTaskLogger(emitter, logLevelDebug)
emitMessages(logger)
logEvents := convertEventsToLogEvents(t, emitter.Collect())
checkNumberOfEvents(t, logEvents, 6)
matchEvent(t, logEvents[0], logLevelDebug, debugMessage)
matchEvent(t, logEvents[1], logLevelDebug, debugMessage)
matchEvent(t, logEvents[2], logLevelInfo, infoMessage)
matchEvent(t, logEvents[3], logLevelInfo, infoMessage)
matchEvent(t, logEvents[4], logLevelWarning, warningMessage)
matchEvent(t, logEvents[5], logLevelWarning, warningMessage)
})
t.Run("info logger", func(t *testing.T) {
emitter := &CollectorTaskEmitter{}
logger := newTaskLogger(emitter, logLevelInfo)
emitMessages(logger)
logEvents := convertEventsToLogEvents(t, emitter.Collect())
checkNumberOfEvents(t, logEvents, 4)
matchEvent(t, logEvents[0], logLevelInfo, infoMessage)
matchEvent(t, logEvents[1], logLevelInfo, infoMessage)
matchEvent(t, logEvents[2], logLevelWarning, warningMessage)
matchEvent(t, logEvents[3], logLevelWarning, warningMessage)
})
t.Run("warn logger", func(t *testing.T) {
emitter := &CollectorTaskEmitter{}
logger := newTaskLogger(emitter, logLevelWarning)
emitMessages(logger)
logEvents := convertEventsToLogEvents(t, emitter.Collect())
checkNumberOfEvents(t, logEvents, 2)
matchEvent(t, logEvents[0], logLevelWarning, warningMessage)
matchEvent(t, logEvents[1], logLevelWarning, warningMessage)
})
}

View File

@ -0,0 +1,222 @@
package oonimkall
import (
"context"
"errors"
"sync"
"github.com/ooni/probe-cli/v3/internal/engine"
"github.com/ooni/probe-cli/v3/internal/engine/model"
)
//
// This file contains mocks for types used by tasks. Because
// we only use mocks when testing, this file is a `_test.go` file.
//
// CollectorTaskEmitter is a thread-safe taskEmitter
// that stores all the events inside itself.
type CollectorTaskEmitter struct {
// events contains the events
events []*event
// mu provides mutual exclusion
mu sync.Mutex
}
// ensures that a CollectorTaskEmitter is a taskEmitter.
var _ taskEmitter = &CollectorTaskEmitter{}
// Emit implements the taskEmitter.Emit method.
func (e *CollectorTaskEmitter) Emit(key string, value interface{}) {
e.mu.Lock()
e.events = append(e.events, &event{Key: key, Value: value})
e.mu.Unlock()
}
// Collect returns a copy of the collected events. It is safe
// to read the events. It's a data race to modify them.
//
// After this function has been called, the internal array
// of events will now be empty.
func (e *CollectorTaskEmitter) Collect() (out []*event) {
e.mu.Lock()
out = e.events
e.events = nil
e.mu.Unlock()
return
}
// SessionBuilderConfigSaver is a session builder that
// saves the received config and returns an error.
type SessionBuilderConfigSaver struct {
Config engine.SessionConfig
}
var _ taskSessionBuilder = &SessionBuilderConfigSaver{}
func (b *SessionBuilderConfigSaver) NewSession(
ctx context.Context, config engine.SessionConfig) (taskSession, error) {
b.Config = config
return nil, errors.New("mocked error")
}
// MockableTaskRunnerDependencies allows to mock all the
// dependencies of taskRunner using a single structure.
type MockableTaskRunnerDependencies struct {
// taskSessionBuilder:
MockNewSession func(ctx context.Context,
config engine.SessionConfig) (taskSession, error)
// taskSession:
MockClose func() error
MockNewExperimentBuilderByName func(name string) (taskExperimentBuilder, error)
MockMaybeLookupBackendsContext func(ctx context.Context) error
MockMaybeLookupLocationContext func(ctx context.Context) error
MockProbeIP func() string
MockProbeASNString func() string
MockProbeCC func() string
MockProbeNetworkName func() string
MockResolverASNString func() string
MockResolverIP func() string
MockResolverNetworkName func() string
// taskExperimentBuilder:
MockableSetCallbacks func(callbacks model.ExperimentCallbacks)
MockableInputPolicy func() engine.InputPolicy
MockableNewExperimentInstance func() taskExperiment
MockableInterruptible func() bool
// taskExperiment:
MockableKibiBytesReceived func() float64
MockableKibiBytesSent func() float64
MockableOpenReportContext func(ctx context.Context) error
MockableReportID func() string
MockableMeasureWithContext func(ctx context.Context, input string) (
measurement *model.Measurement, err error)
MockableSubmitAndUpdateMeasurementContext func(
ctx context.Context, measurement *model.Measurement) error
}
var (
_ taskSessionBuilder = &MockableTaskRunnerDependencies{}
_ taskSession = &MockableTaskRunnerDependencies{}
_ taskExperimentBuilder = &MockableTaskRunnerDependencies{}
_ taskExperiment = &MockableTaskRunnerDependencies{}
)
func (dep *MockableTaskRunnerDependencies) NewSession(
ctx context.Context, config engine.SessionConfig) (taskSession, error) {
if f := dep.MockNewSession; f != nil {
return f(ctx, config)
}
return dep, nil
}
func (dep *MockableTaskRunnerDependencies) Close() error {
return dep.MockClose()
}
func (dep *MockableTaskRunnerDependencies) NewExperimentBuilderByName(name string) (taskExperimentBuilder, error) {
if f := dep.MockNewExperimentBuilderByName; f != nil {
return f(name)
}
return dep, nil
}
func (dep *MockableTaskRunnerDependencies) MaybeLookupBackendsContext(ctx context.Context) error {
return dep.MockMaybeLookupBackendsContext(ctx)
}
func (dep *MockableTaskRunnerDependencies) MaybeLookupLocationContext(ctx context.Context) error {
return dep.MockMaybeLookupLocationContext(ctx)
}
func (dep *MockableTaskRunnerDependencies) ProbeIP() string {
return dep.MockProbeIP()
}
func (dep *MockableTaskRunnerDependencies) ProbeASNString() string {
return dep.MockProbeASNString()
}
func (dep *MockableTaskRunnerDependencies) ProbeCC() string {
return dep.MockProbeCC()
}
func (dep *MockableTaskRunnerDependencies) ProbeNetworkName() string {
return dep.MockProbeNetworkName()
}
func (dep *MockableTaskRunnerDependencies) ResolverASNString() string {
return dep.MockResolverASNString()
}
func (dep *MockableTaskRunnerDependencies) ResolverIP() string {
return dep.MockResolverIP()
}
func (dep *MockableTaskRunnerDependencies) ResolverNetworkName() string {
return dep.MockResolverNetworkName()
}
func (dep *MockableTaskRunnerDependencies) SetCallbacks(callbacks model.ExperimentCallbacks) {
dep.MockableSetCallbacks(callbacks)
}
func (dep *MockableTaskRunnerDependencies) InputPolicy() engine.InputPolicy {
return dep.MockableInputPolicy()
}
func (dep *MockableTaskRunnerDependencies) NewExperimentInstance() taskExperiment {
if f := dep.MockableNewExperimentInstance; f != nil {
return f()
}
return dep
}
func (dep *MockableTaskRunnerDependencies) Interruptible() bool {
return dep.MockableInterruptible()
}
func (dep *MockableTaskRunnerDependencies) KibiBytesReceived() float64 {
return dep.MockableKibiBytesReceived()
}
func (dep *MockableTaskRunnerDependencies) KibiBytesSent() float64 {
return dep.MockableKibiBytesSent()
}
func (dep *MockableTaskRunnerDependencies) OpenReportContext(ctx context.Context) error {
return dep.MockableOpenReportContext(ctx)
}
func (dep *MockableTaskRunnerDependencies) ReportID() string {
return dep.MockableReportID()
}
func (dep *MockableTaskRunnerDependencies) MeasureWithContext(ctx context.Context, input string) (
measurement *model.Measurement, err error) {
return dep.MockableMeasureWithContext(ctx, input)
}
func (dep *MockableTaskRunnerDependencies) SubmitAndUpdateMeasurementContext(
ctx context.Context, measurement *model.Measurement) error {
return dep.MockableSubmitAndUpdateMeasurementContext(ctx, measurement)
}
// MockableKVStoreFSBuilder is a mockable taskKVStoreFSBuilder.
type MockableKVStoreFSBuilder struct {
MockNewFS func(path string) (model.KeyValueStore, error)
}
var _ taskKVStoreFSBuilder = &MockableKVStoreFSBuilder{}
func (m *MockableKVStoreFSBuilder) NewFS(path string) (model.KeyValueStore, error) {
return m.MockNewFS(path)
}

374
pkg/oonimkall/taskmodel.go Normal file
View File

@ -0,0 +1,374 @@
package oonimkall
import (
"context"
"io"
"github.com/ooni/probe-cli/v3/internal/engine"
"github.com/ooni/probe-cli/v3/internal/engine/model"
)
//
// Task Model
//
// The oonimkall package allows you to run OONI network
// experiments as "tasks". This file defines all the
// underlying model entailed by running such tasks.
//
// Logging
//
// This section of the file defines the types and the
// interfaces required to implement logging.
//
// The rest of the codebase will use a generic model.Logger
// as a logger. This is a pretty fundamental interface in
// ooni/probe-cli and so it's not defined in this file.
//
const taskABIVersion = 1
// Running tasks emit logs using different log levels. We
// define log levels with the usual semantics.
//
// The logger used by a task _may_ be configured to not
// emit log events that are less severe than a given
// severity.
//
// We use the following definitions both for defining the
// log level of a log and for configuring the maximum
// acceptable log level emitted by a logger.
const (
logLevelDebug2 = "DEBUG2"
logLevelDebug = "DEBUG"
logLevelInfo = "INFO"
logLevelErr = "ERR"
logLevelWarning = "WARNING"
)
//
// Emitting Events
//
// While it is running, a task emits events. This section
// of the file defines the types needed to emit events.
//
// type of emitted events.
const (
eventTypeFailureIPLookup = "failure.ip_lookup"
eventTypeFailureASNLookup = "failure.asn_lookup"
eventTypeFailureCCLookup = "failure.cc_lookup"
eventTypeFailureMeasurement = "failure.measurement"
eventTypeFailureMeasurementSubmission = "failure.measurement_submission"
eventTypeFailureReportCreate = "failure.report_create"
eventTypeFailureResolverLookup = "failure.resolver_lookup"
eventTypeFailureStartup = "failure.startup"
eventTypeLog = "log"
eventTypeMeasurement = "measurement"
eventTypeStatusEnd = "status.end"
eventTypeStatusGeoIPLookup = "status.geoip_lookup"
eventTypeStatusMeasurementDone = "status.measurement_done"
eventTypeStatusMeasurementStart = "status.measurement_start"
eventTypeStatusMeasurementSubmission = "status.measurement_submission"
eventTypeStatusProgress = "status.progress"
eventTypeStatusQueued = "status.queued"
eventTypeStatusReportCreate = "status.report_create"
eventTypeStatusResolverLookup = "status.resolver_lookup"
eventTypeStatusStarted = "status.started"
)
// taskEmitter is anything that allows us to
// emit events while running a task.
//
// Note that a task emitter _may_ be configured
// to ignore _some_ events though.
type taskEmitter interface {
// Emit emits the event (unless the emitter is
// configured to ignore this event key).
Emit(key string, value interface{})
}
// taskEmitterCloser is a closeable taskEmitter.
type taskEmitterCloser interface {
taskEmitter
io.Closer
}
type eventEmpty struct{}
// eventFailure contains information on a failure.
type eventFailure struct {
Failure string `json:"failure"`
}
// eventLog is an event containing a log message.
type eventLog struct {
LogLevel string `json:"log_level"`
Message string `json:"message"`
}
type eventMeasurementGeneric struct {
Failure string `json:"failure,omitempty"`
Idx int64 `json:"idx"`
Input string `json:"input"`
JSONStr string `json:"json_str,omitempty"`
}
type eventStatusEnd struct {
DownloadedKB float64 `json:"downloaded_kb"`
Failure string `json:"failure"`
UploadedKB float64 `json:"uploaded_kb"`
}
type eventStatusGeoIPLookup struct {
ProbeASN string `json:"probe_asn"`
ProbeCC string `json:"probe_cc"`
ProbeIP string `json:"probe_ip"`
ProbeNetworkName string `json:"probe_network_name"`
}
// eventStatusProgress reports progress information.
type eventStatusProgress struct {
Message string `json:"message"`
Percentage float64 `json:"percentage"`
}
type eventStatusReportGeneric struct {
ReportID string `json:"report_id"`
}
type eventStatusResolverLookup struct {
ResolverASN string `json:"resolver_asn"`
ResolverIP string `json:"resolver_ip"`
ResolverNetworkName string `json:"resolver_network_name"`
}
// event is an event emitted by a task. This structure extends the event
// described by MK v0.10.9 FFI API (https://git.io/Jv4Rv).
type event struct {
Key string `json:"key"`
Value interface{} `json:"value"`
}
//
// OONI Session
//
// For performing several operations, including running
// experiments, we need to create an OONI session.
//
// This section of the file defines the interface between
// our oonimkall API and the real session.
//
// The abstraction representing a OONI session is taskSession.
//
// taskKVStoreFSBuilder constructs a KVStore with
// filesystem backing for running tests.
type taskKVStoreFSBuilder interface {
// NewFS creates a new KVStore using the filesystem.
NewFS(path string) (model.KeyValueStore, error)
}
// taskSessionBuilder constructs a new Session.
type taskSessionBuilder interface {
// NewSession creates a new taskSession.
NewSession(ctx context.Context,
config engine.SessionConfig) (taskSession, error)
}
// taskSession abstracts a OONI session.
type taskSession interface {
// A session can be closed.
io.Closer
// NewExperimentBuilderByName creates the builder for constructing
// a new experiment given the experiment's name.
NewExperimentBuilderByName(name string) (taskExperimentBuilder, error)
// MaybeLookupBackendsContext lookups the OONI backend unless
// this operation has already been performed.
MaybeLookupBackendsContext(ctx context.Context) error
// MaybeLookupLocationContext lookups the probe location unless
// this operation has already been performed.
MaybeLookupLocationContext(ctx context.Context) error
// ProbeIP must be called after MaybeLookupLocationContext
// and returns the resolved probe IP.
ProbeIP() string
// ProbeASNString must be called after MaybeLookupLocationContext
// and returns the resolved probe ASN as a string.
ProbeASNString() string
// ProbeCC must be called after MaybeLookupLocationContext
// and returns the resolved probe country code.
ProbeCC() string
// ProbeNetworkName must be called after MaybeLookupLocationContext
// and returns the resolved probe country code.
ProbeNetworkName() string
// ResolverANSString must be called after MaybeLookupLocationContext
// and returns the resolved resolver's ASN as a string.
ResolverASNString() string
// ResolverIP must be called after MaybeLookupLocationContext
// and returns the resolved resolver's IP.
ResolverIP() string
// ResolverNetworkName must be called after MaybeLookupLocationContext
// and returns the resolved resolver's network name.
ResolverNetworkName() string
}
// taskExperimentBuilder builds a taskExperiment.
type taskExperimentBuilder interface {
// SetCallbacks sets the experiment callbacks.
SetCallbacks(callbacks model.ExperimentCallbacks)
// InputPolicy returns the experiment's input policy.
InputPolicy() engine.InputPolicy
// NewExperiment creates the new experiment.
NewExperimentInstance() taskExperiment
// Interruptible returns whether this experiment is interruptible.
Interruptible() bool
}
// taskExperiment is a runnable experiment.
type taskExperiment interface {
// KibiBytesReceived returns the KiB received by the experiment.
KibiBytesReceived() float64
// KibiBytesSent returns the KiB sent by the experiment.
KibiBytesSent() float64
// OpenReportContext opens a new report.
OpenReportContext(ctx context.Context) error
// ReportID must be called after a successful OpenReportContext
// and returns the report ID for this measurement.
ReportID() string
// MeasureWithContext runs the measurement.
MeasureWithContext(ctx context.Context, input string) (
measurement *model.Measurement, err error)
// SubmitAndUpdateMeasurementContext submits the measurement
// and updates its report ID on success.
SubmitAndUpdateMeasurementContext(
ctx context.Context, measurement *model.Measurement) error
}
//
// Task Running
//
// This section contains the interfaces allowing us
// to run a task until completion.
//
// taskRunner runs a task until completion.
type taskRunner interface {
// Run runs until completion.
Run(ctx context.Context)
}
//
// Task Settings
//
// This section defines the settings used by a task.
//
// Settings contains settings for a task. This structure derives from
// the one described by MK v0.10.9 FFI API (https://git.io/Jv4Rv), yet
// since 2020-12-03 we're not backwards compatible anymore.
type settings struct {
// Annotations contains the annotations to be added
// to every measurements performed by the task.
Annotations map[string]string `json:"annotations,omitempty"`
// AssetsDir is the directory where to store assets. This
// field is an extension of MK's specification. If
// this field is empty, the task won't start.
AssetsDir string `json:"assets_dir"`
// DisabledEvents contains disabled events. See
// https://git.io/Jv4Rv for the events names.
//
// This setting is currently ignored. We noticed the
// code was ignoring it on 2021-12-01.
DisabledEvents []string `json:"disabled_events,omitempty"`
// Inputs contains the inputs. The task will fail if it
// requires input and you provide no input.
Inputs []string `json:"inputs,omitempty"`
// LogLevel contains the logs level. See https://git.io/Jv4Rv
// for the names of the available log levels.
LogLevel string `json:"log_level,omitempty"`
// Name contains the task name. By https://git.io/Jv4Rv the
// names are in camel case, e.g. `Ndt`.
Name string `json:"name"`
// Options contains the task options.
Options settingsOptions `json:"options"`
// Proxy allows you to optionally force a specific proxy
// rather than using no proxy (the default).
//
// Use `psiphon:///` to force using Psiphon with the
// embedded configuration file. Not all builds have
// an embedded configuration file, but OONI builds have
// such a file, so they can use this functionality.
//
// Use `socks5://10.0.0.1:9050/` to connect to a SOCKS5
// proxy running on 10.0.0.1:9050. This could be, for
// example, a suitably configured `tor` instance.
Proxy string
// StateDir is the directory where to store persistent data. This
// field is an extension of MK's specification. If
// this field is empty, the task won't start.
StateDir string `json:"state_dir"`
// TempDir is the temporary directory. This field is an extension of MK's
// specification. If this field is empty, we will pick the tempdir that
// ioutil.TempDir uses by default, which may not work on mobile. According
// to our experiments as of 2020-06-10, leaving the TempDir empty works
// for iOS and does not work for Android.
TempDir string `json:"temp_dir"`
// TunnelDir is the directory where to store persistent state
// related to circumvention tunnels. This directory is required
// only if you want to use the tunnels. Added since 3.10.0.
TunnelDir string `json:"tunnel_dir"`
// Version indicates the version of this structure.
Version int64 `json:"version"`
}
// settingsOptions contains the settings options
type settingsOptions struct {
// MaxRuntime is the maximum runtime expressed in seconds. A negative
// value for this field disables the maximum runtime. Using
// a zero value will also mean disabled. This is not the
// original behaviour of Measurement Kit, which used to run
// for zero time in such case.
MaxRuntime float64 `json:"max_runtime,omitempty"`
// NoCollector indicates whether to use a collector
NoCollector bool `json:"no_collector,omitempty"`
// ProbeServicesBaseURL contains the probe services base URL.
ProbeServicesBaseURL string `json:"probe_services_base_url,omitempty"`
// SoftwareName is the software name. If this option is not
// present, then the library startup will fail.
SoftwareName string `json:"software_name,omitempty"`
// SoftwareVersion is the software version. If this option is not
// present, then the library startup will fail.
SoftwareVersion string `json:"software_version,omitempty"`
}

View File

@ -9,54 +9,25 @@ import (
"github.com/ooni/probe-cli/v3/internal/engine"
"github.com/ooni/probe-cli/v3/internal/engine/model"
"github.com/ooni/probe-cli/v3/internal/kvstore"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)
const (
failureIPLookup = "failure.ip_lookup"
failureASNLookup = "failure.asn_lookup"
failureCCLookup = "failure.cc_lookup"
failureMeasurement = "failure.measurement"
failureMeasurementSubmission = "failure.measurement_submission"
failureReportCreate = "failure.report_create"
failureResolverLookup = "failure.resolver_lookup"
failureStartup = "failure.startup"
measurement = "measurement"
statusEnd = "status.end"
statusGeoIPLookup = "status.geoip_lookup"
statusMeasurementDone = "status.measurement_done"
statusMeasurementStart = "status.measurement_start"
statusMeasurementSubmission = "status.measurement_submission"
statusProgress = "status.progress"
statusQueued = "status.queued"
statusReportCreate = "status.report_create"
statusResolverLookup = "status.resolver_lookup"
statusStarted = "status.started"
)
// run runs the task specified by settings.Name until completion. This is the
// top-level API that should be called by oonimkall.
func run(ctx context.Context, settings *settings, out chan<- *event) {
eof := make(chan interface{})
defer close(eof) // tell the emitter to not emit anymore.
r := newRunner(settings, out, eof)
r.Run(ctx)
}
// runner runs a specific task
type runner struct {
emitter *eventEmitter
maybeLookupLocation func(*engine.Session) error
out chan<- *event
// runnerForTask runs a specific task
type runnerForTask struct {
emitter *taskEmitterWrapper
kvStoreBuilder taskKVStoreFSBuilder
sessionBuilder taskSessionBuilder
settings *settings
}
var _ taskRunner = &runnerForTask{}
// newRunner creates a new task runner
func newRunner(settings *settings, out chan<- *event, eof <-chan interface{}) *runner {
return &runner{
emitter: newEventEmitter(settings.DisabledEvents, out, eof),
out: out,
func newRunner(settings *settings, emitter taskEmitter) *runnerForTask {
return &runnerForTask{
emitter: &taskEmitterWrapper{emitter},
kvStoreBuilder: &taskKVStoreFSBuilderEngine{},
sessionBuilder: &taskSessionBuilderEngine{},
settings: settings,
}
}
@ -64,22 +35,20 @@ func newRunner(settings *settings, out chan<- *event, eof <-chan interface{}) *r
// failureInvalidVersion is the failure returned when Version is invalid
const failureInvalidVersion = "invalid Settings.Version number"
func (r *runner) hasUnsupportedSettings(logger *chanLogger) bool {
if r.settings.Version < 1 {
func (r *runnerForTask) hasUnsupportedSettings() bool {
if r.settings.Version < taskABIVersion {
r.emitter.EmitFailureStartup(failureInvalidVersion)
return true
}
return false
}
func (r *runner) newsession(ctx context.Context, logger *chanLogger) (*engine.Session, error) {
kvstore, err := kvstore.NewFS(r.settings.StateDir)
func (r *runnerForTask) newsession(ctx context.Context, logger model.Logger) (taskSession, error) {
kvstore, err := r.kvStoreBuilder.NewFS(r.settings.StateDir)
if err != nil {
return nil, err
}
// TODO(bassosimone): write tests for this functionality
// See https://github.com/ooni/probe/issues/1465.
var proxyURL *url.URL
if r.settings.Proxy != "" {
var err error
@ -104,11 +73,11 @@ func (r *runner) newsession(ctx context.Context, logger *chanLogger) (*engine.Se
Address: r.settings.Options.ProbeServicesBaseURL,
}}
}
return engine.NewSession(ctx, config)
return r.sessionBuilder.NewSession(ctx, config)
}
func (r *runner) contextForExperiment(
ctx context.Context, builder *engine.ExperimentBuilder,
func (r *runnerForTask) contextForExperiment(
ctx context.Context, builder taskExperimentBuilder,
) context.Context {
if builder.Interruptible() {
return ctx
@ -117,11 +86,11 @@ func (r *runner) contextForExperiment(
}
type runnerCallbacks struct {
emitter *eventEmitter
emitter taskEmitter
}
func (cb *runnerCallbacks) OnProgress(percentage float64, message string) {
cb.emitter.Emit(statusProgress, eventStatusProgress{
cb.emitter.Emit(eventTypeStatusProgress, eventStatusProgress{
Percentage: 0.4 + (percentage * 0.6), // open report is 40%
Message: message,
})
@ -130,13 +99,14 @@ func (cb *runnerCallbacks) OnProgress(percentage float64, message string) {
// Run runs the runner until completion. The context argument controls
// when to stop when processing multiple inputs, as well as when to stop
// experiments explicitly marked as interruptible.
func (r *runner) Run(ctx context.Context) {
logger := newChanLogger(r.emitter, r.settings.LogLevel, r.out)
r.emitter.Emit(statusQueued, eventEmpty{})
if r.hasUnsupportedSettings(logger) {
func (r *runnerForTask) Run(ctx context.Context) {
var logger model.Logger = newTaskLogger(r.emitter, r.settings.LogLevel)
r.emitter.Emit(eventTypeStatusQueued, eventEmpty{})
if r.hasUnsupportedSettings() {
// event failureStartup already emitted
return
}
r.emitter.Emit(statusStarted, eventEmpty{})
r.emitter.Emit(eventTypeStatusStarted, eventEmpty{})
sess, err := r.newsession(ctx, logger)
if err != nil {
r.emitter.EmitFailureStartup(err.Error())
@ -145,45 +115,39 @@ func (r *runner) Run(ctx context.Context) {
endEvent := new(eventStatusEnd)
defer func() {
sess.Close()
r.emitter.Emit(statusEnd, endEvent)
r.emitter.Emit(eventTypeStatusEnd, endEvent)
}()
builder, err := sess.NewExperimentBuilder(r.settings.Name)
builder, err := sess.NewExperimentBuilderByName(r.settings.Name)
if err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
}
logger.Info("Looking up OONI backends... please, be patient")
if err := sess.MaybeLookupBackends(); err != nil {
if err := sess.MaybeLookupBackendsContext(ctx); err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
}
r.emitter.EmitStatusProgress(0.1, "contacted bouncer")
logger.Info("Looking up your location... please, be patient")
maybeLookupLocation := r.maybeLookupLocation
if maybeLookupLocation == nil {
maybeLookupLocation = func(sess *engine.Session) error {
return sess.MaybeLookupLocation()
}
}
if err := maybeLookupLocation(sess); err != nil {
r.emitter.EmitFailureGeneric(failureIPLookup, err.Error())
r.emitter.EmitFailureGeneric(failureASNLookup, err.Error())
r.emitter.EmitFailureGeneric(failureCCLookup, err.Error())
r.emitter.EmitFailureGeneric(failureResolverLookup, err.Error())
if err := sess.MaybeLookupLocationContext(ctx); err != nil {
r.emitter.EmitFailureGeneric(eventTypeFailureIPLookup, err.Error())
r.emitter.EmitFailureGeneric(eventTypeFailureASNLookup, err.Error())
r.emitter.EmitFailureGeneric(eventTypeFailureCCLookup, err.Error())
r.emitter.EmitFailureGeneric(eventTypeFailureResolverLookup, err.Error())
return
}
r.emitter.EmitStatusProgress(0.2, "geoip lookup")
r.emitter.EmitStatusProgress(0.3, "resolver lookup")
r.emitter.Emit(statusGeoIPLookup, eventStatusGeoIPLookup{
r.emitter.Emit(eventTypeStatusGeoIPLookup, eventStatusGeoIPLookup{
ProbeIP: sess.ProbeIP(),
ProbeASN: sess.ProbeASNString(),
ProbeCC: sess.ProbeCC(),
ProbeNetworkName: sess.ProbeNetworkName(),
})
r.emitter.Emit(statusResolverLookup, eventStatusResolverLookup{
r.emitter.Emit(eventTypeStatusResolverLookup, eventStatusResolverLookup{
ResolverASN: sess.ResolverASNString(),
ResolverIP: sess.ResolverIP(),
ResolverNetworkName: sess.ResolverNetworkName(),
@ -198,19 +162,19 @@ func (r *runner) Run(ctx context.Context) {
}
r.settings.Inputs = append(r.settings.Inputs, "")
}
experiment := builder.NewExperiment()
experiment := builder.NewExperimentInstance()
defer func() {
endEvent.DownloadedKB = experiment.KibiBytesReceived()
endEvent.UploadedKB = experiment.KibiBytesSent()
}()
if !r.settings.Options.NoCollector {
logger.Info("Opening report... please, be patient")
if err := experiment.OpenReport(); err != nil {
r.emitter.EmitFailureGeneric(failureReportCreate, err.Error())
if err := experiment.OpenReportContext(ctx); err != nil {
r.emitter.EmitFailureGeneric(eventTypeFailureReportCreate, err.Error())
return
}
r.emitter.EmitStatusProgress(0.4, "open report")
r.emitter.Emit(statusReportCreate, eventStatusReportGeneric{
r.emitter.Emit(eventTypeStatusReportCreate, eventStatusReportGeneric{
ReportID: experiment.ReportID(),
})
}
@ -242,7 +206,7 @@ func (r *runner) Run(ctx context.Context) {
break
}
logger.Infof("Starting measurement with index %d", idx)
r.emitter.Emit(statusMeasurementStart, eventMeasurementGeneric{
r.emitter.Emit(eventTypeStatusMeasurementStart, eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
})
@ -269,7 +233,7 @@ func (r *runner) Run(ctx context.Context) {
}
m.AddAnnotations(r.settings.Annotations)
if err != nil {
r.emitter.Emit(failureMeasurement, eventMeasurementGeneric{
r.emitter.Emit(eventTypeFailureMeasurement, eventMeasurementGeneric{
Failure: err.Error(),
Idx: int64(idx),
Input: input,
@ -282,14 +246,14 @@ func (r *runner) Run(ctx context.Context) {
}
data, err := json.Marshal(m)
runtimex.PanicOnError(err, "measurement.MarshalJSON failed")
r.emitter.Emit(measurement, eventMeasurementGeneric{
r.emitter.Emit(eventTypeMeasurement, eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
JSONStr: string(data),
})
if !r.settings.Options.NoCollector {
logger.Info("Submitting measurement... please, be patient")
err := experiment.SubmitAndUpdateMeasurement(m)
err := experiment.SubmitAndUpdateMeasurementContext(ctx, m)
r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
@ -297,7 +261,7 @@ func (r *runner) Run(ctx context.Context) {
Failure: measurementSubmissionFailure(err),
})
}
r.emitter.Emit(statusMeasurementDone, eventMeasurementGeneric{
r.emitter.Emit(eventTypeStatusMeasurementDone, eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
})
@ -306,9 +270,9 @@ func (r *runner) Run(ctx context.Context) {
func measurementSubmissionEventName(err error) string {
if err != nil {
return failureMeasurementSubmission
return eventTypeFailureMeasurementSubmission
}
return statusMeasurementSubmission
return eventTypeStatusMeasurementSubmission
}
func measurementSubmissionFailure(err error) string {

View File

@ -0,0 +1,741 @@
package oonimkall
import (
"context"
"errors"
"testing"
"time"
"github.com/google/go-cmp/cmp"
engine "github.com/ooni/probe-cli/v3/internal/engine"
"github.com/ooni/probe-cli/v3/internal/engine/model"
)
func TestMeasurementSubmissionEventName(t *testing.T) {
if measurementSubmissionEventName(nil) != eventTypeStatusMeasurementSubmission {
t.Fatal("unexpected submission event name")
}
if measurementSubmissionEventName(errors.New("mocked error")) != eventTypeFailureMeasurementSubmission {
t.Fatal("unexpected submission event name")
}
}
func TestMeasurementSubmissionFailure(t *testing.T) {
if measurementSubmissionFailure(nil) != "" {
t.Fatal("unexpected submission failure")
}
if measurementSubmissionFailure(errors.New("mocked error")) != "mocked error" {
t.Fatal("unexpected submission failure")
}
}
func TestTaskRunnerRun(t *testing.T) {
// newRunnerForTesting is a factory for creating a new
// runner that wraps newRunner and also sets a specific
// taskSessionBuilder for testing purposes.
newRunnerForTesting := func() (*runnerForTask, *CollectorTaskEmitter) {
settings := &settings{
Name: "Example",
Options: settingsOptions{
SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0",
},
StateDir: "testdata/state",
Version: 1,
}
e := &CollectorTaskEmitter{}
r := newRunner(settings, e)
return r, e
}
// runAndCollectContext runs the task until completion
// and collects the emitted events. Remember that
// it's not race safe to modify the events.
runAndCollectContext := func(ctx context.Context, r taskRunner, e *CollectorTaskEmitter) []*event {
r.Run(ctx)
return e.Collect()
}
// runAndCollect is like runAndCollectContext
// but uses context.Background()
runAndCollect := func(r taskRunner, e *CollectorTaskEmitter) []*event {
return runAndCollectContext(context.Background(), r, e)
}
// countEventsByKey returns the number of events
// with the given key inside of the list.
countEventsByKey := func(events []*event, key string) (count int) {
for _, ev := range events {
if ev.Key == key {
count++
}
}
return
}
// assertCountEventsByKey fails is the number of events
// of the given type is not the expected one.
assertCountEventsByKey := func(events []*event, key string, count int) {
if countEventsByKey(events, key) != count {
t.Fatalf("unexpected number of '%s' events", key)
}
}
t.Run("with unsupported settings", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
runner.settings.Version = 0 // force unsupported version
events := runAndCollect(runner, emitter)
assertCountEventsByKey(events, eventTypeFailureStartup, 1)
})
t.Run("with failure when creating a new kvstore", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
// override the kvstore builder to provoke an error
runner.kvStoreBuilder = &MockableKVStoreFSBuilder{
MockNewFS: func(path string) (model.KeyValueStore, error) {
return nil, errors.New("generic error")
},
}
events := runAndCollect(runner, emitter)
assertCountEventsByKey(events, eventTypeFailureStartup, 1)
})
t.Run("with unparsable proxyURL", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
runner.settings.Proxy = "\t" // invalid proxy URL
events := runAndCollect(runner, emitter)
assertCountEventsByKey(events, eventTypeFailureStartup, 1)
})
t.Run("with a parsable proxyURL", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
// set a valid URL
runner.settings.Proxy = "https://127.0.0.1/"
// set a fake session builder that causes the startup to
// fail but records the config passed to NewSession
saver := &SessionBuilderConfigSaver{}
runner.sessionBuilder = saver
events := runAndCollect(runner, emitter)
assertCountEventsByKey(events, eventTypeFailureStartup, 1)
if saver.Config.ProxyURL.String() != runner.settings.Proxy {
t.Fatal("invalid proxy URL")
}
})
t.Run("with custom probe services URL", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
// set a probe services URL
runner.settings.Options.ProbeServicesBaseURL = "https://127.0.0.1"
// set a fake session builder that causes the startup to
// fail but records the config passed to NewSession
saver := &SessionBuilderConfigSaver{}
runner.sessionBuilder = saver
events := runAndCollect(runner, emitter)
assertCountEventsByKey(events, eventTypeFailureStartup, 1)
psu := saver.Config.AvailableProbeServices
if len(psu) != 1 {
t.Fatal("invalid length")
}
if psu[0].Type != "https" {
t.Fatal("invalid type")
}
if psu[0].Address != runner.settings.Options.ProbeServicesBaseURL {
t.Fatal("invalid address")
}
if psu[0].Front != "" {
t.Fatal("invalid front")
}
})
type eventKeyCount struct {
Key string
Count int
}
// reduceEventsKeysIgnoreLog reduces the list of event keys
// counting equal subsequent keys and ignoring log events
reduceEventsKeysIgnoreLog := func(events []*event) (out []eventKeyCount) {
var current eventKeyCount
for _, ev := range events {
if ev.Key == eventTypeLog {
continue
}
if current.Key == ev.Key {
current.Count++
continue
}
if current.Key != "" {
out = append(out, current)
}
current.Key = ev.Key
current.Count = 1
}
if current.Key != "" {
out = append(out, current)
}
return
}
// fakeSuccessfulRun returns a new set of dependencies that
// will perform a fully successful, but fake, run.
fakeSuccessfulRun := func() *MockableTaskRunnerDependencies {
return &MockableTaskRunnerDependencies{
MockableKibiBytesReceived: func() float64 {
return 10
},
MockableKibiBytesSent: func() float64 {
return 4
},
MockableOpenReportContext: func(ctx context.Context) error {
return nil
},
MockableReportID: func() string {
return "20211202T074907Z_example_IT_30722_n1_axDLHNUfJaV1IbuU"
},
MockableMeasureWithContext: func(ctx context.Context, input string) (*model.Measurement, error) {
return &model.Measurement{}, nil
},
MockableSubmitAndUpdateMeasurementContext: func(ctx context.Context, measurement *model.Measurement) error {
return nil
},
MockableSetCallbacks: func(callbacks model.ExperimentCallbacks) {
},
MockableInputPolicy: func() engine.InputPolicy {
return engine.InputNone
},
MockableInterruptible: func() bool {
return false
},
MockClose: func() error {
return nil
},
MockMaybeLookupBackendsContext: func(ctx context.Context) error {
return nil
},
MockMaybeLookupLocationContext: func(ctx context.Context) error {
return nil
},
MockProbeIP: func() string {
return "130.192.91.211"
},
MockProbeASNString: func() string {
return "AS137"
},
MockProbeCC: func() string {
return "IT"
},
MockProbeNetworkName: func() string {
return "GARR"
},
MockResolverASNString: func() string {
return "AS137"
},
MockResolverIP: func() string {
return "130.192.3.24"
},
MockResolverNetworkName: func() string {
return "GARR"
},
}
}
assertReducedEventsLike := func(t *testing.T, expected, got []eventKeyCount) {
if diff := cmp.Diff(expected, got); diff != "" {
t.Fatal(diff)
}
}
t.Run("with invalid experiment name", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
fake := fakeSuccessfulRun()
fake.MockNewExperimentBuilderByName = func(name string) (taskExperimentBuilder, error) {
return nil, errors.New("invalid experiment name")
}
runner.sessionBuilder = fake
events := runAndCollect(runner, emitter)
reduced := reduceEventsKeysIgnoreLog(events)
expect := []eventKeyCount{{
Key: eventTypeStatusQueued,
Count: 1,
}, {
Key: eventTypeStatusStarted,
Count: 1,
}, {
Key: eventTypeFailureStartup,
Count: 1,
}, {
Key: eventTypeStatusEnd,
Count: 1,
}}
assertReducedEventsLike(t, expect, reduced)
})
t.Run("with error during backends lookup", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
fake := fakeSuccessfulRun()
fake.MockMaybeLookupBackendsContext = func(ctx context.Context) error {
return errors.New("mocked error")
}
runner.sessionBuilder = fake
events := runAndCollect(runner, emitter)
reduced := reduceEventsKeysIgnoreLog(events)
expect := []eventKeyCount{{
Key: eventTypeStatusQueued,
Count: 1,
}, {
Key: eventTypeStatusStarted,
Count: 1,
}, {
Key: eventTypeFailureStartup,
Count: 1,
}, {
Key: eventTypeStatusEnd,
Count: 1,
}}
assertReducedEventsLike(t, expect, reduced)
})
t.Run("with error during location lookup", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
fake := fakeSuccessfulRun()
fake.MockMaybeLookupLocationContext = func(ctx context.Context) error {
return errors.New("mocked error")
}
runner.sessionBuilder = fake
events := runAndCollect(runner, emitter)
reduced := reduceEventsKeysIgnoreLog(events)
expect := []eventKeyCount{{
Key: eventTypeStatusQueued,
Count: 1,
}, {
Key: eventTypeStatusStarted,
Count: 1,
}, {
Key: eventTypeStatusProgress,
Count: 1,
}, {
Key: eventTypeFailureIPLookup,
Count: 1,
}, {
Key: eventTypeFailureASNLookup,
Count: 1,
}, {
Key: eventTypeFailureCCLookup,
Count: 1,
}, {
Key: eventTypeFailureResolverLookup,
Count: 1,
}, {
Key: eventTypeStatusEnd,
Count: 1,
}}
assertReducedEventsLike(t, expect, reduced)
})
t.Run("with missing input and input or query backend", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
fake := fakeSuccessfulRun()
fake.MockableInputPolicy = func() engine.InputPolicy {
return engine.InputOrQueryBackend
}
runner.sessionBuilder = fake
events := runAndCollect(runner, emitter)
reduced := reduceEventsKeysIgnoreLog(events)
expect := []eventKeyCount{{
Key: eventTypeStatusQueued,
Count: 1,
}, {
Key: eventTypeStatusStarted,
Count: 1,
}, {
Key: eventTypeStatusProgress,
Count: 3,
}, {
Key: eventTypeStatusGeoIPLookup,
Count: 1,
}, {
Key: eventTypeStatusResolverLookup,
Count: 1,
}, {
Key: eventTypeFailureStartup,
Count: 1,
}, {
Key: eventTypeStatusEnd,
Count: 1,
}}
assertReducedEventsLike(t, expect, reduced)
})
t.Run("with missing input and input strictly required", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
fake := fakeSuccessfulRun()
fake.MockableInputPolicy = func() engine.InputPolicy {
return engine.InputStrictlyRequired
}
runner.sessionBuilder = fake
events := runAndCollect(runner, emitter)
reduced := reduceEventsKeysIgnoreLog(events)
expect := []eventKeyCount{{
Key: eventTypeStatusQueued,
Count: 1,
}, {
Key: eventTypeStatusStarted,
Count: 1,
}, {
Key: eventTypeStatusProgress,
Count: 3,
}, {
Key: eventTypeStatusGeoIPLookup,
Count: 1,
}, {
Key: eventTypeStatusResolverLookup,
Count: 1,
}, {
Key: eventTypeFailureStartup,
Count: 1,
}, {
Key: eventTypeStatusEnd,
Count: 1,
}}
assertReducedEventsLike(t, expect, reduced)
})
t.Run("with failure opening report", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
fake := fakeSuccessfulRun()
fake.MockableOpenReportContext = func(ctx context.Context) error {
return errors.New("mocked error")
}
runner.sessionBuilder = fake
events := runAndCollect(runner, emitter)
reduced := reduceEventsKeysIgnoreLog(events)
expect := []eventKeyCount{{
Key: eventTypeStatusQueued,
Count: 1,
}, {
Key: eventTypeStatusStarted,
Count: 1,
}, {
Key: eventTypeStatusProgress,
Count: 3,
}, {
Key: eventTypeStatusGeoIPLookup,
Count: 1,
}, {
Key: eventTypeStatusResolverLookup,
Count: 1,
}, {
Key: eventTypeFailureReportCreate,
Count: 1,
}, {
Key: eventTypeStatusEnd,
Count: 1,
}}
assertReducedEventsLike(t, expect, reduced)
})
t.Run("with success and no input", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
fake := fakeSuccessfulRun()
runner.sessionBuilder = fake
events := runAndCollect(runner, emitter)
reduced := reduceEventsKeysIgnoreLog(events)
expect := []eventKeyCount{{
Key: eventTypeStatusQueued,
Count: 1,
}, {
Key: eventTypeStatusStarted,
Count: 1,
}, {
Key: eventTypeStatusProgress,
Count: 3,
}, {
Key: eventTypeStatusGeoIPLookup,
Count: 1,
}, {
Key: eventTypeStatusResolverLookup,
Count: 1,
}, {
Key: eventTypeStatusProgress,
Count: 1,
}, {
Key: eventTypeStatusReportCreate,
Count: 1,
}, {
Key: eventTypeStatusMeasurementStart,
Count: 1,
}, {
Key: eventTypeMeasurement,
Count: 1,
}, {
Key: eventTypeStatusMeasurementSubmission,
Count: 1,
}, {
Key: eventTypeStatusMeasurementDone,
Count: 1,
}, {
Key: eventTypeStatusEnd,
Count: 1,
}}
assertReducedEventsLike(t, expect, reduced)
})
t.Run("with measurement failure and no input", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
fake := fakeSuccessfulRun()
fake.MockableMeasureWithContext = func(ctx context.Context, input string) (measurement *model.Measurement, err error) {
return nil, errors.New("preconditions error")
}
runner.sessionBuilder = fake
events := runAndCollect(runner, emitter)
reduced := reduceEventsKeysIgnoreLog(events)
expect := []eventKeyCount{{
Key: eventTypeStatusQueued,
Count: 1,
}, {
Key: eventTypeStatusStarted,
Count: 1,
}, {
Key: eventTypeStatusProgress,
Count: 3,
}, {
Key: eventTypeStatusGeoIPLookup,
Count: 1,
}, {
Key: eventTypeStatusResolverLookup,
Count: 1,
}, {
Key: eventTypeStatusProgress,
Count: 1,
}, {
Key: eventTypeStatusReportCreate,
Count: 1,
}, {
Key: eventTypeStatusMeasurementStart,
Count: 1,
}, {
Key: eventTypeFailureMeasurement,
Count: 1,
}, {
Key: eventTypeStatusEnd,
Count: 1,
}}
assertReducedEventsLike(t, expect, reduced)
})
t.Run("with success and input", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
runner.settings.Inputs = []string{"a", "b", "c", "d"}
fake := fakeSuccessfulRun()
fake.MockableInputPolicy = func() engine.InputPolicy {
return engine.InputStrictlyRequired
}
runner.sessionBuilder = fake
events := runAndCollect(runner, emitter)
reduced := reduceEventsKeysIgnoreLog(events)
expect := []eventKeyCount{
{Key: eventTypeStatusQueued, Count: 1},
{Key: eventTypeStatusStarted, Count: 1},
{Key: eventTypeStatusProgress, Count: 3},
{Key: eventTypeStatusGeoIPLookup, Count: 1},
{Key: eventTypeStatusResolverLookup, Count: 1},
{Key: eventTypeStatusProgress, Count: 1},
{Key: eventTypeStatusReportCreate, Count: 1},
//
{Key: eventTypeStatusMeasurementStart, Count: 1},
{Key: eventTypeStatusProgress, Count: 1},
{Key: eventTypeMeasurement, Count: 1},
{Key: eventTypeStatusMeasurementSubmission, Count: 1},
{Key: eventTypeStatusMeasurementDone, Count: 1},
//
{Key: eventTypeStatusMeasurementStart, Count: 1},
{Key: eventTypeStatusProgress, Count: 1},
{Key: eventTypeMeasurement, Count: 1},
{Key: eventTypeStatusMeasurementSubmission, Count: 1},
{Key: eventTypeStatusMeasurementDone, Count: 1},
//
{Key: eventTypeStatusMeasurementStart, Count: 1},
{Key: eventTypeStatusProgress, Count: 1},
{Key: eventTypeMeasurement, Count: 1},
{Key: eventTypeStatusMeasurementSubmission, Count: 1},
{Key: eventTypeStatusMeasurementDone, Count: 1},
//
{Key: eventTypeStatusMeasurementStart, Count: 1},
{Key: eventTypeStatusProgress, Count: 1},
{Key: eventTypeMeasurement, Count: 1},
{Key: eventTypeStatusMeasurementSubmission, Count: 1},
{Key: eventTypeStatusMeasurementDone, Count: 1},
//
{Key: eventTypeStatusEnd, Count: 1},
}
assertReducedEventsLike(t, expect, reduced)
})
t.Run("with succes and max runtime", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
runner.settings.Inputs = []string{"a", "b", "c", "d"}
runner.settings.Options.MaxRuntime = 2
fake := fakeSuccessfulRun()
fake.MockableInputPolicy = func() engine.InputPolicy {
return engine.InputStrictlyRequired
}
fake.MockableMeasureWithContext = func(ctx context.Context, input string) (measurement *model.Measurement, err error) {
time.Sleep(1 * time.Second)
return &model.Measurement{}, nil
}
runner.sessionBuilder = fake
events := runAndCollect(runner, emitter)
reduced := reduceEventsKeysIgnoreLog(events)
expect := []eventKeyCount{
{Key: eventTypeStatusQueued, Count: 1},
{Key: eventTypeStatusStarted, Count: 1},
{Key: eventTypeStatusProgress, Count: 3},
{Key: eventTypeStatusGeoIPLookup, Count: 1},
{Key: eventTypeStatusResolverLookup, Count: 1},
{Key: eventTypeStatusProgress, Count: 1},
{Key: eventTypeStatusReportCreate, Count: 1},
//
{Key: eventTypeStatusMeasurementStart, Count: 1},
{Key: eventTypeStatusProgress, Count: 1},
{Key: eventTypeMeasurement, Count: 1},
{Key: eventTypeStatusMeasurementSubmission, Count: 1},
{Key: eventTypeStatusMeasurementDone, Count: 1},
//
{Key: eventTypeStatusMeasurementStart, Count: 1},
{Key: eventTypeStatusProgress, Count: 1},
{Key: eventTypeMeasurement, Count: 1},
{Key: eventTypeStatusMeasurementSubmission, Count: 1},
{Key: eventTypeStatusMeasurementDone, Count: 1},
//
{Key: eventTypeStatusEnd, Count: 1},
}
assertReducedEventsLike(t, expect, reduced)
})
t.Run("with interrupted experiment", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
runner.settings.Inputs = []string{"a", "b", "c", "d"}
runner.settings.Options.MaxRuntime = 2
fake := fakeSuccessfulRun()
fake.MockableInputPolicy = func() engine.InputPolicy {
return engine.InputStrictlyRequired
}
fake.MockableInterruptible = func() bool {
return true
}
ctx, cancel := context.WithCancel(context.Background())
fake.MockableMeasureWithContext = func(ctx context.Context, input string) (measurement *model.Measurement, err error) {
cancel()
return &model.Measurement{}, nil
}
runner.sessionBuilder = fake
events := runAndCollectContext(ctx, runner, emitter)
reduced := reduceEventsKeysIgnoreLog(events)
expect := []eventKeyCount{
{Key: eventTypeStatusQueued, Count: 1},
{Key: eventTypeStatusStarted, Count: 1},
{Key: eventTypeStatusProgress, Count: 3},
{Key: eventTypeStatusGeoIPLookup, Count: 1},
{Key: eventTypeStatusResolverLookup, Count: 1},
{Key: eventTypeStatusProgress, Count: 1},
{Key: eventTypeStatusReportCreate, Count: 1},
//
{Key: eventTypeStatusMeasurementStart, Count: 1},
{Key: eventTypeStatusProgress, Count: 1},
//
{Key: eventTypeStatusEnd, Count: 1},
}
assertReducedEventsLike(t, expect, reduced)
})
t.Run("with measurement submission failure", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
runner.settings.Inputs = []string{"a"}
fake := fakeSuccessfulRun()
fake.MockableInputPolicy = func() engine.InputPolicy {
return engine.InputStrictlyRequired
}
fake.MockableSubmitAndUpdateMeasurementContext = func(ctx context.Context, measurement *model.Measurement) error {
return errors.New("cannot submit")
}
runner.sessionBuilder = fake
events := runAndCollect(runner, emitter)
reduced := reduceEventsKeysIgnoreLog(events)
expect := []eventKeyCount{
{Key: eventTypeStatusQueued, Count: 1},
{Key: eventTypeStatusStarted, Count: 1},
{Key: eventTypeStatusProgress, Count: 3},
{Key: eventTypeStatusGeoIPLookup, Count: 1},
{Key: eventTypeStatusResolverLookup, Count: 1},
{Key: eventTypeStatusProgress, Count: 1},
{Key: eventTypeStatusReportCreate, Count: 1},
//
{Key: eventTypeStatusMeasurementStart, Count: 1},
{Key: eventTypeStatusProgress, Count: 1},
{Key: eventTypeMeasurement, Count: 1},
{Key: eventTypeFailureMeasurementSubmission, Count: 1},
{Key: eventTypeStatusMeasurementDone, Count: 1},
//
{Key: eventTypeStatusEnd, Count: 1},
}
assertReducedEventsLike(t, expect, reduced)
})
t.Run("with success and progress", func(t *testing.T) {
runner, emitter := newRunnerForTesting()
fake := fakeSuccessfulRun()
var callbacks model.ExperimentCallbacks
fake.MockableSetCallbacks = func(cbs model.ExperimentCallbacks) {
callbacks = cbs
}
fake.MockableMeasureWithContext = func(ctx context.Context, input string) (measurement *model.Measurement, err error) {
callbacks.OnProgress(1, "hello from the fake experiment")
return &model.Measurement{}, nil
}
runner.sessionBuilder = fake
events := runAndCollect(runner, emitter)
reduced := reduceEventsKeysIgnoreLog(events)
expect := []eventKeyCount{{
Key: eventTypeStatusQueued,
Count: 1,
}, {
Key: eventTypeStatusStarted,
Count: 1,
}, {
Key: eventTypeStatusProgress,
Count: 3,
}, {
Key: eventTypeStatusGeoIPLookup,
Count: 1,
}, {
Key: eventTypeStatusResolverLookup,
Count: 1,
}, {
Key: eventTypeStatusProgress,
Count: 1,
}, {
Key: eventTypeStatusReportCreate,
Count: 1,
}, {
Key: eventTypeStatusMeasurementStart,
Count: 1,
}, {
Key: eventTypeStatusProgress,
Count: 1,
}, {
Key: eventTypeMeasurement,
Count: 1,
}, {
Key: eventTypeStatusMeasurementSubmission,
Count: 1,
}, {
Key: eventTypeStatusMeasurementDone,
Count: 1,
}, {
Key: eventTypeStatusEnd,
Count: 1,
}}
assertReducedEventsLike(t, expect, reduced)
})
}

View File

@ -0,0 +1,78 @@
package oonimkall
import (
"context"
"github.com/ooni/probe-cli/v3/internal/engine"
"github.com/ooni/probe-cli/v3/internal/engine/model"
"github.com/ooni/probe-cli/v3/internal/kvstore"
)
//
// This file implements taskSession and derived types.
//
// taskKVStoreFSBuilderEngine creates a new KVStore
// using the ./internal/engine package.
type taskKVStoreFSBuilderEngine struct{}
var _ taskKVStoreFSBuilder = &taskKVStoreFSBuilderEngine{}
func (*taskKVStoreFSBuilderEngine) NewFS(path string) (model.KeyValueStore, error) {
return kvstore.NewFS(path)
}
// taskSessionBuilderEngine builds a new session
// using the ./internal/engine package.
type taskSessionBuilderEngine struct{}
var _ taskSessionBuilder = &taskSessionBuilderEngine{}
// NewSession implements taskSessionBuilder.NewSession.
func (b *taskSessionBuilderEngine) NewSession(ctx context.Context,
config engine.SessionConfig) (taskSession, error) {
sess, err := engine.NewSession(ctx, config)
if err != nil {
return nil, err
}
return &taskSessionEngine{sess}, nil
}
// taskSessionEngine wraps ./internal/engine's Session.
type taskSessionEngine struct {
*engine.Session
}
var _ taskSession = &taskSessionEngine{}
// NewExperimentBuilderByName implements
// taskSessionEngine.NewExperimentBuilderByName.
func (sess *taskSessionEngine) NewExperimentBuilderByName(
name string) (taskExperimentBuilder, error) {
builder, err := sess.NewExperimentBuilder(name)
if err != nil {
return nil, err
}
return &taskExperimentBuilderEngine{builder}, err
}
// taskExperimentBuilderEngine wraps ./internal/engine's
// ExperimentBuilder type.
type taskExperimentBuilderEngine struct {
*engine.ExperimentBuilder
}
var _ taskExperimentBuilder = &taskExperimentBuilderEngine{}
// NewExperimentInstance implements
// taskExperimentBuilder.NewExperimentInstance.
func (b *taskExperimentBuilderEngine) NewExperimentInstance() taskExperiment {
return &taskExperimentEngine{b.NewExperiment()}
}
// taskExperimentEngine wraps ./internal/engine's Experiment.
type taskExperimentEngine struct {
*engine.Experiment
}
var _ taskExperiment = &taskExperimentEngine{}

View File

@ -0,0 +1,128 @@
package oonimkall
import (
"context"
"testing"
"github.com/apex/log"
"github.com/ooni/probe-cli/v3/internal/engine"
"github.com/ooni/probe-cli/v3/internal/version"
)
func TestTaskKVSToreFSBuilderEngine(t *testing.T) {
b := &taskKVStoreFSBuilderEngine{}
store, err := b.NewFS("testdata/state")
if err != nil {
t.Fatal(err)
}
if store == nil {
t.Fatal("expected non-nil store here")
}
}
func TestTaskSessionBuilderEngine(t *testing.T) {
t.Run("NewSession", func(t *testing.T) {
t.Run("on success", func(t *testing.T) {
builder := &taskSessionBuilderEngine{}
ctx := context.Background()
config := engine.SessionConfig{
Logger: log.Log,
SoftwareName: "ooniprobe-cli",
SoftwareVersion: version.Version,
}
sess, err := builder.NewSession(ctx, config)
if err != nil {
t.Fatal(err)
}
sess.Close()
})
t.Run("on failure", func(t *testing.T) {
builder := &taskSessionBuilderEngine{}
ctx := context.Background()
config := engine.SessionConfig{}
sess, err := builder.NewSession(ctx, config)
if err == nil {
t.Fatal("expected an error here")
}
if sess != nil {
t.Fatal("expected nil session here")
}
})
})
}
func TestTaskSessionEngine(t *testing.T) {
// newSession is a helper function for creating a new session.
newSession := func(t *testing.T) taskSession {
builder := &taskSessionBuilderEngine{}
ctx := context.Background()
config := engine.SessionConfig{
Logger: log.Log,
SoftwareName: "ooniprobe-cli",
SoftwareVersion: version.Version,
}
sess, err := builder.NewSession(ctx, config)
if err != nil {
t.Fatal(err)
}
return sess
}
t.Run("NewExperimentBuilderByName", func(t *testing.T) {
t.Run("on success", func(t *testing.T) {
sess := newSession(t)
builder, err := sess.NewExperimentBuilderByName("ndt")
if err != nil {
t.Fatal(err)
}
if builder == nil {
t.Fatal("expected non-nil builder")
}
})
t.Run("on failure", func(t *testing.T) {
sess := newSession(t)
builder, err := sess.NewExperimentBuilderByName("antani")
if err == nil {
t.Fatal("expected an error here")
}
if builder != nil {
t.Fatal("expected nil builder")
}
})
})
}
func TestTaskExperimentBuilderEngine(t *testing.T) {
// newBuilder is a helper function for creating a new session
// as well as a new experiment builder
newBuilder := func(t *testing.T) (taskSession, taskExperimentBuilder) {
builder := &taskSessionBuilderEngine{}
ctx := context.Background()
config := engine.SessionConfig{
Logger: log.Log,
SoftwareName: "ooniprobe-cli",
SoftwareVersion: version.Version,
}
sess, err := builder.NewSession(ctx, config)
if err != nil {
t.Fatal(err)
}
expBuilder, err := sess.NewExperimentBuilderByName("ndt")
if err != nil {
t.Fatal(err)
}
return sess, expBuilder
}
t.Run("NewExperiment", func(t *testing.T) {
_, builder := newBuilder(t)
exp := builder.NewExperimentInstance()
if exp == nil {
t.Fatal("expected non-nil experiment here")
}
})
}