From d289b80386a917a496d02427b5f06b3c16b2d5f3 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Tue, 13 Sep 2022 21:54:40 +0200 Subject: [PATCH] feat(webconnectivity@v0.5): stream the response body (#959) Reference issue: https://github.com/ooni/probe/issues/2295 --- .../webconnectivity/cleartextflow.go | 2 +- internal/experiment/webconnectivity/iox.go | 79 +++++++++++++++++++ .../experiment/webconnectivity/measurer.go | 2 +- .../experiment/webconnectivity/secureflow.go | 2 +- internal/netxlite/iox.go | 3 + 5 files changed, 85 insertions(+), 3 deletions(-) create mode 100644 internal/experiment/webconnectivity/iox.go diff --git a/internal/experiment/webconnectivity/cleartextflow.go b/internal/experiment/webconnectivity/cleartextflow.go index 97b9d00..94d56be 100644 --- a/internal/experiment/webconnectivity/cleartextflow.go +++ b/internal/experiment/webconnectivity/cleartextflow.go @@ -233,7 +233,7 @@ func (t *CleartextFlow) httpTransaction(ctx context.Context, network, address, a t.CookieJar.SetCookies(req.URL, cookies) } reader := io.LimitReader(resp.Body, maxbody) - body, err = netxlite.ReadAllContext(ctx, reader) + body, err = StreamAllContext(ctx, reader) } finished := trace.TimeSince(trace.ZeroTime) t.TestKeys.AppendNetworkEvents(measurexlite.NewAnnotationArchivalNetworkEvent( diff --git a/internal/experiment/webconnectivity/iox.go b/internal/experiment/webconnectivity/iox.go new file mode 100644 index 0000000..b494b30 --- /dev/null +++ b/internal/experiment/webconnectivity/iox.go @@ -0,0 +1,79 @@ +package webconnectivity + +// +// Extensions to incrementally stream-reading a response body. +// + +import ( + "context" + "errors" + "io" + + "github.com/ooni/probe-cli/v3/internal/netxlite" +) + +// StreamAllContext streams from the given reader [r] until +// interrupted by [ctx] or when [r] hits the EOF. +// +// This function runs a background goroutine that should exit as soon +// as [ctx] is done or when [reader] is closed, if applicable. +// +// This function transforms an errors.Is(err, io.EOF) to a nil error +// such as the standard library's io.ReadAll does. +// +// This function might return a non-zero-length buffer along with +// an non-nil error in the case in which we could only read a portion +// of the body and then we were interrupted by the error. +func StreamAllContext(ctx context.Context, reader io.Reader) ([]byte, error) { + // TODO(bassosimone): consider merging into the ./internal/netxlite/iox.go file + // once this code has been used in testing for quite some time + datach, errch := make(chan []byte), make(chan error) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + go func() { + buffer := make([]byte, 1<<13) + for { + count, err := reader.Read(buffer) + if count > 0 { + data := buffer[:count] + select { + case datach <- data: + // fallthrough to check error + case <-ctx.Done(): + return + } + } + if err != nil { + select { + case errch <- err: + return + case <-ctx.Done(): + return + } + } + } + }() + + resultbuf := make([]byte, 0, 1<<17) + for { + select { + case data := <-datach: + // TODO(bassosimone): is there a more efficient way? + resultbuf = append(resultbuf, data...) + case err := <-errch: + if errors.Is(err, io.EOF) { + // see https://github.com/ooni/probe/issues/1965 + return resultbuf, nil + } + return resultbuf, netxlite.NewTopLevelGenericErrWrapper(err) + case <-ctx.Done(): + err := ctx.Err() + if errors.Is(err, context.DeadlineExceeded) { + return resultbuf, nil + } + return resultbuf, netxlite.NewTopLevelGenericErrWrapper(err) + } + } +} diff --git a/internal/experiment/webconnectivity/measurer.go b/internal/experiment/webconnectivity/measurer.go index 5da15c1..0c3406b 100644 --- a/internal/experiment/webconnectivity/measurer.go +++ b/internal/experiment/webconnectivity/measurer.go @@ -36,7 +36,7 @@ func (m *Measurer) ExperimentName() string { // ExperimentVersion implements model.ExperimentMeasurer. func (m *Measurer) ExperimentVersion() string { - return "0.5.13" + return "0.5.14" } // Run implements model.ExperimentMeasurer. diff --git a/internal/experiment/webconnectivity/secureflow.go b/internal/experiment/webconnectivity/secureflow.go index a1ab704..331a575 100644 --- a/internal/experiment/webconnectivity/secureflow.go +++ b/internal/experiment/webconnectivity/secureflow.go @@ -285,7 +285,7 @@ func (t *SecureFlow) httpTransaction(ctx context.Context, network, address, alpn t.CookieJar.SetCookies(req.URL, cookies) } reader := io.LimitReader(resp.Body, maxbody) - body, err = netxlite.ReadAllContext(ctx, reader) + body, err = StreamAllContext(ctx, reader) } finished := trace.TimeSince(trace.ZeroTime) t.TestKeys.AppendNetworkEvents(measurexlite.NewAnnotationArchivalNetworkEvent( diff --git a/internal/netxlite/iox.go b/internal/netxlite/iox.go index 04dde36..41b020d 100644 --- a/internal/netxlite/iox.go +++ b/internal/netxlite/iox.go @@ -10,6 +10,9 @@ import ( "io" ) +// TODO(bassosimone): consider integrating StreamAllContext from +// internal/experiment/webconnectivity/iox.go + // ReadAllContext is like io.ReadAll but reads r in a // background goroutine. This function will return // earlier if the context is cancelled. In which case