From c4eb6826066c2449cddc84474f136430cccc91b5 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Fri, 26 Nov 2021 22:18:45 +0100 Subject: [PATCH] [forwardport] fix(oonimkall): don't close channel to signal end of task (#619) (#620) This diff forward ports 90bf0629b957c912a5a6f3bb6c98ad3abb5a2ff6 to `master`. If we close the channel to signal the end of a task we may panic when some background goroutine tries to post on the channel. This bug is rare but may happen. See for example https://github.com/ooni/probe/issues/1438. How can we improve? First, let us add a timeout when sending to the channel. Given that the channel is buffered and we have a generous timeout (1/4 of a second), it's unlikely we will really block. But, in the event in which a late message appears, we'll eventually _unblock_ when sending with a timeout. So, now we don't have to worry anymore about leaking forever a goroutine. Then, let us change the protocol with which we signal that a task is done. We used to close the channel. Now, instead we just synchronously post a nil on the channel when done. In turn, we interpret this nil to mean that the task is done when we receive messages. The _main_ different with respect to before is that now we are asking the consumer of our API to drain the channel. Because before we had a blocking channel, it seems to me we were already requiring the consumer of the API to do that. Which means, I think in practical terms it did not change much. Finally, acknowledge that we don't need a specific state variable to tell us we're done and simplify a little bit the API by just making isRunning private and using the "we're done" signal to determine whether we've stopped running the task. All these changes should be enough to close https://github.com/ooni/probe/issues/1438. --- pkg/oonimkall/eventemitter.go | 12 +++++++++++- pkg/oonimkall/task.go | 24 ++++++++++++++---------- pkg/oonimkall/task_integration_test.go | 4 ++-- pkg/oonimkall/task_internal_test.go | 5 ----- 4 files changed, 27 insertions(+), 18 deletions(-) delete mode 100644 pkg/oonimkall/task_internal_test.go diff --git a/pkg/oonimkall/eventemitter.go b/pkg/oonimkall/eventemitter.go index d6aec3d..17b9366 100644 --- a/pkg/oonimkall/eventemitter.go +++ b/pkg/oonimkall/eventemitter.go @@ -1,5 +1,7 @@ package oonimkall +import "time" + // eventEmitter emits event on a channel type eventEmitter struct { disabled map[string]bool @@ -36,5 +38,13 @@ func (ee *eventEmitter) Emit(key string, value interface{}) { if ee.disabled[key] { return } - ee.out <- &event{Key: key, Value: value} + const maxSendTimeout = 250 * time.Millisecond + timer := time.NewTimer(maxSendTimeout) + defer timer.Stop() + select { + case ee.out <- &event{Key: key, Value: value}: + // good, we've been able to send the new event + case <-timer.C: + // oops, we've timed out sending + } } diff --git a/pkg/oonimkall/task.go b/pkg/oonimkall/task.go index 1d548b3..de59b3b 100644 --- a/pkg/oonimkall/task.go +++ b/pkg/oonimkall/task.go @@ -59,10 +59,9 @@ import ( // running as subsequent Tasks to reuse the Session connections // created with the OONI probe services backends. type Task struct { - cancel context.CancelFunc - isdone *atomicx.Int64 - isstopped *atomicx.Int64 - out chan *event + cancel context.CancelFunc + isdone *atomicx.Int64 + out chan *event } // StartTask starts an asynchronous task. The input argument is a @@ -75,15 +74,13 @@ func StartTask(input string) (*Task, error) { const bufsiz = 128 // common case: we don't want runner to block ctx, cancel := context.WithCancel(context.Background()) task := &Task{ - cancel: cancel, - isdone: &atomicx.Int64{}, - isstopped: &atomicx.Int64{}, - out: make(chan *event, bufsiz), + cancel: cancel, + isdone: &atomicx.Int64{}, + out: make(chan *event, bufsiz), } go func() { - defer close(task.out) - defer task.isstopped.Add(1) run(ctx, &settings, task.out) + task.out <- nil // signal that we're done w/o closing the channel }() return task, nil } @@ -92,6 +89,9 @@ func StartTask(input string) (*Task, error) { // string is a serialized JSON following MK v0.10.9's API. func (t *Task) WaitForNextEvent() string { const terminated = `{"key":"task_terminated","value":{}}` // like MK + if t.isdone.Load() != 0 { + return terminated + } evp := <-t.out if evp == nil { t.isdone.Add(1) @@ -107,6 +107,10 @@ func (t *Task) IsDone() bool { return t.isdone.Load() != 0 } +func (t *Task) isRunning() bool { + return !t.IsDone() +} + // Interrupt interrupts the task. func (t *Task) Interrupt() { t.cancel() diff --git a/pkg/oonimkall/task_integration_test.go b/pkg/oonimkall/task_integration_test.go index 194ac2d..4a6b1bf 100644 --- a/pkg/oonimkall/task_integration_test.go +++ b/pkg/oonimkall/task_integration_test.go @@ -511,13 +511,13 @@ func TestNonblock(t *testing.T) { if err != nil { t.Fatal(err) } - if !task.IsRunning() { + if !task.isRunning() { t.Fatal("The runner should be running at this point") } // If the task blocks because it emits too much events, this test // will run forever and will be killed. Because we have room for up // to 128 events in the buffer, we should hopefully be fine. - for task.IsRunning() { + for task.isRunning() { time.Sleep(time.Second) } for !task.IsDone() { diff --git a/pkg/oonimkall/task_internal_test.go b/pkg/oonimkall/task_internal_test.go deleted file mode 100644 index 71fcbd0..0000000 --- a/pkg/oonimkall/task_internal_test.go +++ /dev/null @@ -1,5 +0,0 @@ -package oonimkall - -func (t *Task) IsRunning() bool { - return t.isstopped.Load() == 0 -}