fix(oonimkall): improve channel management pattern (#621)

1. add eof channel to event emitter and use this channel as signal
that we shouldn't be sending anymore instead of using a pattern where
we use a timer to decide sending has timed out (because we're using
a buffered channel, it is still possible for some evetns to end up
in the channel if there is space, but this is not a concern, because
the events will be deleted when the channel itself is gone);

2. refactor all tests where we assumed the output channel was closed
to actually use a parallel "eof" channel and use it as signal we
should not be sending anymore (not strictly required but still the
right thing to do in terms of using consistent patterns);

3. modify how we construct a runner so that it passes to the
event emitter an "eof" channel and closes this channel when the
main goroutine running the task is terminating;

4. modify the task to signal events such as "task goroutine started"
and "task goroutine stopped" using channels, which helps to write
much more correct tests;

5. take advantage of the previous change to improve the test that
ensures we're not blocking for a small number of events and also
improve the name of such a test to reflect what it's testing.

The related issue in term of fixing the channel usage pattern is
https://github.com/ooni/probe/issues/1438.

Regarding improving testability, instead, the correct reference
issue is https://github.com/ooni/probe/issues/1903.

There are possibly more changes to apply here to improve this package
and its testability, but let's land this diff first and then see
how much time is left for further improvements.

I've run unit and integration tests with `-race` locally.

This diff will need to be backported to `release/3.11`.
This commit is contained in:
Simone Basso 2021-12-01 15:40:25 +01:00 committed by GitHub
parent c4eb682606
commit 120f2b9fbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 98 additions and 67 deletions

View File

@ -1,16 +1,16 @@
package oonimkall package oonimkall
import "time"
// eventEmitter emits event on a channel // eventEmitter emits event on a channel
type eventEmitter struct { type eventEmitter struct {
disabled map[string]bool disabled map[string]bool
eof <-chan interface{}
out chan<- *event out chan<- *event
} }
// newEventEmitter creates a new Emitter // newEventEmitter creates a new Emitter
func newEventEmitter(disabledEvents []string, out chan<- *event) *eventEmitter { func newEventEmitter(disabledEvents []string, out chan<- *event,
ee := &eventEmitter{out: out} eof <-chan interface{}) *eventEmitter {
ee := &eventEmitter{eof: eof, out: out}
ee.disabled = make(map[string]bool) ee.disabled = make(map[string]bool)
for _, eventname := range disabledEvents { for _, eventname := range disabledEvents {
ee.disabled[eventname] = true ee.disabled[eventname] = true
@ -38,13 +38,10 @@ func (ee *eventEmitter) Emit(key string, value interface{}) {
if ee.disabled[key] { if ee.disabled[key] {
return return
} }
const maxSendTimeout = 250 * time.Millisecond // Prevent this goroutine from blocking on `ee.out` if the caller
timer := time.NewTimer(maxSendTimeout) // has already told us it's not going to accept more events.
defer timer.Stop()
select { select {
case ee.out <- &event{Key: key, Value: value}: case ee.out <- &event{Key: key, Value: value}:
// good, we've been able to send the new event case <-ee.eof:
case <-timer.C:
// oops, we've timed out sending
} }
} }

View File

@ -4,15 +4,22 @@ import "testing"
func TestDisabledEvents(t *testing.T) { func TestDisabledEvents(t *testing.T) {
out := make(chan *event) out := make(chan *event)
emitter := newEventEmitter([]string{"log"}, out) eof := make(chan interface{})
emitter := newEventEmitter([]string{"log"}, out, eof)
go func() { go func() {
emitter.Emit("log", eventLog{Message: "foo"}) emitter.Emit("log", eventLog{Message: "foo"})
close(out) close(eof)
}() }()
var count int64 var count int64
for ev := range out { Loop:
if ev.Key == "log" { for {
count++ select {
case ev := <-out:
if ev.Key == "log" {
count++
}
case <-eof:
break Loop
} }
} }
if count > 0 { if count > 0 {
@ -22,18 +29,25 @@ func TestDisabledEvents(t *testing.T) {
func TestEmitFailureStartup(t *testing.T) { func TestEmitFailureStartup(t *testing.T) {
out := make(chan *event) out := make(chan *event)
emitter := newEventEmitter([]string{}, out) eof := make(chan interface{})
emitter := newEventEmitter([]string{}, out, eof)
go func() { go func() {
emitter.EmitFailureStartup("mocked error") emitter.EmitFailureStartup("mocked error")
close(out) close(eof)
}() }()
var found bool var found bool
for ev := range out { Loop:
if ev.Key == "failure.startup" { for {
evv := ev.Value.(eventFailure) // panic if not castable select {
if evv.Failure == "mocked error" { case ev := <-out:
found = true if ev.Key == "failure.startup" {
evv := ev.Value.(eventFailure) // panic if not castable
if evv.Failure == "mocked error" {
found = true
}
} }
case <-eof:
break Loop
} }
} }
if !found { if !found {
@ -43,18 +57,25 @@ func TestEmitFailureStartup(t *testing.T) {
func TestEmitStatusProgress(t *testing.T) { func TestEmitStatusProgress(t *testing.T) {
out := make(chan *event) out := make(chan *event)
emitter := newEventEmitter([]string{}, out) eof := make(chan interface{})
emitter := newEventEmitter([]string{}, out, eof)
go func() { go func() {
emitter.EmitStatusProgress(0.7, "foo") emitter.EmitStatusProgress(0.7, "foo")
close(out) close(eof)
}() }()
var found bool var found bool
for ev := range out { Loop:
if ev.Key == "status.progress" { for {
evv := ev.Value.(eventStatusProgress) // panic if not castable select {
if evv.Message == "foo" && evv.Percentage == 0.7 { case ev := <-out:
found = true if ev.Key == "status.progress" {
evv := ev.Value.(eventStatusProgress) // panic if not castable
if evv.Message == "foo" && evv.Percentage == 0.7 {
found = true
}
} }
case <-eof:
break Loop
} }
} }
if !found { if !found {

View File

@ -38,7 +38,9 @@ const (
// run runs the task specified by settings.Name until completion. This is the // run runs the task specified by settings.Name until completion. This is the
// top-level API that should be called by oonimkall. // top-level API that should be called by oonimkall.
func run(ctx context.Context, settings *settings, out chan<- *event) { func run(ctx context.Context, settings *settings, out chan<- *event) {
r := newRunner(settings, out) eof := make(chan interface{})
defer close(eof) // tell the emitter to not emit anymore.
r := newRunner(settings, out, eof)
r.Run(ctx) r.Run(ctx)
} }
@ -51,9 +53,9 @@ type runner struct {
} }
// newRunner creates a new task runner // newRunner creates a new task runner
func newRunner(settings *settings, out chan<- *event) *runner { func newRunner(settings *settings, out chan<- *event, eof <-chan interface{}) *runner {
return &runner{ return &runner{
emitter: newEventEmitter(settings.DisabledEvents, out), emitter: newEventEmitter(settings.DisabledEvents, out, eof),
out: out, out: out,
settings: settings, settings: settings,
} }

View File

@ -44,32 +44,39 @@ func TestRunnerMaybeLookupLocationFailure(t *testing.T) {
Version: 1, Version: 1,
} }
seench := make(chan int64) seench := make(chan int64)
eof := make(chan interface{})
go func() { go func() {
var seen int64 var seen int64
for ev := range out { Loop:
switch ev.Key { for {
case "failure.ip_lookup", "failure.asn_lookup", select {
"failure.cc_lookup", "failure.resolver_lookup": case ev := <-out:
seen++ switch ev.Key {
case "status.progress": case "failure.ip_lookup", "failure.asn_lookup",
evv := ev.Value.(eventStatusProgress) "failure.cc_lookup", "failure.resolver_lookup":
if evv.Percentage >= 0.2 { seen++
panic(fmt.Sprintf("too much progress: %+v", ev)) case "status.progress":
evv := ev.Value.(eventStatusProgress)
if evv.Percentage >= 0.2 {
panic(fmt.Sprintf("too much progress: %+v", ev))
}
case "status.queued", "status.started", "status.end":
default:
panic(fmt.Sprintf("unexpected key: %s - %+v", ev.Key, ev.Value))
} }
case "status.queued", "status.started", "status.end": case <-eof:
default: break Loop
panic(fmt.Sprintf("unexpected key: %s - %+v", ev.Key, ev.Value))
} }
} }
seench <- seen seench <- seen
}() }()
expected := errors.New("mocked error") expected := errors.New("mocked error")
r := newRunner(settings, out) r := newRunner(settings, out, eof)
r.maybeLookupLocation = func(*engine.Session) error { r.maybeLookupLocation = func(*engine.Session) error {
return expected return expected
} }
r.Run(context.Background()) r.Run(context.Background())
close(out) close(eof)
if n := <-seench; n != 4 { if n := <-seench; n != 4 {
t.Fatal("unexpected number of events") t.Fatal("unexpected number of events")
} }

View File

@ -59,9 +59,11 @@ import (
// running as subsequent Tasks to reuse the Session connections // running as subsequent Tasks to reuse the Session connections
// created with the OONI probe services backends. // created with the OONI probe services backends.
type Task struct { type Task struct {
cancel context.CancelFunc cancel context.CancelFunc
isdone *atomicx.Int64 isdone *atomicx.Int64
out chan *event isstarted chan interface{} // for testing
isstopped chan interface{} // for testing
out chan *event
} }
// StartTask starts an asynchronous task. The input argument is a // StartTask starts an asynchronous task. The input argument is a
@ -74,13 +76,17 @@ func StartTask(input string) (*Task, error) {
const bufsiz = 128 // common case: we don't want runner to block const bufsiz = 128 // common case: we don't want runner to block
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
task := &Task{ task := &Task{
cancel: cancel, cancel: cancel,
isdone: &atomicx.Int64{}, isdone: &atomicx.Int64{},
out: make(chan *event, bufsiz), isstarted: make(chan interface{}),
isstopped: make(chan interface{}),
out: make(chan *event, bufsiz),
} }
go func() { go func() {
close(task.isstarted)
run(ctx, &settings, task.out) run(ctx, &settings, task.out)
task.out <- nil // signal that we're done w/o closing the channel task.out <- nil // signal that we're done w/o closing the channel
close(task.isstopped)
}() }()
return task, nil return task, nil
} }
@ -107,10 +113,6 @@ func (t *Task) IsDone() bool {
return t.isdone.Load() != 0 return t.isdone.Load() != 0
} }
func (t *Task) isRunning() bool {
return !t.IsDone()
}
// Interrupt interrupts the task. // Interrupt interrupts the task.
func (t *Task) Interrupt() { func (t *Task) Interrupt() {
t.cancel() t.cancel()

View File

@ -290,7 +290,7 @@ func TestInterruptExampleWithInput(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
t.Skip("Skipping broken test; see https://github.com/ooni/probe-cli/v3/internal/engine/issues/992") t.Skip("Skipping broken test; see https://github.com/ooni/probe-engine/issues/992")
task, err := StartTask(`{ task, err := StartTask(`{
"assets_dir": "../testdata/oonimkall/assets", "assets_dir": "../testdata/oonimkall/assets",
"inputs": [ "inputs": [
@ -494,7 +494,9 @@ func TestPrivacyAndScrubbing(t *testing.T) {
} }
} }
func TestNonblock(t *testing.T) { func TestNonblockWithFewEvents(t *testing.T) {
// This test tests whether we won't block for a small
// number of events emitted by the task
if testing.Short() { if testing.Short() {
t.Skip("skip test in short mode") t.Skip("skip test in short mode")
} }
@ -511,16 +513,16 @@ func TestNonblock(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if !task.isRunning() { // Wait for the task thread to start
t.Fatal("The runner should be running at this point") <-task.isstarted
} // Wait for the task thread to complete
// If the task blocks because it emits too much events, this test <-task.isstopped
// will run forever and will be killed. Because we have room for up var count int
// to 128 events in the buffer, we should hopefully be fine.
for task.isRunning() {
time.Sleep(time.Second)
}
for !task.IsDone() { for !task.IsDone() {
task.WaitForNextEvent() task.WaitForNextEvent()
count++
}
if count < 5 {
t.Fatal("too few events")
} }
} }