ac2e0d718f
This diff forward ports 018b5de8ce10040b553f0923f70543c1071b954c, whose original commit message follows: - - - The underlying issue causing https://github.com/ooni/probe/issues/2037 is that the final measurement of a web_connectivity run is not submitted because the context expires while we're submitting it in most cases. In turn, this happens because a web_connectivity measurement is not interrupted midway, since it's not interruptible. This choice is sound in that we want to finish an in progress measurement. And this is also why the max_runtime is never 100% accurate. Yet, once the context is expired, the subsequent submission fails. Fix the issue by using three contexts. The root context is the one that the user controls. The measurement context is the one tied to the max runtime. The submit context is tied to the max runtime plus extra slack time to ensure we submit the measurement. With this diff applied, I run the mobile app a couple of times and did not notice any unsubmitted measurements. Still, more testing is also probably required to further ensure we've properly fixed. I'm committing this diff in the release/3.14 branch but we WILL also need to forward port it into the master branch. While there, since pkg/oonimkall is a large package, let us create a doc.go file for keeping the docs. Conflicts: pkg/oonimkall/task.go
84 lines
2.3 KiB
Go
84 lines
2.3 KiB
Go
package oonimkall
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
|
|
"github.com/ooni/probe-cli/v3/internal/atomicx"
|
|
"github.com/ooni/probe-cli/v3/internal/runtimex"
|
|
)
|
|
|
|
// Task is an asynchronous task running an experiment. It mimics the
|
|
// namesake concept initially implemented in Measurement Kit.
|
|
//
|
|
// Future directions
|
|
//
|
|
// Currently Task and Session are two unrelated APIs. As part of
|
|
// evolving the APIs with which apps interact with the engine, we
|
|
// will modify Task to run in the context of a Session. We will
|
|
// do that to save extra lookups and to allow several experiments
|
|
// running as subsequent Tasks to reuse the Session connections
|
|
// created with the OONI probe services backends.
|
|
type Task struct {
|
|
cancel context.CancelFunc
|
|
isdone *atomicx.Int64
|
|
isstarted chan interface{} // for testing
|
|
isstopped chan interface{} // for testing
|
|
out chan *event
|
|
}
|
|
|
|
// StartTask starts an asynchronous task. The input argument is a
|
|
// serialized JSON conforming to MK v0.10.9's API.
|
|
func StartTask(input string) (*Task, error) {
|
|
var settings settings
|
|
if err := json.Unmarshal([]byte(input), &settings); err != nil {
|
|
return nil, err
|
|
}
|
|
const bufsiz = 128 // common case: we don't want runner to block
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
task := &Task{
|
|
cancel: cancel,
|
|
isdone: &atomicx.Int64{},
|
|
isstarted: make(chan interface{}),
|
|
isstopped: make(chan interface{}),
|
|
out: make(chan *event, bufsiz),
|
|
}
|
|
go func() {
|
|
close(task.isstarted)
|
|
emitter := newTaskEmitterUsingChan(task.out)
|
|
r := newRunner(&settings, emitter)
|
|
r.Run(ctx)
|
|
task.out <- nil // signal that we're done w/o closing the channel
|
|
emitter.Close()
|
|
close(task.isstopped)
|
|
}()
|
|
return task, nil
|
|
}
|
|
|
|
// WaitForNextEvent blocks until the next event occurs. The returned
|
|
// string is a serialized JSON following MK v0.10.9's API.
|
|
func (t *Task) WaitForNextEvent() string {
|
|
const terminated = `{"key":"task_terminated","value":{}}` // like MK
|
|
if t.isdone.Load() != 0 {
|
|
return terminated
|
|
}
|
|
evp := <-t.out
|
|
if evp == nil {
|
|
t.isdone.Add(1)
|
|
return terminated
|
|
}
|
|
data, err := json.Marshal(evp)
|
|
runtimex.PanicOnError(err, "json.Marshal failed")
|
|
return string(data)
|
|
}
|
|
|
|
// IsDone returns true if the task is done.
|
|
func (t *Task) IsDone() bool {
|
|
return t.isdone.Load() != 0
|
|
}
|
|
|
|
// Interrupt interrupts the task.
|
|
func (t *Task) Interrupt() {
|
|
t.cancel()
|
|
}
|