351 lines
11 KiB
Go
351 lines
11 KiB
Go
package oonimkall
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/url"
|
|
"time"
|
|
|
|
"github.com/ooni/probe-cli/v3/internal/engine"
|
|
"github.com/ooni/probe-cli/v3/internal/model"
|
|
"github.com/ooni/probe-cli/v3/internal/runtimex"
|
|
)
|
|
|
|
// runnerForTask runs a specific task
|
|
type runnerForTask struct {
|
|
emitter *taskEmitterWrapper
|
|
kvStoreBuilder taskKVStoreFSBuilder
|
|
sessionBuilder taskSessionBuilder
|
|
settings *settings
|
|
}
|
|
|
|
var _ taskRunner = &runnerForTask{}
|
|
|
|
// newRunner creates a new task runner
|
|
func newRunner(settings *settings, emitter taskEmitter) *runnerForTask {
|
|
return &runnerForTask{
|
|
emitter: &taskEmitterWrapper{emitter},
|
|
kvStoreBuilder: &taskKVStoreFSBuilderEngine{},
|
|
sessionBuilder: &taskSessionBuilderEngine{},
|
|
settings: settings,
|
|
}
|
|
}
|
|
|
|
// failureInvalidVersion is the failure returned when Version is invalid
|
|
const failureInvalidVersion = "invalid Settings.Version number"
|
|
|
|
func (r *runnerForTask) hasUnsupportedSettings() bool {
|
|
if r.settings.Version < taskABIVersion {
|
|
r.emitter.EmitFailureStartup(failureInvalidVersion)
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (r *runnerForTask) newsession(ctx context.Context, logger model.Logger) (taskSession, error) {
|
|
kvstore, err := r.kvStoreBuilder.NewFS(r.settings.StateDir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var proxyURL *url.URL
|
|
if r.settings.Proxy != "" {
|
|
var err error
|
|
proxyURL, err = url.Parse(r.settings.Proxy)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
config := engine.SessionConfig{
|
|
KVStore: kvstore,
|
|
Logger: logger,
|
|
ProxyURL: proxyURL,
|
|
SoftwareName: r.settings.Options.SoftwareName,
|
|
SoftwareVersion: r.settings.Options.SoftwareVersion,
|
|
TempDir: r.settings.TempDir,
|
|
TunnelDir: r.settings.TunnelDir,
|
|
}
|
|
if r.settings.Options.ProbeServicesBaseURL != "" {
|
|
config.AvailableProbeServices = []model.OOAPIService{{
|
|
Type: "https",
|
|
Address: r.settings.Options.ProbeServicesBaseURL,
|
|
}}
|
|
}
|
|
return r.sessionBuilder.NewSession(ctx, config)
|
|
}
|
|
|
|
// contextForExperiment ensurs that for measuring we only use an
|
|
// interruptible context when we can interrupt the experiment
|
|
func (r *runnerForTask) contextForExperiment(
|
|
ctx context.Context, builder taskExperimentBuilder,
|
|
) context.Context {
|
|
if builder.Interruptible() {
|
|
return ctx
|
|
}
|
|
return context.Background()
|
|
}
|
|
|
|
type runnerCallbacks struct {
|
|
emitter taskEmitter
|
|
}
|
|
|
|
func (cb *runnerCallbacks) OnProgress(percentage float64, message string) {
|
|
cb.emitter.Emit(eventTypeStatusProgress, eventStatusProgress{
|
|
Percentage: 0.4 + (percentage * 0.6), // open report is 40%
|
|
Message: message,
|
|
})
|
|
}
|
|
|
|
// Run runs the runner until completion. The context argument controls
|
|
// when to stop when processing multiple inputs, as well as when to stop
|
|
// experiments explicitly marked as interruptible.
|
|
func (r *runnerForTask) Run(rootCtx context.Context) {
|
|
// Implementation note: this function uses these contexts:
|
|
//
|
|
// - rootCtx is the root context and is controlled by the user;
|
|
//
|
|
// - measCtx derives from rootCtx and is possibly tied to the
|
|
// maximum runtime and is used to choose when to stop measuring;
|
|
//
|
|
// - submitCtx is like measCtx but, in case we're using a max
|
|
// runtime, is given more time to finish submitting.
|
|
//
|
|
// See https://github.com/ooni/probe/issues/2037.
|
|
var logger model.Logger = newTaskLogger(r.emitter, r.settings.LogLevel)
|
|
r.emitter.Emit(eventTypeStatusQueued, eventEmpty{})
|
|
if r.hasUnsupportedSettings() {
|
|
// event failureStartup already emitted
|
|
return
|
|
}
|
|
r.emitter.Emit(eventTypeStatusStarted, eventEmpty{})
|
|
sess, err := r.newsession(rootCtx, logger)
|
|
if err != nil {
|
|
r.emitter.EmitFailureStartup(err.Error())
|
|
return
|
|
}
|
|
endEvent := new(eventStatusEnd)
|
|
defer func() {
|
|
sess.Close()
|
|
r.emitter.Emit(eventTypeStatusEnd, endEvent)
|
|
}()
|
|
|
|
builder, err := sess.NewExperimentBuilderByName(r.settings.Name)
|
|
if err != nil {
|
|
r.emitter.EmitFailureStartup(err.Error())
|
|
return
|
|
}
|
|
|
|
logger.Info("Looking up OONI backends... please, be patient")
|
|
if err := sess.MaybeLookupBackendsContext(rootCtx); err != nil {
|
|
r.emitter.EmitFailureStartup(err.Error())
|
|
return
|
|
}
|
|
r.emitter.EmitStatusProgress(0.1, "contacted bouncer")
|
|
|
|
logger.Info("Looking up your location... please, be patient")
|
|
if err := sess.MaybeLookupLocationContext(rootCtx); err != nil {
|
|
r.emitter.EmitFailureGeneric(eventTypeFailureIPLookup, err.Error())
|
|
r.emitter.EmitFailureGeneric(eventTypeFailureASNLookup, err.Error())
|
|
r.emitter.EmitFailureGeneric(eventTypeFailureCCLookup, err.Error())
|
|
r.emitter.EmitFailureGeneric(eventTypeFailureResolverLookup, err.Error())
|
|
return
|
|
}
|
|
r.emitter.EmitStatusProgress(0.2, "geoip lookup")
|
|
r.emitter.EmitStatusProgress(0.3, "resolver lookup")
|
|
r.emitter.Emit(eventTypeStatusGeoIPLookup, eventStatusGeoIPLookup{
|
|
ProbeIP: sess.ProbeIP(),
|
|
ProbeASN: sess.ProbeASNString(),
|
|
ProbeCC: sess.ProbeCC(),
|
|
ProbeNetworkName: sess.ProbeNetworkName(),
|
|
})
|
|
r.emitter.Emit(eventTypeStatusResolverLookup, eventStatusResolverLookup{
|
|
ResolverASN: sess.ResolverASNString(),
|
|
ResolverIP: sess.ResolverIP(),
|
|
ResolverNetworkName: sess.ResolverNetworkName(),
|
|
})
|
|
|
|
builder.SetCallbacks(&runnerCallbacks{emitter: r.emitter})
|
|
|
|
// TODO(bassosimone): replace the following code with an
|
|
// invocation of the InputLoader. Since I am making these
|
|
// changes before a release and I've already changed the
|
|
// code a lot, I'd rather avoid changing it even more,
|
|
// for the following reason:
|
|
//
|
|
// If we add an call InputLoader here, this code will
|
|
// magically invoke check-in for InputOrQueryBackend,
|
|
// which we need to make sure the app can handle. This is
|
|
// the main reason why now I don't fill like properly
|
|
// fixing this code and use InputLoader: too much work
|
|
// in too little time, so mistakes more likely.
|
|
//
|
|
// In fact, our current app assumes that it's its
|
|
// responsibility to load the inputs, not oonimkall's.
|
|
switch builder.InputPolicy() {
|
|
case model.InputOrQueryBackend, model.InputStrictlyRequired:
|
|
if len(r.settings.Inputs) <= 0 {
|
|
r.emitter.EmitFailureStartup("no input provided")
|
|
return
|
|
}
|
|
case model.InputOrStaticDefault:
|
|
if len(r.settings.Inputs) <= 0 {
|
|
inputs, err := engine.StaticBareInputForExperiment(r.settings.Name)
|
|
if err != nil {
|
|
r.emitter.EmitFailureStartup("no default static input for this experiment")
|
|
return
|
|
}
|
|
r.settings.Inputs = inputs
|
|
}
|
|
case model.InputOptional:
|
|
if len(r.settings.Inputs) <= 0 {
|
|
r.settings.Inputs = append(r.settings.Inputs, "")
|
|
}
|
|
default: // treat this case as engine.InputNone.
|
|
if len(r.settings.Inputs) > 0 {
|
|
r.emitter.EmitFailureStartup("experiment does not accept input")
|
|
return
|
|
}
|
|
r.settings.Inputs = append(r.settings.Inputs, "")
|
|
}
|
|
experiment := builder.NewExperimentInstance()
|
|
defer func() {
|
|
endEvent.DownloadedKB = experiment.KibiBytesReceived()
|
|
endEvent.UploadedKB = experiment.KibiBytesSent()
|
|
}()
|
|
if !r.settings.Options.NoCollector {
|
|
logger.Info("Opening report... please, be patient")
|
|
if err := experiment.OpenReportContext(rootCtx); err != nil {
|
|
r.emitter.EmitFailureGeneric(eventTypeFailureReportCreate, err.Error())
|
|
return
|
|
}
|
|
r.emitter.EmitStatusProgress(0.4, "open report")
|
|
r.emitter.Emit(eventTypeStatusReportCreate, eventStatusReportGeneric{
|
|
ReportID: experiment.ReportID(),
|
|
})
|
|
}
|
|
measCtx, measCancel := context.WithCancel(rootCtx)
|
|
defer measCancel()
|
|
submitCtx, submitCancel := context.WithCancel(rootCtx)
|
|
defer submitCancel()
|
|
// This deviates a little bit from measurement-kit, for which
|
|
// a zero timeout is actually valid. Since it does not make much
|
|
// sense, here we're changing the behaviour.
|
|
//
|
|
// See https://github.com/measurement-kit/measurement-kit/issues/1922
|
|
if r.settings.Options.MaxRuntime > 0 {
|
|
// We want to honour max_runtime only when we're running an
|
|
// experiment that clearly wants specific input. We could refine
|
|
// this policy in the future, but for now this covers in a
|
|
// reasonable way web connectivity, so we should be ok.
|
|
switch builder.InputPolicy() {
|
|
case model.InputOrQueryBackend, model.InputStrictlyRequired:
|
|
var (
|
|
cancelMeas context.CancelFunc
|
|
cancelSubmit context.CancelFunc
|
|
)
|
|
// We give the context used for submitting extra time so that
|
|
// it's possible to submit the last measurement.
|
|
//
|
|
// See https://github.com/ooni/probe/issues/2037 for more info.
|
|
maxRuntime := time.Duration(r.settings.Options.MaxRuntime) * time.Second
|
|
measCtx, cancelMeas = context.WithTimeout(measCtx, maxRuntime)
|
|
defer cancelMeas()
|
|
maxRuntime += 30 * time.Second
|
|
submitCtx, cancelSubmit = context.WithTimeout(submitCtx, maxRuntime)
|
|
defer cancelSubmit()
|
|
}
|
|
}
|
|
inputCount := len(r.settings.Inputs)
|
|
start := time.Now()
|
|
inflatedMaxRuntime := r.settings.Options.MaxRuntime + r.settings.Options.MaxRuntime/10
|
|
eta := start.Add(time.Duration(inflatedMaxRuntime) * time.Second)
|
|
for idx, input := range r.settings.Inputs {
|
|
if measCtx.Err() != nil {
|
|
break
|
|
}
|
|
logger.Infof("Starting measurement with index %d", idx)
|
|
r.emitter.Emit(eventTypeStatusMeasurementStart, eventMeasurementGeneric{
|
|
Idx: int64(idx),
|
|
Input: input,
|
|
})
|
|
if input != "" && inputCount > 0 {
|
|
var percentage float64
|
|
if r.settings.Options.MaxRuntime > 0 {
|
|
now := time.Now()
|
|
percentage = (now.Sub(start).Seconds()/eta.Sub(start).Seconds())*0.6 + 0.4
|
|
} else {
|
|
percentage = (float64(idx)/float64(inputCount))*0.6 + 0.4
|
|
}
|
|
r.emitter.EmitStatusProgress(percentage, fmt.Sprintf(
|
|
"processing %s", input,
|
|
))
|
|
}
|
|
m, err := experiment.MeasureWithContext(
|
|
r.contextForExperiment(measCtx, builder),
|
|
input,
|
|
)
|
|
if builder.Interruptible() && measCtx.Err() != nil {
|
|
// We want to stop here only if interruptible otherwise we want to
|
|
// submit measurement and stop at beginning of next iteration
|
|
break
|
|
}
|
|
if err != nil {
|
|
r.emitter.Emit(eventTypeFailureMeasurement, eventMeasurementGeneric{
|
|
Failure: err.Error(),
|
|
Idx: int64(idx),
|
|
Input: input,
|
|
})
|
|
// Historical note: here we used to fallthrough but, since we have
|
|
// implemented async measurements, the case where there is an error
|
|
// and we also have a valid measurement cant't happen anymore. So,
|
|
// now the only valid strategy here is to continue.
|
|
continue
|
|
}
|
|
m.AddAnnotations(r.settings.Annotations)
|
|
data, err := json.Marshal(m)
|
|
runtimex.PanicOnError(err, "measurement.MarshalJSON failed")
|
|
r.emitter.Emit(eventTypeMeasurement, eventMeasurementGeneric{
|
|
Idx: int64(idx),
|
|
Input: input,
|
|
JSONStr: string(data),
|
|
})
|
|
if !r.settings.Options.NoCollector {
|
|
logger.Info("Submitting measurement... please, be patient")
|
|
err := experiment.SubmitAndUpdateMeasurementContext(submitCtx, m)
|
|
warnOnFailure(logger, "cannot submit measurement", err)
|
|
r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{
|
|
Idx: int64(idx),
|
|
Input: input,
|
|
JSONStr: string(data),
|
|
Failure: measurementSubmissionFailure(err),
|
|
})
|
|
}
|
|
r.emitter.Emit(eventTypeStatusMeasurementDone, eventMeasurementGeneric{
|
|
Idx: int64(idx),
|
|
Input: input,
|
|
})
|
|
}
|
|
}
|
|
|
|
func warnOnFailure(logger model.Logger, message string, err error) {
|
|
if err != nil {
|
|
logger.Warnf("%s: %s (%+v)", message, err.Error(), err)
|
|
}
|
|
}
|
|
|
|
func measurementSubmissionEventName(err error) string {
|
|
if err != nil {
|
|
return eventTypeFailureMeasurementSubmission
|
|
}
|
|
return eventTypeStatusMeasurementSubmission
|
|
}
|
|
|
|
func measurementSubmissionFailure(err error) string {
|
|
if err != nil {
|
|
return err.Error()
|
|
}
|
|
return ""
|
|
}
|