[forwardport] fix(oonimkall): ensure we can submit last measurement (#699)

This diff forward ports 018b5de8ce10040b553f0923f70543c1071b954c, whose
original commit message follows:

- - -

The underlying issue causing https://github.com/ooni/probe/issues/2037
is that the final measurement of a web_connectivity run is not
submitted because the context expires while we're submitting it
in most cases.

In turn, this happens because a web_connectivity measurement is not
interrupted midway, since it's not interruptible. This choice is sound
in that we want to finish an in progress measurement. And this is
also why the max_runtime is never 100% accurate.

Yet, once the context is expired, the subsequent submission fails.

Fix the issue by using three contexts. The root context is the one that
the user controls. The measurement context is the one tied to the max
runtime. The submit context is tied to the max runtime plus extra slack
time to ensure we submit the measurement.

With this diff applied, I run the mobile app a couple of times and did
not notice any unsubmitted measurements. Still, more testing is also
probably required to further ensure we've properly fixed.

I'm committing this diff in the release/3.14 branch but we WILL also
need to forward port it into the master branch.

While there, since pkg/oonimkall is a large package, let us create
a doc.go file for keeping the docs.

 Conflicts:
	pkg/oonimkall/task.go
This commit is contained in:
Simone Basso 2022-02-23 12:38:58 +01:00 committed by GitHub
parent ce401272e8
commit ac2e0d718f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 94 additions and 60 deletions

48
pkg/oonimkall/doc.go Normal file
View File

@ -0,0 +1,48 @@
// Package oonimkall implements APIs used by OONI mobile apps. We
// expose these APIs to mobile apps using gomobile.
//
// We expose two APIs: the task API, which is derived from the
// API originally exposed by Measurement Kit, and the session API,
// which is a Go API that mobile apps can use via `gomobile`.
//
// This package is named oonimkall because it contains a partial
// reimplementation of the mkall API implemented by Measurement Kit
// in, e.g., https://github.com/measurement-kit/mkall-ios.
//
// Semantic versioning policy
//
// This package is public for technical reasons. We cannot use `go
// mobile` on a private package. Yet, we are not going to bump this
// repository's major number in case we change oonimkall's API. We
// consider this package our private API for interfacing with our
// mobile applications for Android and iOS.
//
// Task API
//
// The basic tenet of the task API is that you define an experiment
// task you wanna run using a JSON, then you start a task for it, and
// you receive events as serialized JSONs. In addition to this
// functionality, we also include extra APIs used by OONI mobile.
//
// The task API was first defined in Measurement Kit v0.9.0. In this
// context, it was called "the FFI API". The API we expose here is not
// strictly an FFI API, but is close enough for the purpose of using
// OONI from Android and iOS. See https://git.io/Jv4Rv
// (measurement-kit/measurement-kit@v0.10.9) for a comprehensive
// description of MK's FFI API.
//
// See also https://github.com/ooni/probe-engine/pull/347 for the
// design document describing the task API.
//
// See also https://github.com/ooni/probe-cli/v3/internal/engine/blob/master/DESIGN.md,
// which explains why we implemented the oonimkall API.
//
// Session API
//
// The Session API is a Go API that can be exported to mobile apps
// using the gomobile tool. The latest design document for this API is
// at https://github.com/ooni/probe-engine/pull/954.
//
// The basic tenet of the session API is that you create an instance
// of `Session` and use it to perform the operations you need.
package oonimkall

View File

@ -1,50 +1,3 @@
// Package oonimkall implements APIs used by OONI mobile apps. We
// expose these APIs to mobile apps using gomobile.
//
// We expose two APIs: the task API, which is derived from the
// API originally exposed by Measurement Kit, and the session API,
// which is a Go API that mobile apps can use via `gomobile`.
//
// This package is named oonimkall because it contains a partial
// reimplementation of the mkall API implemented by Measurement Kit
// in, e.g., https://github.com/measurement-kit/mkall-ios.
//
// Semantic versioning policy
//
// This package is public for technical reasons. We cannot use `go
// mobile` on a private package. Yet, we are not going to bump this
// repository's major number in case we change oonimkall's API. We
// consider this package our private API for interfacing with our
// mobile applications for Android and iOS.
//
// Task API
//
// The basic tenet of the task API is that you define an experiment
// task you wanna run using a JSON, then you start a task for it, and
// you receive events as serialized JSONs. In addition to this
// functionality, we also include extra APIs used by OONI mobile.
//
// The task API was first defined in Measurement Kit v0.9.0. In this
// context, it was called "the FFI API". The API we expose here is not
// strictly an FFI API, but is close enough for the purpose of using
// OONI from Android and iOS. See https://git.io/Jv4Rv
// (measurement-kit/measurement-kit@v0.10.9) for a comprehensive
// description of MK's FFI API.
//
// See also https://github.com/ooni/probe-engine/pull/347 for the
// design document describing the task API.
//
// See also https://github.com/ooni/probe-cli/v3/internal/engine/blob/master/DESIGN.md,
// which explains why we implemented the oonimkall API.
//
// Session API
//
// The Session API is a Go API that can be exported to mobile apps
// using the gomobile tool. The latest design document for this API is
// at https://github.com/ooni/probe-engine/pull/954.
//
// The basic tenet of the session API is that you create an instance
// of `Session` and use it to perform the operations you need.
package oonimkall
import (

View File

@ -76,6 +76,8 @@ func (r *runnerForTask) newsession(ctx context.Context, logger model.Logger) (ta
return r.sessionBuilder.NewSession(ctx, config)
}
// contextForExperiment ensurs that for measuring we only use an
// interruptible context when we can interrupt the experiment
func (r *runnerForTask) contextForExperiment(
ctx context.Context, builder taskExperimentBuilder,
) context.Context {
@ -99,7 +101,18 @@ func (cb *runnerCallbacks) OnProgress(percentage float64, message string) {
// Run runs the runner until completion. The context argument controls
// when to stop when processing multiple inputs, as well as when to stop
// experiments explicitly marked as interruptible.
func (r *runnerForTask) Run(ctx context.Context) {
func (r *runnerForTask) Run(rootCtx context.Context) {
// Implementation note: this function uses these contexts:
//
// - rootCtx is the root context and is controlled by the user;
//
// - measCtx derives from rootCtx and is possibly tied to the
// maximum runtime and is used to choose when to stop measuring;
//
// - submitCtx is like measCtx but, in case we're using a max
// runtime, is given more time to finish submitting.
//
// See https://github.com/ooni/probe/issues/2037.
var logger model.Logger = newTaskLogger(r.emitter, r.settings.LogLevel)
r.emitter.Emit(eventTypeStatusQueued, eventEmpty{})
if r.hasUnsupportedSettings() {
@ -107,7 +120,7 @@ func (r *runnerForTask) Run(ctx context.Context) {
return
}
r.emitter.Emit(eventTypeStatusStarted, eventEmpty{})
sess, err := r.newsession(ctx, logger)
sess, err := r.newsession(rootCtx, logger)
if err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
@ -125,14 +138,14 @@ func (r *runnerForTask) Run(ctx context.Context) {
}
logger.Info("Looking up OONI backends... please, be patient")
if err := sess.MaybeLookupBackendsContext(ctx); err != nil {
if err := sess.MaybeLookupBackendsContext(rootCtx); err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
}
r.emitter.EmitStatusProgress(0.1, "contacted bouncer")
logger.Info("Looking up your location... please, be patient")
if err := sess.MaybeLookupLocationContext(ctx); err != nil {
if err := sess.MaybeLookupLocationContext(rootCtx); err != nil {
r.emitter.EmitFailureGeneric(eventTypeFailureIPLookup, err.Error())
r.emitter.EmitFailureGeneric(eventTypeFailureASNLookup, err.Error())
r.emitter.EmitFailureGeneric(eventTypeFailureCCLookup, err.Error())
@ -203,7 +216,7 @@ func (r *runnerForTask) Run(ctx context.Context) {
}()
if !r.settings.Options.NoCollector {
logger.Info("Opening report... please, be patient")
if err := experiment.OpenReportContext(ctx); err != nil {
if err := experiment.OpenReportContext(rootCtx); err != nil {
r.emitter.EmitFailureGeneric(eventTypeFailureReportCreate, err.Error())
return
}
@ -212,6 +225,10 @@ func (r *runnerForTask) Run(ctx context.Context) {
ReportID: experiment.ReportID(),
})
}
measCtx, measCancel := context.WithCancel(rootCtx)
defer measCancel()
submitCtx, submitCancel := context.WithCancel(rootCtx)
defer submitCancel()
// This deviates a little bit from measurement-kit, for which
// a zero timeout is actually valid. Since it does not make much
// sense, here we're changing the behaviour.
@ -224,11 +241,20 @@ func (r *runnerForTask) Run(ctx context.Context) {
// reasonable way web connectivity, so we should be ok.
switch builder.InputPolicy() {
case engine.InputOrQueryBackend, engine.InputStrictlyRequired:
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(
ctx, time.Duration(r.settings.Options.MaxRuntime)*time.Second,
var (
cancelMeas context.CancelFunc
cancelSubmit context.CancelFunc
)
defer cancel()
// We give the context used for submitting extra time so that
// it's possible to submit the last measurement.
//
// See https://github.com/ooni/probe/issues/2037 for more info.
maxRuntime := time.Duration(r.settings.Options.MaxRuntime) * time.Second
measCtx, cancelMeas = context.WithTimeout(measCtx, maxRuntime)
defer cancelMeas()
maxRuntime += 30 * time.Second
submitCtx, cancelSubmit = context.WithTimeout(submitCtx, maxRuntime)
defer cancelSubmit()
}
}
inputCount := len(r.settings.Inputs)
@ -236,7 +262,7 @@ func (r *runnerForTask) Run(ctx context.Context) {
inflatedMaxRuntime := r.settings.Options.MaxRuntime + r.settings.Options.MaxRuntime/10
eta := start.Add(time.Duration(inflatedMaxRuntime) * time.Second)
for idx, input := range r.settings.Inputs {
if ctx.Err() != nil {
if measCtx.Err() != nil {
break
}
logger.Infof("Starting measurement with index %d", idx)
@ -257,10 +283,10 @@ func (r *runnerForTask) Run(ctx context.Context) {
))
}
m, err := experiment.MeasureWithContext(
r.contextForExperiment(ctx, builder),
r.contextForExperiment(measCtx, builder),
input,
)
if builder.Interruptible() && ctx.Err() != nil {
if builder.Interruptible() && measCtx.Err() != nil {
// We want to stop here only if interruptible otherwise we want to
// submit measurement and stop at beginning of next iteration
break
@ -287,7 +313,8 @@ func (r *runnerForTask) Run(ctx context.Context) {
})
if !r.settings.Options.NoCollector {
logger.Info("Submitting measurement... please, be patient")
err := experiment.SubmitAndUpdateMeasurementContext(ctx, m)
err := experiment.SubmitAndUpdateMeasurementContext(submitCtx, m)
warnOnFailure(logger, "cannot submit measurement", err)
r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
@ -302,6 +329,12 @@ func (r *runnerForTask) Run(ctx context.Context) {
}
}
func warnOnFailure(logger model.Logger, message string, err error) {
if err != nil {
logger.Warnf("%s: %s (%+v)", message, err.Error(), err)
}
}
func measurementSubmissionEventName(err error) string {
if err != nil {
return eventTypeFailureMeasurementSubmission