From ba7b981fcbebfed5f8b2d4f39e9755d39fb77a20 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Fri, 5 Nov 2021 14:37:03 +0100 Subject: [PATCH] fix(measurex): allow API user to choose parallelism (#581) Closes https://github.com/ooni/probe/issues/1818 --- .../engine/experiment/webstepsx/measurer.go | 3 +- internal/engine/experiment/webstepsx/th.go | 6 +- internal/measurex/measurer.go | 61 ++++++++++++------- .../tutorial/measurex/chapter09/README.md | 7 ++- internal/tutorial/measurex/chapter09/main.go | 7 ++- .../tutorial/measurex/chapter10/README.md | 5 +- internal/tutorial/measurex/chapter10/main.go | 5 +- .../tutorial/measurex/chapter11/README.md | 7 ++- internal/tutorial/measurex/chapter11/main.go | 7 ++- .../tutorial/measurex/chapter12/README.md | 7 ++- internal/tutorial/measurex/chapter12/main.go | 7 ++- 11 files changed, 88 insertions(+), 34 deletions(-) diff --git a/internal/engine/experiment/webstepsx/measurer.go b/internal/engine/experiment/webstepsx/measurer.go index dfb918d..f31ec00 100644 --- a/internal/engine/experiment/webstepsx/measurer.go +++ b/internal/engine/experiment/webstepsx/measurer.go @@ -126,8 +126,9 @@ func (mx *Measurer) runAsync(ctx context.Context, sess model.ExperimentSession, TLSHandshaker: netxlite.NewTLSHandshakerStdlib(sess.Logger()), } cookies := measurex.NewCookieJar() + const parallelism = 3 in := mmx.MeasureURLAndFollowRedirections( - ctx, URL, measurex.NewHTTPRequestHeaderForMeasuring(), cookies) + ctx, parallelism, URL, measurex.NewHTTPRequestHeaderForMeasuring(), cookies) for m := range in { out <- &model.ExperimentAsyncTestKeys{ Extensions: map[string]int64{ diff --git a/internal/engine/experiment/webstepsx/th.go b/internal/engine/experiment/webstepsx/th.go index ee1fc3f..5405f36 100644 --- a/internal/engine/experiment/webstepsx/th.go +++ b/internal/engine/experiment/webstepsx/th.go @@ -87,7 +87,8 @@ func (c *THClient) Run(ctx context.Context, URL string) (*THServerResponse, erro } mx := measurex.NewMeasurerWithDefaultSettings() var dns []*measurex.DNSMeasurement - for m := range mx.LookupURLHostParallel(ctx, parsed, c.DNServers...) { + const parallelism = 3 + for m := range mx.LookupURLHostParallel(ctx, parallelism, parsed, c.DNServers...) { dns = append(dns, m) } endpoints, err := measurex.AllEndpointsForURL(parsed, dns...) @@ -251,7 +252,8 @@ func (h *THHandler) singleStep( ForeignResolver: thResolver, }} jar := measurex.NewCookieJar() - meas, err := mx.MeasureURL(ctx, req.URL, req.HTTPRequestHeaders, jar) + const parallelism = 3 + meas, err := mx.MeasureURL(ctx, parallelism, req.URL, req.HTTPRequestHeaders, jar) if err != nil { return nil, err } diff --git a/internal/measurex/measurer.go b/internal/measurex/measurer.go index ebd12b2..bccb7f5 100644 --- a/internal/measurex/measurer.go +++ b/internal/measurex/measurer.go @@ -527,9 +527,12 @@ func (mx *Measurer) httpClientDo(ctx context.Context, // HTTPEndpointGetParallel performs an HTTPEndpointGet for each // input endpoint using a pool of background goroutines. // +// You can choose the parallelism with the parallelism argument. If this +// argument is zero, or negative, we use a small default value. +// // This function returns to the caller a channel where to read // measurements from. The channel is closed when done. -func (mx *Measurer) HTTPEndpointGetParallel(ctx context.Context, +func (mx *Measurer) HTTPEndpointGetParallel(ctx context.Context, parallelism int, jar http.CookieJar, epnts ...*HTTPEndpoint) <-chan *HTTPEndpointMeasurement { var ( done = make(chan interface{}) @@ -542,7 +545,9 @@ func (mx *Measurer) HTTPEndpointGetParallel(ctx context.Context, input <- epnt } }() - const parallelism = 3 + if parallelism <= 0 { + parallelism = 3 + } for i := 0; i < parallelism; i++ { go func() { for epnt := range input { @@ -591,7 +596,10 @@ type ResolverInfo struct { // LookupURLHostParallel performs an LookupHost-like operation for each // resolver that you provide as argument using a pool of goroutines. -func (mx *Measurer) LookupURLHostParallel(ctx context.Context, +// +// You can choose the parallelism with the parallelism argument. If this +// argument is zero, or negative, we use a small default value. +func (mx *Measurer) LookupURLHostParallel(ctx context.Context, parallelism int, URL *url.URL, resos ...*ResolverInfo) <-chan *DNSMeasurement { var ( done = make(chan interface{}) @@ -604,7 +612,9 @@ func (mx *Measurer) LookupURLHostParallel(ctx context.Context, resolvers <- reso } }() - const parallelism = 3 + if parallelism <= 0 { + parallelism = 3 + } for i := 0; i < parallelism; i++ { go func() { for reso := range resolvers { @@ -654,8 +664,11 @@ func (mx *Measurer) lookupHostWithResolverInfo( // have in input an hostname rather than a URL. As such, we cannot // determine whether to perform HTTPSSvc lookups and so we aren't // going to perform this kind of lookups in this case. -func (mx *Measurer) LookupHostParallel( - ctx context.Context, hostname, port string) <-chan *DNSMeasurement { +// +// You can choose the parallelism with the parallelism argument. If this +// argument is zero, or negative, we use a small default value. +func (mx *Measurer) LookupHostParallel(ctx context.Context, + parallelism int, hostname, port string) <-chan *DNSMeasurement { out := make(chan *DNSMeasurement) go func() { defer close(out) @@ -663,7 +676,7 @@ func (mx *Measurer) LookupHostParallel( Scheme: "", // so we don't see https and we don't try HTTPSSvc Host: net.JoinHostPort(hostname, port), } - for m := range mx.LookupURLHostParallel(ctx, URL) { + for m := range mx.LookupURLHostParallel(ctx, parallelism, URL) { out <- &DNSMeasurement{Domain: hostname, Measurement: m.Measurement} } }() @@ -724,11 +737,17 @@ type MeasureURLHelper interface { // // Arguments: // -// - ctx is the context for timeout/cancellation +// - ctx is the context for timeout/cancellation. // -// - URL is the URL to measure +// - parallelism is the number of parallel background goroutines +// to use to perform parallelizable operations (i.e., operations for +// which `measurex` defines an `OpParallel` API where `Op` is the +// name of an operation implemented by `measurex`). If parallel's value +// is zero or negative, we use a reasonably small default. // -// - header contains the HTTP headers for the request +// - URL is the URL to measure. +// +// - header contains the HTTP headers for the request. // // - cookies contains the cookies we should use for measuring // this URL and possibly future redirections. @@ -742,7 +761,7 @@ type MeasureURLHelper interface { // redirect properly without cookies. This has been // documented at https://github.com/ooni/probe/issues/1727. func (mx *Measurer) MeasureURL( - ctx context.Context, URL string, headers http.Header, + ctx context.Context, parallelism int, URL string, headers http.Header, cookies http.CookieJar) (*URLMeasurement, error) { mx.Logger.Infof("MeasureURL url=%s", URL) m := &URLMeasurement{URL: URL} @@ -756,7 +775,7 @@ func (mx *Measurer) MeasureURL( return nil, errors.New("measurer: no configured resolver") } dnsBegin := time.Now() - for dns := range mx.LookupURLHostParallel(ctx, parsed, mx.Resolvers...) { + for dns := range mx.LookupURLHostParallel(ctx, parallelism, parsed, mx.Resolvers...) { m.DNS = append(m.DNS, dns) } m.DNSRuntime = time.Since(dnsBegin) @@ -774,12 +793,12 @@ func (mx *Measurer) MeasureURL( mx.enforceAllowedHeadersOnly(epnts) } epntRuntime := time.Now() - for epnt := range mx.HTTPEndpointGetParallel(ctx, cookies, epnts...) { + for epnt := range mx.HTTPEndpointGetParallel(ctx, parallelism, cookies, epnts...) { m.Endpoints = append(m.Endpoints, epnt) } switch parsed.Scheme { case "https": - mx.maybeQUICFollowUp(ctx, m, cookies, epnts...) + mx.maybeQUICFollowUp(ctx, parallelism, m, cookies, epnts...) default: // nothing to do } @@ -792,7 +811,7 @@ func (mx *Measurer) MeasureURL( // for QUIC. We query for HTTPSSvc but currently only Cloudflare // implements this proposed standard. So, this function is // where we take care of all the other servers implementing QUIC. -func (mx *Measurer) maybeQUICFollowUp(ctx context.Context, +func (mx *Measurer) maybeQUICFollowUp(ctx context.Context, parallelism int, m *URLMeasurement, cookies http.CookieJar, epnts ...*HTTPEndpoint) { altsvc := []string{} for _, epnt := range m.Endpoints { @@ -827,7 +846,7 @@ func (mx *Measurer) maybeQUICFollowUp(ctx context.Context, continue } if parts[0] == "h3=\":443\"" { - mx.doQUICFollowUp(ctx, m, cookies, epnts...) + mx.doQUICFollowUp(ctx, parallelism, m, cookies, epnts...) return } } @@ -835,7 +854,7 @@ func (mx *Measurer) maybeQUICFollowUp(ctx context.Context, } // doQUICFollowUp runs when we know there's QUIC support via Alt-Svc. -func (mx *Measurer) doQUICFollowUp(ctx context.Context, +func (mx *Measurer) doQUICFollowUp(ctx context.Context, parallelism int, m *URLMeasurement, cookies http.CookieJar, epnts ...*HTTPEndpoint) { quicEpnts := []*HTTPEndpoint{} // do not mutate the existing list rather create a new one @@ -850,7 +869,7 @@ func (mx *Measurer) doQUICFollowUp(ctx context.Context, Header: epnt.Header, }) } - for mquic := range mx.HTTPEndpointGetParallel(ctx, cookies, quicEpnts...) { + for mquic := range mx.HTTPEndpointGetParallel(ctx, parallelism, cookies, quicEpnts...) { m.Endpoints = append(m.Endpoints, mquic) } } @@ -904,12 +923,12 @@ func (r *redirectionQueue) redirectionsCount() int { // MeasureURLAndFollowRedirections is like MeasureURL except // that it _also_ follows all the HTTP redirections. -func (mx *Measurer) MeasureURLAndFollowRedirections(ctx context.Context, +func (mx *Measurer) MeasureURLAndFollowRedirections(ctx context.Context, parallelism int, URL string, headers http.Header, cookies http.CookieJar) <-chan *URLMeasurement { out := make(chan *URLMeasurement) go func() { defer close(out) - meas, err := mx.MeasureURL(ctx, URL, headers, cookies) + meas, err := mx.MeasureURL(ctx, parallelism, URL, headers, cookies) if err != nil { mx.Logger.Warnf("mx.MeasureURL failed: %s", err.Error()) return @@ -919,7 +938,7 @@ func (mx *Measurer) MeasureURLAndFollowRedirections(ctx context.Context, const maxRedirects = 7 for !rq.empty() && rq.redirectionsCount() < maxRedirects { URL = rq.popleft() - meas, err = mx.MeasureURL(ctx, URL, headers, cookies) + meas, err = mx.MeasureURL(ctx, parallelism, URL, headers, cookies) if err != nil { mx.Logger.Warnf("mx.MeasureURL failed: %s", err.Error()) return diff --git a/internal/tutorial/measurex/chapter09/README.md b/internal/tutorial/measurex/chapter09/README.md index 1a984d7..1f4a26f 100644 --- a/internal/tutorial/measurex/chapter09/README.md +++ b/internal/tutorial/measurex/chapter09/README.md @@ -87,9 +87,14 @@ Then, we call `HTTPEndpointGetParallel`. The arguments are: - all the endpoints to measure +The parallelism argument tells the code how many parallel goroutines +to use for parallelizable operations. If this value is zero or negative, +the code will use a reasonably small default. + ```Go cookies := measurex.NewCookieJar() - for epnt := range mx.HTTPEndpointGetParallel(ctx, cookies, httpEndpoints...) { + const parallelism = 3 + for epnt := range mx.HTTPEndpointGetParallel(ctx, parallelism, cookies, httpEndpoints...) { m.Endpoints = append(m.Endpoints, epnt) } ``` diff --git a/internal/tutorial/measurex/chapter09/main.go b/internal/tutorial/measurex/chapter09/main.go index 9b8f44f..94162f1 100644 --- a/internal/tutorial/measurex/chapter09/main.go +++ b/internal/tutorial/measurex/chapter09/main.go @@ -88,9 +88,14 @@ func main() { // // - all the endpoints to measure // + // The parallelism argument tells the code how many parallel goroutines + // to use for parallelizable operations. If this value is zero or negative, + // the code will use a reasonably small default. + // // ```Go cookies := measurex.NewCookieJar() - for epnt := range mx.HTTPEndpointGetParallel(ctx, cookies, httpEndpoints...) { + const parallelism = 3 + for epnt := range mx.HTTPEndpointGetParallel(ctx, parallelism, cookies, httpEndpoints...) { m.Endpoints = append(m.Endpoints, epnt) } // ``` diff --git a/internal/tutorial/measurex/chapter10/README.md b/internal/tutorial/measurex/chapter10/README.md index d2aaaaa..a7c4f42 100644 --- a/internal/tutorial/measurex/chapter10/README.md +++ b/internal/tutorial/measurex/chapter10/README.md @@ -90,7 +90,8 @@ scheme is "https". Otherwise, if it's just "http", it does not make sense to send this query. ```Go - for dns := range mx.LookupURLHostParallel(ctx, parsed, resolvers...) { + const parallelism = 3 + for dns := range mx.LookupURLHostParallel(ctx, parallelism, parsed, resolvers...) { m.DNS = append(m.DNS, dns) } ``` @@ -102,7 +103,7 @@ The rest of the program is exactly like in chapter09. httpEndpoints, err := measurex.AllHTTPEndpointsForURL(parsed, headers, m.DNS...) runtimex.PanicOnError(err, "cannot get all the HTTP endpoints") cookies := measurex.NewCookieJar() - for epnt := range mx.HTTPEndpointGetParallel(ctx, cookies, httpEndpoints...) { + for epnt := range mx.HTTPEndpointGetParallel(ctx, parallelism, cookies, httpEndpoints...) { m.Endpoints = append(m.Endpoints, epnt) } print(m) diff --git a/internal/tutorial/measurex/chapter10/main.go b/internal/tutorial/measurex/chapter10/main.go index 712482f..7a72589 100644 --- a/internal/tutorial/measurex/chapter10/main.go +++ b/internal/tutorial/measurex/chapter10/main.go @@ -91,7 +91,8 @@ func main() { // does not make sense to send this query. // // ```Go - for dns := range mx.LookupURLHostParallel(ctx, parsed, resolvers...) { + const parallelism = 3 + for dns := range mx.LookupURLHostParallel(ctx, parallelism, parsed, resolvers...) { m.DNS = append(m.DNS, dns) } // ``` @@ -103,7 +104,7 @@ func main() { httpEndpoints, err := measurex.AllHTTPEndpointsForURL(parsed, headers, m.DNS...) runtimex.PanicOnError(err, "cannot get all the HTTP endpoints") cookies := measurex.NewCookieJar() - for epnt := range mx.HTTPEndpointGetParallel(ctx, cookies, httpEndpoints...) { + for epnt := range mx.HTTPEndpointGetParallel(ctx, parallelism, cookies, httpEndpoints...) { m.Endpoints = append(m.Endpoints, epnt) } print(m) diff --git a/internal/tutorial/measurex/chapter11/README.md b/internal/tutorial/measurex/chapter11/README.md index 457e7ee..0c0fec5 100644 --- a/internal/tutorial/measurex/chapter11/README.md +++ b/internal/tutorial/measurex/chapter11/README.md @@ -65,6 +65,10 @@ The arguments are: - the context as usual +- the number of parallel goroutines to use to perform parallelizable +operations (passing zero or negative will cause the code to use +a reasonably small default value) + - the unparsed URL to measure - the headers we want to use @@ -72,7 +76,8 @@ The arguments are: - a jar for cookies ```Go - m, err := mx.MeasureURL(ctx, *URL, headers, cookies) + const parallelism = 3 + m, err := mx.MeasureURL(ctx, parallelism, *URL, headers, cookies) ``` The return value is either an `URLMeasurement` or an error. The error happens, for example, if diff --git a/internal/tutorial/measurex/chapter11/main.go b/internal/tutorial/measurex/chapter11/main.go index 9cdf22d..9df5a8e 100644 --- a/internal/tutorial/measurex/chapter11/main.go +++ b/internal/tutorial/measurex/chapter11/main.go @@ -66,6 +66,10 @@ func main() { // // - the context as usual // + // - the number of parallel goroutines to use to perform parallelizable + // operations (passing zero or negative will cause the code to use + // a reasonably small default value) + // // - the unparsed URL to measure // // - the headers we want to use @@ -73,7 +77,8 @@ func main() { // - a jar for cookies // // ```Go - m, err := mx.MeasureURL(ctx, *URL, headers, cookies) + const parallelism = 3 + m, err := mx.MeasureURL(ctx, parallelism, *URL, headers, cookies) // ``` // The return value is either an `URLMeasurement` // or an error. The error happens, for example, if diff --git a/internal/tutorial/measurex/chapter12/README.md b/internal/tutorial/measurex/chapter12/README.md index cba87cf..07d4deb 100644 --- a/internal/tutorial/measurex/chapter12/README.md +++ b/internal/tutorial/measurex/chapter12/README.md @@ -62,11 +62,16 @@ returns a channel where it posts the result of measuring the original URL along with all its redirections. Internally, `MeasureURLAndFollowRedirections` calls `MeasureURL`. +The parallelism argument dictates how many parallel goroutine +to use for parallelizable operations. (A zero or negative +value implies that the code should use a sensible default value.) + We accumulate the results in `URLs` and print `m`. The channel is closed when done by `MeasureURLAndFollowRedirections`, so we leave the loop. ```Go - for m := range mx.MeasureURLAndFollowRedirections(ctx, *URL, headers, cookies) { + const parallelism = 3 + for m := range mx.MeasureURLAndFollowRedirections(ctx, parallelism, *URL, headers, cookies) { all.URLs = append(all.URLs, measurex.NewArchivalURLMeasurement(m)) } print(all) diff --git a/internal/tutorial/measurex/chapter12/main.go b/internal/tutorial/measurex/chapter12/main.go index 899c978..8c4a064 100644 --- a/internal/tutorial/measurex/chapter12/main.go +++ b/internal/tutorial/measurex/chapter12/main.go @@ -63,11 +63,16 @@ func main() { // the original URL along with all its redirections. Internally, // `MeasureURLAndFollowRedirections` calls `MeasureURL`. // + // The parallelism argument dictates how many parallel goroutine + // to use for parallelizable operations. (A zero or negative + // value implies that the code should use a sensible default value.) + // // We accumulate the results in `URLs` and print `m`. The channel // is closed when done by `MeasureURLAndFollowRedirections`, so we leave the loop. // // ```Go - for m := range mx.MeasureURLAndFollowRedirections(ctx, *URL, headers, cookies) { + const parallelism = 3 + for m := range mx.MeasureURLAndFollowRedirections(ctx, parallelism, *URL, headers, cookies) { all.URLs = append(all.URLs, measurex.NewArchivalURLMeasurement(m)) } print(all)