refactor: we don't wanna export pkg/oonimkall/tasks (#216)
* doc: merge the engine and the cli readmes Part of https://github.com/ooni/probe/issues/1335 * refactor: we don't wanna export pkg/oonimkall/tasks See https://github.com/ooni/probe/issues/1335
This commit is contained in:
@@ -0,0 +1,87 @@
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// ChanLogger is a logger targeting a channel
|
||||
type ChanLogger struct {
|
||||
emitter *EventEmitter
|
||||
hasdebug bool
|
||||
hasinfo bool
|
||||
haswarning bool
|
||||
out chan<- *Event
|
||||
}
|
||||
|
||||
// Debug implements Logger.Debug
|
||||
func (cl *ChanLogger) Debug(msg string) {
|
||||
if cl.hasdebug {
|
||||
cl.emitter.Emit("log", EventLog{
|
||||
LogLevel: "DEBUG",
|
||||
Message: msg,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Debugf implements Logger.Debugf
|
||||
func (cl *ChanLogger) Debugf(format string, v ...interface{}) {
|
||||
if cl.hasdebug {
|
||||
cl.Debug(fmt.Sprintf(format, v...))
|
||||
}
|
||||
}
|
||||
|
||||
// Info implements Logger.Info
|
||||
func (cl *ChanLogger) Info(msg string) {
|
||||
if cl.hasinfo {
|
||||
cl.emitter.Emit("log", EventLog{
|
||||
LogLevel: "INFO",
|
||||
Message: msg,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Infof implements Logger.Infof
|
||||
func (cl *ChanLogger) Infof(format string, v ...interface{}) {
|
||||
if cl.hasinfo {
|
||||
cl.Info(fmt.Sprintf(format, v...))
|
||||
}
|
||||
}
|
||||
|
||||
// Warn implements Logger.Warn
|
||||
func (cl *ChanLogger) Warn(msg string) {
|
||||
if cl.haswarning {
|
||||
cl.emitter.Emit("log", EventLog{
|
||||
LogLevel: "WARNING",
|
||||
Message: msg,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Warnf implements Logger.Warnf
|
||||
func (cl *ChanLogger) Warnf(format string, v ...interface{}) {
|
||||
if cl.haswarning {
|
||||
cl.Warn(fmt.Sprintf(format, v...))
|
||||
}
|
||||
}
|
||||
|
||||
// NewChanLogger creates a new ChanLogger instance.
|
||||
func NewChanLogger(emitter *EventEmitter, logLevel string,
|
||||
out chan<- *Event) *ChanLogger {
|
||||
cl := &ChanLogger{
|
||||
emitter: emitter,
|
||||
out: out,
|
||||
}
|
||||
switch logLevel {
|
||||
case "DEBUG", "DEBUG2":
|
||||
cl.hasdebug = true
|
||||
fallthrough
|
||||
case "INFO":
|
||||
cl.hasinfo = true
|
||||
fallthrough
|
||||
case "ERR", "WARNING":
|
||||
fallthrough
|
||||
default:
|
||||
cl.haswarning = true
|
||||
}
|
||||
return cl
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package tasks
|
||||
|
||||
type eventEmpty struct{}
|
||||
|
||||
// EventFailure contains information on a failure.
|
||||
type EventFailure struct {
|
||||
Failure string `json:"failure"`
|
||||
}
|
||||
|
||||
// EventLog is an event containing a log message.
|
||||
type EventLog struct {
|
||||
LogLevel string `json:"log_level"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type eventMeasurementGeneric struct {
|
||||
Failure string `json:"failure,omitempty"`
|
||||
Idx int64 `json:"idx"`
|
||||
Input string `json:"input"`
|
||||
JSONStr string `json:"json_str,omitempty"`
|
||||
}
|
||||
|
||||
type eventStatusEnd struct {
|
||||
DownloadedKB float64 `json:"downloaded_kb"`
|
||||
Failure string `json:"failure"`
|
||||
UploadedKB float64 `json:"uploaded_kb"`
|
||||
}
|
||||
|
||||
type eventStatusGeoIPLookup struct {
|
||||
ProbeASN string `json:"probe_asn"`
|
||||
ProbeCC string `json:"probe_cc"`
|
||||
ProbeIP string `json:"probe_ip"`
|
||||
ProbeNetworkName string `json:"probe_network_name"`
|
||||
}
|
||||
|
||||
// EventStatusProgress reports progress information.
|
||||
type EventStatusProgress struct {
|
||||
Message string `json:"message"`
|
||||
Percentage float64 `json:"percentage"`
|
||||
}
|
||||
|
||||
type eventStatusReportGeneric struct {
|
||||
ReportID string `json:"report_id"`
|
||||
}
|
||||
|
||||
type eventStatusResolverLookup struct {
|
||||
ResolverASN string `json:"resolver_asn"`
|
||||
ResolverIP string `json:"resolver_ip"`
|
||||
ResolverNetworkName string `json:"resolver_network_name"`
|
||||
}
|
||||
|
||||
// Event is an event emitted by a task. This structure extends the event
|
||||
// described by MK v0.10.9 FFI API (https://git.io/Jv4Rv).
|
||||
type Event struct {
|
||||
Key string `json:"key"`
|
||||
Value interface{} `json:"value"`
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package tasks
|
||||
|
||||
// EventEmitter emits event on a channel
|
||||
type EventEmitter struct {
|
||||
disabled map[string]bool
|
||||
out chan<- *Event
|
||||
}
|
||||
|
||||
// NewEventEmitter creates a new Emitter
|
||||
func NewEventEmitter(disabledEvents []string, out chan<- *Event) *EventEmitter {
|
||||
ee := &EventEmitter{out: out}
|
||||
ee.disabled = make(map[string]bool)
|
||||
for _, eventname := range disabledEvents {
|
||||
ee.disabled[eventname] = true
|
||||
}
|
||||
return ee
|
||||
}
|
||||
|
||||
// EmitFailureStartup emits the failureStartup event
|
||||
func (ee *EventEmitter) EmitFailureStartup(failure string) {
|
||||
ee.EmitFailureGeneric(failureStartup, failure)
|
||||
}
|
||||
|
||||
// EmitFailureGeneric emits a failure event
|
||||
func (ee *EventEmitter) EmitFailureGeneric(name, failure string) {
|
||||
ee.Emit(name, EventFailure{Failure: failure})
|
||||
}
|
||||
|
||||
// EmitStatusProgress emits the status.Progress event
|
||||
func (ee *EventEmitter) EmitStatusProgress(percentage float64, message string) {
|
||||
ee.Emit(statusProgress, EventStatusProgress{Message: message, Percentage: percentage})
|
||||
}
|
||||
|
||||
// Emit emits the specified event
|
||||
func (ee *EventEmitter) Emit(key string, value interface{}) {
|
||||
if ee.disabled[key] == true {
|
||||
return
|
||||
}
|
||||
ee.out <- &Event{Key: key, Value: value}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
package tasks_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/ooni/probe-cli/v3/pkg/oonimkall/internal/tasks"
|
||||
)
|
||||
|
||||
func TestDisabledEvents(t *testing.T) {
|
||||
out := make(chan *tasks.Event)
|
||||
emitter := tasks.NewEventEmitter([]string{"log"}, out)
|
||||
go func() {
|
||||
emitter.Emit("log", tasks.EventLog{Message: "foo"})
|
||||
close(out)
|
||||
}()
|
||||
var count int64
|
||||
for ev := range out {
|
||||
if ev.Key == "log" {
|
||||
count++
|
||||
}
|
||||
}
|
||||
if count > 0 {
|
||||
t.Fatal("cannot disable events")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmitFailureStartup(t *testing.T) {
|
||||
out := make(chan *tasks.Event)
|
||||
emitter := tasks.NewEventEmitter([]string{}, out)
|
||||
go func() {
|
||||
emitter.EmitFailureStartup("mocked error")
|
||||
close(out)
|
||||
}()
|
||||
var found bool
|
||||
for ev := range out {
|
||||
if ev.Key == "failure.startup" {
|
||||
evv := ev.Value.(tasks.EventFailure) // panic if not castable
|
||||
if evv.Failure == "mocked error" {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatal("did not see expected event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmitStatusProgress(t *testing.T) {
|
||||
out := make(chan *tasks.Event)
|
||||
emitter := tasks.NewEventEmitter([]string{}, out)
|
||||
go func() {
|
||||
emitter.EmitStatusProgress(0.7, "foo")
|
||||
close(out)
|
||||
}()
|
||||
var found bool
|
||||
for ev := range out {
|
||||
if ev.Key == "status.progress" {
|
||||
evv := ev.Value.(tasks.EventStatusProgress) // panic if not castable
|
||||
if evv.Message == "foo" && evv.Percentage == 0.7 {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatal("did not see expected event")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,299 @@
|
||||
// Package tasks implements tasks run using the oonimkall API.
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
engine "github.com/ooni/probe-cli/v3/internal/engine"
|
||||
"github.com/ooni/probe-cli/v3/internal/engine/runtimex"
|
||||
"github.com/ooni/probe-cli/v3/internal/engine/model"
|
||||
)
|
||||
|
||||
const (
|
||||
failureIPLookup = "failure.ip_lookup"
|
||||
failureASNLookup = "failure.asn_lookup"
|
||||
failureCCLookup = "failure.cc_lookup"
|
||||
failureMeasurement = "failure.measurement"
|
||||
failureMeasurementSubmission = "failure.measurement_submission"
|
||||
failureReportCreate = "failure.report_create"
|
||||
failureResolverLookup = "failure.resolver_lookup"
|
||||
failureStartup = "failure.startup"
|
||||
measurement = "measurement"
|
||||
statusEnd = "status.end"
|
||||
statusGeoIPLookup = "status.geoip_lookup"
|
||||
statusMeasurementDone = "status.measurement_done"
|
||||
statusMeasurementStart = "status.measurement_start"
|
||||
statusMeasurementSubmission = "status.measurement_submission"
|
||||
statusProgress = "status.progress"
|
||||
statusQueued = "status.queued"
|
||||
statusReportCreate = "status.report_create"
|
||||
statusResolverLookup = "status.resolver_lookup"
|
||||
statusStarted = "status.started"
|
||||
)
|
||||
|
||||
// Run runs the task specified by settings.Name until completion. This is the
|
||||
// top-level API that should be called by oonimkall.
|
||||
func Run(ctx context.Context, settings *Settings, out chan<- *Event) {
|
||||
r := NewRunner(settings, out)
|
||||
r.Run(ctx)
|
||||
}
|
||||
|
||||
// Runner runs a specific task
|
||||
type Runner struct {
|
||||
emitter *EventEmitter
|
||||
maybeLookupLocation func(*engine.Session) error
|
||||
out chan<- *Event
|
||||
settings *Settings
|
||||
}
|
||||
|
||||
// NewRunner creates a new task runner
|
||||
func NewRunner(settings *Settings, out chan<- *Event) *Runner {
|
||||
return &Runner{
|
||||
emitter: NewEventEmitter(settings.DisabledEvents, out),
|
||||
out: out,
|
||||
settings: settings,
|
||||
}
|
||||
}
|
||||
|
||||
// FailureInvalidVersion is the failure returned when Version is invalid
|
||||
const FailureInvalidVersion = "invalid Settings.Version number"
|
||||
|
||||
func (r *Runner) hasUnsupportedSettings(logger *ChanLogger) bool {
|
||||
if r.settings.Version < 1 {
|
||||
r.emitter.EmitFailureStartup(FailureInvalidVersion)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (r *Runner) newsession(logger *ChanLogger) (*engine.Session, error) {
|
||||
kvstore, err := engine.NewFileSystemKVStore(r.settings.StateDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config := engine.SessionConfig{
|
||||
AssetsDir: r.settings.AssetsDir,
|
||||
KVStore: kvstore,
|
||||
Logger: logger,
|
||||
SoftwareName: r.settings.Options.SoftwareName,
|
||||
SoftwareVersion: r.settings.Options.SoftwareVersion,
|
||||
TempDir: r.settings.TempDir,
|
||||
}
|
||||
if r.settings.Options.ProbeServicesBaseURL != "" {
|
||||
config.AvailableProbeServices = []model.Service{{
|
||||
Type: "https",
|
||||
Address: r.settings.Options.ProbeServicesBaseURL,
|
||||
}}
|
||||
}
|
||||
return engine.NewSession(config)
|
||||
}
|
||||
|
||||
func (r *Runner) contextForExperiment(
|
||||
ctx context.Context, builder *engine.ExperimentBuilder,
|
||||
) context.Context {
|
||||
if builder.Interruptible() {
|
||||
return ctx
|
||||
}
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
type runnerCallbacks struct {
|
||||
emitter *EventEmitter
|
||||
}
|
||||
|
||||
func (cb *runnerCallbacks) OnProgress(percentage float64, message string) {
|
||||
cb.emitter.Emit(statusProgress, EventStatusProgress{
|
||||
Percentage: 0.4 + (percentage * 0.6), // open report is 40%
|
||||
Message: message,
|
||||
})
|
||||
}
|
||||
|
||||
// Run runs the runner until completion. The context argument controls
|
||||
// when to stop when processing multiple inputs, as well as when to stop
|
||||
// experiments explicitly marked as interruptible.
|
||||
func (r *Runner) Run(ctx context.Context) {
|
||||
logger := NewChanLogger(r.emitter, r.settings.LogLevel, r.out)
|
||||
r.emitter.Emit(statusQueued, eventEmpty{})
|
||||
if r.hasUnsupportedSettings(logger) {
|
||||
return
|
||||
}
|
||||
r.emitter.Emit(statusStarted, eventEmpty{})
|
||||
sess, err := r.newsession(logger)
|
||||
if err != nil {
|
||||
r.emitter.EmitFailureStartup(err.Error())
|
||||
return
|
||||
}
|
||||
endEvent := new(eventStatusEnd)
|
||||
defer func() {
|
||||
sess.Close()
|
||||
r.emitter.Emit(statusEnd, endEvent)
|
||||
}()
|
||||
|
||||
builder, err := sess.NewExperimentBuilder(r.settings.Name)
|
||||
if err != nil {
|
||||
r.emitter.EmitFailureStartup(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("Looking up OONI backends... please, be patient")
|
||||
if err := sess.MaybeLookupBackends(); err != nil {
|
||||
r.emitter.EmitFailureStartup(err.Error())
|
||||
return
|
||||
}
|
||||
r.emitter.EmitStatusProgress(0.1, "contacted bouncer")
|
||||
|
||||
logger.Info("Looking up your location... please, be patient")
|
||||
maybeLookupLocation := r.maybeLookupLocation
|
||||
if maybeLookupLocation == nil {
|
||||
maybeLookupLocation = func(sess *engine.Session) error {
|
||||
return sess.MaybeLookupLocation()
|
||||
}
|
||||
}
|
||||
if err := maybeLookupLocation(sess); err != nil {
|
||||
r.emitter.EmitFailureGeneric(failureIPLookup, err.Error())
|
||||
r.emitter.EmitFailureGeneric(failureASNLookup, err.Error())
|
||||
r.emitter.EmitFailureGeneric(failureCCLookup, err.Error())
|
||||
r.emitter.EmitFailureGeneric(failureResolverLookup, err.Error())
|
||||
return
|
||||
}
|
||||
r.emitter.EmitStatusProgress(0.2, "geoip lookup")
|
||||
r.emitter.EmitStatusProgress(0.3, "resolver lookup")
|
||||
r.emitter.Emit(statusGeoIPLookup, eventStatusGeoIPLookup{
|
||||
ProbeIP: sess.ProbeIP(),
|
||||
ProbeASN: sess.ProbeASNString(),
|
||||
ProbeCC: sess.ProbeCC(),
|
||||
ProbeNetworkName: sess.ProbeNetworkName(),
|
||||
})
|
||||
r.emitter.Emit(statusResolverLookup, eventStatusResolverLookup{
|
||||
ResolverASN: sess.ResolverASNString(),
|
||||
ResolverIP: sess.ResolverIP(),
|
||||
ResolverNetworkName: sess.ResolverNetworkName(),
|
||||
})
|
||||
|
||||
builder.SetCallbacks(&runnerCallbacks{emitter: r.emitter})
|
||||
if len(r.settings.Inputs) <= 0 {
|
||||
switch builder.InputPolicy() {
|
||||
case engine.InputOrQueryTestLists, engine.InputStrictlyRequired:
|
||||
r.emitter.EmitFailureStartup("no input provided")
|
||||
return
|
||||
}
|
||||
r.settings.Inputs = append(r.settings.Inputs, "")
|
||||
}
|
||||
experiment := builder.NewExperiment()
|
||||
defer func() {
|
||||
endEvent.DownloadedKB = experiment.KibiBytesReceived()
|
||||
endEvent.UploadedKB = experiment.KibiBytesSent()
|
||||
}()
|
||||
if !r.settings.Options.NoCollector {
|
||||
logger.Info("Opening report... please, be patient")
|
||||
if err := experiment.OpenReport(); err != nil {
|
||||
r.emitter.EmitFailureGeneric(failureReportCreate, err.Error())
|
||||
return
|
||||
}
|
||||
r.emitter.EmitStatusProgress(0.4, "open report")
|
||||
r.emitter.Emit(statusReportCreate, eventStatusReportGeneric{
|
||||
ReportID: experiment.ReportID(),
|
||||
})
|
||||
}
|
||||
// This deviates a little bit from measurement-kit, for which
|
||||
// a zero timeout is actually valid. Since it does not make much
|
||||
// sense, here we're changing the behaviour.
|
||||
//
|
||||
// See https://github.com/measurement-kit/measurement-kit/issues/1922
|
||||
if r.settings.Options.MaxRuntime > 0 {
|
||||
// We want to honour max_runtime only when we're running an
|
||||
// experiment that clearly wants specific input. We could refine
|
||||
// this policy in the future, but for now this covers in a
|
||||
// reasonable way web connectivity, so we should be ok.
|
||||
switch builder.InputPolicy() {
|
||||
case engine.InputOrQueryTestLists, engine.InputStrictlyRequired:
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(
|
||||
ctx, time.Duration(r.settings.Options.MaxRuntime)*time.Second,
|
||||
)
|
||||
defer cancel()
|
||||
}
|
||||
}
|
||||
inputCount := len(r.settings.Inputs)
|
||||
start := time.Now()
|
||||
inflatedMaxRuntime := r.settings.Options.MaxRuntime + r.settings.Options.MaxRuntime/10
|
||||
eta := start.Add(time.Duration(inflatedMaxRuntime) * time.Second)
|
||||
for idx, input := range r.settings.Inputs {
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
logger.Infof("Starting measurement with index %d", idx)
|
||||
r.emitter.Emit(statusMeasurementStart, eventMeasurementGeneric{
|
||||
Idx: int64(idx),
|
||||
Input: input,
|
||||
})
|
||||
if input != "" && inputCount > 0 {
|
||||
var percentage float64
|
||||
if r.settings.Options.MaxRuntime > 0 {
|
||||
now := time.Now()
|
||||
percentage = (now.Sub(start).Seconds()/eta.Sub(start).Seconds())*0.6 + 0.4
|
||||
} else {
|
||||
percentage = (float64(idx)/float64(inputCount))*0.6 + 0.4
|
||||
}
|
||||
r.emitter.EmitStatusProgress(percentage, fmt.Sprintf(
|
||||
"processing %s", input,
|
||||
))
|
||||
}
|
||||
m, err := experiment.MeasureWithContext(
|
||||
r.contextForExperiment(ctx, builder),
|
||||
input,
|
||||
)
|
||||
if builder.Interruptible() && ctx.Err() != nil {
|
||||
// We want to stop here only if interruptible otherwise we want to
|
||||
// submit measurement and stop at beginning of next iteration
|
||||
break
|
||||
}
|
||||
m.AddAnnotations(r.settings.Annotations)
|
||||
if err != nil {
|
||||
r.emitter.Emit(failureMeasurement, eventMeasurementGeneric{
|
||||
Failure: err.Error(),
|
||||
Idx: int64(idx),
|
||||
Input: input,
|
||||
})
|
||||
// fallthrough: we want to submit the report anyway
|
||||
}
|
||||
data, err := json.Marshal(m)
|
||||
runtimex.PanicOnError(err, "measurement.MarshalJSON failed")
|
||||
r.emitter.Emit(measurement, eventMeasurementGeneric{
|
||||
Idx: int64(idx),
|
||||
Input: input,
|
||||
JSONStr: string(data),
|
||||
})
|
||||
if !r.settings.Options.NoCollector {
|
||||
logger.Info("Submitting measurement... please, be patient")
|
||||
err := experiment.SubmitAndUpdateMeasurement(m)
|
||||
r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{
|
||||
Idx: int64(idx),
|
||||
Input: input,
|
||||
JSONStr: string(data),
|
||||
Failure: measurementSubmissionFailure(err),
|
||||
})
|
||||
}
|
||||
r.emitter.Emit(statusMeasurementDone, eventMeasurementGeneric{
|
||||
Idx: int64(idx),
|
||||
Input: input,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func measurementSubmissionEventName(err error) string {
|
||||
if err != nil {
|
||||
return failureMeasurementSubmission
|
||||
}
|
||||
return statusMeasurementSubmission
|
||||
}
|
||||
|
||||
func measurementSubmissionFailure(err error) string {
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
@@ -0,0 +1,404 @@
|
||||
package tasks_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ooni/probe-cli/v3/pkg/oonimkall/internal/tasks"
|
||||
)
|
||||
|
||||
func TestRunnerMaybeLookupBackendsFailure(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(500)
|
||||
}))
|
||||
defer server.Close()
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
Name: "Example",
|
||||
Options: tasks.SettingsOptions{
|
||||
ProbeServicesBaseURL: server.URL,
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var failures []string
|
||||
for ev := range out {
|
||||
switch ev.Key {
|
||||
case "failure.startup":
|
||||
failure := ev.Value.(tasks.EventFailure).Failure
|
||||
failures = append(failures, failure)
|
||||
case "status.queued", "status.started", "log", "status.end":
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected key: %s", ev.Key))
|
||||
}
|
||||
}
|
||||
if len(failures) != 1 {
|
||||
t.Fatal("unexpected number of failures")
|
||||
}
|
||||
if failures[0] != "all available probe services failed" {
|
||||
t.Fatal("invalid failure")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerOpenReportFailure(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
var (
|
||||
nreq int64
|
||||
mu sync.Mutex
|
||||
)
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
nreq++
|
||||
if nreq == 1 {
|
||||
w.Write([]byte(`{}`))
|
||||
return
|
||||
}
|
||||
w.WriteHeader(500)
|
||||
}))
|
||||
defer server.Close()
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
Name: "Example",
|
||||
Options: tasks.SettingsOptions{
|
||||
ProbeServicesBaseURL: server.URL,
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
seench := make(chan int64)
|
||||
go func() {
|
||||
var seen int64
|
||||
for ev := range out {
|
||||
switch ev.Key {
|
||||
case "failure.report_create":
|
||||
seen++
|
||||
case "status.progress":
|
||||
evv := ev.Value.(tasks.EventStatusProgress)
|
||||
if evv.Percentage >= 0.4 {
|
||||
panic(fmt.Sprintf("too much progress: %+v", ev))
|
||||
}
|
||||
case "status.queued", "status.started", "log", "status.end",
|
||||
"status.geoip_lookup", "status.resolver_lookup":
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected key: %s", ev.Key))
|
||||
}
|
||||
}
|
||||
seench <- seen
|
||||
}()
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
if n := <-seench; n != 1 {
|
||||
t.Fatal("unexpected number of events")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerGood(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
LogLevel: "DEBUG",
|
||||
Name: "Example",
|
||||
Options: tasks.SettingsOptions{
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var found bool
|
||||
for ev := range out {
|
||||
if ev.Key == "status.end" {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatal("status.end event not found")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithUnsupportedSettings(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
LogLevel: "DEBUG",
|
||||
Name: "Example",
|
||||
Options: tasks.SettingsOptions{
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var failures []string
|
||||
for ev := range out {
|
||||
if ev.Key == "failure.startup" {
|
||||
failure := ev.Value.(tasks.EventFailure).Failure
|
||||
failures = append(failures, failure)
|
||||
}
|
||||
}
|
||||
if len(failures) != 1 {
|
||||
t.Fatal("invalid number of failures")
|
||||
}
|
||||
if failures[0] != tasks.FailureInvalidVersion {
|
||||
t.Fatal("not the failure we expected")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithInvalidKVStorePath(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
LogLevel: "DEBUG",
|
||||
Name: "Example",
|
||||
Options: tasks.SettingsOptions{
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "", // must be empty to cause the failure below
|
||||
Version: 1,
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var failures []string
|
||||
for ev := range out {
|
||||
if ev.Key == "failure.startup" {
|
||||
failure := ev.Value.(tasks.EventFailure).Failure
|
||||
failures = append(failures, failure)
|
||||
}
|
||||
}
|
||||
if len(failures) != 1 {
|
||||
t.Fatal("invalid number of failures")
|
||||
}
|
||||
if failures[0] != "mkdir : no such file or directory" {
|
||||
t.Fatal("not the failure we expected")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithInvalidExperimentName(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
LogLevel: "DEBUG",
|
||||
Name: "Nonexistent",
|
||||
Options: tasks.SettingsOptions{
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var failures []string
|
||||
for ev := range out {
|
||||
if ev.Key == "failure.startup" {
|
||||
failure := ev.Value.(tasks.EventFailure).Failure
|
||||
failures = append(failures, failure)
|
||||
}
|
||||
}
|
||||
if len(failures) != 1 {
|
||||
t.Fatal("invalid number of failures")
|
||||
}
|
||||
if failures[0] != "no such experiment: Nonexistent" {
|
||||
t.Fatalf("not the failure we expected: %s", failures[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithMissingInput(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
LogLevel: "DEBUG",
|
||||
Name: "ExampleWithInput",
|
||||
Options: tasks.SettingsOptions{
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var failures []string
|
||||
for ev := range out {
|
||||
if ev.Key == "failure.startup" {
|
||||
failure := ev.Value.(tasks.EventFailure).Failure
|
||||
failures = append(failures, failure)
|
||||
}
|
||||
}
|
||||
if len(failures) != 1 {
|
||||
t.Fatal("invalid number of failures")
|
||||
}
|
||||
if failures[0] != "no input provided" {
|
||||
t.Fatalf("not the failure we expected: %s", failures[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithMaxRuntime(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"},
|
||||
LogLevel: "DEBUG",
|
||||
Name: "ExampleWithInput",
|
||||
Options: tasks.SettingsOptions{
|
||||
MaxRuntime: 1,
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
begin := time.Now()
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var found bool
|
||||
for ev := range out {
|
||||
if ev.Key == "status.end" {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatal("status.end event not found")
|
||||
}
|
||||
// The runtime is long because of ancillary operations and is even more
|
||||
// longer because of self shaping we may be performing (especially in
|
||||
// CI builds) using `-tags shaping`). We have experimentally determined
|
||||
// that ~10 seconds is the typical CI test run time. See:
|
||||
//
|
||||
// 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788
|
||||
//
|
||||
// 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855
|
||||
if time.Now().Sub(begin) > 10*time.Second {
|
||||
t.Fatal("expected shorter runtime")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithMaxRuntimeNonInterruptible(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"},
|
||||
LogLevel: "DEBUG",
|
||||
Name: "ExampleWithInputNonInterruptible",
|
||||
Options: tasks.SettingsOptions{
|
||||
MaxRuntime: 1,
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
begin := time.Now()
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var found bool
|
||||
for ev := range out {
|
||||
if ev.Key == "status.end" {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatal("status.end event not found")
|
||||
}
|
||||
// The runtime is long because of ancillary operations and is even more
|
||||
// longer because of self shaping we may be performing (especially in
|
||||
// CI builds) using `-tags shaping`). We have experimentally determined
|
||||
// that ~10 seconds is the typical CI test run time. See:
|
||||
//
|
||||
// 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788
|
||||
//
|
||||
// 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855
|
||||
if time.Now().Sub(begin) > 10*time.Second {
|
||||
t.Fatal("expected shorter runtime")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithFailedMeasurement(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"},
|
||||
LogLevel: "DEBUG",
|
||||
Name: "ExampleWithFailure",
|
||||
Options: tasks.SettingsOptions{
|
||||
MaxRuntime: 1,
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var found bool
|
||||
for ev := range out {
|
||||
if ev.Key == "failure.measurement" {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatal("failure.measurement event not found")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
engine "github.com/ooni/probe-cli/v3/internal/engine"
|
||||
)
|
||||
|
||||
func TestMeasurementSubmissionEventName(t *testing.T) {
|
||||
if measurementSubmissionEventName(nil) != statusMeasurementSubmission {
|
||||
t.Fatal("unexpected submission event name")
|
||||
}
|
||||
if measurementSubmissionEventName(errors.New("mocked error")) != failureMeasurementSubmission {
|
||||
t.Fatal("unexpected submission event name")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurementSubmissionFailure(t *testing.T) {
|
||||
if measurementSubmissionFailure(nil) != "" {
|
||||
t.Fatal("unexpected submission failure")
|
||||
}
|
||||
if measurementSubmissionFailure(errors.New("mocked error")) != "mocked error" {
|
||||
t.Fatal("unexpected submission failure")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerMaybeLookupLocationFailure(t *testing.T) {
|
||||
out := make(chan *Event)
|
||||
settings := &Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
Name: "Example",
|
||||
Options: SettingsOptions{
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
seench := make(chan int64)
|
||||
go func() {
|
||||
var seen int64
|
||||
for ev := range out {
|
||||
switch ev.Key {
|
||||
case "failure.ip_lookup", "failure.asn_lookup",
|
||||
"failure.cc_lookup", "failure.resolver_lookup":
|
||||
seen++
|
||||
case "status.progress":
|
||||
evv := ev.Value.(EventStatusProgress)
|
||||
if evv.Percentage >= 0.2 {
|
||||
panic(fmt.Sprintf("too much progress: %+v", ev))
|
||||
}
|
||||
case "status.queued", "status.started", "status.end":
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected key: %s", ev.Key))
|
||||
}
|
||||
}
|
||||
seench <- seen
|
||||
}()
|
||||
expected := errors.New("mocked error")
|
||||
r := NewRunner(settings, out)
|
||||
r.maybeLookupLocation = func(*engine.Session) error {
|
||||
return expected
|
||||
}
|
||||
r.Run(context.Background())
|
||||
close(out)
|
||||
if n := <-seench; n != 4 {
|
||||
t.Fatal("unexpected number of events")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package tasks
|
||||
|
||||
// Settings contains settings for a task. This structure derives from
|
||||
// the one described by MK v0.10.9 FFI API (https://git.io/Jv4Rv), yet
|
||||
// since 2020-12-03 we're not backwards compatible anymore.
|
||||
type Settings struct {
|
||||
// Annotations contains the annotations to be added
|
||||
// to every measurements performed by the task.
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
|
||||
// AssetsDir is the directory where to store assets. This
|
||||
// field is an extension of MK's specification. If
|
||||
// this field is empty, the task won't start.
|
||||
AssetsDir string `json:"assets_dir"`
|
||||
|
||||
// DisabledEvents contains disabled events. See
|
||||
// https://git.io/Jv4Rv for the events names.
|
||||
DisabledEvents []string `json:"disabled_events,omitempty"`
|
||||
|
||||
// Inputs contains the inputs. The task will fail if it
|
||||
// requires input and you provide no input.
|
||||
Inputs []string `json:"inputs,omitempty"`
|
||||
|
||||
// LogLevel contains the logs level. See https://git.io/Jv4Rv
|
||||
// for the names of the available log levels.
|
||||
LogLevel string `json:"log_level,omitempty"`
|
||||
|
||||
// Name contains the task name. By https://git.io/Jv4Rv the
|
||||
// names are in camel case, e.g. `Ndt`.
|
||||
Name string `json:"name"`
|
||||
|
||||
// Options contains the task options.
|
||||
Options SettingsOptions `json:"options"`
|
||||
|
||||
// StateDir is the directory where to store persistent data. This
|
||||
// field is an extension of MK's specification. If
|
||||
// this field is empty, the task won't start.
|
||||
StateDir string `json:"state_dir"`
|
||||
|
||||
// TempDir is the temporary directory. This field is an extension of MK's
|
||||
// specification. If this field is empty, we will pick the tempdir that
|
||||
// ioutil.TempDir uses by default, which may not work on mobile. According
|
||||
// to our experiments as of 2020-06-10, leaving the TempDir empty works
|
||||
// for iOS and does not work for Android.
|
||||
TempDir string `json:"temp_dir"`
|
||||
|
||||
// Version indicates the version of this structure.
|
||||
Version int64 `json:"version"`
|
||||
}
|
||||
|
||||
// SettingsOptions contains the settings options
|
||||
type SettingsOptions struct {
|
||||
// MaxRuntime is the maximum runtime expressed in seconds. A negative
|
||||
// value for this field disables the maximum runtime. Using
|
||||
// a zero value will also mean disabled. This is not the
|
||||
// original behaviour of Measurement Kit, which used to run
|
||||
// for zero time in such case.
|
||||
MaxRuntime float64 `json:"max_runtime,omitempty"`
|
||||
|
||||
// NoCollector indicates whether to use a collector
|
||||
NoCollector bool `json:"no_collector,omitempty"`
|
||||
|
||||
// ProbeServicesBaseURL contains the probe services base URL.
|
||||
ProbeServicesBaseURL string `json:"probe_services_base_url,omitempty"`
|
||||
|
||||
// SoftwareName is the software name. If this option is not
|
||||
// present, then the library startup will fail.
|
||||
SoftwareName string `json:"software_name,omitempty"`
|
||||
|
||||
// SoftwareVersion is the software version. If this option is not
|
||||
// present, then the library startup will fail.
|
||||
SoftwareVersion string `json:"software_version,omitempty"`
|
||||
}
|
||||
Reference in New Issue
Block a user