feat(webconnectivity@v0.5): get a webpage whenever possible (#950)

Implements https://github.com/ooni/probe/issues/2276 and supersedes https://github.com/ooni/probe-cli/pull/949.
This commit is contained in:
Simone Basso 2022-09-11 22:12:48 +02:00 committed by GitHub
parent 6b8b13344a
commit 5e75512396
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 343 additions and 83 deletions

View File

@ -38,10 +38,6 @@ type CleartextFlow struct {
// Logger is the MANDATORY logger to use.
Logger model.Logger
// Sema is the MANDATORY semaphore to allow just a single
// connection to perform the HTTP transaction.
Sema <-chan any
// TestKeys is MANDATORY and contains the TestKeys.
TestKeys *TestKeys
@ -61,6 +57,10 @@ type CleartextFlow struct {
// HostHeader is the OPTIONAL host header to use.
HostHeader string
// PrioSelector is the OPTIONAL priority selector to use to determine
// whether this flow is allowed to fetch the webpage.
PrioSelector *prioritySelector
// Referer contains the OPTIONAL referer, used for redirects.
Referer string
@ -113,11 +113,9 @@ func (t *CleartextFlow) Run(parentCtx context.Context, index int64) {
alpn := "" // no ALPN because we're not using TLS
// Only allow N flows to _use_ the connection
select {
case <-t.Sema:
default:
ol.Stop(nil)
// Determine whether we're allowed to fetch the webpage
if t.PrioSelector == nil || !t.PrioSelector.permissionToFetch(t.Address) {
ol.Stop("stop after TCP connect")
return
}

View File

@ -17,17 +17,12 @@ import (
// EndpointMeasurementsStarter is used by Control to start extra
// measurements using new IP addrs discovered by the TH.
type EndpointMeasurementsStarter interface {
// startCleartextFlowsWithSema starts a TCP measurement flow for each IP addr. The [sema]
// argument allows to control how many flows are allowed to perform HTTP measurements. Every
// flow will attempt to read from [sema] and won't perform HTTP measurements if a
// nonblocking read fails. Hence, you must create a [sema] channel with buffer equal
// to N and N elements inside it to allow N flows to perform HTTP measurements. Passing
// a nil [sema] causes no flow to attempt HTTP measurements.
startCleartextFlowsWithSema(ctx context.Context, sema <-chan any, addresses []DNSEntry)
// startCleartextFlows starts a TCP measurement flow for each IP addr. The [ps]
// argument determines whether this flow will be allowed to fetch the webpage.
startCleartextFlows(ctx context.Context, ps *prioritySelector, addresses []DNSEntry)
// startSecureFlowsWithSema starts a TCP+TLS measurement flow for each IP addr. See
// the docs of startCleartextFlowsWithSema for more info on the [sema] arg.
startSecureFlowsWithSema(ctx context.Context, sema <-chan any, addresses []DNSEntry)
// startSecureFlows is like startCleartextFlows but for HTTPS.
startSecureFlows(ctx context.Context, ps *prioritySelector, addresses []DNSEntry)
}
// Control issues a Control request and saves the results
@ -46,6 +41,10 @@ type Control struct {
// Logger is the MANDATORY logger to use.
Logger model.Logger
// PrioSelector is the OPTIONAL priority selector to use to determine
// whether we will be allowed to fetch the webpage.
PrioSelector *prioritySelector
// TestKeys is MANDATORY and contains the TestKeys.
TestKeys *TestKeys
@ -131,13 +130,13 @@ func (c *Control) Run(parentCtx context.Context) {
return
}
// if the TH returned us addresses we did not previously were
// aware of, make sure we also measure them
c.maybeStartExtraMeasurements(parentCtx, cresp.DNS.Addrs)
// on success, save the control response
c.TestKeys.SetControl(&cresp)
ol.Stop(nil)
// if the TH returned us addresses we did not previously were
// aware of, make sure we also measure them
c.maybeStartExtraMeasurements(parentCtx, cresp.DNS.Addrs)
}
// This function determines whether we should start new
@ -175,14 +174,7 @@ func (c *Control) maybeStartExtraMeasurements(ctx context.Context, thAddrs []str
})
}
// Start extra measurements for TH-only addresses. Because we already
// measured HTTP(S) using IP addrs discovered by the resolvers, we are not
// going to do that again now. I am not sure this is the right policy
// but I think we can just try it and then change if needed...
//
// Also, let's remember that reading from a nil chan blocks forever, so
// we're basically forcing the goroutines to avoid HTTP(S).
var nohttp chan any = nil
c.ExtraMeasurementsStarter.startCleartextFlowsWithSema(ctx, nohttp, thOnly)
c.ExtraMeasurementsStarter.startSecureFlowsWithSema(ctx, nohttp, thOnly)
// Start extra measurements for TH-only addresses.
c.ExtraMeasurementsStarter.startCleartextFlows(ctx, c.PrioSelector, thOnly)
c.ExtraMeasurementsStarter.startSecureFlows(ctx, c.PrioSelector, thOnly)
}

View File

@ -171,10 +171,13 @@ func (t *DNSResolvers) Run(parentCtx context.Context) {
log.Infof("using previously-cached addrs: %+v", addresses)
}
// create priority selector
ps := newPrioritySelector(parentCtx, t.ZeroTime, t.TestKeys, t.Logger, t.WaitGroup, addresses)
// fan out a number of child async tasks to use the IP addrs
t.startCleartextFlows(parentCtx, addresses)
t.startSecureFlows(parentCtx, addresses)
t.maybeStartControlFlow(parentCtx, addresses)
t.startCleartextFlows(parentCtx, ps, addresses)
t.startSecureFlows(parentCtx, ps, addresses)
t.maybeStartControlFlow(parentCtx, ps, addresses)
}
// whoamiSystemV4 performs a DNS whoami lookup for the system resolver. This function must
@ -414,14 +417,11 @@ func (t *DNSResolvers) dohSplitQueries(
}
// startCleartextFlows starts a TCP measurement flow for each IP addr.
func (t *DNSResolvers) startCleartextFlows(ctx context.Context, addresses []DNSEntry) {
sema := make(chan any, 1)
sema <- true // allow a single flow to fetch the HTTP body
t.startCleartextFlowsWithSema(ctx, sema, addresses)
}
// startCleartextFlowsWithSema implements EndpointMeasurementsStarter.
func (t *DNSResolvers) startCleartextFlowsWithSema(ctx context.Context, sema <-chan any, addresses []DNSEntry) {
func (t *DNSResolvers) startCleartextFlows(
ctx context.Context,
ps *prioritySelector,
addresses []DNSEntry,
) {
if t.URL.Scheme != "http" {
// Do not bother with measuring HTTP when the user
// has asked us to measure an HTTPS URL.
@ -432,22 +432,18 @@ func (t *DNSResolvers) startCleartextFlowsWithSema(ctx context.Context, sema <-c
port = urlPort
}
for _, addr := range addresses {
maybeNilSema := sema
if (addr.Flags & DNSAddrFlagSystemResolver) == 0 {
maybeNilSema = nil // see https://github.com/ooni/probe/issues/2258
}
task := &CleartextFlow{
Address: net.JoinHostPort(addr.Addr, port),
DNSCache: t.DNSCache,
IDGenerator: t.IDGenerator,
Logger: t.Logger,
Sema: maybeNilSema,
TestKeys: t.TestKeys,
ZeroTime: t.ZeroTime,
WaitGroup: t.WaitGroup,
CookieJar: t.CookieJar,
FollowRedirects: t.URL.Scheme == "http",
HostHeader: t.URL.Host,
PrioSelector: ps,
Referer: t.Referer,
UDPAddress: t.UDPAddress,
URLPath: t.URL.Path,
@ -458,19 +454,15 @@ func (t *DNSResolvers) startCleartextFlowsWithSema(ctx context.Context, sema <-c
}
// startSecureFlows starts a TCP+TLS measurement flow for each IP addr.
func (t *DNSResolvers) startSecureFlows(ctx context.Context, addresses []DNSEntry) {
sema := make(chan any, 1)
if t.URL.Scheme == "https" {
// Allows just a single worker to fetch the response body but do that
// only if the test-lists URL uses "https" as the scheme. Otherwise, just
// validate IPs by performing a TLS handshake.
sema <- true
func (t *DNSResolvers) startSecureFlows(
ctx context.Context,
ps *prioritySelector,
addresses []DNSEntry,
) {
if t.URL.Scheme != "https" {
// When the scheme is not HTTPS we fetch using HTTP
ps = nil
}
t.startSecureFlowsWithSema(ctx, sema, addresses)
}
// startSecureFlowsWithSema implements EndpointMeasurementsStarter.
func (t *DNSResolvers) startSecureFlowsWithSema(ctx context.Context, sema <-chan any, addresses []DNSEntry) {
port := "443"
if urlPort := t.URL.Port(); urlPort != "" {
if t.URL.Scheme != "https" {
@ -481,16 +473,11 @@ func (t *DNSResolvers) startSecureFlowsWithSema(ctx context.Context, sema <-chan
port = urlPort
}
for _, addr := range addresses {
maybeNilSema := sema
if (addr.Flags & DNSAddrFlagSystemResolver) == 0 {
maybeNilSema = nil // see https://github.com/ooni/probe/issues/2258
}
task := &SecureFlow{
Address: net.JoinHostPort(addr.Addr, port),
DNSCache: t.DNSCache,
IDGenerator: t.IDGenerator,
Logger: t.Logger,
Sema: maybeNilSema,
TestKeys: t.TestKeys,
ZeroTime: t.ZeroTime,
WaitGroup: t.WaitGroup,
@ -499,6 +486,7 @@ func (t *DNSResolvers) startSecureFlowsWithSema(ctx context.Context, sema <-chan
FollowRedirects: t.URL.Scheme == "https",
SNI: t.URL.Hostname(),
HostHeader: t.URL.Host,
PrioSelector: ps,
Referer: t.Referer,
UDPAddress: t.UDPAddress,
URLPath: t.URL.Path,
@ -509,7 +497,11 @@ func (t *DNSResolvers) startSecureFlowsWithSema(ctx context.Context, sema <-chan
}
// maybeStartControlFlow starts the control flow iff .Session and .THAddr are set.
func (t *DNSResolvers) maybeStartControlFlow(ctx context.Context, addresses []DNSEntry) {
func (t *DNSResolvers) maybeStartControlFlow(
ctx context.Context,
ps *prioritySelector,
addresses []DNSEntry,
) {
// note: for subsequent requests we don't set .Session and .THAddr hence
// we are not going to query the test helper more than once
if t.Session != nil && t.THAddr != "" {
@ -521,6 +513,7 @@ func (t *DNSResolvers) maybeStartControlFlow(ctx context.Context, addresses []DN
Addresses: addrs,
ExtraMeasurementsStarter: t, // allows starting follow-up measurement flows
Logger: t.Logger,
PrioSelector: ps,
TestKeys: t.TestKeys,
Session: t.Session,
THAddr: t.THAddr,

View File

@ -36,7 +36,7 @@ func (m *Measurer) ExperimentName() string {
// ExperimentVersion implements model.ExperimentMeasurer.
func (m *Measurer) ExperimentVersion() string {
return "0.5.6"
return "0.5.8"
}
// Run implements model.ExperimentMeasurer.
@ -46,6 +46,11 @@ func (m *Measurer) Run(ctx context.Context, sess model.ExperimentSession,
// WILL NOT be submitted to the OONI backend. You SHOULD only return an error
// for fundamental errors (e.g., the input is invalid or missing).
// make sure we have a cancellable context such that we can stop any
// goroutine running in the background (e.g., priority.go's ones)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// honour InputOrQueryBackend
input := measurement.Input
if input == "" {

View File

@ -0,0 +1,248 @@
package webconnectivity
//
// Determine which connection(s) are allowed to fetch the webpage
// by giving higher priority to the system resolver, then to the
// UDP resolver, then to the DoH resolver, then to the TH.
//
// This sorting reflects the likelyhood that we will se a blockpage
// because the system resolver is the most likely to be blocked
// (e.g., in Italy). The UDP resolver is also blocked in countries
// with more censorship (e.g., in China). The DoH resolver and
// the TH have more or less the same priority here, but we needed
// to choose one of them to have higher priority.
//
// Note that this functionality is where Web Connectivity LTE
// diverges from websteps, which will instead fetch all the available
// webpages. To adhere to the Web Connectivity model, we need to
// have a single fetch per redirect. However, by allowing all the
// resolvers plus the TH to provide us with addresses, we increase
// our chances of detecting more kinds of censorship.
//
// Also note that this implementation basically makes the
// https://github.com/ooni/probe/issues/2258 issue obsolete,
// since now we're considering all resolvers.
//
// See https://github.com/ooni/probe/issues/2276.
//
import (
"context"
"fmt"
"net"
"sync"
"time"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)
// prioritySelector selects the connection with the highest priority.
type prioritySelector struct {
// ch is the channel used to ask for priority
ch chan *priorityRequest
// logger is the logger to use
logger model.Logger
// m contains a map from known addresses to their flags
m map[string]int64
// nhttps is the number of addrs resolved using DoH
nhttps int
// nsystem is the number of addrs resolved using the system resolver
nsystem int
// nudp is the nunber of addrs resolver using UDP
nudp int
// tk contains the TestKeys.
tk *TestKeys
// zeroTime is the zero time of the current measurement
zeroTime time.Time
}
// priorityRequest is a request to get priority for fetching the webpage
// over other concurrent connections that are doing the same.
type priorityRequest struct {
// addr is the address we're using
addr string
// resp is the buffered channel where the response will appear
resp chan bool
}
// newPrioritySelector creates a new prioritySelector instance.
func newPrioritySelector(
ctx context.Context,
zeroTime time.Time,
tk *TestKeys,
logger model.Logger,
wg *sync.WaitGroup,
addrs []DNSEntry,
) *prioritySelector {
ps := &prioritySelector{
ch: make(chan *priorityRequest),
logger: logger,
m: map[string]int64{},
nhttps: 0,
nsystem: 0,
nudp: 0,
tk: tk,
zeroTime: zeroTime,
}
ps.log("create with %+v", addrs)
for _, addr := range addrs {
flags := addr.Flags
ps.m[addr.Addr] = flags
if (flags & DNSAddrFlagSystemResolver) != 0 {
ps.nsystem++
}
if (flags & DNSAddrFlagUDP) != 0 {
ps.nudp++
}
if (flags & DNSAddrFlagHTTPS) != 0 {
ps.nhttps++
}
}
wg.Add(1)
go ps.selector(ctx, wg)
return ps
}
// log formats and emits a ConnPriorityLogEntry
func (ps *prioritySelector) log(format string, v ...any) {
ps.tk.AppendConnPriorityLogEntry(&ConnPriorityLogEntry{
Msg: fmt.Sprintf(format, v...),
T: time.Since(ps.zeroTime).Seconds(),
})
ps.logger.Infof("prioritySelector: "+format, v...)
}
// permissionToFetch returns whether this ready-to-use connection
// is permitted to perform a round trip and fetch the webpage.
func (ps *prioritySelector) permissionToFetch(address string) bool {
ipAddr, _, err := net.SplitHostPort(address)
runtimex.PanicOnError(err, "net.SplitHostPort failed")
r := &priorityRequest{
addr: ipAddr,
resp: make(chan bool, 1), // buffer to simplify selector() implementation
}
select {
case <-time.After(10 * time.Millisecond):
ps.log("conn %s: denied permission: timed out sending", address)
return false
case ps.ch <- r:
select {
case <-time.After(time.Second):
ps.log("conn %s: denied permission: timed out receiving", address)
return false
case v := <-r.resp:
ps.log("conn %s: granted permission: %+v", address, v)
return v
}
}
}
// selector grants permission to the highest priority request that
// arrives within a reasonable time frame. This function runs into the
// background goroutine and terminates when [ctx] is done.
//
// This function implements https://github.com/ooni/probe/issues/2276.
func (ps *prioritySelector) selector(ctx context.Context, wg *sync.WaitGroup) {
// synchronize with the parent
defer wg.Done()
// Implementation note: setting an arbitrary timeout here would
// be ~an issue because we want this goroutine to be available in
// case the only connections from which we could fetch a webpage
// are the ones using TH addresses. However, we know the TH could
// require a long time to complete due to timeouts caused by IP
// addresses provided by the probe.
//
// See https://explorer.ooni.org/measurement/20220911T105037Z_webconnectivity_IT_30722_n1_ruzuQ219SmIO9SrT?input=http%3A%2F%2Festrenosli.org%2F
// for a measurement where a too-short timeout prevented us from
// attempting to fetch a webpage from TH-resolved addrs.
//
// See https://explorer.ooni.org/measurement/20220911T194527Z_webconnectivity_IT_30722_n1_jufRZGay0Db9Ge4v?input=http%3A%2F%2Festrenosli.org%2F
// for a measurement where this issue was fixed.
// await the first priority request
var first *priorityRequest
select {
case <-ctx.Done():
return
case first = <-ps.ch:
}
// if this request is highest priority, grant permission
if ps.isHighestPriority(first) {
first.resp <- true // buffered channel
return
}
// collect additional requests for up to extraTime, thus giving
// a possibly higher priority connection time to establish
const extraTime = 500 * time.Millisecond
expired := time.NewTimer(extraTime)
defer expired.Stop()
requests := []*priorityRequest{first}
Loop:
for {
select {
case <-expired.C:
break Loop
case r := <-ps.ch:
requests = append(requests, r)
case <-ctx.Done():
return
}
}
// grant permission to the highest priority request
highPrio := ps.findHighestPriority(requests)
highPrio.resp <- true // buffered channel
// deny permission to all the other inflight requests
for _, r := range requests {
if highPrio != r {
r.resp <- false // buffered channel
}
}
}
// findHighestPriority returns the highest priority request
func (ps *prioritySelector) findHighestPriority(reqs []*priorityRequest) *priorityRequest {
runtimex.Assert(len(reqs) > 0, "findHighestPriority wants a non-empty reqs slice")
for _, r := range reqs {
if ps.isHighestPriority(r) {
return r
}
}
return reqs[0]
}
// isHighestPriority returns whether this request is highest priority
func (ps *prioritySelector) isHighestPriority(r *priorityRequest) bool {
// See https://github.com/ooni/probe/issues/2276
flags := ps.m[r.addr]
if ps.nsystem > 0 {
if (flags & DNSAddrFlagSystemResolver) != 0 {
return true
}
} else if ps.nudp > 0 {
if (flags & DNSAddrFlagUDP) != 0 {
return true
}
} else if ps.nhttps > 0 {
if (flags & DNSAddrFlagHTTPS) != 0 {
return true
}
} else {
// Happens when we only have addresses from the TH
return true
}
return false
}

View File

@ -39,10 +39,6 @@ type SecureFlow struct {
// Logger is the MANDATORY logger to use.
Logger model.Logger
// Sema is the MANDATORY semaphore to allow just a single
// connection to perform the HTTP transaction.
Sema <-chan any
// TestKeys is MANDATORY and contains the TestKeys.
TestKeys *TestKeys
@ -65,6 +61,10 @@ type SecureFlow struct {
// HostHeader is the OPTIONAL host header to use.
HostHeader string
// PrioSelector is the OPTIONAL priority selector to use to determine
// whether this flow is allowed to fetch the webpage.
PrioSelector *prioritySelector
// Referer contains the OPTIONAL referer, used for redirects.
Referer string
@ -144,11 +144,9 @@ func (t *SecureFlow) Run(parentCtx context.Context, index int64) {
alpn := tlsConnState.NegotiatedProtocol
// Only allow N flows to _use_ the connection
select {
case <-t.Sema:
default:
ol.Stop(nil)
// Determine whether we're allowed to fetch the webpage
if t.PrioSelector == nil || !t.PrioSelector.permissionToFetch(t.Address) {
ol.Stop("stop after TLS handshake")
return
}

View File

@ -65,6 +65,10 @@ type TestKeys struct {
// Control contains the TH's response.
Control *webconnectivity.ControlResponse `json:"control"`
// ConnPriorityLog explains why Web Connectivity chose to use a given
// ready-to-use HTTP(S) connection among many.
ConnPriorityLog []*ConnPriorityLogEntry `json:"x_conn_priority_log"`
// ControlFailure contains the failure of the control experiment.
ControlFailure *string `json:"control_failure"`
@ -128,6 +132,15 @@ type TestKeys struct {
mu *sync.Mutex
}
// ConnPriorityLogEntry is an entry in the TestKeys.ConnPriorityLog slice.
type ConnPriorityLogEntry struct {
// Msg is the specific log entry
Msg string `json:"msg"`
// T is when this entry was generated
T float64 `json:"t"`
}
// DNSWhoamiInfoEntry contains an entry for DNSWhoamiInfo.
type DNSWhoamiInfoEntry struct {
// Address is the IP address
@ -278,6 +291,13 @@ func (tk *TestKeys) SetClientResolver(value string) {
tk.mu.Unlock()
}
// AppendConnPriorityLogEntry appends an entry to ConnPriorityLog.
func (tk *TestKeys) AppendConnPriorityLogEntry(entry *ConnPriorityLogEntry) {
tk.mu.Lock()
tk.ConnPriorityLog = append(tk.ConnPriorityLog, entry)
tk.mu.Unlock()
}
// NewTestKeys creates a new instance of TestKeys.
func NewTestKeys() *TestKeys {
return &TestKeys{
@ -307,6 +327,7 @@ func NewTestKeys() *TestKeys {
TCPConnect: []*model.ArchivalTCPConnectResult{},
TLSHandshakes: []*model.ArchivalTLSOrQUICHandshakeResult{},
Control: nil,
ConnPriorityLog: []*ConnPriorityLogEntry{},
ControlFailure: nil,
DNSFlags: 0,
DNSExperimentFailure: nil,

View File

@ -58,17 +58,22 @@ func (ol *OperationLogger) maybeEmitProgress() {
}
}
// Stop must be called when the operation is done. The [err] argument
// Stop must be called when the operation is done. The [value] argument
// is the result of the operation, which may be nil. This method ensures
// that we log the final result of the now-completed operation.
func (ol *OperationLogger) Stop(err error) {
func (ol *OperationLogger) Stop(value any) {
ol.once.Do(func() {
close(ol.sighup)
ol.wg.Wait()
if err != nil {
if value != nil {
if err, okay := value.(error); okay {
ol.logger.Infof("%s... %s", ol.message, err.Error())
return
}
ol.logger.Infof("%s... ok", ol.message)
// fallthrough
} else {
value = "ok"
}
ol.logger.Infof("%s... %+v", ol.message, value)
})
}