ooni-probe-cli/pkg/oonimkall/taskrunner.go
Simone Basso ac2e0d718f
[forwardport] fix(oonimkall): ensure we can submit last measurement (#699)
This diff forward ports 018b5de8ce10040b553f0923f70543c1071b954c, whose
original commit message follows:

- - -

The underlying issue causing https://github.com/ooni/probe/issues/2037
is that the final measurement of a web_connectivity run is not
submitted because the context expires while we're submitting it
in most cases.

In turn, this happens because a web_connectivity measurement is not
interrupted midway, since it's not interruptible. This choice is sound
in that we want to finish an in progress measurement. And this is
also why the max_runtime is never 100% accurate.

Yet, once the context is expired, the subsequent submission fails.

Fix the issue by using three contexts. The root context is the one that
the user controls. The measurement context is the one tied to the max
runtime. The submit context is tied to the max runtime plus extra slack
time to ensure we submit the measurement.

With this diff applied, I run the mobile app a couple of times and did
not notice any unsubmitted measurements. Still, more testing is also
probably required to further ensure we've properly fixed.

I'm committing this diff in the release/3.14 branch but we WILL also
need to forward port it into the master branch.

While there, since pkg/oonimkall is a large package, let us create
a doc.go file for keeping the docs.

 Conflicts:
	pkg/oonimkall/task.go
2022-02-23 12:38:58 +01:00

351 lines
11 KiB
Go

package oonimkall
import (
"context"
"encoding/json"
"fmt"
"net/url"
"time"
"github.com/ooni/probe-cli/v3/internal/engine"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)
// runnerForTask runs a specific task
type runnerForTask struct {
emitter *taskEmitterWrapper
kvStoreBuilder taskKVStoreFSBuilder
sessionBuilder taskSessionBuilder
settings *settings
}
var _ taskRunner = &runnerForTask{}
// newRunner creates a new task runner
func newRunner(settings *settings, emitter taskEmitter) *runnerForTask {
return &runnerForTask{
emitter: &taskEmitterWrapper{emitter},
kvStoreBuilder: &taskKVStoreFSBuilderEngine{},
sessionBuilder: &taskSessionBuilderEngine{},
settings: settings,
}
}
// failureInvalidVersion is the failure returned when Version is invalid
const failureInvalidVersion = "invalid Settings.Version number"
func (r *runnerForTask) hasUnsupportedSettings() bool {
if r.settings.Version < taskABIVersion {
r.emitter.EmitFailureStartup(failureInvalidVersion)
return true
}
return false
}
func (r *runnerForTask) newsession(ctx context.Context, logger model.Logger) (taskSession, error) {
kvstore, err := r.kvStoreBuilder.NewFS(r.settings.StateDir)
if err != nil {
return nil, err
}
var proxyURL *url.URL
if r.settings.Proxy != "" {
var err error
proxyURL, err = url.Parse(r.settings.Proxy)
if err != nil {
return nil, err
}
}
config := engine.SessionConfig{
KVStore: kvstore,
Logger: logger,
ProxyURL: proxyURL,
SoftwareName: r.settings.Options.SoftwareName,
SoftwareVersion: r.settings.Options.SoftwareVersion,
TempDir: r.settings.TempDir,
TunnelDir: r.settings.TunnelDir,
}
if r.settings.Options.ProbeServicesBaseURL != "" {
config.AvailableProbeServices = []model.OOAPIService{{
Type: "https",
Address: r.settings.Options.ProbeServicesBaseURL,
}}
}
return r.sessionBuilder.NewSession(ctx, config)
}
// contextForExperiment ensurs that for measuring we only use an
// interruptible context when we can interrupt the experiment
func (r *runnerForTask) contextForExperiment(
ctx context.Context, builder taskExperimentBuilder,
) context.Context {
if builder.Interruptible() {
return ctx
}
return context.Background()
}
type runnerCallbacks struct {
emitter taskEmitter
}
func (cb *runnerCallbacks) OnProgress(percentage float64, message string) {
cb.emitter.Emit(eventTypeStatusProgress, eventStatusProgress{
Percentage: 0.4 + (percentage * 0.6), // open report is 40%
Message: message,
})
}
// Run runs the runner until completion. The context argument controls
// when to stop when processing multiple inputs, as well as when to stop
// experiments explicitly marked as interruptible.
func (r *runnerForTask) Run(rootCtx context.Context) {
// Implementation note: this function uses these contexts:
//
// - rootCtx is the root context and is controlled by the user;
//
// - measCtx derives from rootCtx and is possibly tied to the
// maximum runtime and is used to choose when to stop measuring;
//
// - submitCtx is like measCtx but, in case we're using a max
// runtime, is given more time to finish submitting.
//
// See https://github.com/ooni/probe/issues/2037.
var logger model.Logger = newTaskLogger(r.emitter, r.settings.LogLevel)
r.emitter.Emit(eventTypeStatusQueued, eventEmpty{})
if r.hasUnsupportedSettings() {
// event failureStartup already emitted
return
}
r.emitter.Emit(eventTypeStatusStarted, eventEmpty{})
sess, err := r.newsession(rootCtx, logger)
if err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
}
endEvent := new(eventStatusEnd)
defer func() {
sess.Close()
r.emitter.Emit(eventTypeStatusEnd, endEvent)
}()
builder, err := sess.NewExperimentBuilderByName(r.settings.Name)
if err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
}
logger.Info("Looking up OONI backends... please, be patient")
if err := sess.MaybeLookupBackendsContext(rootCtx); err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
}
r.emitter.EmitStatusProgress(0.1, "contacted bouncer")
logger.Info("Looking up your location... please, be patient")
if err := sess.MaybeLookupLocationContext(rootCtx); err != nil {
r.emitter.EmitFailureGeneric(eventTypeFailureIPLookup, err.Error())
r.emitter.EmitFailureGeneric(eventTypeFailureASNLookup, err.Error())
r.emitter.EmitFailureGeneric(eventTypeFailureCCLookup, err.Error())
r.emitter.EmitFailureGeneric(eventTypeFailureResolverLookup, err.Error())
return
}
r.emitter.EmitStatusProgress(0.2, "geoip lookup")
r.emitter.EmitStatusProgress(0.3, "resolver lookup")
r.emitter.Emit(eventTypeStatusGeoIPLookup, eventStatusGeoIPLookup{
ProbeIP: sess.ProbeIP(),
ProbeASN: sess.ProbeASNString(),
ProbeCC: sess.ProbeCC(),
ProbeNetworkName: sess.ProbeNetworkName(),
})
r.emitter.Emit(eventTypeStatusResolverLookup, eventStatusResolverLookup{
ResolverASN: sess.ResolverASNString(),
ResolverIP: sess.ResolverIP(),
ResolverNetworkName: sess.ResolverNetworkName(),
})
builder.SetCallbacks(&runnerCallbacks{emitter: r.emitter})
// TODO(bassosimone): replace the following code with an
// invocation of the InputLoader. Since I am making these
// changes before a release and I've already changed the
// code a lot, I'd rather avoid changing it even more,
// for the following reason:
//
// If we add an call InputLoader here, this code will
// magically invoke check-in for InputOrQueryBackend,
// which we need to make sure the app can handle. This is
// the main reason why now I don't fill like properly
// fixing this code and use InputLoader: too much work
// in too little time, so mistakes more likely.
//
// In fact, our current app assumes that it's its
// responsibility to load the inputs, not oonimkall's.
switch builder.InputPolicy() {
case engine.InputOrQueryBackend, engine.InputStrictlyRequired:
if len(r.settings.Inputs) <= 0 {
r.emitter.EmitFailureStartup("no input provided")
return
}
case engine.InputOrStaticDefault:
if len(r.settings.Inputs) <= 0 {
inputs, err := engine.StaticBareInputForExperiment(r.settings.Name)
if err != nil {
r.emitter.EmitFailureStartup("no default static input for this experiment")
return
}
r.settings.Inputs = inputs
}
case engine.InputOptional:
if len(r.settings.Inputs) <= 0 {
r.settings.Inputs = append(r.settings.Inputs, "")
}
default: // treat this case as engine.InputNone.
if len(r.settings.Inputs) > 0 {
r.emitter.EmitFailureStartup("experiment does not accept input")
return
}
r.settings.Inputs = append(r.settings.Inputs, "")
}
experiment := builder.NewExperimentInstance()
defer func() {
endEvent.DownloadedKB = experiment.KibiBytesReceived()
endEvent.UploadedKB = experiment.KibiBytesSent()
}()
if !r.settings.Options.NoCollector {
logger.Info("Opening report... please, be patient")
if err := experiment.OpenReportContext(rootCtx); err != nil {
r.emitter.EmitFailureGeneric(eventTypeFailureReportCreate, err.Error())
return
}
r.emitter.EmitStatusProgress(0.4, "open report")
r.emitter.Emit(eventTypeStatusReportCreate, eventStatusReportGeneric{
ReportID: experiment.ReportID(),
})
}
measCtx, measCancel := context.WithCancel(rootCtx)
defer measCancel()
submitCtx, submitCancel := context.WithCancel(rootCtx)
defer submitCancel()
// This deviates a little bit from measurement-kit, for which
// a zero timeout is actually valid. Since it does not make much
// sense, here we're changing the behaviour.
//
// See https://github.com/measurement-kit/measurement-kit/issues/1922
if r.settings.Options.MaxRuntime > 0 {
// We want to honour max_runtime only when we're running an
// experiment that clearly wants specific input. We could refine
// this policy in the future, but for now this covers in a
// reasonable way web connectivity, so we should be ok.
switch builder.InputPolicy() {
case engine.InputOrQueryBackend, engine.InputStrictlyRequired:
var (
cancelMeas context.CancelFunc
cancelSubmit context.CancelFunc
)
// We give the context used for submitting extra time so that
// it's possible to submit the last measurement.
//
// See https://github.com/ooni/probe/issues/2037 for more info.
maxRuntime := time.Duration(r.settings.Options.MaxRuntime) * time.Second
measCtx, cancelMeas = context.WithTimeout(measCtx, maxRuntime)
defer cancelMeas()
maxRuntime += 30 * time.Second
submitCtx, cancelSubmit = context.WithTimeout(submitCtx, maxRuntime)
defer cancelSubmit()
}
}
inputCount := len(r.settings.Inputs)
start := time.Now()
inflatedMaxRuntime := r.settings.Options.MaxRuntime + r.settings.Options.MaxRuntime/10
eta := start.Add(time.Duration(inflatedMaxRuntime) * time.Second)
for idx, input := range r.settings.Inputs {
if measCtx.Err() != nil {
break
}
logger.Infof("Starting measurement with index %d", idx)
r.emitter.Emit(eventTypeStatusMeasurementStart, eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
})
if input != "" && inputCount > 0 {
var percentage float64
if r.settings.Options.MaxRuntime > 0 {
now := time.Now()
percentage = (now.Sub(start).Seconds()/eta.Sub(start).Seconds())*0.6 + 0.4
} else {
percentage = (float64(idx)/float64(inputCount))*0.6 + 0.4
}
r.emitter.EmitStatusProgress(percentage, fmt.Sprintf(
"processing %s", input,
))
}
m, err := experiment.MeasureWithContext(
r.contextForExperiment(measCtx, builder),
input,
)
if builder.Interruptible() && measCtx.Err() != nil {
// We want to stop here only if interruptible otherwise we want to
// submit measurement and stop at beginning of next iteration
break
}
m.AddAnnotations(r.settings.Annotations)
if err != nil {
r.emitter.Emit(eventTypeFailureMeasurement, eventMeasurementGeneric{
Failure: err.Error(),
Idx: int64(idx),
Input: input,
})
// Historical note: here we used to fallthrough but, since we have
// implemented async measurements, the case where there is an error
// and we also have a valid measurement cant't happen anymore. So,
// now the only valid strategy here is to continue.
continue
}
data, err := json.Marshal(m)
runtimex.PanicOnError(err, "measurement.MarshalJSON failed")
r.emitter.Emit(eventTypeMeasurement, eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
JSONStr: string(data),
})
if !r.settings.Options.NoCollector {
logger.Info("Submitting measurement... please, be patient")
err := experiment.SubmitAndUpdateMeasurementContext(submitCtx, m)
warnOnFailure(logger, "cannot submit measurement", err)
r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
JSONStr: string(data),
Failure: measurementSubmissionFailure(err),
})
}
r.emitter.Emit(eventTypeStatusMeasurementDone, eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
})
}
}
func warnOnFailure(logger model.Logger, message string, err error) {
if err != nil {
logger.Warnf("%s: %s (%+v)", message, err.Error(), err)
}
}
func measurementSubmissionEventName(err error) string {
if err != nil {
return eventTypeFailureMeasurementSubmission
}
return eventTypeStatusMeasurementSubmission
}
func measurementSubmissionFailure(err error) string {
if err != nil {
return err.Error()
}
return ""
}