300 lines
9.4 KiB
Go
300 lines
9.4 KiB
Go
|
// Package tasks implements tasks run using the oonimkall API.
|
||
|
package tasks
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
engine "github.com/ooni/probe-cli/v3/internal/engine"
|
||
|
"github.com/ooni/probe-cli/v3/internal/engine/internal/runtimex"
|
||
|
"github.com/ooni/probe-cli/v3/internal/engine/model"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
failureIPLookup = "failure.ip_lookup"
|
||
|
failureASNLookup = "failure.asn_lookup"
|
||
|
failureCCLookup = "failure.cc_lookup"
|
||
|
failureMeasurement = "failure.measurement"
|
||
|
failureMeasurementSubmission = "failure.measurement_submission"
|
||
|
failureReportCreate = "failure.report_create"
|
||
|
failureResolverLookup = "failure.resolver_lookup"
|
||
|
failureStartup = "failure.startup"
|
||
|
measurement = "measurement"
|
||
|
statusEnd = "status.end"
|
||
|
statusGeoIPLookup = "status.geoip_lookup"
|
||
|
statusMeasurementDone = "status.measurement_done"
|
||
|
statusMeasurementStart = "status.measurement_start"
|
||
|
statusMeasurementSubmission = "status.measurement_submission"
|
||
|
statusProgress = "status.progress"
|
||
|
statusQueued = "status.queued"
|
||
|
statusReportCreate = "status.report_create"
|
||
|
statusResolverLookup = "status.resolver_lookup"
|
||
|
statusStarted = "status.started"
|
||
|
)
|
||
|
|
||
|
// Run runs the task specified by settings.Name until completion. This is the
|
||
|
// top-level API that should be called by oonimkall.
|
||
|
func Run(ctx context.Context, settings *Settings, out chan<- *Event) {
|
||
|
r := NewRunner(settings, out)
|
||
|
r.Run(ctx)
|
||
|
}
|
||
|
|
||
|
// Runner runs a specific task
|
||
|
type Runner struct {
|
||
|
emitter *EventEmitter
|
||
|
maybeLookupLocation func(*engine.Session) error
|
||
|
out chan<- *Event
|
||
|
settings *Settings
|
||
|
}
|
||
|
|
||
|
// NewRunner creates a new task runner
|
||
|
func NewRunner(settings *Settings, out chan<- *Event) *Runner {
|
||
|
return &Runner{
|
||
|
emitter: NewEventEmitter(settings.DisabledEvents, out),
|
||
|
out: out,
|
||
|
settings: settings,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// FailureInvalidVersion is the failure returned when Version is invalid
|
||
|
const FailureInvalidVersion = "invalid Settings.Version number"
|
||
|
|
||
|
func (r *Runner) hasUnsupportedSettings(logger *ChanLogger) bool {
|
||
|
if r.settings.Version < 1 {
|
||
|
r.emitter.EmitFailureStartup(FailureInvalidVersion)
|
||
|
return true
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
func (r *Runner) newsession(logger *ChanLogger) (*engine.Session, error) {
|
||
|
kvstore, err := engine.NewFileSystemKVStore(r.settings.StateDir)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
config := engine.SessionConfig{
|
||
|
AssetsDir: r.settings.AssetsDir,
|
||
|
KVStore: kvstore,
|
||
|
Logger: logger,
|
||
|
SoftwareName: r.settings.Options.SoftwareName,
|
||
|
SoftwareVersion: r.settings.Options.SoftwareVersion,
|
||
|
TempDir: r.settings.TempDir,
|
||
|
}
|
||
|
if r.settings.Options.ProbeServicesBaseURL != "" {
|
||
|
config.AvailableProbeServices = []model.Service{{
|
||
|
Type: "https",
|
||
|
Address: r.settings.Options.ProbeServicesBaseURL,
|
||
|
}}
|
||
|
}
|
||
|
return engine.NewSession(config)
|
||
|
}
|
||
|
|
||
|
func (r *Runner) contextForExperiment(
|
||
|
ctx context.Context, builder *engine.ExperimentBuilder,
|
||
|
) context.Context {
|
||
|
if builder.Interruptible() {
|
||
|
return ctx
|
||
|
}
|
||
|
return context.Background()
|
||
|
}
|
||
|
|
||
|
type runnerCallbacks struct {
|
||
|
emitter *EventEmitter
|
||
|
}
|
||
|
|
||
|
func (cb *runnerCallbacks) OnProgress(percentage float64, message string) {
|
||
|
cb.emitter.Emit(statusProgress, EventStatusProgress{
|
||
|
Percentage: 0.4 + (percentage * 0.6), // open report is 40%
|
||
|
Message: message,
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// Run runs the runner until completion. The context argument controls
|
||
|
// when to stop when processing multiple inputs, as well as when to stop
|
||
|
// experiments explicitly marked as interruptible.
|
||
|
func (r *Runner) Run(ctx context.Context) {
|
||
|
logger := NewChanLogger(r.emitter, r.settings.LogLevel, r.out)
|
||
|
r.emitter.Emit(statusQueued, eventEmpty{})
|
||
|
if r.hasUnsupportedSettings(logger) {
|
||
|
return
|
||
|
}
|
||
|
r.emitter.Emit(statusStarted, eventEmpty{})
|
||
|
sess, err := r.newsession(logger)
|
||
|
if err != nil {
|
||
|
r.emitter.EmitFailureStartup(err.Error())
|
||
|
return
|
||
|
}
|
||
|
endEvent := new(eventStatusEnd)
|
||
|
defer func() {
|
||
|
sess.Close()
|
||
|
r.emitter.Emit(statusEnd, endEvent)
|
||
|
}()
|
||
|
|
||
|
builder, err := sess.NewExperimentBuilder(r.settings.Name)
|
||
|
if err != nil {
|
||
|
r.emitter.EmitFailureStartup(err.Error())
|
||
|
return
|
||
|
}
|
||
|
|
||
|
logger.Info("Looking up OONI backends... please, be patient")
|
||
|
if err := sess.MaybeLookupBackends(); err != nil {
|
||
|
r.emitter.EmitFailureStartup(err.Error())
|
||
|
return
|
||
|
}
|
||
|
r.emitter.EmitStatusProgress(0.1, "contacted bouncer")
|
||
|
|
||
|
logger.Info("Looking up your location... please, be patient")
|
||
|
maybeLookupLocation := r.maybeLookupLocation
|
||
|
if maybeLookupLocation == nil {
|
||
|
maybeLookupLocation = func(sess *engine.Session) error {
|
||
|
return sess.MaybeLookupLocation()
|
||
|
}
|
||
|
}
|
||
|
if err := maybeLookupLocation(sess); err != nil {
|
||
|
r.emitter.EmitFailureGeneric(failureIPLookup, err.Error())
|
||
|
r.emitter.EmitFailureGeneric(failureASNLookup, err.Error())
|
||
|
r.emitter.EmitFailureGeneric(failureCCLookup, err.Error())
|
||
|
r.emitter.EmitFailureGeneric(failureResolverLookup, err.Error())
|
||
|
return
|
||
|
}
|
||
|
r.emitter.EmitStatusProgress(0.2, "geoip lookup")
|
||
|
r.emitter.EmitStatusProgress(0.3, "resolver lookup")
|
||
|
r.emitter.Emit(statusGeoIPLookup, eventStatusGeoIPLookup{
|
||
|
ProbeIP: sess.ProbeIP(),
|
||
|
ProbeASN: sess.ProbeASNString(),
|
||
|
ProbeCC: sess.ProbeCC(),
|
||
|
ProbeNetworkName: sess.ProbeNetworkName(),
|
||
|
})
|
||
|
r.emitter.Emit(statusResolverLookup, eventStatusResolverLookup{
|
||
|
ResolverASN: sess.ResolverASNString(),
|
||
|
ResolverIP: sess.ResolverIP(),
|
||
|
ResolverNetworkName: sess.ResolverNetworkName(),
|
||
|
})
|
||
|
|
||
|
builder.SetCallbacks(&runnerCallbacks{emitter: r.emitter})
|
||
|
if len(r.settings.Inputs) <= 0 {
|
||
|
switch builder.InputPolicy() {
|
||
|
case engine.InputOrQueryTestLists, engine.InputStrictlyRequired:
|
||
|
r.emitter.EmitFailureStartup("no input provided")
|
||
|
return
|
||
|
}
|
||
|
r.settings.Inputs = append(r.settings.Inputs, "")
|
||
|
}
|
||
|
experiment := builder.NewExperiment()
|
||
|
defer func() {
|
||
|
endEvent.DownloadedKB = experiment.KibiBytesReceived()
|
||
|
endEvent.UploadedKB = experiment.KibiBytesSent()
|
||
|
}()
|
||
|
if !r.settings.Options.NoCollector {
|
||
|
logger.Info("Opening report... please, be patient")
|
||
|
if err := experiment.OpenReport(); err != nil {
|
||
|
r.emitter.EmitFailureGeneric(failureReportCreate, err.Error())
|
||
|
return
|
||
|
}
|
||
|
r.emitter.EmitStatusProgress(0.4, "open report")
|
||
|
r.emitter.Emit(statusReportCreate, eventStatusReportGeneric{
|
||
|
ReportID: experiment.ReportID(),
|
||
|
})
|
||
|
}
|
||
|
// This deviates a little bit from measurement-kit, for which
|
||
|
// a zero timeout is actually valid. Since it does not make much
|
||
|
// sense, here we're changing the behaviour.
|
||
|
//
|
||
|
// See https://github.com/measurement-kit/measurement-kit/issues/1922
|
||
|
if r.settings.Options.MaxRuntime > 0 {
|
||
|
// We want to honour max_runtime only when we're running an
|
||
|
// experiment that clearly wants specific input. We could refine
|
||
|
// this policy in the future, but for now this covers in a
|
||
|
// reasonable way web connectivity, so we should be ok.
|
||
|
switch builder.InputPolicy() {
|
||
|
case engine.InputOrQueryTestLists, engine.InputStrictlyRequired:
|
||
|
var cancel context.CancelFunc
|
||
|
ctx, cancel = context.WithTimeout(
|
||
|
ctx, time.Duration(r.settings.Options.MaxRuntime)*time.Second,
|
||
|
)
|
||
|
defer cancel()
|
||
|
}
|
||
|
}
|
||
|
inputCount := len(r.settings.Inputs)
|
||
|
start := time.Now()
|
||
|
inflatedMaxRuntime := r.settings.Options.MaxRuntime + r.settings.Options.MaxRuntime/10
|
||
|
eta := start.Add(time.Duration(inflatedMaxRuntime) * time.Second)
|
||
|
for idx, input := range r.settings.Inputs {
|
||
|
if ctx.Err() != nil {
|
||
|
break
|
||
|
}
|
||
|
logger.Infof("Starting measurement with index %d", idx)
|
||
|
r.emitter.Emit(statusMeasurementStart, eventMeasurementGeneric{
|
||
|
Idx: int64(idx),
|
||
|
Input: input,
|
||
|
})
|
||
|
if input != "" && inputCount > 0 {
|
||
|
var percentage float64
|
||
|
if r.settings.Options.MaxRuntime > 0 {
|
||
|
now := time.Now()
|
||
|
percentage = (now.Sub(start).Seconds()/eta.Sub(start).Seconds())*0.6 + 0.4
|
||
|
} else {
|
||
|
percentage = (float64(idx)/float64(inputCount))*0.6 + 0.4
|
||
|
}
|
||
|
r.emitter.EmitStatusProgress(percentage, fmt.Sprintf(
|
||
|
"processing %s", input,
|
||
|
))
|
||
|
}
|
||
|
m, err := experiment.MeasureWithContext(
|
||
|
r.contextForExperiment(ctx, builder),
|
||
|
input,
|
||
|
)
|
||
|
if builder.Interruptible() && ctx.Err() != nil {
|
||
|
// We want to stop here only if interruptible otherwise we want to
|
||
|
// submit measurement and stop at beginning of next iteration
|
||
|
break
|
||
|
}
|
||
|
m.AddAnnotations(r.settings.Annotations)
|
||
|
if err != nil {
|
||
|
r.emitter.Emit(failureMeasurement, eventMeasurementGeneric{
|
||
|
Failure: err.Error(),
|
||
|
Idx: int64(idx),
|
||
|
Input: input,
|
||
|
})
|
||
|
// fallthrough: we want to submit the report anyway
|
||
|
}
|
||
|
data, err := json.Marshal(m)
|
||
|
runtimex.PanicOnError(err, "measurement.MarshalJSON failed")
|
||
|
r.emitter.Emit(measurement, eventMeasurementGeneric{
|
||
|
Idx: int64(idx),
|
||
|
Input: input,
|
||
|
JSONStr: string(data),
|
||
|
})
|
||
|
if !r.settings.Options.NoCollector {
|
||
|
logger.Info("Submitting measurement... please, be patient")
|
||
|
err := experiment.SubmitAndUpdateMeasurement(m)
|
||
|
r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{
|
||
|
Idx: int64(idx),
|
||
|
Input: input,
|
||
|
JSONStr: string(data),
|
||
|
Failure: measurementSubmissionFailure(err),
|
||
|
})
|
||
|
}
|
||
|
r.emitter.Emit(statusMeasurementDone, eventMeasurementGeneric{
|
||
|
Idx: int64(idx),
|
||
|
Input: input,
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func measurementSubmissionEventName(err error) string {
|
||
|
if err != nil {
|
||
|
return failureMeasurementSubmission
|
||
|
}
|
||
|
return statusMeasurementSubmission
|
||
|
}
|
||
|
|
||
|
func measurementSubmissionFailure(err error) string {
|
||
|
if err != nil {
|
||
|
return err.Error()
|
||
|
}
|
||
|
return ""
|
||
|
}
|