fix(measurex): allow API user to choose parallelism (#581)

Closes https://github.com/ooni/probe/issues/1818
This commit is contained in:
Simone Basso
2021-11-05 14:37:03 +01:00
committed by GitHub
parent 3b27780836
commit ba7b981fcb
11 changed files with 88 additions and 34 deletions
+40 -21
View File
@@ -527,9 +527,12 @@ func (mx *Measurer) httpClientDo(ctx context.Context,
// HTTPEndpointGetParallel performs an HTTPEndpointGet for each
// input endpoint using a pool of background goroutines.
//
// You can choose the parallelism with the parallelism argument. If this
// argument is zero, or negative, we use a small default value.
//
// This function returns to the caller a channel where to read
// measurements from. The channel is closed when done.
func (mx *Measurer) HTTPEndpointGetParallel(ctx context.Context,
func (mx *Measurer) HTTPEndpointGetParallel(ctx context.Context, parallelism int,
jar http.CookieJar, epnts ...*HTTPEndpoint) <-chan *HTTPEndpointMeasurement {
var (
done = make(chan interface{})
@@ -542,7 +545,9 @@ func (mx *Measurer) HTTPEndpointGetParallel(ctx context.Context,
input <- epnt
}
}()
const parallelism = 3
if parallelism <= 0 {
parallelism = 3
}
for i := 0; i < parallelism; i++ {
go func() {
for epnt := range input {
@@ -591,7 +596,10 @@ type ResolverInfo struct {
// LookupURLHostParallel performs an LookupHost-like operation for each
// resolver that you provide as argument using a pool of goroutines.
func (mx *Measurer) LookupURLHostParallel(ctx context.Context,
//
// You can choose the parallelism with the parallelism argument. If this
// argument is zero, or negative, we use a small default value.
func (mx *Measurer) LookupURLHostParallel(ctx context.Context, parallelism int,
URL *url.URL, resos ...*ResolverInfo) <-chan *DNSMeasurement {
var (
done = make(chan interface{})
@@ -604,7 +612,9 @@ func (mx *Measurer) LookupURLHostParallel(ctx context.Context,
resolvers <- reso
}
}()
const parallelism = 3
if parallelism <= 0 {
parallelism = 3
}
for i := 0; i < parallelism; i++ {
go func() {
for reso := range resolvers {
@@ -654,8 +664,11 @@ func (mx *Measurer) lookupHostWithResolverInfo(
// have in input an hostname rather than a URL. As such, we cannot
// determine whether to perform HTTPSSvc lookups and so we aren't
// going to perform this kind of lookups in this case.
func (mx *Measurer) LookupHostParallel(
ctx context.Context, hostname, port string) <-chan *DNSMeasurement {
//
// You can choose the parallelism with the parallelism argument. If this
// argument is zero, or negative, we use a small default value.
func (mx *Measurer) LookupHostParallel(ctx context.Context,
parallelism int, hostname, port string) <-chan *DNSMeasurement {
out := make(chan *DNSMeasurement)
go func() {
defer close(out)
@@ -663,7 +676,7 @@ func (mx *Measurer) LookupHostParallel(
Scheme: "", // so we don't see https and we don't try HTTPSSvc
Host: net.JoinHostPort(hostname, port),
}
for m := range mx.LookupURLHostParallel(ctx, URL) {
for m := range mx.LookupURLHostParallel(ctx, parallelism, URL) {
out <- &DNSMeasurement{Domain: hostname, Measurement: m.Measurement}
}
}()
@@ -724,11 +737,17 @@ type MeasureURLHelper interface {
//
// Arguments:
//
// - ctx is the context for timeout/cancellation
// - ctx is the context for timeout/cancellation.
//
// - URL is the URL to measure
// - parallelism is the number of parallel background goroutines
// to use to perform parallelizable operations (i.e., operations for
// which `measurex` defines an `OpParallel` API where `Op` is the
// name of an operation implemented by `measurex`). If parallel's value
// is zero or negative, we use a reasonably small default.
//
// - header contains the HTTP headers for the request
// - URL is the URL to measure.
//
// - header contains the HTTP headers for the request.
//
// - cookies contains the cookies we should use for measuring
// this URL and possibly future redirections.
@@ -742,7 +761,7 @@ type MeasureURLHelper interface {
// redirect properly without cookies. This has been
// documented at https://github.com/ooni/probe/issues/1727.
func (mx *Measurer) MeasureURL(
ctx context.Context, URL string, headers http.Header,
ctx context.Context, parallelism int, URL string, headers http.Header,
cookies http.CookieJar) (*URLMeasurement, error) {
mx.Logger.Infof("MeasureURL url=%s", URL)
m := &URLMeasurement{URL: URL}
@@ -756,7 +775,7 @@ func (mx *Measurer) MeasureURL(
return nil, errors.New("measurer: no configured resolver")
}
dnsBegin := time.Now()
for dns := range mx.LookupURLHostParallel(ctx, parsed, mx.Resolvers...) {
for dns := range mx.LookupURLHostParallel(ctx, parallelism, parsed, mx.Resolvers...) {
m.DNS = append(m.DNS, dns)
}
m.DNSRuntime = time.Since(dnsBegin)
@@ -774,12 +793,12 @@ func (mx *Measurer) MeasureURL(
mx.enforceAllowedHeadersOnly(epnts)
}
epntRuntime := time.Now()
for epnt := range mx.HTTPEndpointGetParallel(ctx, cookies, epnts...) {
for epnt := range mx.HTTPEndpointGetParallel(ctx, parallelism, cookies, epnts...) {
m.Endpoints = append(m.Endpoints, epnt)
}
switch parsed.Scheme {
case "https":
mx.maybeQUICFollowUp(ctx, m, cookies, epnts...)
mx.maybeQUICFollowUp(ctx, parallelism, m, cookies, epnts...)
default:
// nothing to do
}
@@ -792,7 +811,7 @@ func (mx *Measurer) MeasureURL(
// for QUIC. We query for HTTPSSvc but currently only Cloudflare
// implements this proposed standard. So, this function is
// where we take care of all the other servers implementing QUIC.
func (mx *Measurer) maybeQUICFollowUp(ctx context.Context,
func (mx *Measurer) maybeQUICFollowUp(ctx context.Context, parallelism int,
m *URLMeasurement, cookies http.CookieJar, epnts ...*HTTPEndpoint) {
altsvc := []string{}
for _, epnt := range m.Endpoints {
@@ -827,7 +846,7 @@ func (mx *Measurer) maybeQUICFollowUp(ctx context.Context,
continue
}
if parts[0] == "h3=\":443\"" {
mx.doQUICFollowUp(ctx, m, cookies, epnts...)
mx.doQUICFollowUp(ctx, parallelism, m, cookies, epnts...)
return
}
}
@@ -835,7 +854,7 @@ func (mx *Measurer) maybeQUICFollowUp(ctx context.Context,
}
// doQUICFollowUp runs when we know there's QUIC support via Alt-Svc.
func (mx *Measurer) doQUICFollowUp(ctx context.Context,
func (mx *Measurer) doQUICFollowUp(ctx context.Context, parallelism int,
m *URLMeasurement, cookies http.CookieJar, epnts ...*HTTPEndpoint) {
quicEpnts := []*HTTPEndpoint{}
// do not mutate the existing list rather create a new one
@@ -850,7 +869,7 @@ func (mx *Measurer) doQUICFollowUp(ctx context.Context,
Header: epnt.Header,
})
}
for mquic := range mx.HTTPEndpointGetParallel(ctx, cookies, quicEpnts...) {
for mquic := range mx.HTTPEndpointGetParallel(ctx, parallelism, cookies, quicEpnts...) {
m.Endpoints = append(m.Endpoints, mquic)
}
}
@@ -904,12 +923,12 @@ func (r *redirectionQueue) redirectionsCount() int {
// MeasureURLAndFollowRedirections is like MeasureURL except
// that it _also_ follows all the HTTP redirections.
func (mx *Measurer) MeasureURLAndFollowRedirections(ctx context.Context,
func (mx *Measurer) MeasureURLAndFollowRedirections(ctx context.Context, parallelism int,
URL string, headers http.Header, cookies http.CookieJar) <-chan *URLMeasurement {
out := make(chan *URLMeasurement)
go func() {
defer close(out)
meas, err := mx.MeasureURL(ctx, URL, headers, cookies)
meas, err := mx.MeasureURL(ctx, parallelism, URL, headers, cookies)
if err != nil {
mx.Logger.Warnf("mx.MeasureURL failed: %s", err.Error())
return
@@ -919,7 +938,7 @@ func (mx *Measurer) MeasureURLAndFollowRedirections(ctx context.Context,
const maxRedirects = 7
for !rq.empty() && rq.redirectionsCount() < maxRedirects {
URL = rq.popleft()
meas, err = mx.MeasureURL(ctx, URL, headers, cookies)
meas, err = mx.MeasureURL(ctx, parallelism, URL, headers, cookies)
if err != nil {
mx.Logger.Warnf("mx.MeasureURL failed: %s", err.Error())
return