diff --git a/internal/experiment/webconnectivity/cleartextflow.go b/internal/experiment/webconnectivity/cleartextflow.go index bae9859..97b9d00 100644 --- a/internal/experiment/webconnectivity/cleartextflow.go +++ b/internal/experiment/webconnectivity/cleartextflow.go @@ -38,10 +38,6 @@ type CleartextFlow struct { // Logger is the MANDATORY logger to use. Logger model.Logger - // Sema is the MANDATORY semaphore to allow just a single - // connection to perform the HTTP transaction. - Sema <-chan any - // TestKeys is MANDATORY and contains the TestKeys. TestKeys *TestKeys @@ -61,6 +57,10 @@ type CleartextFlow struct { // HostHeader is the OPTIONAL host header to use. HostHeader string + // PrioSelector is the OPTIONAL priority selector to use to determine + // whether this flow is allowed to fetch the webpage. + PrioSelector *prioritySelector + // Referer contains the OPTIONAL referer, used for redirects. Referer string @@ -113,11 +113,9 @@ func (t *CleartextFlow) Run(parentCtx context.Context, index int64) { alpn := "" // no ALPN because we're not using TLS - // Only allow N flows to _use_ the connection - select { - case <-t.Sema: - default: - ol.Stop(nil) + // Determine whether we're allowed to fetch the webpage + if t.PrioSelector == nil || !t.PrioSelector.permissionToFetch(t.Address) { + ol.Stop("stop after TCP connect") return } diff --git a/internal/experiment/webconnectivity/control.go b/internal/experiment/webconnectivity/control.go index 994fcbc..8824c96 100644 --- a/internal/experiment/webconnectivity/control.go +++ b/internal/experiment/webconnectivity/control.go @@ -17,17 +17,12 @@ import ( // EndpointMeasurementsStarter is used by Control to start extra // measurements using new IP addrs discovered by the TH. type EndpointMeasurementsStarter interface { - // startCleartextFlowsWithSema starts a TCP measurement flow for each IP addr. The [sema] - // argument allows to control how many flows are allowed to perform HTTP measurements. Every - // flow will attempt to read from [sema] and won't perform HTTP measurements if a - // nonblocking read fails. Hence, you must create a [sema] channel with buffer equal - // to N and N elements inside it to allow N flows to perform HTTP measurements. Passing - // a nil [sema] causes no flow to attempt HTTP measurements. - startCleartextFlowsWithSema(ctx context.Context, sema <-chan any, addresses []DNSEntry) + // startCleartextFlows starts a TCP measurement flow for each IP addr. The [ps] + // argument determines whether this flow will be allowed to fetch the webpage. + startCleartextFlows(ctx context.Context, ps *prioritySelector, addresses []DNSEntry) - // startSecureFlowsWithSema starts a TCP+TLS measurement flow for each IP addr. See - // the docs of startCleartextFlowsWithSema for more info on the [sema] arg. - startSecureFlowsWithSema(ctx context.Context, sema <-chan any, addresses []DNSEntry) + // startSecureFlows is like startCleartextFlows but for HTTPS. + startSecureFlows(ctx context.Context, ps *prioritySelector, addresses []DNSEntry) } // Control issues a Control request and saves the results @@ -46,6 +41,10 @@ type Control struct { // Logger is the MANDATORY logger to use. Logger model.Logger + // PrioSelector is the OPTIONAL priority selector to use to determine + // whether we will be allowed to fetch the webpage. + PrioSelector *prioritySelector + // TestKeys is MANDATORY and contains the TestKeys. TestKeys *TestKeys @@ -131,13 +130,13 @@ func (c *Control) Run(parentCtx context.Context) { return } - // if the TH returned us addresses we did not previously were - // aware of, make sure we also measure them - c.maybeStartExtraMeasurements(parentCtx, cresp.DNS.Addrs) - // on success, save the control response c.TestKeys.SetControl(&cresp) ol.Stop(nil) + + // if the TH returned us addresses we did not previously were + // aware of, make sure we also measure them + c.maybeStartExtraMeasurements(parentCtx, cresp.DNS.Addrs) } // This function determines whether we should start new @@ -175,14 +174,7 @@ func (c *Control) maybeStartExtraMeasurements(ctx context.Context, thAddrs []str }) } - // Start extra measurements for TH-only addresses. Because we already - // measured HTTP(S) using IP addrs discovered by the resolvers, we are not - // going to do that again now. I am not sure this is the right policy - // but I think we can just try it and then change if needed... - // - // Also, let's remember that reading from a nil chan blocks forever, so - // we're basically forcing the goroutines to avoid HTTP(S). - var nohttp chan any = nil - c.ExtraMeasurementsStarter.startCleartextFlowsWithSema(ctx, nohttp, thOnly) - c.ExtraMeasurementsStarter.startSecureFlowsWithSema(ctx, nohttp, thOnly) + // Start extra measurements for TH-only addresses. + c.ExtraMeasurementsStarter.startCleartextFlows(ctx, c.PrioSelector, thOnly) + c.ExtraMeasurementsStarter.startSecureFlows(ctx, c.PrioSelector, thOnly) } diff --git a/internal/experiment/webconnectivity/dnsresolvers.go b/internal/experiment/webconnectivity/dnsresolvers.go index 58a8871..856b4fa 100644 --- a/internal/experiment/webconnectivity/dnsresolvers.go +++ b/internal/experiment/webconnectivity/dnsresolvers.go @@ -171,10 +171,13 @@ func (t *DNSResolvers) Run(parentCtx context.Context) { log.Infof("using previously-cached addrs: %+v", addresses) } + // create priority selector + ps := newPrioritySelector(parentCtx, t.ZeroTime, t.TestKeys, t.Logger, t.WaitGroup, addresses) + // fan out a number of child async tasks to use the IP addrs - t.startCleartextFlows(parentCtx, addresses) - t.startSecureFlows(parentCtx, addresses) - t.maybeStartControlFlow(parentCtx, addresses) + t.startCleartextFlows(parentCtx, ps, addresses) + t.startSecureFlows(parentCtx, ps, addresses) + t.maybeStartControlFlow(parentCtx, ps, addresses) } // whoamiSystemV4 performs a DNS whoami lookup for the system resolver. This function must @@ -414,14 +417,11 @@ func (t *DNSResolvers) dohSplitQueries( } // startCleartextFlows starts a TCP measurement flow for each IP addr. -func (t *DNSResolvers) startCleartextFlows(ctx context.Context, addresses []DNSEntry) { - sema := make(chan any, 1) - sema <- true // allow a single flow to fetch the HTTP body - t.startCleartextFlowsWithSema(ctx, sema, addresses) -} - -// startCleartextFlowsWithSema implements EndpointMeasurementsStarter. -func (t *DNSResolvers) startCleartextFlowsWithSema(ctx context.Context, sema <-chan any, addresses []DNSEntry) { +func (t *DNSResolvers) startCleartextFlows( + ctx context.Context, + ps *prioritySelector, + addresses []DNSEntry, +) { if t.URL.Scheme != "http" { // Do not bother with measuring HTTP when the user // has asked us to measure an HTTPS URL. @@ -432,22 +432,18 @@ func (t *DNSResolvers) startCleartextFlowsWithSema(ctx context.Context, sema <-c port = urlPort } for _, addr := range addresses { - maybeNilSema := sema - if (addr.Flags & DNSAddrFlagSystemResolver) == 0 { - maybeNilSema = nil // see https://github.com/ooni/probe/issues/2258 - } task := &CleartextFlow{ Address: net.JoinHostPort(addr.Addr, port), DNSCache: t.DNSCache, IDGenerator: t.IDGenerator, Logger: t.Logger, - Sema: maybeNilSema, TestKeys: t.TestKeys, ZeroTime: t.ZeroTime, WaitGroup: t.WaitGroup, CookieJar: t.CookieJar, FollowRedirects: t.URL.Scheme == "http", HostHeader: t.URL.Host, + PrioSelector: ps, Referer: t.Referer, UDPAddress: t.UDPAddress, URLPath: t.URL.Path, @@ -458,19 +454,15 @@ func (t *DNSResolvers) startCleartextFlowsWithSema(ctx context.Context, sema <-c } // startSecureFlows starts a TCP+TLS measurement flow for each IP addr. -func (t *DNSResolvers) startSecureFlows(ctx context.Context, addresses []DNSEntry) { - sema := make(chan any, 1) - if t.URL.Scheme == "https" { - // Allows just a single worker to fetch the response body but do that - // only if the test-lists URL uses "https" as the scheme. Otherwise, just - // validate IPs by performing a TLS handshake. - sema <- true +func (t *DNSResolvers) startSecureFlows( + ctx context.Context, + ps *prioritySelector, + addresses []DNSEntry, +) { + if t.URL.Scheme != "https" { + // When the scheme is not HTTPS we fetch using HTTP + ps = nil } - t.startSecureFlowsWithSema(ctx, sema, addresses) -} - -// startSecureFlowsWithSema implements EndpointMeasurementsStarter. -func (t *DNSResolvers) startSecureFlowsWithSema(ctx context.Context, sema <-chan any, addresses []DNSEntry) { port := "443" if urlPort := t.URL.Port(); urlPort != "" { if t.URL.Scheme != "https" { @@ -481,16 +473,11 @@ func (t *DNSResolvers) startSecureFlowsWithSema(ctx context.Context, sema <-chan port = urlPort } for _, addr := range addresses { - maybeNilSema := sema - if (addr.Flags & DNSAddrFlagSystemResolver) == 0 { - maybeNilSema = nil // see https://github.com/ooni/probe/issues/2258 - } task := &SecureFlow{ Address: net.JoinHostPort(addr.Addr, port), DNSCache: t.DNSCache, IDGenerator: t.IDGenerator, Logger: t.Logger, - Sema: maybeNilSema, TestKeys: t.TestKeys, ZeroTime: t.ZeroTime, WaitGroup: t.WaitGroup, @@ -499,6 +486,7 @@ func (t *DNSResolvers) startSecureFlowsWithSema(ctx context.Context, sema <-chan FollowRedirects: t.URL.Scheme == "https", SNI: t.URL.Hostname(), HostHeader: t.URL.Host, + PrioSelector: ps, Referer: t.Referer, UDPAddress: t.UDPAddress, URLPath: t.URL.Path, @@ -509,7 +497,11 @@ func (t *DNSResolvers) startSecureFlowsWithSema(ctx context.Context, sema <-chan } // maybeStartControlFlow starts the control flow iff .Session and .THAddr are set. -func (t *DNSResolvers) maybeStartControlFlow(ctx context.Context, addresses []DNSEntry) { +func (t *DNSResolvers) maybeStartControlFlow( + ctx context.Context, + ps *prioritySelector, + addresses []DNSEntry, +) { // note: for subsequent requests we don't set .Session and .THAddr hence // we are not going to query the test helper more than once if t.Session != nil && t.THAddr != "" { @@ -521,6 +513,7 @@ func (t *DNSResolvers) maybeStartControlFlow(ctx context.Context, addresses []DN Addresses: addrs, ExtraMeasurementsStarter: t, // allows starting follow-up measurement flows Logger: t.Logger, + PrioSelector: ps, TestKeys: t.TestKeys, Session: t.Session, THAddr: t.THAddr, diff --git a/internal/experiment/webconnectivity/measurer.go b/internal/experiment/webconnectivity/measurer.go index ce41157..207dd1e 100644 --- a/internal/experiment/webconnectivity/measurer.go +++ b/internal/experiment/webconnectivity/measurer.go @@ -36,7 +36,7 @@ func (m *Measurer) ExperimentName() string { // ExperimentVersion implements model.ExperimentMeasurer. func (m *Measurer) ExperimentVersion() string { - return "0.5.6" + return "0.5.8" } // Run implements model.ExperimentMeasurer. @@ -46,6 +46,11 @@ func (m *Measurer) Run(ctx context.Context, sess model.ExperimentSession, // WILL NOT be submitted to the OONI backend. You SHOULD only return an error // for fundamental errors (e.g., the input is invalid or missing). + // make sure we have a cancellable context such that we can stop any + // goroutine running in the background (e.g., priority.go's ones) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + // honour InputOrQueryBackend input := measurement.Input if input == "" { diff --git a/internal/experiment/webconnectivity/priority.go b/internal/experiment/webconnectivity/priority.go new file mode 100644 index 0000000..4f84af1 --- /dev/null +++ b/internal/experiment/webconnectivity/priority.go @@ -0,0 +1,248 @@ +package webconnectivity + +// +// Determine which connection(s) are allowed to fetch the webpage +// by giving higher priority to the system resolver, then to the +// UDP resolver, then to the DoH resolver, then to the TH. +// +// This sorting reflects the likelyhood that we will se a blockpage +// because the system resolver is the most likely to be blocked +// (e.g., in Italy). The UDP resolver is also blocked in countries +// with more censorship (e.g., in China). The DoH resolver and +// the TH have more or less the same priority here, but we needed +// to choose one of them to have higher priority. +// +// Note that this functionality is where Web Connectivity LTE +// diverges from websteps, which will instead fetch all the available +// webpages. To adhere to the Web Connectivity model, we need to +// have a single fetch per redirect. However, by allowing all the +// resolvers plus the TH to provide us with addresses, we increase +// our chances of detecting more kinds of censorship. +// +// Also note that this implementation basically makes the +// https://github.com/ooni/probe/issues/2258 issue obsolete, +// since now we're considering all resolvers. +// +// See https://github.com/ooni/probe/issues/2276. +// + +import ( + "context" + "fmt" + "net" + "sync" + "time" + + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/runtimex" +) + +// prioritySelector selects the connection with the highest priority. +type prioritySelector struct { + // ch is the channel used to ask for priority + ch chan *priorityRequest + + // logger is the logger to use + logger model.Logger + + // m contains a map from known addresses to their flags + m map[string]int64 + + // nhttps is the number of addrs resolved using DoH + nhttps int + + // nsystem is the number of addrs resolved using the system resolver + nsystem int + + // nudp is the nunber of addrs resolver using UDP + nudp int + + // tk contains the TestKeys. + tk *TestKeys + + // zeroTime is the zero time of the current measurement + zeroTime time.Time +} + +// priorityRequest is a request to get priority for fetching the webpage +// over other concurrent connections that are doing the same. +type priorityRequest struct { + // addr is the address we're using + addr string + + // resp is the buffered channel where the response will appear + resp chan bool +} + +// newPrioritySelector creates a new prioritySelector instance. +func newPrioritySelector( + ctx context.Context, + zeroTime time.Time, + tk *TestKeys, + logger model.Logger, + wg *sync.WaitGroup, + addrs []DNSEntry, +) *prioritySelector { + ps := &prioritySelector{ + ch: make(chan *priorityRequest), + logger: logger, + m: map[string]int64{}, + nhttps: 0, + nsystem: 0, + nudp: 0, + tk: tk, + zeroTime: zeroTime, + } + ps.log("create with %+v", addrs) + for _, addr := range addrs { + flags := addr.Flags + ps.m[addr.Addr] = flags + if (flags & DNSAddrFlagSystemResolver) != 0 { + ps.nsystem++ + } + if (flags & DNSAddrFlagUDP) != 0 { + ps.nudp++ + } + if (flags & DNSAddrFlagHTTPS) != 0 { + ps.nhttps++ + } + } + wg.Add(1) + go ps.selector(ctx, wg) + return ps +} + +// log formats and emits a ConnPriorityLogEntry +func (ps *prioritySelector) log(format string, v ...any) { + ps.tk.AppendConnPriorityLogEntry(&ConnPriorityLogEntry{ + Msg: fmt.Sprintf(format, v...), + T: time.Since(ps.zeroTime).Seconds(), + }) + ps.logger.Infof("prioritySelector: "+format, v...) +} + +// permissionToFetch returns whether this ready-to-use connection +// is permitted to perform a round trip and fetch the webpage. +func (ps *prioritySelector) permissionToFetch(address string) bool { + ipAddr, _, err := net.SplitHostPort(address) + runtimex.PanicOnError(err, "net.SplitHostPort failed") + r := &priorityRequest{ + addr: ipAddr, + resp: make(chan bool, 1), // buffer to simplify selector() implementation + } + select { + case <-time.After(10 * time.Millisecond): + ps.log("conn %s: denied permission: timed out sending", address) + return false + case ps.ch <- r: + select { + case <-time.After(time.Second): + ps.log("conn %s: denied permission: timed out receiving", address) + return false + case v := <-r.resp: + ps.log("conn %s: granted permission: %+v", address, v) + return v + } + } +} + +// selector grants permission to the highest priority request that +// arrives within a reasonable time frame. This function runs into the +// background goroutine and terminates when [ctx] is done. +// +// This function implements https://github.com/ooni/probe/issues/2276. +func (ps *prioritySelector) selector(ctx context.Context, wg *sync.WaitGroup) { + // synchronize with the parent + defer wg.Done() + + // Implementation note: setting an arbitrary timeout here would + // be ~an issue because we want this goroutine to be available in + // case the only connections from which we could fetch a webpage + // are the ones using TH addresses. However, we know the TH could + // require a long time to complete due to timeouts caused by IP + // addresses provided by the probe. + // + // See https://explorer.ooni.org/measurement/20220911T105037Z_webconnectivity_IT_30722_n1_ruzuQ219SmIO9SrT?input=http%3A%2F%2Festrenosli.org%2F + // for a measurement where a too-short timeout prevented us from + // attempting to fetch a webpage from TH-resolved addrs. + // + // See https://explorer.ooni.org/measurement/20220911T194527Z_webconnectivity_IT_30722_n1_jufRZGay0Db9Ge4v?input=http%3A%2F%2Festrenosli.org%2F + // for a measurement where this issue was fixed. + + // await the first priority request + var first *priorityRequest + select { + case <-ctx.Done(): + return + case first = <-ps.ch: + } + + // if this request is highest priority, grant permission + if ps.isHighestPriority(first) { + first.resp <- true // buffered channel + return + } + + // collect additional requests for up to extraTime, thus giving + // a possibly higher priority connection time to establish + const extraTime = 500 * time.Millisecond + expired := time.NewTimer(extraTime) + defer expired.Stop() + requests := []*priorityRequest{first} +Loop: + for { + select { + case <-expired.C: + break Loop + case r := <-ps.ch: + requests = append(requests, r) + case <-ctx.Done(): + return + } + } + + // grant permission to the highest priority request + highPrio := ps.findHighestPriority(requests) + highPrio.resp <- true // buffered channel + + // deny permission to all the other inflight requests + for _, r := range requests { + if highPrio != r { + r.resp <- false // buffered channel + } + } +} + +// findHighestPriority returns the highest priority request +func (ps *prioritySelector) findHighestPriority(reqs []*priorityRequest) *priorityRequest { + runtimex.Assert(len(reqs) > 0, "findHighestPriority wants a non-empty reqs slice") + for _, r := range reqs { + if ps.isHighestPriority(r) { + return r + } + } + return reqs[0] +} + +// isHighestPriority returns whether this request is highest priority +func (ps *prioritySelector) isHighestPriority(r *priorityRequest) bool { + // See https://github.com/ooni/probe/issues/2276 + flags := ps.m[r.addr] + if ps.nsystem > 0 { + if (flags & DNSAddrFlagSystemResolver) != 0 { + return true + } + } else if ps.nudp > 0 { + if (flags & DNSAddrFlagUDP) != 0 { + return true + } + } else if ps.nhttps > 0 { + if (flags & DNSAddrFlagHTTPS) != 0 { + return true + } + } else { + // Happens when we only have addresses from the TH + return true + } + return false +} diff --git a/internal/experiment/webconnectivity/secureflow.go b/internal/experiment/webconnectivity/secureflow.go index ee19242..a1ab704 100644 --- a/internal/experiment/webconnectivity/secureflow.go +++ b/internal/experiment/webconnectivity/secureflow.go @@ -39,10 +39,6 @@ type SecureFlow struct { // Logger is the MANDATORY logger to use. Logger model.Logger - // Sema is the MANDATORY semaphore to allow just a single - // connection to perform the HTTP transaction. - Sema <-chan any - // TestKeys is MANDATORY and contains the TestKeys. TestKeys *TestKeys @@ -65,6 +61,10 @@ type SecureFlow struct { // HostHeader is the OPTIONAL host header to use. HostHeader string + // PrioSelector is the OPTIONAL priority selector to use to determine + // whether this flow is allowed to fetch the webpage. + PrioSelector *prioritySelector + // Referer contains the OPTIONAL referer, used for redirects. Referer string @@ -144,11 +144,9 @@ func (t *SecureFlow) Run(parentCtx context.Context, index int64) { alpn := tlsConnState.NegotiatedProtocol - // Only allow N flows to _use_ the connection - select { - case <-t.Sema: - default: - ol.Stop(nil) + // Determine whether we're allowed to fetch the webpage + if t.PrioSelector == nil || !t.PrioSelector.permissionToFetch(t.Address) { + ol.Stop("stop after TLS handshake") return } diff --git a/internal/experiment/webconnectivity/testkeys.go b/internal/experiment/webconnectivity/testkeys.go index 4db7f3e..94d6282 100644 --- a/internal/experiment/webconnectivity/testkeys.go +++ b/internal/experiment/webconnectivity/testkeys.go @@ -65,6 +65,10 @@ type TestKeys struct { // Control contains the TH's response. Control *webconnectivity.ControlResponse `json:"control"` + // ConnPriorityLog explains why Web Connectivity chose to use a given + // ready-to-use HTTP(S) connection among many. + ConnPriorityLog []*ConnPriorityLogEntry `json:"x_conn_priority_log"` + // ControlFailure contains the failure of the control experiment. ControlFailure *string `json:"control_failure"` @@ -128,6 +132,15 @@ type TestKeys struct { mu *sync.Mutex } +// ConnPriorityLogEntry is an entry in the TestKeys.ConnPriorityLog slice. +type ConnPriorityLogEntry struct { + // Msg is the specific log entry + Msg string `json:"msg"` + + // T is when this entry was generated + T float64 `json:"t"` +} + // DNSWhoamiInfoEntry contains an entry for DNSWhoamiInfo. type DNSWhoamiInfoEntry struct { // Address is the IP address @@ -278,6 +291,13 @@ func (tk *TestKeys) SetClientResolver(value string) { tk.mu.Unlock() } +// AppendConnPriorityLogEntry appends an entry to ConnPriorityLog. +func (tk *TestKeys) AppendConnPriorityLogEntry(entry *ConnPriorityLogEntry) { + tk.mu.Lock() + tk.ConnPriorityLog = append(tk.ConnPriorityLog, entry) + tk.mu.Unlock() +} + // NewTestKeys creates a new instance of TestKeys. func NewTestKeys() *TestKeys { return &TestKeys{ @@ -307,6 +327,7 @@ func NewTestKeys() *TestKeys { TCPConnect: []*model.ArchivalTCPConnectResult{}, TLSHandshakes: []*model.ArchivalTLSOrQUICHandshakeResult{}, Control: nil, + ConnPriorityLog: []*ConnPriorityLogEntry{}, ControlFailure: nil, DNSFlags: 0, DNSExperimentFailure: nil, diff --git a/internal/measurexlite/logger.go b/internal/measurexlite/logger.go index 53cecf4..aac482a 100644 --- a/internal/measurexlite/logger.go +++ b/internal/measurexlite/logger.go @@ -58,17 +58,22 @@ func (ol *OperationLogger) maybeEmitProgress() { } } -// Stop must be called when the operation is done. The [err] argument +// Stop must be called when the operation is done. The [value] argument // is the result of the operation, which may be nil. This method ensures // that we log the final result of the now-completed operation. -func (ol *OperationLogger) Stop(err error) { +func (ol *OperationLogger) Stop(value any) { ol.once.Do(func() { close(ol.sighup) ol.wg.Wait() - if err != nil { - ol.logger.Infof("%s... %s", ol.message, err.Error()) - return + if value != nil { + if err, okay := value.(error); okay { + ol.logger.Infof("%s... %s", ol.message, err.Error()) + return + } + // fallthrough + } else { + value = "ok" } - ol.logger.Infof("%s... ok", ol.message) + ol.logger.Infof("%s... %+v", ol.message, value) }) }