ooni-probe-cli/pkg/oonimkall/task.go
Simone Basso ac2e0d718f
[forwardport] fix(oonimkall): ensure we can submit last measurement (#699)
This diff forward ports 018b5de8ce10040b553f0923f70543c1071b954c, whose
original commit message follows:

- - -

The underlying issue causing https://github.com/ooni/probe/issues/2037
is that the final measurement of a web_connectivity run is not
submitted because the context expires while we're submitting it
in most cases.

In turn, this happens because a web_connectivity measurement is not
interrupted midway, since it's not interruptible. This choice is sound
in that we want to finish an in progress measurement. And this is
also why the max_runtime is never 100% accurate.

Yet, once the context is expired, the subsequent submission fails.

Fix the issue by using three contexts. The root context is the one that
the user controls. The measurement context is the one tied to the max
runtime. The submit context is tied to the max runtime plus extra slack
time to ensure we submit the measurement.

With this diff applied, I run the mobile app a couple of times and did
not notice any unsubmitted measurements. Still, more testing is also
probably required to further ensure we've properly fixed.

I'm committing this diff in the release/3.14 branch but we WILL also
need to forward port it into the master branch.

While there, since pkg/oonimkall is a large package, let us create
a doc.go file for keeping the docs.

 Conflicts:
	pkg/oonimkall/task.go
2022-02-23 12:38:58 +01:00

84 lines
2.3 KiB
Go

package oonimkall
import (
"context"
"encoding/json"
"github.com/ooni/probe-cli/v3/internal/atomicx"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)
// Task is an asynchronous task running an experiment. It mimics the
// namesake concept initially implemented in Measurement Kit.
//
// Future directions
//
// Currently Task and Session are two unrelated APIs. As part of
// evolving the APIs with which apps interact with the engine, we
// will modify Task to run in the context of a Session. We will
// do that to save extra lookups and to allow several experiments
// running as subsequent Tasks to reuse the Session connections
// created with the OONI probe services backends.
type Task struct {
cancel context.CancelFunc
isdone *atomicx.Int64
isstarted chan interface{} // for testing
isstopped chan interface{} // for testing
out chan *event
}
// StartTask starts an asynchronous task. The input argument is a
// serialized JSON conforming to MK v0.10.9's API.
func StartTask(input string) (*Task, error) {
var settings settings
if err := json.Unmarshal([]byte(input), &settings); err != nil {
return nil, err
}
const bufsiz = 128 // common case: we don't want runner to block
ctx, cancel := context.WithCancel(context.Background())
task := &Task{
cancel: cancel,
isdone: &atomicx.Int64{},
isstarted: make(chan interface{}),
isstopped: make(chan interface{}),
out: make(chan *event, bufsiz),
}
go func() {
close(task.isstarted)
emitter := newTaskEmitterUsingChan(task.out)
r := newRunner(&settings, emitter)
r.Run(ctx)
task.out <- nil // signal that we're done w/o closing the channel
emitter.Close()
close(task.isstopped)
}()
return task, nil
}
// WaitForNextEvent blocks until the next event occurs. The returned
// string is a serialized JSON following MK v0.10.9's API.
func (t *Task) WaitForNextEvent() string {
const terminated = `{"key":"task_terminated","value":{}}` // like MK
if t.isdone.Load() != 0 {
return terminated
}
evp := <-t.out
if evp == nil {
t.isdone.Add(1)
return terminated
}
data, err := json.Marshal(evp)
runtimex.PanicOnError(err, "json.Marshal failed")
return string(data)
}
// IsDone returns true if the task is done.
func (t *Task) IsDone() bool {
return t.isdone.Load() != 0
}
// Interrupt interrupts the task.
func (t *Task) Interrupt() {
t.cancel()
}