refactor: start building an Android package (#205)
* refactor: start building an Android package Part of https://github.com/ooni/probe/issues/1335. This seems also a good moment to move some packages out of the engine, e.g., oonimkall. This package, for example, is a consumer of the engine, so it makes sense it's not _inside_ it. * fix: committed some stuff I didn't need to commit * fix: oonimkall needs to be public to build The side effect is that we will probably need to bump the major version number every time we change one of these APIs. (We can also of course choose to violate the basic guidelines of Go software, but I believe this is bad form.) I have no problem in bumping the major quite frequently and in any case this monorepo solution is convinving me more than continuing to keep a split between engine and cli. The need to embed assets to make the probe more reliable trumps the negative effects of having to ~frequently bump major because we expose a public API. * fix: let's not forget about libooniffi Honestly, I don't know what to do with this library. I added it to provide a drop in replacement for MK but I have no idea whether it's used and useful. I would not feel comfortable exposing it, unlike oonimkall, since we're not using it. It may be that the right thing to do here is just to delete the package and reduce the amount of code we're maintaining? * woops, we're still missing the publish android script * fix(publish-android.bash): add proper API key * ouch fix another place where the name changed
This commit is contained in:
@@ -0,0 +1,87 @@
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// ChanLogger is a logger targeting a channel
|
||||
type ChanLogger struct {
|
||||
emitter *EventEmitter
|
||||
hasdebug bool
|
||||
hasinfo bool
|
||||
haswarning bool
|
||||
out chan<- *Event
|
||||
}
|
||||
|
||||
// Debug implements Logger.Debug
|
||||
func (cl *ChanLogger) Debug(msg string) {
|
||||
if cl.hasdebug {
|
||||
cl.emitter.Emit("log", EventLog{
|
||||
LogLevel: "DEBUG",
|
||||
Message: msg,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Debugf implements Logger.Debugf
|
||||
func (cl *ChanLogger) Debugf(format string, v ...interface{}) {
|
||||
if cl.hasdebug {
|
||||
cl.Debug(fmt.Sprintf(format, v...))
|
||||
}
|
||||
}
|
||||
|
||||
// Info implements Logger.Info
|
||||
func (cl *ChanLogger) Info(msg string) {
|
||||
if cl.hasinfo {
|
||||
cl.emitter.Emit("log", EventLog{
|
||||
LogLevel: "INFO",
|
||||
Message: msg,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Infof implements Logger.Infof
|
||||
func (cl *ChanLogger) Infof(format string, v ...interface{}) {
|
||||
if cl.hasinfo {
|
||||
cl.Info(fmt.Sprintf(format, v...))
|
||||
}
|
||||
}
|
||||
|
||||
// Warn implements Logger.Warn
|
||||
func (cl *ChanLogger) Warn(msg string) {
|
||||
if cl.haswarning {
|
||||
cl.emitter.Emit("log", EventLog{
|
||||
LogLevel: "WARNING",
|
||||
Message: msg,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Warnf implements Logger.Warnf
|
||||
func (cl *ChanLogger) Warnf(format string, v ...interface{}) {
|
||||
if cl.haswarning {
|
||||
cl.Warn(fmt.Sprintf(format, v...))
|
||||
}
|
||||
}
|
||||
|
||||
// NewChanLogger creates a new ChanLogger instance.
|
||||
func NewChanLogger(emitter *EventEmitter, logLevel string,
|
||||
out chan<- *Event) *ChanLogger {
|
||||
cl := &ChanLogger{
|
||||
emitter: emitter,
|
||||
out: out,
|
||||
}
|
||||
switch logLevel {
|
||||
case "DEBUG", "DEBUG2":
|
||||
cl.hasdebug = true
|
||||
fallthrough
|
||||
case "INFO":
|
||||
cl.hasinfo = true
|
||||
fallthrough
|
||||
case "ERR", "WARNING":
|
||||
fallthrough
|
||||
default:
|
||||
cl.haswarning = true
|
||||
}
|
||||
return cl
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package tasks
|
||||
|
||||
type eventEmpty struct{}
|
||||
|
||||
// EventFailure contains information on a failure.
|
||||
type EventFailure struct {
|
||||
Failure string `json:"failure"`
|
||||
}
|
||||
|
||||
// EventLog is an event containing a log message.
|
||||
type EventLog struct {
|
||||
LogLevel string `json:"log_level"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type eventMeasurementGeneric struct {
|
||||
Failure string `json:"failure,omitempty"`
|
||||
Idx int64 `json:"idx"`
|
||||
Input string `json:"input"`
|
||||
JSONStr string `json:"json_str,omitempty"`
|
||||
}
|
||||
|
||||
type eventStatusEnd struct {
|
||||
DownloadedKB float64 `json:"downloaded_kb"`
|
||||
Failure string `json:"failure"`
|
||||
UploadedKB float64 `json:"uploaded_kb"`
|
||||
}
|
||||
|
||||
type eventStatusGeoIPLookup struct {
|
||||
ProbeASN string `json:"probe_asn"`
|
||||
ProbeCC string `json:"probe_cc"`
|
||||
ProbeIP string `json:"probe_ip"`
|
||||
ProbeNetworkName string `json:"probe_network_name"`
|
||||
}
|
||||
|
||||
// EventStatusProgress reports progress information.
|
||||
type EventStatusProgress struct {
|
||||
Message string `json:"message"`
|
||||
Percentage float64 `json:"percentage"`
|
||||
}
|
||||
|
||||
type eventStatusReportGeneric struct {
|
||||
ReportID string `json:"report_id"`
|
||||
}
|
||||
|
||||
type eventStatusResolverLookup struct {
|
||||
ResolverASN string `json:"resolver_asn"`
|
||||
ResolverIP string `json:"resolver_ip"`
|
||||
ResolverNetworkName string `json:"resolver_network_name"`
|
||||
}
|
||||
|
||||
// Event is an event emitted by a task. This structure extends the event
|
||||
// described by MK v0.10.9 FFI API (https://git.io/Jv4Rv).
|
||||
type Event struct {
|
||||
Key string `json:"key"`
|
||||
Value interface{} `json:"value"`
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package tasks
|
||||
|
||||
// EventEmitter emits event on a channel
|
||||
type EventEmitter struct {
|
||||
disabled map[string]bool
|
||||
out chan<- *Event
|
||||
}
|
||||
|
||||
// NewEventEmitter creates a new Emitter
|
||||
func NewEventEmitter(disabledEvents []string, out chan<- *Event) *EventEmitter {
|
||||
ee := &EventEmitter{out: out}
|
||||
ee.disabled = make(map[string]bool)
|
||||
for _, eventname := range disabledEvents {
|
||||
ee.disabled[eventname] = true
|
||||
}
|
||||
return ee
|
||||
}
|
||||
|
||||
// EmitFailureStartup emits the failureStartup event
|
||||
func (ee *EventEmitter) EmitFailureStartup(failure string) {
|
||||
ee.EmitFailureGeneric(failureStartup, failure)
|
||||
}
|
||||
|
||||
// EmitFailureGeneric emits a failure event
|
||||
func (ee *EventEmitter) EmitFailureGeneric(name, failure string) {
|
||||
ee.Emit(name, EventFailure{Failure: failure})
|
||||
}
|
||||
|
||||
// EmitStatusProgress emits the status.Progress event
|
||||
func (ee *EventEmitter) EmitStatusProgress(percentage float64, message string) {
|
||||
ee.Emit(statusProgress, EventStatusProgress{Message: message, Percentage: percentage})
|
||||
}
|
||||
|
||||
// Emit emits the specified event
|
||||
func (ee *EventEmitter) Emit(key string, value interface{}) {
|
||||
if ee.disabled[key] == true {
|
||||
return
|
||||
}
|
||||
ee.out <- &Event{Key: key, Value: value}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
package tasks_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/ooni/probe-cli/v3/pkg/oonimkall/tasks"
|
||||
)
|
||||
|
||||
func TestDisabledEvents(t *testing.T) {
|
||||
out := make(chan *tasks.Event)
|
||||
emitter := tasks.NewEventEmitter([]string{"log"}, out)
|
||||
go func() {
|
||||
emitter.Emit("log", tasks.EventLog{Message: "foo"})
|
||||
close(out)
|
||||
}()
|
||||
var count int64
|
||||
for ev := range out {
|
||||
if ev.Key == "log" {
|
||||
count++
|
||||
}
|
||||
}
|
||||
if count > 0 {
|
||||
t.Fatal("cannot disable events")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmitFailureStartup(t *testing.T) {
|
||||
out := make(chan *tasks.Event)
|
||||
emitter := tasks.NewEventEmitter([]string{}, out)
|
||||
go func() {
|
||||
emitter.EmitFailureStartup("mocked error")
|
||||
close(out)
|
||||
}()
|
||||
var found bool
|
||||
for ev := range out {
|
||||
if ev.Key == "failure.startup" {
|
||||
evv := ev.Value.(tasks.EventFailure) // panic if not castable
|
||||
if evv.Failure == "mocked error" {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatal("did not see expected event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmitStatusProgress(t *testing.T) {
|
||||
out := make(chan *tasks.Event)
|
||||
emitter := tasks.NewEventEmitter([]string{}, out)
|
||||
go func() {
|
||||
emitter.EmitStatusProgress(0.7, "foo")
|
||||
close(out)
|
||||
}()
|
||||
var found bool
|
||||
for ev := range out {
|
||||
if ev.Key == "status.progress" {
|
||||
evv := ev.Value.(tasks.EventStatusProgress) // panic if not castable
|
||||
if evv.Message == "foo" && evv.Percentage == 0.7 {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatal("did not see expected event")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,299 @@
|
||||
// Package tasks implements tasks run using the oonimkall API.
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
engine "github.com/ooni/probe-cli/v3/internal/engine"
|
||||
"github.com/ooni/probe-cli/v3/internal/engine/runtimex"
|
||||
"github.com/ooni/probe-cli/v3/internal/engine/model"
|
||||
)
|
||||
|
||||
const (
|
||||
failureIPLookup = "failure.ip_lookup"
|
||||
failureASNLookup = "failure.asn_lookup"
|
||||
failureCCLookup = "failure.cc_lookup"
|
||||
failureMeasurement = "failure.measurement"
|
||||
failureMeasurementSubmission = "failure.measurement_submission"
|
||||
failureReportCreate = "failure.report_create"
|
||||
failureResolverLookup = "failure.resolver_lookup"
|
||||
failureStartup = "failure.startup"
|
||||
measurement = "measurement"
|
||||
statusEnd = "status.end"
|
||||
statusGeoIPLookup = "status.geoip_lookup"
|
||||
statusMeasurementDone = "status.measurement_done"
|
||||
statusMeasurementStart = "status.measurement_start"
|
||||
statusMeasurementSubmission = "status.measurement_submission"
|
||||
statusProgress = "status.progress"
|
||||
statusQueued = "status.queued"
|
||||
statusReportCreate = "status.report_create"
|
||||
statusResolverLookup = "status.resolver_lookup"
|
||||
statusStarted = "status.started"
|
||||
)
|
||||
|
||||
// Run runs the task specified by settings.Name until completion. This is the
|
||||
// top-level API that should be called by oonimkall.
|
||||
func Run(ctx context.Context, settings *Settings, out chan<- *Event) {
|
||||
r := NewRunner(settings, out)
|
||||
r.Run(ctx)
|
||||
}
|
||||
|
||||
// Runner runs a specific task
|
||||
type Runner struct {
|
||||
emitter *EventEmitter
|
||||
maybeLookupLocation func(*engine.Session) error
|
||||
out chan<- *Event
|
||||
settings *Settings
|
||||
}
|
||||
|
||||
// NewRunner creates a new task runner
|
||||
func NewRunner(settings *Settings, out chan<- *Event) *Runner {
|
||||
return &Runner{
|
||||
emitter: NewEventEmitter(settings.DisabledEvents, out),
|
||||
out: out,
|
||||
settings: settings,
|
||||
}
|
||||
}
|
||||
|
||||
// FailureInvalidVersion is the failure returned when Version is invalid
|
||||
const FailureInvalidVersion = "invalid Settings.Version number"
|
||||
|
||||
func (r *Runner) hasUnsupportedSettings(logger *ChanLogger) bool {
|
||||
if r.settings.Version < 1 {
|
||||
r.emitter.EmitFailureStartup(FailureInvalidVersion)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (r *Runner) newsession(logger *ChanLogger) (*engine.Session, error) {
|
||||
kvstore, err := engine.NewFileSystemKVStore(r.settings.StateDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config := engine.SessionConfig{
|
||||
AssetsDir: r.settings.AssetsDir,
|
||||
KVStore: kvstore,
|
||||
Logger: logger,
|
||||
SoftwareName: r.settings.Options.SoftwareName,
|
||||
SoftwareVersion: r.settings.Options.SoftwareVersion,
|
||||
TempDir: r.settings.TempDir,
|
||||
}
|
||||
if r.settings.Options.ProbeServicesBaseURL != "" {
|
||||
config.AvailableProbeServices = []model.Service{{
|
||||
Type: "https",
|
||||
Address: r.settings.Options.ProbeServicesBaseURL,
|
||||
}}
|
||||
}
|
||||
return engine.NewSession(config)
|
||||
}
|
||||
|
||||
func (r *Runner) contextForExperiment(
|
||||
ctx context.Context, builder *engine.ExperimentBuilder,
|
||||
) context.Context {
|
||||
if builder.Interruptible() {
|
||||
return ctx
|
||||
}
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
type runnerCallbacks struct {
|
||||
emitter *EventEmitter
|
||||
}
|
||||
|
||||
func (cb *runnerCallbacks) OnProgress(percentage float64, message string) {
|
||||
cb.emitter.Emit(statusProgress, EventStatusProgress{
|
||||
Percentage: 0.4 + (percentage * 0.6), // open report is 40%
|
||||
Message: message,
|
||||
})
|
||||
}
|
||||
|
||||
// Run runs the runner until completion. The context argument controls
|
||||
// when to stop when processing multiple inputs, as well as when to stop
|
||||
// experiments explicitly marked as interruptible.
|
||||
func (r *Runner) Run(ctx context.Context) {
|
||||
logger := NewChanLogger(r.emitter, r.settings.LogLevel, r.out)
|
||||
r.emitter.Emit(statusQueued, eventEmpty{})
|
||||
if r.hasUnsupportedSettings(logger) {
|
||||
return
|
||||
}
|
||||
r.emitter.Emit(statusStarted, eventEmpty{})
|
||||
sess, err := r.newsession(logger)
|
||||
if err != nil {
|
||||
r.emitter.EmitFailureStartup(err.Error())
|
||||
return
|
||||
}
|
||||
endEvent := new(eventStatusEnd)
|
||||
defer func() {
|
||||
sess.Close()
|
||||
r.emitter.Emit(statusEnd, endEvent)
|
||||
}()
|
||||
|
||||
builder, err := sess.NewExperimentBuilder(r.settings.Name)
|
||||
if err != nil {
|
||||
r.emitter.EmitFailureStartup(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("Looking up OONI backends... please, be patient")
|
||||
if err := sess.MaybeLookupBackends(); err != nil {
|
||||
r.emitter.EmitFailureStartup(err.Error())
|
||||
return
|
||||
}
|
||||
r.emitter.EmitStatusProgress(0.1, "contacted bouncer")
|
||||
|
||||
logger.Info("Looking up your location... please, be patient")
|
||||
maybeLookupLocation := r.maybeLookupLocation
|
||||
if maybeLookupLocation == nil {
|
||||
maybeLookupLocation = func(sess *engine.Session) error {
|
||||
return sess.MaybeLookupLocation()
|
||||
}
|
||||
}
|
||||
if err := maybeLookupLocation(sess); err != nil {
|
||||
r.emitter.EmitFailureGeneric(failureIPLookup, err.Error())
|
||||
r.emitter.EmitFailureGeneric(failureASNLookup, err.Error())
|
||||
r.emitter.EmitFailureGeneric(failureCCLookup, err.Error())
|
||||
r.emitter.EmitFailureGeneric(failureResolverLookup, err.Error())
|
||||
return
|
||||
}
|
||||
r.emitter.EmitStatusProgress(0.2, "geoip lookup")
|
||||
r.emitter.EmitStatusProgress(0.3, "resolver lookup")
|
||||
r.emitter.Emit(statusGeoIPLookup, eventStatusGeoIPLookup{
|
||||
ProbeIP: sess.ProbeIP(),
|
||||
ProbeASN: sess.ProbeASNString(),
|
||||
ProbeCC: sess.ProbeCC(),
|
||||
ProbeNetworkName: sess.ProbeNetworkName(),
|
||||
})
|
||||
r.emitter.Emit(statusResolverLookup, eventStatusResolverLookup{
|
||||
ResolverASN: sess.ResolverASNString(),
|
||||
ResolverIP: sess.ResolverIP(),
|
||||
ResolverNetworkName: sess.ResolverNetworkName(),
|
||||
})
|
||||
|
||||
builder.SetCallbacks(&runnerCallbacks{emitter: r.emitter})
|
||||
if len(r.settings.Inputs) <= 0 {
|
||||
switch builder.InputPolicy() {
|
||||
case engine.InputOrQueryTestLists, engine.InputStrictlyRequired:
|
||||
r.emitter.EmitFailureStartup("no input provided")
|
||||
return
|
||||
}
|
||||
r.settings.Inputs = append(r.settings.Inputs, "")
|
||||
}
|
||||
experiment := builder.NewExperiment()
|
||||
defer func() {
|
||||
endEvent.DownloadedKB = experiment.KibiBytesReceived()
|
||||
endEvent.UploadedKB = experiment.KibiBytesSent()
|
||||
}()
|
||||
if !r.settings.Options.NoCollector {
|
||||
logger.Info("Opening report... please, be patient")
|
||||
if err := experiment.OpenReport(); err != nil {
|
||||
r.emitter.EmitFailureGeneric(failureReportCreate, err.Error())
|
||||
return
|
||||
}
|
||||
r.emitter.EmitStatusProgress(0.4, "open report")
|
||||
r.emitter.Emit(statusReportCreate, eventStatusReportGeneric{
|
||||
ReportID: experiment.ReportID(),
|
||||
})
|
||||
}
|
||||
// This deviates a little bit from measurement-kit, for which
|
||||
// a zero timeout is actually valid. Since it does not make much
|
||||
// sense, here we're changing the behaviour.
|
||||
//
|
||||
// See https://github.com/measurement-kit/measurement-kit/issues/1922
|
||||
if r.settings.Options.MaxRuntime > 0 {
|
||||
// We want to honour max_runtime only when we're running an
|
||||
// experiment that clearly wants specific input. We could refine
|
||||
// this policy in the future, but for now this covers in a
|
||||
// reasonable way web connectivity, so we should be ok.
|
||||
switch builder.InputPolicy() {
|
||||
case engine.InputOrQueryTestLists, engine.InputStrictlyRequired:
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(
|
||||
ctx, time.Duration(r.settings.Options.MaxRuntime)*time.Second,
|
||||
)
|
||||
defer cancel()
|
||||
}
|
||||
}
|
||||
inputCount := len(r.settings.Inputs)
|
||||
start := time.Now()
|
||||
inflatedMaxRuntime := r.settings.Options.MaxRuntime + r.settings.Options.MaxRuntime/10
|
||||
eta := start.Add(time.Duration(inflatedMaxRuntime) * time.Second)
|
||||
for idx, input := range r.settings.Inputs {
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
logger.Infof("Starting measurement with index %d", idx)
|
||||
r.emitter.Emit(statusMeasurementStart, eventMeasurementGeneric{
|
||||
Idx: int64(idx),
|
||||
Input: input,
|
||||
})
|
||||
if input != "" && inputCount > 0 {
|
||||
var percentage float64
|
||||
if r.settings.Options.MaxRuntime > 0 {
|
||||
now := time.Now()
|
||||
percentage = (now.Sub(start).Seconds()/eta.Sub(start).Seconds())*0.6 + 0.4
|
||||
} else {
|
||||
percentage = (float64(idx)/float64(inputCount))*0.6 + 0.4
|
||||
}
|
||||
r.emitter.EmitStatusProgress(percentage, fmt.Sprintf(
|
||||
"processing %s", input,
|
||||
))
|
||||
}
|
||||
m, err := experiment.MeasureWithContext(
|
||||
r.contextForExperiment(ctx, builder),
|
||||
input,
|
||||
)
|
||||
if builder.Interruptible() && ctx.Err() != nil {
|
||||
// We want to stop here only if interruptible otherwise we want to
|
||||
// submit measurement and stop at beginning of next iteration
|
||||
break
|
||||
}
|
||||
m.AddAnnotations(r.settings.Annotations)
|
||||
if err != nil {
|
||||
r.emitter.Emit(failureMeasurement, eventMeasurementGeneric{
|
||||
Failure: err.Error(),
|
||||
Idx: int64(idx),
|
||||
Input: input,
|
||||
})
|
||||
// fallthrough: we want to submit the report anyway
|
||||
}
|
||||
data, err := json.Marshal(m)
|
||||
runtimex.PanicOnError(err, "measurement.MarshalJSON failed")
|
||||
r.emitter.Emit(measurement, eventMeasurementGeneric{
|
||||
Idx: int64(idx),
|
||||
Input: input,
|
||||
JSONStr: string(data),
|
||||
})
|
||||
if !r.settings.Options.NoCollector {
|
||||
logger.Info("Submitting measurement... please, be patient")
|
||||
err := experiment.SubmitAndUpdateMeasurement(m)
|
||||
r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{
|
||||
Idx: int64(idx),
|
||||
Input: input,
|
||||
JSONStr: string(data),
|
||||
Failure: measurementSubmissionFailure(err),
|
||||
})
|
||||
}
|
||||
r.emitter.Emit(statusMeasurementDone, eventMeasurementGeneric{
|
||||
Idx: int64(idx),
|
||||
Input: input,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func measurementSubmissionEventName(err error) string {
|
||||
if err != nil {
|
||||
return failureMeasurementSubmission
|
||||
}
|
||||
return statusMeasurementSubmission
|
||||
}
|
||||
|
||||
func measurementSubmissionFailure(err error) string {
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
@@ -0,0 +1,404 @@
|
||||
package tasks_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ooni/probe-cli/v3/pkg/oonimkall/tasks"
|
||||
)
|
||||
|
||||
func TestRunnerMaybeLookupBackendsFailure(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(500)
|
||||
}))
|
||||
defer server.Close()
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
Name: "Example",
|
||||
Options: tasks.SettingsOptions{
|
||||
ProbeServicesBaseURL: server.URL,
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var failures []string
|
||||
for ev := range out {
|
||||
switch ev.Key {
|
||||
case "failure.startup":
|
||||
failure := ev.Value.(tasks.EventFailure).Failure
|
||||
failures = append(failures, failure)
|
||||
case "status.queued", "status.started", "log", "status.end":
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected key: %s", ev.Key))
|
||||
}
|
||||
}
|
||||
if len(failures) != 1 {
|
||||
t.Fatal("unexpected number of failures")
|
||||
}
|
||||
if failures[0] != "all available probe services failed" {
|
||||
t.Fatal("invalid failure")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerOpenReportFailure(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
var (
|
||||
nreq int64
|
||||
mu sync.Mutex
|
||||
)
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
nreq++
|
||||
if nreq == 1 {
|
||||
w.Write([]byte(`{}`))
|
||||
return
|
||||
}
|
||||
w.WriteHeader(500)
|
||||
}))
|
||||
defer server.Close()
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
Name: "Example",
|
||||
Options: tasks.SettingsOptions{
|
||||
ProbeServicesBaseURL: server.URL,
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
seench := make(chan int64)
|
||||
go func() {
|
||||
var seen int64
|
||||
for ev := range out {
|
||||
switch ev.Key {
|
||||
case "failure.report_create":
|
||||
seen++
|
||||
case "status.progress":
|
||||
evv := ev.Value.(tasks.EventStatusProgress)
|
||||
if evv.Percentage >= 0.4 {
|
||||
panic(fmt.Sprintf("too much progress: %+v", ev))
|
||||
}
|
||||
case "status.queued", "status.started", "log", "status.end",
|
||||
"status.geoip_lookup", "status.resolver_lookup":
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected key: %s", ev.Key))
|
||||
}
|
||||
}
|
||||
seench <- seen
|
||||
}()
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
if n := <-seench; n != 1 {
|
||||
t.Fatal("unexpected number of events")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerGood(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
LogLevel: "DEBUG",
|
||||
Name: "Example",
|
||||
Options: tasks.SettingsOptions{
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var found bool
|
||||
for ev := range out {
|
||||
if ev.Key == "status.end" {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatal("status.end event not found")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithUnsupportedSettings(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
LogLevel: "DEBUG",
|
||||
Name: "Example",
|
||||
Options: tasks.SettingsOptions{
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var failures []string
|
||||
for ev := range out {
|
||||
if ev.Key == "failure.startup" {
|
||||
failure := ev.Value.(tasks.EventFailure).Failure
|
||||
failures = append(failures, failure)
|
||||
}
|
||||
}
|
||||
if len(failures) != 1 {
|
||||
t.Fatal("invalid number of failures")
|
||||
}
|
||||
if failures[0] != tasks.FailureInvalidVersion {
|
||||
t.Fatal("not the failure we expected")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithInvalidKVStorePath(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
LogLevel: "DEBUG",
|
||||
Name: "Example",
|
||||
Options: tasks.SettingsOptions{
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "", // must be empty to cause the failure below
|
||||
Version: 1,
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var failures []string
|
||||
for ev := range out {
|
||||
if ev.Key == "failure.startup" {
|
||||
failure := ev.Value.(tasks.EventFailure).Failure
|
||||
failures = append(failures, failure)
|
||||
}
|
||||
}
|
||||
if len(failures) != 1 {
|
||||
t.Fatal("invalid number of failures")
|
||||
}
|
||||
if failures[0] != "mkdir : no such file or directory" {
|
||||
t.Fatal("not the failure we expected")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithInvalidExperimentName(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
LogLevel: "DEBUG",
|
||||
Name: "Nonexistent",
|
||||
Options: tasks.SettingsOptions{
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var failures []string
|
||||
for ev := range out {
|
||||
if ev.Key == "failure.startup" {
|
||||
failure := ev.Value.(tasks.EventFailure).Failure
|
||||
failures = append(failures, failure)
|
||||
}
|
||||
}
|
||||
if len(failures) != 1 {
|
||||
t.Fatal("invalid number of failures")
|
||||
}
|
||||
if failures[0] != "no such experiment: Nonexistent" {
|
||||
t.Fatalf("not the failure we expected: %s", failures[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithMissingInput(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
LogLevel: "DEBUG",
|
||||
Name: "ExampleWithInput",
|
||||
Options: tasks.SettingsOptions{
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var failures []string
|
||||
for ev := range out {
|
||||
if ev.Key == "failure.startup" {
|
||||
failure := ev.Value.(tasks.EventFailure).Failure
|
||||
failures = append(failures, failure)
|
||||
}
|
||||
}
|
||||
if len(failures) != 1 {
|
||||
t.Fatal("invalid number of failures")
|
||||
}
|
||||
if failures[0] != "no input provided" {
|
||||
t.Fatalf("not the failure we expected: %s", failures[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithMaxRuntime(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"},
|
||||
LogLevel: "DEBUG",
|
||||
Name: "ExampleWithInput",
|
||||
Options: tasks.SettingsOptions{
|
||||
MaxRuntime: 1,
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
begin := time.Now()
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var found bool
|
||||
for ev := range out {
|
||||
if ev.Key == "status.end" {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatal("status.end event not found")
|
||||
}
|
||||
// The runtime is long because of ancillary operations and is even more
|
||||
// longer because of self shaping we may be performing (especially in
|
||||
// CI builds) using `-tags shaping`). We have experimentally determined
|
||||
// that ~10 seconds is the typical CI test run time. See:
|
||||
//
|
||||
// 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788
|
||||
//
|
||||
// 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855
|
||||
if time.Now().Sub(begin) > 10*time.Second {
|
||||
t.Fatal("expected shorter runtime")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithMaxRuntimeNonInterruptible(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"},
|
||||
LogLevel: "DEBUG",
|
||||
Name: "ExampleWithInputNonInterruptible",
|
||||
Options: tasks.SettingsOptions{
|
||||
MaxRuntime: 1,
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
begin := time.Now()
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var found bool
|
||||
for ev := range out {
|
||||
if ev.Key == "status.end" {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatal("status.end event not found")
|
||||
}
|
||||
// The runtime is long because of ancillary operations and is even more
|
||||
// longer because of self shaping we may be performing (especially in
|
||||
// CI builds) using `-tags shaping`). We have experimentally determined
|
||||
// that ~10 seconds is the typical CI test run time. See:
|
||||
//
|
||||
// 1. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263788
|
||||
//
|
||||
// 2. https://github.com/ooni/probe-cli/v3/internal/engine/pull/588/checks?check_run_id=667263855
|
||||
if time.Now().Sub(begin) > 10*time.Second {
|
||||
t.Fatal("expected shorter runtime")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerWithFailedMeasurement(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skip test in short mode")
|
||||
}
|
||||
out := make(chan *tasks.Event)
|
||||
settings := &tasks.Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
Inputs: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"},
|
||||
LogLevel: "DEBUG",
|
||||
Name: "ExampleWithFailure",
|
||||
Options: tasks.SettingsOptions{
|
||||
MaxRuntime: 1,
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
go func() {
|
||||
tasks.Run(context.Background(), settings, out)
|
||||
close(out)
|
||||
}()
|
||||
var found bool
|
||||
for ev := range out {
|
||||
if ev.Key == "failure.measurement" {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatal("failure.measurement event not found")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
engine "github.com/ooni/probe-cli/v3/internal/engine"
|
||||
)
|
||||
|
||||
func TestMeasurementSubmissionEventName(t *testing.T) {
|
||||
if measurementSubmissionEventName(nil) != statusMeasurementSubmission {
|
||||
t.Fatal("unexpected submission event name")
|
||||
}
|
||||
if measurementSubmissionEventName(errors.New("mocked error")) != failureMeasurementSubmission {
|
||||
t.Fatal("unexpected submission event name")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurementSubmissionFailure(t *testing.T) {
|
||||
if measurementSubmissionFailure(nil) != "" {
|
||||
t.Fatal("unexpected submission failure")
|
||||
}
|
||||
if measurementSubmissionFailure(errors.New("mocked error")) != "mocked error" {
|
||||
t.Fatal("unexpected submission failure")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerMaybeLookupLocationFailure(t *testing.T) {
|
||||
out := make(chan *Event)
|
||||
settings := &Settings{
|
||||
AssetsDir: "../../testdata/oonimkall/assets",
|
||||
Name: "Example",
|
||||
Options: SettingsOptions{
|
||||
SoftwareName: "oonimkall-test",
|
||||
SoftwareVersion: "0.1.0",
|
||||
},
|
||||
StateDir: "../../testdata/oonimkall/state",
|
||||
Version: 1,
|
||||
}
|
||||
seench := make(chan int64)
|
||||
go func() {
|
||||
var seen int64
|
||||
for ev := range out {
|
||||
switch ev.Key {
|
||||
case "failure.ip_lookup", "failure.asn_lookup",
|
||||
"failure.cc_lookup", "failure.resolver_lookup":
|
||||
seen++
|
||||
case "status.progress":
|
||||
evv := ev.Value.(EventStatusProgress)
|
||||
if evv.Percentage >= 0.2 {
|
||||
panic(fmt.Sprintf("too much progress: %+v", ev))
|
||||
}
|
||||
case "status.queued", "status.started", "status.end":
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected key: %s", ev.Key))
|
||||
}
|
||||
}
|
||||
seench <- seen
|
||||
}()
|
||||
expected := errors.New("mocked error")
|
||||
r := NewRunner(settings, out)
|
||||
r.maybeLookupLocation = func(*engine.Session) error {
|
||||
return expected
|
||||
}
|
||||
r.Run(context.Background())
|
||||
close(out)
|
||||
if n := <-seench; n != 4 {
|
||||
t.Fatal("unexpected number of events")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package tasks
|
||||
|
||||
// Settings contains settings for a task. This structure derives from
|
||||
// the one described by MK v0.10.9 FFI API (https://git.io/Jv4Rv), yet
|
||||
// since 2020-12-03 we're not backwards compatible anymore.
|
||||
type Settings struct {
|
||||
// Annotations contains the annotations to be added
|
||||
// to every measurements performed by the task.
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
|
||||
// AssetsDir is the directory where to store assets. This
|
||||
// field is an extension of MK's specification. If
|
||||
// this field is empty, the task won't start.
|
||||
AssetsDir string `json:"assets_dir"`
|
||||
|
||||
// DisabledEvents contains disabled events. See
|
||||
// https://git.io/Jv4Rv for the events names.
|
||||
DisabledEvents []string `json:"disabled_events,omitempty"`
|
||||
|
||||
// Inputs contains the inputs. The task will fail if it
|
||||
// requires input and you provide no input.
|
||||
Inputs []string `json:"inputs,omitempty"`
|
||||
|
||||
// LogLevel contains the logs level. See https://git.io/Jv4Rv
|
||||
// for the names of the available log levels.
|
||||
LogLevel string `json:"log_level,omitempty"`
|
||||
|
||||
// Name contains the task name. By https://git.io/Jv4Rv the
|
||||
// names are in camel case, e.g. `Ndt`.
|
||||
Name string `json:"name"`
|
||||
|
||||
// Options contains the task options.
|
||||
Options SettingsOptions `json:"options"`
|
||||
|
||||
// StateDir is the directory where to store persistent data. This
|
||||
// field is an extension of MK's specification. If
|
||||
// this field is empty, the task won't start.
|
||||
StateDir string `json:"state_dir"`
|
||||
|
||||
// TempDir is the temporary directory. This field is an extension of MK's
|
||||
// specification. If this field is empty, we will pick the tempdir that
|
||||
// ioutil.TempDir uses by default, which may not work on mobile. According
|
||||
// to our experiments as of 2020-06-10, leaving the TempDir empty works
|
||||
// for iOS and does not work for Android.
|
||||
TempDir string `json:"temp_dir"`
|
||||
|
||||
// Version indicates the version of this structure.
|
||||
Version int64 `json:"version"`
|
||||
}
|
||||
|
||||
// SettingsOptions contains the settings options
|
||||
type SettingsOptions struct {
|
||||
// MaxRuntime is the maximum runtime expressed in seconds. A negative
|
||||
// value for this field disables the maximum runtime. Using
|
||||
// a zero value will also mean disabled. This is not the
|
||||
// original behaviour of Measurement Kit, which used to run
|
||||
// for zero time in such case.
|
||||
MaxRuntime float64 `json:"max_runtime,omitempty"`
|
||||
|
||||
// NoCollector indicates whether to use a collector
|
||||
NoCollector bool `json:"no_collector,omitempty"`
|
||||
|
||||
// ProbeServicesBaseURL contains the probe services base URL.
|
||||
ProbeServicesBaseURL string `json:"probe_services_base_url,omitempty"`
|
||||
|
||||
// SoftwareName is the software name. If this option is not
|
||||
// present, then the library startup will fail.
|
||||
SoftwareName string `json:"software_name,omitempty"`
|
||||
|
||||
// SoftwareVersion is the software version. If this option is not
|
||||
// present, then the library startup will fail.
|
||||
SoftwareVersion string `json:"software_version,omitempty"`
|
||||
}
|
||||
Reference in New Issue
Block a user