diff --git a/pkg/oonimkall/eventemitter.go b/pkg/oonimkall/eventemitter.go index d6aec3d..17b9366 100644 --- a/pkg/oonimkall/eventemitter.go +++ b/pkg/oonimkall/eventemitter.go @@ -1,5 +1,7 @@ package oonimkall +import "time" + // eventEmitter emits event on a channel type eventEmitter struct { disabled map[string]bool @@ -36,5 +38,13 @@ func (ee *eventEmitter) Emit(key string, value interface{}) { if ee.disabled[key] { return } - ee.out <- &event{Key: key, Value: value} + const maxSendTimeout = 250 * time.Millisecond + timer := time.NewTimer(maxSendTimeout) + defer timer.Stop() + select { + case ee.out <- &event{Key: key, Value: value}: + // good, we've been able to send the new event + case <-timer.C: + // oops, we've timed out sending + } } diff --git a/pkg/oonimkall/task.go b/pkg/oonimkall/task.go index 1d548b3..de59b3b 100644 --- a/pkg/oonimkall/task.go +++ b/pkg/oonimkall/task.go @@ -59,10 +59,9 @@ import ( // running as subsequent Tasks to reuse the Session connections // created with the OONI probe services backends. type Task struct { - cancel context.CancelFunc - isdone *atomicx.Int64 - isstopped *atomicx.Int64 - out chan *event + cancel context.CancelFunc + isdone *atomicx.Int64 + out chan *event } // StartTask starts an asynchronous task. The input argument is a @@ -75,15 +74,13 @@ func StartTask(input string) (*Task, error) { const bufsiz = 128 // common case: we don't want runner to block ctx, cancel := context.WithCancel(context.Background()) task := &Task{ - cancel: cancel, - isdone: &atomicx.Int64{}, - isstopped: &atomicx.Int64{}, - out: make(chan *event, bufsiz), + cancel: cancel, + isdone: &atomicx.Int64{}, + out: make(chan *event, bufsiz), } go func() { - defer close(task.out) - defer task.isstopped.Add(1) run(ctx, &settings, task.out) + task.out <- nil // signal that we're done w/o closing the channel }() return task, nil } @@ -92,6 +89,9 @@ func StartTask(input string) (*Task, error) { // string is a serialized JSON following MK v0.10.9's API. func (t *Task) WaitForNextEvent() string { const terminated = `{"key":"task_terminated","value":{}}` // like MK + if t.isdone.Load() != 0 { + return terminated + } evp := <-t.out if evp == nil { t.isdone.Add(1) @@ -107,6 +107,10 @@ func (t *Task) IsDone() bool { return t.isdone.Load() != 0 } +func (t *Task) isRunning() bool { + return !t.IsDone() +} + // Interrupt interrupts the task. func (t *Task) Interrupt() { t.cancel() diff --git a/pkg/oonimkall/task_integration_test.go b/pkg/oonimkall/task_integration_test.go index 194ac2d..4a6b1bf 100644 --- a/pkg/oonimkall/task_integration_test.go +++ b/pkg/oonimkall/task_integration_test.go @@ -511,13 +511,13 @@ func TestNonblock(t *testing.T) { if err != nil { t.Fatal(err) } - if !task.IsRunning() { + if !task.isRunning() { t.Fatal("The runner should be running at this point") } // If the task blocks because it emits too much events, this test // will run forever and will be killed. Because we have room for up // to 128 events in the buffer, we should hopefully be fine. - for task.IsRunning() { + for task.isRunning() { time.Sleep(time.Second) } for !task.IsDone() { diff --git a/pkg/oonimkall/task_internal_test.go b/pkg/oonimkall/task_internal_test.go deleted file mode 100644 index 71fcbd0..0000000 --- a/pkg/oonimkall/task_internal_test.go +++ /dev/null @@ -1,5 +0,0 @@ -package oonimkall - -func (t *Task) IsRunning() bool { - return t.isstopped.Load() == 0 -}