This diff forward ports 90bf0629b957c912a5a6f3bb6c98ad3abb5a2ff6 to `master`. If we close the channel to signal the end of a task we may panic when some background goroutine tries to post on the channel. This bug is rare but may happen. See for example https://github.com/ooni/probe/issues/1438. How can we improve? First, let us add a timeout when sending to the channel. Given that the channel is buffered and we have a generous timeout (1/4 of a second), it's unlikely we will really block. But, in the event in which a late message appears, we'll eventually _unblock_ when sending with a timeout. So, now we don't have to worry anymore about leaking forever a goroutine. Then, let us change the protocol with which we signal that a task is done. We used to close the channel. Now, instead we just synchronously post a nil on the channel when done. In turn, we interpret this nil to mean that the task is done when we receive messages. The _main_ different with respect to before is that now we are asking the consumer of our API to drain the channel. Because before we had a blocking channel, it seems to me we were already requiring the consumer of the API to do that. Which means, I think in practical terms it did not change much. Finally, acknowledge that we don't need a specific state variable to tell us we're done and simplify a little bit the API by just making isRunning private and using the "we're done" signal to determine whether we've stopped running the task. All these changes should be enough to close https://github.com/ooni/probe/issues/1438.
This commit is contained in:
parent
ee5be24900
commit
c4eb682606
|
@ -1,5 +1,7 @@
|
|||
package oonimkall
|
||||
|
||||
import "time"
|
||||
|
||||
// eventEmitter emits event on a channel
|
||||
type eventEmitter struct {
|
||||
disabled map[string]bool
|
||||
|
@ -36,5 +38,13 @@ func (ee *eventEmitter) Emit(key string, value interface{}) {
|
|||
if ee.disabled[key] {
|
||||
return
|
||||
}
|
||||
ee.out <- &event{Key: key, Value: value}
|
||||
const maxSendTimeout = 250 * time.Millisecond
|
||||
timer := time.NewTimer(maxSendTimeout)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case ee.out <- &event{Key: key, Value: value}:
|
||||
// good, we've been able to send the new event
|
||||
case <-timer.C:
|
||||
// oops, we've timed out sending
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,10 +59,9 @@ import (
|
|||
// running as subsequent Tasks to reuse the Session connections
|
||||
// created with the OONI probe services backends.
|
||||
type Task struct {
|
||||
cancel context.CancelFunc
|
||||
isdone *atomicx.Int64
|
||||
isstopped *atomicx.Int64
|
||||
out chan *event
|
||||
cancel context.CancelFunc
|
||||
isdone *atomicx.Int64
|
||||
out chan *event
|
||||
}
|
||||
|
||||
// StartTask starts an asynchronous task. The input argument is a
|
||||
|
@ -75,15 +74,13 @@ func StartTask(input string) (*Task, error) {
|
|||
const bufsiz = 128 // common case: we don't want runner to block
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
task := &Task{
|
||||
cancel: cancel,
|
||||
isdone: &atomicx.Int64{},
|
||||
isstopped: &atomicx.Int64{},
|
||||
out: make(chan *event, bufsiz),
|
||||
cancel: cancel,
|
||||
isdone: &atomicx.Int64{},
|
||||
out: make(chan *event, bufsiz),
|
||||
}
|
||||
go func() {
|
||||
defer close(task.out)
|
||||
defer task.isstopped.Add(1)
|
||||
run(ctx, &settings, task.out)
|
||||
task.out <- nil // signal that we're done w/o closing the channel
|
||||
}()
|
||||
return task, nil
|
||||
}
|
||||
|
@ -92,6 +89,9 @@ func StartTask(input string) (*Task, error) {
|
|||
// string is a serialized JSON following MK v0.10.9's API.
|
||||
func (t *Task) WaitForNextEvent() string {
|
||||
const terminated = `{"key":"task_terminated","value":{}}` // like MK
|
||||
if t.isdone.Load() != 0 {
|
||||
return terminated
|
||||
}
|
||||
evp := <-t.out
|
||||
if evp == nil {
|
||||
t.isdone.Add(1)
|
||||
|
@ -107,6 +107,10 @@ func (t *Task) IsDone() bool {
|
|||
return t.isdone.Load() != 0
|
||||
}
|
||||
|
||||
func (t *Task) isRunning() bool {
|
||||
return !t.IsDone()
|
||||
}
|
||||
|
||||
// Interrupt interrupts the task.
|
||||
func (t *Task) Interrupt() {
|
||||
t.cancel()
|
||||
|
|
|
@ -511,13 +511,13 @@ func TestNonblock(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !task.IsRunning() {
|
||||
if !task.isRunning() {
|
||||
t.Fatal("The runner should be running at this point")
|
||||
}
|
||||
// If the task blocks because it emits too much events, this test
|
||||
// will run forever and will be killed. Because we have room for up
|
||||
// to 128 events in the buffer, we should hopefully be fine.
|
||||
for task.IsRunning() {
|
||||
for task.isRunning() {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
for !task.IsDone() {
|
||||
|
|
|
@ -1,5 +0,0 @@
|
|||
package oonimkall
|
||||
|
||||
func (t *Task) IsRunning() bool {
|
||||
return t.isstopped.Load() == 0
|
||||
}
|
Loading…
Reference in New Issue
Block a user