[forwardport] refactor(oonimkall): merge internal/task into oonimkall (#617) (#618)

This diff forward ports bc4b9f1ea89158bfa7b7a80ae59a90b43c784ed2 to `master`.

See https://github.com/ooni/probe/issues/1903
This commit is contained in:
Simone Basso 2021-11-26 20:21:42 +01:00 committed by GitHub
parent ece6f3d48d
commit ee5be24900
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 182 additions and 194 deletions

View File

@ -1,22 +1,20 @@
package tasks package oonimkall
import ( import "fmt"
"fmt"
)
// ChanLogger is a logger targeting a channel // chanLogger is a logger targeting a channel
type ChanLogger struct { type chanLogger struct {
emitter *EventEmitter emitter *eventEmitter
hasdebug bool hasdebug bool
hasinfo bool hasinfo bool
haswarning bool haswarning bool
out chan<- *Event out chan<- *event
} }
// Debug implements Logger.Debug // Debug implements Logger.Debug
func (cl *ChanLogger) Debug(msg string) { func (cl *chanLogger) Debug(msg string) {
if cl.hasdebug { if cl.hasdebug {
cl.emitter.Emit("log", EventLog{ cl.emitter.Emit("log", eventLog{
LogLevel: "DEBUG", LogLevel: "DEBUG",
Message: msg, Message: msg,
}) })
@ -24,16 +22,16 @@ func (cl *ChanLogger) Debug(msg string) {
} }
// Debugf implements Logger.Debugf // Debugf implements Logger.Debugf
func (cl *ChanLogger) Debugf(format string, v ...interface{}) { func (cl *chanLogger) Debugf(format string, v ...interface{}) {
if cl.hasdebug { if cl.hasdebug {
cl.Debug(fmt.Sprintf(format, v...)) cl.Debug(fmt.Sprintf(format, v...))
} }
} }
// Info implements Logger.Info // Info implements Logger.Info
func (cl *ChanLogger) Info(msg string) { func (cl *chanLogger) Info(msg string) {
if cl.hasinfo { if cl.hasinfo {
cl.emitter.Emit("log", EventLog{ cl.emitter.Emit("log", eventLog{
LogLevel: "INFO", LogLevel: "INFO",
Message: msg, Message: msg,
}) })
@ -41,16 +39,16 @@ func (cl *ChanLogger) Info(msg string) {
} }
// Infof implements Logger.Infof // Infof implements Logger.Infof
func (cl *ChanLogger) Infof(format string, v ...interface{}) { func (cl *chanLogger) Infof(format string, v ...interface{}) {
if cl.hasinfo { if cl.hasinfo {
cl.Info(fmt.Sprintf(format, v...)) cl.Info(fmt.Sprintf(format, v...))
} }
} }
// Warn implements Logger.Warn // Warn implements Logger.Warn
func (cl *ChanLogger) Warn(msg string) { func (cl *chanLogger) Warn(msg string) {
if cl.haswarning { if cl.haswarning {
cl.emitter.Emit("log", EventLog{ cl.emitter.Emit("log", eventLog{
LogLevel: "WARNING", LogLevel: "WARNING",
Message: msg, Message: msg,
}) })
@ -58,16 +56,16 @@ func (cl *ChanLogger) Warn(msg string) {
} }
// Warnf implements Logger.Warnf // Warnf implements Logger.Warnf
func (cl *ChanLogger) Warnf(format string, v ...interface{}) { func (cl *chanLogger) Warnf(format string, v ...interface{}) {
if cl.haswarning { if cl.haswarning {
cl.Warn(fmt.Sprintf(format, v...)) cl.Warn(fmt.Sprintf(format, v...))
} }
} }
// NewChanLogger creates a new ChanLogger instance. // newChanLogger creates a new ChanLogger instance.
func NewChanLogger(emitter *EventEmitter, logLevel string, func newChanLogger(emitter *eventEmitter, logLevel string,
out chan<- *Event) *ChanLogger { out chan<- *event) *chanLogger {
cl := &ChanLogger{ cl := &chanLogger{
emitter: emitter, emitter: emitter,
out: out, out: out,
} }

View File

@ -1,14 +1,14 @@
package tasks package oonimkall
type eventEmpty struct{} type eventEmpty struct{}
// EventFailure contains information on a failure. // eventFailure contains information on a failure.
type EventFailure struct { type eventFailure struct {
Failure string `json:"failure"` Failure string `json:"failure"`
} }
// EventLog is an event containing a log message. // eventLog is an event containing a log message.
type EventLog struct { type eventLog struct {
LogLevel string `json:"log_level"` LogLevel string `json:"log_level"`
Message string `json:"message"` Message string `json:"message"`
} }
@ -33,8 +33,8 @@ type eventStatusGeoIPLookup struct {
ProbeNetworkName string `json:"probe_network_name"` ProbeNetworkName string `json:"probe_network_name"`
} }
// EventStatusProgress reports progress information. // eventStatusProgress reports progress information.
type EventStatusProgress struct { type eventStatusProgress struct {
Message string `json:"message"` Message string `json:"message"`
Percentage float64 `json:"percentage"` Percentage float64 `json:"percentage"`
} }
@ -49,9 +49,9 @@ type eventStatusResolverLookup struct {
ResolverNetworkName string `json:"resolver_network_name"` ResolverNetworkName string `json:"resolver_network_name"`
} }
// Event is an event emitted by a task. This structure extends the event // event is an event emitted by a task. This structure extends the event
// described by MK v0.10.9 FFI API (https://git.io/Jv4Rv). // described by MK v0.10.9 FFI API (https://git.io/Jv4Rv).
type Event struct { type event struct {
Key string `json:"key"` Key string `json:"key"`
Value interface{} `json:"value"` Value interface{} `json:"value"`
} }

View File

@ -0,0 +1,40 @@
package oonimkall
// eventEmitter emits event on a channel
type eventEmitter struct {
disabled map[string]bool
out chan<- *event
}
// newEventEmitter creates a new Emitter
func newEventEmitter(disabledEvents []string, out chan<- *event) *eventEmitter {
ee := &eventEmitter{out: out}
ee.disabled = make(map[string]bool)
for _, eventname := range disabledEvents {
ee.disabled[eventname] = true
}
return ee
}
// EmitFailureStartup emits the failureStartup event
func (ee *eventEmitter) EmitFailureStartup(failure string) {
ee.EmitFailureGeneric(failureStartup, failure)
}
// EmitFailureGeneric emits a failure event
func (ee *eventEmitter) EmitFailureGeneric(name, failure string) {
ee.Emit(name, eventFailure{Failure: failure})
}
// EmitStatusProgress emits the status.Progress event
func (ee *eventEmitter) EmitStatusProgress(percentage float64, message string) {
ee.Emit(statusProgress, eventStatusProgress{Message: message, Percentage: percentage})
}
// Emit emits the specified event
func (ee *eventEmitter) Emit(key string, value interface{}) {
if ee.disabled[key] {
return
}
ee.out <- &event{Key: key, Value: value}
}

View File

@ -1,16 +1,12 @@
package tasks_test package oonimkall
import ( import "testing"
"testing"
"github.com/ooni/probe-cli/v3/pkg/oonimkall/internal/tasks"
)
func TestDisabledEvents(t *testing.T) { func TestDisabledEvents(t *testing.T) {
out := make(chan *tasks.Event) out := make(chan *event)
emitter := tasks.NewEventEmitter([]string{"log"}, out) emitter := newEventEmitter([]string{"log"}, out)
go func() { go func() {
emitter.Emit("log", tasks.EventLog{Message: "foo"}) emitter.Emit("log", eventLog{Message: "foo"})
close(out) close(out)
}() }()
var count int64 var count int64
@ -25,8 +21,8 @@ func TestDisabledEvents(t *testing.T) {
} }
func TestEmitFailureStartup(t *testing.T) { func TestEmitFailureStartup(t *testing.T) {
out := make(chan *tasks.Event) out := make(chan *event)
emitter := tasks.NewEventEmitter([]string{}, out) emitter := newEventEmitter([]string{}, out)
go func() { go func() {
emitter.EmitFailureStartup("mocked error") emitter.EmitFailureStartup("mocked error")
close(out) close(out)
@ -34,7 +30,7 @@ func TestEmitFailureStartup(t *testing.T) {
var found bool var found bool
for ev := range out { for ev := range out {
if ev.Key == "failure.startup" { if ev.Key == "failure.startup" {
evv := ev.Value.(tasks.EventFailure) // panic if not castable evv := ev.Value.(eventFailure) // panic if not castable
if evv.Failure == "mocked error" { if evv.Failure == "mocked error" {
found = true found = true
} }
@ -46,8 +42,8 @@ func TestEmitFailureStartup(t *testing.T) {
} }
func TestEmitStatusProgress(t *testing.T) { func TestEmitStatusProgress(t *testing.T) {
out := make(chan *tasks.Event) out := make(chan *event)
emitter := tasks.NewEventEmitter([]string{}, out) emitter := newEventEmitter([]string{}, out)
go func() { go func() {
emitter.EmitStatusProgress(0.7, "foo") emitter.EmitStatusProgress(0.7, "foo")
close(out) close(out)
@ -55,7 +51,7 @@ func TestEmitStatusProgress(t *testing.T) {
var found bool var found bool
for ev := range out { for ev := range out {
if ev.Key == "status.progress" { if ev.Key == "status.progress" {
evv := ev.Value.(tasks.EventStatusProgress) // panic if not castable evv := ev.Value.(eventStatusProgress) // panic if not castable
if evv.Message == "foo" && evv.Percentage == 0.7 { if evv.Message == "foo" && evv.Percentage == 0.7 {
found = true found = true
} }

View File

@ -1,40 +0,0 @@
package tasks
// EventEmitter emits event on a channel
type EventEmitter struct {
disabled map[string]bool
out chan<- *Event
}
// NewEventEmitter creates a new Emitter
func NewEventEmitter(disabledEvents []string, out chan<- *Event) *EventEmitter {
ee := &EventEmitter{out: out}
ee.disabled = make(map[string]bool)
for _, eventname := range disabledEvents {
ee.disabled[eventname] = true
}
return ee
}
// EmitFailureStartup emits the failureStartup event
func (ee *EventEmitter) EmitFailureStartup(failure string) {
ee.EmitFailureGeneric(failureStartup, failure)
}
// EmitFailureGeneric emits a failure event
func (ee *EventEmitter) EmitFailureGeneric(name, failure string) {
ee.Emit(name, EventFailure{Failure: failure})
}
// EmitStatusProgress emits the status.Progress event
func (ee *EventEmitter) EmitStatusProgress(percentage float64, message string) {
ee.Emit(statusProgress, EventStatusProgress{Message: message, Percentage: percentage})
}
// Emit emits the specified event
func (ee *EventEmitter) Emit(key string, value interface{}) {
if ee.disabled[key] == true {
return
}
ee.out <- &Event{Key: key, Value: value}
}

View File

@ -1,5 +1,4 @@
// Package tasks implements tasks run using the oonimkall API. package oonimkall
package tasks
import ( import (
"context" "context"
@ -36,42 +35,42 @@ const (
statusStarted = "status.started" statusStarted = "status.started"
) )
// Run runs the task specified by settings.Name until completion. This is the // run runs the task specified by settings.Name until completion. This is the
// top-level API that should be called by oonimkall. // top-level API that should be called by oonimkall.
func Run(ctx context.Context, settings *Settings, out chan<- *Event) { func run(ctx context.Context, settings *settings, out chan<- *event) {
r := NewRunner(settings, out) r := newRunner(settings, out)
r.Run(ctx) r.Run(ctx)
} }
// Runner runs a specific task // runner runs a specific task
type Runner struct { type runner struct {
emitter *EventEmitter emitter *eventEmitter
maybeLookupLocation func(*engine.Session) error maybeLookupLocation func(*engine.Session) error
out chan<- *Event out chan<- *event
settings *Settings settings *settings
} }
// NewRunner creates a new task runner // newRunner creates a new task runner
func NewRunner(settings *Settings, out chan<- *Event) *Runner { func newRunner(settings *settings, out chan<- *event) *runner {
return &Runner{ return &runner{
emitter: NewEventEmitter(settings.DisabledEvents, out), emitter: newEventEmitter(settings.DisabledEvents, out),
out: out, out: out,
settings: settings, settings: settings,
} }
} }
// FailureInvalidVersion is the failure returned when Version is invalid // failureInvalidVersion is the failure returned when Version is invalid
const FailureInvalidVersion = "invalid Settings.Version number" const failureInvalidVersion = "invalid Settings.Version number"
func (r *Runner) hasUnsupportedSettings(logger *ChanLogger) bool { func (r *runner) hasUnsupportedSettings(logger *chanLogger) bool {
if r.settings.Version < 1 { if r.settings.Version < 1 {
r.emitter.EmitFailureStartup(FailureInvalidVersion) r.emitter.EmitFailureStartup(failureInvalidVersion)
return true return true
} }
return false return false
} }
func (r *Runner) newsession(ctx context.Context, logger *ChanLogger) (*engine.Session, error) { func (r *runner) newsession(ctx context.Context, logger *chanLogger) (*engine.Session, error) {
kvstore, err := kvstore.NewFS(r.settings.StateDir) kvstore, err := kvstore.NewFS(r.settings.StateDir)
if err != nil { if err != nil {
return nil, err return nil, err
@ -106,7 +105,7 @@ func (r *Runner) newsession(ctx context.Context, logger *ChanLogger) (*engine.Se
return engine.NewSession(ctx, config) return engine.NewSession(ctx, config)
} }
func (r *Runner) contextForExperiment( func (r *runner) contextForExperiment(
ctx context.Context, builder *engine.ExperimentBuilder, ctx context.Context, builder *engine.ExperimentBuilder,
) context.Context { ) context.Context {
if builder.Interruptible() { if builder.Interruptible() {
@ -116,11 +115,11 @@ func (r *Runner) contextForExperiment(
} }
type runnerCallbacks struct { type runnerCallbacks struct {
emitter *EventEmitter emitter *eventEmitter
} }
func (cb *runnerCallbacks) OnProgress(percentage float64, message string) { func (cb *runnerCallbacks) OnProgress(percentage float64, message string) {
cb.emitter.Emit(statusProgress, EventStatusProgress{ cb.emitter.Emit(statusProgress, eventStatusProgress{
Percentage: 0.4 + (percentage * 0.6), // open report is 40% Percentage: 0.4 + (percentage * 0.6), // open report is 40%
Message: message, Message: message,
}) })
@ -129,8 +128,8 @@ func (cb *runnerCallbacks) OnProgress(percentage float64, message string) {
// Run runs the runner until completion. The context argument controls // Run runs the runner until completion. The context argument controls
// when to stop when processing multiple inputs, as well as when to stop // when to stop when processing multiple inputs, as well as when to stop
// experiments explicitly marked as interruptible. // experiments explicitly marked as interruptible.
func (r *Runner) Run(ctx context.Context) { func (r *runner) Run(ctx context.Context) {
logger := NewChanLogger(r.emitter, r.settings.LogLevel, r.out) logger := newChanLogger(r.emitter, r.settings.LogLevel, r.out)
r.emitter.Emit(statusQueued, eventEmpty{}) r.emitter.Emit(statusQueued, eventEmpty{})
if r.hasUnsupportedSettings(logger) { if r.hasUnsupportedSettings(logger) {
return return

View File

@ -1,4 +1,4 @@
package tasks_test package oonimkall
import ( import (
"context" "context"
@ -8,8 +8,6 @@ import (
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/ooni/probe-cli/v3/pkg/oonimkall/internal/tasks"
) )
func TestRunnerMaybeLookupBackendsFailure(t *testing.T) { func TestRunnerMaybeLookupBackendsFailure(t *testing.T) {
@ -17,11 +15,11 @@ func TestRunnerMaybeLookupBackendsFailure(t *testing.T) {
w.WriteHeader(500) w.WriteHeader(500)
})) }))
defer server.Close() defer server.Close()
out := make(chan *tasks.Event) out := make(chan *event)
settings := &tasks.Settings{ settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets", AssetsDir: "../../testdata/oonimkall/assets",
Name: "Example", Name: "Example",
Options: tasks.SettingsOptions{ Options: settingsOptions{
ProbeServicesBaseURL: server.URL, ProbeServicesBaseURL: server.URL,
SoftwareName: "oonimkall-test", SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0", SoftwareVersion: "0.1.0",
@ -30,14 +28,14 @@ func TestRunnerMaybeLookupBackendsFailure(t *testing.T) {
Version: 1, Version: 1,
} }
go func() { go func() {
tasks.Run(context.Background(), settings, out) run(context.Background(), settings, out)
close(out) close(out)
}() }()
var failures []string var failures []string
for ev := range out { for ev := range out {
switch ev.Key { switch ev.Key {
case "failure.startup": case "failure.startup":
failure := ev.Value.(tasks.EventFailure).Failure failure := ev.Value.(eventFailure).Failure
failures = append(failures, failure) failures = append(failures, failure)
case "status.queued", "status.started", "log", "status.end": case "status.queued", "status.started", "log", "status.end":
default: default:
@ -71,11 +69,11 @@ func TestRunnerOpenReportFailure(t *testing.T) {
w.WriteHeader(500) w.WriteHeader(500)
})) }))
defer server.Close() defer server.Close()
out := make(chan *tasks.Event) out := make(chan *event)
settings := &tasks.Settings{ settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets", AssetsDir: "../../testdata/oonimkall/assets",
Name: "Example", Name: "Example",
Options: tasks.SettingsOptions{ Options: settingsOptions{
ProbeServicesBaseURL: server.URL, ProbeServicesBaseURL: server.URL,
SoftwareName: "oonimkall-test", SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0", SoftwareVersion: "0.1.0",
@ -91,7 +89,7 @@ func TestRunnerOpenReportFailure(t *testing.T) {
case "failure.report_create": case "failure.report_create":
seen++ seen++
case "status.progress": case "status.progress":
evv := ev.Value.(tasks.EventStatusProgress) evv := ev.Value.(eventStatusProgress)
if evv.Percentage >= 0.4 { if evv.Percentage >= 0.4 {
panic(fmt.Sprintf("too much progress: %+v", ev)) panic(fmt.Sprintf("too much progress: %+v", ev))
} }
@ -103,7 +101,7 @@ func TestRunnerOpenReportFailure(t *testing.T) {
} }
seench <- seen seench <- seen
}() }()
tasks.Run(context.Background(), settings, out) run(context.Background(), settings, out)
close(out) close(out)
if n := <-seench; n != 1 { if n := <-seench; n != 1 {
t.Fatal("unexpected number of events") t.Fatal("unexpected number of events")
@ -114,12 +112,12 @@ func TestRunnerGood(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
out := make(chan *tasks.Event) out := make(chan *event)
settings := &tasks.Settings{ settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets", AssetsDir: "../../testdata/oonimkall/assets",
LogLevel: "DEBUG", LogLevel: "DEBUG",
Name: "Example", Name: "Example",
Options: tasks.SettingsOptions{ Options: settingsOptions{
SoftwareName: "oonimkall-test", SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0", SoftwareVersion: "0.1.0",
}, },
@ -127,7 +125,7 @@ func TestRunnerGood(t *testing.T) {
Version: 1, Version: 1,
} }
go func() { go func() {
tasks.Run(context.Background(), settings, out) run(context.Background(), settings, out)
close(out) close(out)
}() }()
var found bool var found bool
@ -145,32 +143,32 @@ func TestRunnerWithUnsupportedSettings(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
out := make(chan *tasks.Event) out := make(chan *event)
settings := &tasks.Settings{ settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets", AssetsDir: "../../testdata/oonimkall/assets",
LogLevel: "DEBUG", LogLevel: "DEBUG",
Name: "Example", Name: "Example",
Options: tasks.SettingsOptions{ Options: settingsOptions{
SoftwareName: "oonimkall-test", SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0", SoftwareVersion: "0.1.0",
}, },
StateDir: "../../testdata/oonimkall/state", StateDir: "../../testdata/oonimkall/state",
} }
go func() { go func() {
tasks.Run(context.Background(), settings, out) run(context.Background(), settings, out)
close(out) close(out)
}() }()
var failures []string var failures []string
for ev := range out { for ev := range out {
if ev.Key == "failure.startup" { if ev.Key == "failure.startup" {
failure := ev.Value.(tasks.EventFailure).Failure failure := ev.Value.(eventFailure).Failure
failures = append(failures, failure) failures = append(failures, failure)
} }
} }
if len(failures) != 1 { if len(failures) != 1 {
t.Fatal("invalid number of failures") t.Fatal("invalid number of failures")
} }
if failures[0] != tasks.FailureInvalidVersion { if failures[0] != failureInvalidVersion {
t.Fatal("not the failure we expected") t.Fatal("not the failure we expected")
} }
} }
@ -179,12 +177,12 @@ func TestRunnerWithInvalidKVStorePath(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
out := make(chan *tasks.Event) out := make(chan *event)
settings := &tasks.Settings{ settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets", AssetsDir: "../../testdata/oonimkall/assets",
LogLevel: "DEBUG", LogLevel: "DEBUG",
Name: "Example", Name: "Example",
Options: tasks.SettingsOptions{ Options: settingsOptions{
SoftwareName: "oonimkall-test", SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0", SoftwareVersion: "0.1.0",
}, },
@ -192,13 +190,13 @@ func TestRunnerWithInvalidKVStorePath(t *testing.T) {
Version: 1, Version: 1,
} }
go func() { go func() {
tasks.Run(context.Background(), settings, out) run(context.Background(), settings, out)
close(out) close(out)
}() }()
var failures []string var failures []string
for ev := range out { for ev := range out {
if ev.Key == "failure.startup" { if ev.Key == "failure.startup" {
failure := ev.Value.(tasks.EventFailure).Failure failure := ev.Value.(eventFailure).Failure
failures = append(failures, failure) failures = append(failures, failure)
} }
} }
@ -214,12 +212,12 @@ func TestRunnerWithInvalidExperimentName(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
out := make(chan *tasks.Event) out := make(chan *event)
settings := &tasks.Settings{ settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets", AssetsDir: "../../testdata/oonimkall/assets",
LogLevel: "DEBUG", LogLevel: "DEBUG",
Name: "Nonexistent", Name: "Nonexistent",
Options: tasks.SettingsOptions{ Options: settingsOptions{
SoftwareName: "oonimkall-test", SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0", SoftwareVersion: "0.1.0",
}, },
@ -227,13 +225,13 @@ func TestRunnerWithInvalidExperimentName(t *testing.T) {
Version: 1, Version: 1,
} }
go func() { go func() {
tasks.Run(context.Background(), settings, out) run(context.Background(), settings, out)
close(out) close(out)
}() }()
var failures []string var failures []string
for ev := range out { for ev := range out {
if ev.Key == "failure.startup" { if ev.Key == "failure.startup" {
failure := ev.Value.(tasks.EventFailure).Failure failure := ev.Value.(eventFailure).Failure
failures = append(failures, failure) failures = append(failures, failure)
} }
} }
@ -249,12 +247,12 @@ func TestRunnerWithMissingInput(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
out := make(chan *tasks.Event) out := make(chan *event)
settings := &tasks.Settings{ settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets", AssetsDir: "../../testdata/oonimkall/assets",
LogLevel: "DEBUG", LogLevel: "DEBUG",
Name: "ExampleWithInput", Name: "ExampleWithInput",
Options: tasks.SettingsOptions{ Options: settingsOptions{
SoftwareName: "oonimkall-test", SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0", SoftwareVersion: "0.1.0",
}, },
@ -262,13 +260,13 @@ func TestRunnerWithMissingInput(t *testing.T) {
Version: 1, Version: 1,
} }
go func() { go func() {
tasks.Run(context.Background(), settings, out) run(context.Background(), settings, out)
close(out) close(out)
}() }()
var failures []string var failures []string
for ev := range out { for ev := range out {
if ev.Key == "failure.startup" { if ev.Key == "failure.startup" {
failure := ev.Value.(tasks.EventFailure).Failure failure := ev.Value.(eventFailure).Failure
failures = append(failures, failure) failures = append(failures, failure)
} }
} }
@ -284,13 +282,13 @@ func TestRunnerWithMaxRuntime(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
out := make(chan *tasks.Event) out := make(chan *event)
settings := &tasks.Settings{ settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets", AssetsDir: "../../testdata/oonimkall/assets",
Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"},
LogLevel: "DEBUG", LogLevel: "DEBUG",
Name: "ExampleWithInput", Name: "ExampleWithInput",
Options: tasks.SettingsOptions{ Options: settingsOptions{
MaxRuntime: 1, MaxRuntime: 1,
SoftwareName: "oonimkall-test", SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0", SoftwareVersion: "0.1.0",
@ -300,7 +298,7 @@ func TestRunnerWithMaxRuntime(t *testing.T) {
} }
begin := time.Now() begin := time.Now()
go func() { go func() {
tasks.Run(context.Background(), settings, out) run(context.Background(), settings, out)
close(out) close(out)
}() }()
var found bool var found bool
@ -320,7 +318,7 @@ func TestRunnerWithMaxRuntime(t *testing.T) {
// 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788 // 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788
// //
// 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855 // 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855
if time.Now().Sub(begin) > 10*time.Second { if time.Since(begin) > 10*time.Second {
t.Fatal("expected shorter runtime") t.Fatal("expected shorter runtime")
} }
} }
@ -329,13 +327,13 @@ func TestRunnerWithMaxRuntimeNonInterruptible(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
out := make(chan *tasks.Event) out := make(chan *event)
settings := &tasks.Settings{ settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets", AssetsDir: "../../testdata/oonimkall/assets",
Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"},
LogLevel: "DEBUG", LogLevel: "DEBUG",
Name: "ExampleWithInputNonInterruptible", Name: "ExampleWithInputNonInterruptible",
Options: tasks.SettingsOptions{ Options: settingsOptions{
MaxRuntime: 1, MaxRuntime: 1,
SoftwareName: "oonimkall-test", SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0", SoftwareVersion: "0.1.0",
@ -345,7 +343,7 @@ func TestRunnerWithMaxRuntimeNonInterruptible(t *testing.T) {
} }
begin := time.Now() begin := time.Now()
go func() { go func() {
tasks.Run(context.Background(), settings, out) run(context.Background(), settings, out)
close(out) close(out)
}() }()
var found bool var found bool
@ -365,7 +363,7 @@ func TestRunnerWithMaxRuntimeNonInterruptible(t *testing.T) {
// 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788 // 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788
// //
// 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855 // 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855
if time.Now().Sub(begin) > 10*time.Second { if time.Since(begin) > 10*time.Second {
t.Fatal("expected shorter runtime") t.Fatal("expected shorter runtime")
} }
} }
@ -374,13 +372,13 @@ func TestRunnerWithFailedMeasurement(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
out := make(chan *tasks.Event) out := make(chan *event)
settings := &tasks.Settings{ settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets", AssetsDir: "../../testdata/oonimkall/assets",
Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"},
LogLevel: "DEBUG", LogLevel: "DEBUG",
Name: "ExampleWithFailure", Name: "ExampleWithFailure",
Options: tasks.SettingsOptions{ Options: settingsOptions{
MaxRuntime: 1, MaxRuntime: 1,
SoftwareName: "oonimkall-test", SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0", SoftwareVersion: "0.1.0",
@ -389,7 +387,7 @@ func TestRunnerWithFailedMeasurement(t *testing.T) {
Version: 1, Version: 1,
} }
go func() { go func() {
tasks.Run(context.Background(), settings, out) run(context.Background(), settings, out)
close(out) close(out)
}() }()
var found bool var found bool

View File

@ -1,4 +1,4 @@
package tasks package oonimkall
import ( import (
"context" "context"
@ -32,11 +32,11 @@ func TestRunnerMaybeLookupLocationFailure(t *testing.T) {
// TODO(https://github.com/ooni/probe-cli/pull/518) // TODO(https://github.com/ooni/probe-cli/pull/518)
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
out := make(chan *Event) out := make(chan *event)
settings := &Settings{ settings := &settings{
AssetsDir: "../../testdata/oonimkall/assets", AssetsDir: "../../testdata/oonimkall/assets",
Name: "Example", Name: "Example",
Options: SettingsOptions{ Options: settingsOptions{
SoftwareName: "oonimkall-test", SoftwareName: "oonimkall-test",
SoftwareVersion: "0.1.0", SoftwareVersion: "0.1.0",
}, },
@ -52,7 +52,7 @@ func TestRunnerMaybeLookupLocationFailure(t *testing.T) {
"failure.cc_lookup", "failure.resolver_lookup": "failure.cc_lookup", "failure.resolver_lookup":
seen++ seen++
case "status.progress": case "status.progress":
evv := ev.Value.(EventStatusProgress) evv := ev.Value.(eventStatusProgress)
if evv.Percentage >= 0.2 { if evv.Percentage >= 0.2 {
panic(fmt.Sprintf("too much progress: %+v", ev)) panic(fmt.Sprintf("too much progress: %+v", ev))
} }
@ -64,7 +64,7 @@ func TestRunnerMaybeLookupLocationFailure(t *testing.T) {
seench <- seen seench <- seen
}() }()
expected := errors.New("mocked error") expected := errors.New("mocked error")
r := NewRunner(settings, out) r := newRunner(settings, out)
r.maybeLookupLocation = func(*engine.Session) error { r.maybeLookupLocation = func(*engine.Session) error {
return expected return expected
} }

View File

@ -1,9 +1,9 @@
package tasks package oonimkall
// Settings contains settings for a task. This structure derives from // Settings contains settings for a task. This structure derives from
// the one described by MK v0.10.9 FFI API (https://git.io/Jv4Rv), yet // the one described by MK v0.10.9 FFI API (https://git.io/Jv4Rv), yet
// since 2020-12-03 we're not backwards compatible anymore. // since 2020-12-03 we're not backwards compatible anymore.
type Settings struct { type settings struct {
// Annotations contains the annotations to be added // Annotations contains the annotations to be added
// to every measurements performed by the task. // to every measurements performed by the task.
Annotations map[string]string `json:"annotations,omitempty"` Annotations map[string]string `json:"annotations,omitempty"`
@ -30,7 +30,7 @@ type Settings struct {
Name string `json:"name"` Name string `json:"name"`
// Options contains the task options. // Options contains the task options.
Options SettingsOptions `json:"options"` Options settingsOptions `json:"options"`
// Proxy allows you to optionally force a specific proxy // Proxy allows you to optionally force a specific proxy
// rather than using no proxy (the default). // rather than using no proxy (the default).
@ -66,8 +66,8 @@ type Settings struct {
Version int64 `json:"version"` Version int64 `json:"version"`
} }
// SettingsOptions contains the settings options // settingsOptions contains the settings options
type SettingsOptions struct { type settingsOptions struct {
// MaxRuntime is the maximum runtime expressed in seconds. A negative // MaxRuntime is the maximum runtime expressed in seconds. A negative
// value for this field disables the maximum runtime. Using // value for this field disables the maximum runtime. Using
// a zero value will also mean disabled. This is not the // a zero value will also mean disabled. This is not the

View File

@ -45,7 +45,6 @@ import (
"github.com/ooni/probe-cli/v3/internal/atomicx" "github.com/ooni/probe-cli/v3/internal/atomicx"
"github.com/ooni/probe-cli/v3/internal/runtimex" "github.com/ooni/probe-cli/v3/internal/runtimex"
"github.com/ooni/probe-cli/v3/pkg/oonimkall/internal/tasks"
) )
// Task is an asynchronous task running an experiment. It mimics the // Task is an asynchronous task running an experiment. It mimics the
@ -63,13 +62,13 @@ type Task struct {
cancel context.CancelFunc cancel context.CancelFunc
isdone *atomicx.Int64 isdone *atomicx.Int64
isstopped *atomicx.Int64 isstopped *atomicx.Int64
out chan *tasks.Event out chan *event
} }
// StartTask starts an asynchronous task. The input argument is a // StartTask starts an asynchronous task. The input argument is a
// serialized JSON conforming to MK v0.10.9's API. // serialized JSON conforming to MK v0.10.9's API.
func StartTask(input string) (*Task, error) { func StartTask(input string) (*Task, error) {
var settings tasks.Settings var settings settings
if err := json.Unmarshal([]byte(input), &settings); err != nil { if err := json.Unmarshal([]byte(input), &settings); err != nil {
return nil, err return nil, err
} }
@ -79,12 +78,12 @@ func StartTask(input string) (*Task, error) {
cancel: cancel, cancel: cancel,
isdone: &atomicx.Int64{}, isdone: &atomicx.Int64{},
isstopped: &atomicx.Int64{}, isstopped: &atomicx.Int64{},
out: make(chan *tasks.Event, bufsiz), out: make(chan *event, bufsiz),
} }
go func() { go func() {
defer close(task.out) defer close(task.out)
defer task.isstopped.Add(1) defer task.isstopped.Add(1)
tasks.Run(ctx, &settings, task.out) run(ctx, &settings, task.out)
}() }()
return task, nil return task, nil
} }

View File

@ -1,4 +1,4 @@
package oonimkall_test package oonimkall
import ( import (
"encoding/json" "encoding/json"
@ -9,8 +9,6 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/ooni/probe-cli/v3/internal/engine/model" "github.com/ooni/probe-cli/v3/internal/engine/model"
"github.com/ooni/probe-cli/v3/pkg/oonimkall"
"github.com/ooni/probe-cli/v3/pkg/oonimkall/internal/tasks"
) )
type eventlike struct { type eventlike struct {
@ -22,7 +20,7 @@ func TestGood(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
task, err := oonimkall.StartTask(`{ task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets", "assets_dir": "../testdata/oonimkall/assets",
"log_level": "DEBUG", "log_level": "DEBUG",
"name": "Example", "name": "Example",
@ -69,7 +67,7 @@ func TestWithMeasurementFailure(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
task, err := oonimkall.StartTask(`{ task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets", "assets_dir": "../testdata/oonimkall/assets",
"log_level": "DEBUG", "log_level": "DEBUG",
"name": "ExampleWithFailure", "name": "ExampleWithFailure",
@ -98,7 +96,7 @@ func TestWithMeasurementFailure(t *testing.T) {
} }
func TestInvalidJSON(t *testing.T) { func TestInvalidJSON(t *testing.T) {
task, err := oonimkall.StartTask(`{`) task, err := StartTask(`{`)
var syntaxerr *json.SyntaxError var syntaxerr *json.SyntaxError
if !errors.As(err, &syntaxerr) { if !errors.As(err, &syntaxerr) {
t.Fatal("not the expected error") t.Fatal("not the expected error")
@ -109,7 +107,7 @@ func TestInvalidJSON(t *testing.T) {
} }
func TestUnsupportedSetting(t *testing.T) { func TestUnsupportedSetting(t *testing.T) {
task, err := oonimkall.StartTask(`{ task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets", "assets_dir": "../testdata/oonimkall/assets",
"log_level": "DEBUG", "log_level": "DEBUG",
"name": "Example", "name": "Example",
@ -130,7 +128,7 @@ func TestUnsupportedSetting(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if event.Key == "failure.startup" { if event.Key == "failure.startup" {
if strings.Contains(eventstr, tasks.FailureInvalidVersion) { if strings.Contains(eventstr, failureInvalidVersion) {
seen = true seen = true
} }
} }
@ -141,7 +139,7 @@ func TestUnsupportedSetting(t *testing.T) {
} }
func TestEmptyStateDir(t *testing.T) { func TestEmptyStateDir(t *testing.T) {
task, err := oonimkall.StartTask(`{ task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets", "assets_dir": "../testdata/oonimkall/assets",
"log_level": "DEBUG", "log_level": "DEBUG",
"name": "Example", "name": "Example",
@ -173,7 +171,7 @@ func TestEmptyStateDir(t *testing.T) {
} }
func TestUnknownExperiment(t *testing.T) { func TestUnknownExperiment(t *testing.T) {
task, err := oonimkall.StartTask(`{ task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets", "assets_dir": "../testdata/oonimkall/assets",
"log_level": "DEBUG", "log_level": "DEBUG",
"name": "Antani", "name": "Antani",
@ -209,7 +207,7 @@ func TestInputIsRequired(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
task, err := oonimkall.StartTask(`{ task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets", "assets_dir": "../testdata/oonimkall/assets",
"log_level": "DEBUG", "log_level": "DEBUG",
"name": "ExampleWithInput", "name": "ExampleWithInput",
@ -246,7 +244,7 @@ func TestMaxRuntime(t *testing.T) {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
begin := time.Now() begin := time.Now()
task, err := oonimkall.StartTask(`{ task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets", "assets_dir": "../testdata/oonimkall/assets",
"inputs": ["a", "b", "c"], "inputs": ["a", "b", "c"],
"name": "ExampleWithInput", "name": "ExampleWithInput",
@ -293,7 +291,7 @@ func TestInterruptExampleWithInput(t *testing.T) {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
t.Skip("Skipping broken test; see https://github.com/ooni/probe-cli/v3/internal/engine/issues/992") t.Skip("Skipping broken test; see https://github.com/ooni/probe-cli/v3/internal/engine/issues/992")
task, err := oonimkall.StartTask(`{ task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets", "assets_dir": "../testdata/oonimkall/assets",
"inputs": [ "inputs": [
"http://www.kernel.org/", "http://www.kernel.org/",
@ -360,7 +358,7 @@ func TestInterruptNdt7(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
task, err := oonimkall.StartTask(`{ task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets", "assets_dir": "../testdata/oonimkall/assets",
"name": "Ndt7", "name": "Ndt7",
"options": { "options": {
@ -417,7 +415,7 @@ func TestCountBytesForExample(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
task, err := oonimkall.StartTask(`{ task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets", "assets_dir": "../testdata/oonimkall/assets",
"name": "Example", "name": "Example",
"options": { "options": {
@ -457,7 +455,7 @@ func TestPrivacyAndScrubbing(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
task, err := oonimkall.StartTask(`{ task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets", "assets_dir": "../testdata/oonimkall/assets",
"name": "Example", "name": "Example",
"options": { "options": {
@ -500,7 +498,7 @@ func TestNonblock(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
task, err := oonimkall.StartTask(`{ task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets", "assets_dir": "../testdata/oonimkall/assets",
"name": "Example", "name": "Example",
"options": { "options": {