feat(measurex): refactored measurement library (#528)

This commit introduce a measurement library that consists of
refactored code from earlier websteps experiments.

I am not going to add tests for the time being, because this library
is still a bit in flux, as we finalize websteps.

I will soon though commit documentation explaining in detail how
to use it, which currrently is at https://github.com/ooni/probe-cli/pull/506
and adds a new directory to internal/tutorial.

The core idea of this measurement library is to allow two
measurement modes:

1. tracing, which is what we're currently doing now, and the
tutorial shows how we can rewrite the measurement part of web
connectivity with measurex using less code. Under a tracing
approach, we construct a normal http.Client that however has
tracing configured, we gather events for resolve, connect, TLS
handshake, QUIC handshake, HTTP round trip, etc. and then we
try to make sense of what happened from the events stream;

2. step-by-step, which is what websteps does, and basically
means that after each operation you immediately write into
a Measurement structure its results and immediately draw the
conclusions on what seems odd (which later may become an
anomaly if we see what the test helper measured).

This library is also such that it produces a data format
compatible with the current OONI spec.

This work is part of https://github.com/ooni/probe/issues/1733.
This commit is contained in:
Simone Basso 2021-09-30 01:24:08 +02:00 committed by GitHub
parent ff1c170562
commit 399d2f65da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 3008 additions and 0 deletions

View File

@ -0,0 +1,78 @@
package measurex
import (
"net/http"
"strings"
)
//
// Archival
//
// This file defines helpers to serialize to the OONI data format.
//
// ArchivalBinaryData is the archival format for binary data.
type ArchivalBinaryData struct {
Data []byte `json:"data"`
Format string `json:"format"`
}
// NewArchivalBinaryData builds a new ArchivalBinaryData
// from an array of bytes. If the array is nil, we return nil.
func NewArchivalBinaryData(data []byte) (out *ArchivalBinaryData) {
if len(data) > 0 {
out = &ArchivalBinaryData{
Data: data,
Format: "base64",
}
}
return
}
// ArchivalHeaders is a list of HTTP headers.
type ArchivalHeaders map[string]string
// Get searches for the first header with the named key
// and returns it. If not found, returns an empty string.
func (headers ArchivalHeaders) Get(key string) string {
return headers[strings.ToLower(key)]
}
// NewArchivalHeaders builds a new HeadersList from http.Header.
func NewArchivalHeaders(in http.Header) (out ArchivalHeaders) {
out = make(ArchivalHeaders)
for k, vv := range in {
for _, v := range vv {
// It breaks my hearth a little bit to ignore
// subsequent headers, but this does not happen
// very frequently, and I know the pipeline
// parses the map headers format only.
out[strings.ToLower(k)] = v
break
}
}
return
}
// NewArchivalTLSCertList builds a new []ArchivalBinaryData
// from a list of raw x509 certificates data.
func NewArchivalTLSCerts(in [][]byte) (out []*ArchivalBinaryData) {
for _, cert := range in {
out = append(out, &ArchivalBinaryData{
Data: cert,
Format: "base64",
})
}
return
}
// NewArchivalFailure creates an archival failure from an error. We
// cannot round trip an error using JSON, so we serialize to this
// intermediate format that is a sort of Optional<string>.
func NewArchivalFailure(err error) *string {
if err == nil {
return nil
}
s := err.Error()
return &s
}

View File

@ -0,0 +1,56 @@
package measurex
//
// Bogon
//
// This file helps us to decide if an IPAddr is a bogon.
//
// TODO(bassosimone): code in engine/netx should use this file.
import (
"net"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)
// isBogon returns whether if an IP address is bogon. Passing to this
// function a non-IP address causes it to return true.
func isBogon(address string) bool {
ip := net.ParseIP(address)
return ip == nil || isPrivate(ip)
}
var privateIPBlocks []*net.IPNet
func init() {
for _, cidr := range []string{
"0.0.0.0/8", // "This" network (however, Linux...)
"10.0.0.0/8", // RFC1918
"100.64.0.0/10", // Carrier grade NAT
"127.0.0.0/8", // IPv4 loopback
"169.254.0.0/16", // RFC3927 link-local
"172.16.0.0/12", // RFC1918
"192.168.0.0/16", // RFC1918
"224.0.0.0/4", // Multicast
"::1/128", // IPv6 loopback
"fe80::/10", // IPv6 link-local
"fc00::/7", // IPv6 unique local addr
} {
_, block, err := net.ParseCIDR(cidr)
runtimex.PanicOnError(err, "net.ParseCIDR failed")
privateIPBlocks = append(privateIPBlocks, block)
}
}
func isPrivate(ip net.IP) bool {
if ip.IsLoopback() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() {
return true
}
for _, block := range privateIPBlocks {
if block.Contains(ip) {
return true
}
}
return false
}

239
internal/measurex/db.go Normal file
View File

@ -0,0 +1,239 @@
package measurex
//
// DB
//
// This file defines two types:
//
// - WritableDB is the interface allowing networking code
// (e.g., Dialer to save measurement events);
//
// - MeasurementDB implements WritableDB and allows high-level
// code to generate a Measurement from all the events.
//
import "sync"
// WritableDB is an events "database" in which networking code
// (e.g., Dialer) can save measurement events (e.g., the result
// of a connect, a TLS handshake, a read).
type WritableDB interface {
// InsertIntoDial saves a Dial event.
InsertIntoDial(ev *NetworkEvent)
// InsertIntoReadWrite saves an I/O event.
InsertIntoReadWrite(ev *NetworkEvent)
// InsertIntoClose saves a close event.
InsertIntoClose(ev *NetworkEvent)
// InsertIntoTLSHandshake saves a TLS handshake event.
InsertIntoTLSHandshake(ev *TLSHandshakeEvent)
// InsertIntoLookupHost saves a lookup host event.
InsertIntoLookupHost(ev *DNSLookupEvent)
// InsertIntoLookupHTTPSvc saves an HTTPSvc lookup event.
InsertIntoLookupHTTPSSvc(ev *DNSLookupEvent)
// InsertIntoDNSRoundTrip saves a DNS round trip event.
InsertIntoDNSRoundTrip(ev *DNSRoundTripEvent)
// InsertIntoHTTPRoundTrip saves an HTTP round trip event.
InsertIntoHTTPRoundTrip(ev *HTTPRoundTripEvent)
// InsertIntoHTTPRedirect saves an HTTP redirect event.
InsertIntoHTTPRedirect(ev *HTTPRedirectEvent)
// InsertIntoQUICHandshake saves a QUIC handshake event.
InsertIntoQUICHandshake(ev *QUICHandshakeEvent)
}
// MeasurementDB is a WritableDB that also allows high-level code
// to generate a Measurement from all the saved events.
type MeasurementDB struct {
// database "tables"
dialTable []*NetworkEvent
readWriteTable []*NetworkEvent
closeTable []*NetworkEvent
tlsHandshakeTable []*TLSHandshakeEvent
lookupHostTable []*DNSLookupEvent
lookupHTTPSvcTable []*DNSLookupEvent
dnsRoundTripTable []*DNSRoundTripEvent
httpRoundTripTable []*HTTPRoundTripEvent
httpRedirectTable []*HTTPRedirectEvent
quicHandshakeTable []*QUICHandshakeEvent
// mu protects all the fields
mu sync.Mutex
}
var _ WritableDB = &MeasurementDB{}
// DeleteAll deletes all the content of the DB.
func (db *MeasurementDB) DeleteAll() {
db.mu.Lock()
db.dialTable = nil
db.readWriteTable = nil
db.closeTable = nil
db.tlsHandshakeTable = nil
db.lookupHostTable = nil
db.lookupHTTPSvcTable = nil
db.dnsRoundTripTable = nil
db.httpRoundTripTable = nil
db.httpRedirectTable = nil
db.quicHandshakeTable = nil
db.mu.Unlock()
}
// InsertIntoDial implements EventDB.InsertIntoDial.
func (db *MeasurementDB) InsertIntoDial(ev *NetworkEvent) {
db.mu.Lock()
db.dialTable = append(db.dialTable, ev)
db.mu.Unlock()
}
// selectAllFromDialUnlocked returns all dial events.
func (db *MeasurementDB) selectAllFromDialUnlocked() (out []*NetworkEvent) {
out = append(out, db.dialTable...)
return
}
// InsertIntoReadWrite implements EventDB.InsertIntoReadWrite.
func (db *MeasurementDB) InsertIntoReadWrite(ev *NetworkEvent) {
db.mu.Lock()
db.readWriteTable = append(db.readWriteTable, ev)
db.mu.Unlock()
}
// selectAllFromReadWriteUnlocked returns all I/O events.
func (db *MeasurementDB) selectAllFromReadWriteUnlocked() (out []*NetworkEvent) {
out = append(out, db.readWriteTable...)
return
}
// InsertIntoClose implements EventDB.InsertIntoClose.
func (db *MeasurementDB) InsertIntoClose(ev *NetworkEvent) {
db.mu.Lock()
db.closeTable = append(db.closeTable, ev)
db.mu.Unlock()
}
// selectAllFromCloseUnlocked returns all close events.
func (db *MeasurementDB) selectAllFromCloseUnlocked() (out []*NetworkEvent) {
out = append(out, db.closeTable...)
return
}
// InsertIntoTLSHandshake implements EventDB.InsertIntoTLSHandshake.
func (db *MeasurementDB) InsertIntoTLSHandshake(ev *TLSHandshakeEvent) {
db.mu.Lock()
db.tlsHandshakeTable = append(db.tlsHandshakeTable, ev)
db.mu.Unlock()
}
// selectAllFromTLSHandshakeUnlocked returns all TLS handshake events.
func (db *MeasurementDB) selectAllFromTLSHandshakeUnlocked() (out []*TLSHandshakeEvent) {
out = append(out, db.tlsHandshakeTable...)
return
}
// InsertIntoLookupHost implements EventDB.InsertIntoLookupHost.
func (db *MeasurementDB) InsertIntoLookupHost(ev *DNSLookupEvent) {
db.mu.Lock()
db.lookupHostTable = append(db.lookupHostTable, ev)
db.mu.Unlock()
}
// selectAllFromLookupHostUnlocked returns all the lookup host events.
func (db *MeasurementDB) selectAllFromLookupHostUnlocked() (out []*DNSLookupEvent) {
out = append(out, db.lookupHostTable...)
return
}
// InsertIntoHTTPSSvc implements EventDB.InsertIntoHTTPSSvc
func (db *MeasurementDB) InsertIntoLookupHTTPSSvc(ev *DNSLookupEvent) {
db.mu.Lock()
db.lookupHTTPSvcTable = append(db.lookupHTTPSvcTable, ev)
db.mu.Unlock()
}
// selectAllFromLookupHTTPSSvcUnlocked returns all HTTPSSvc lookup events.
func (db *MeasurementDB) selectAllFromLookupHTTPSSvcUnlocked() (out []*DNSLookupEvent) {
out = append(out, db.lookupHTTPSvcTable...)
return
}
// InsertIntoDNSRoundTrip implements EventDB.InsertIntoDNSRoundTrip.
func (db *MeasurementDB) InsertIntoDNSRoundTrip(ev *DNSRoundTripEvent) {
db.mu.Lock()
db.dnsRoundTripTable = append(db.dnsRoundTripTable, ev)
db.mu.Unlock()
}
// selectAllFromDNSRoundTripUnlocked returns all DNS round trip events.
func (db *MeasurementDB) selectAllFromDNSRoundTripUnlocked() (out []*DNSRoundTripEvent) {
out = append(out, db.dnsRoundTripTable...)
return
}
// InsertIntoHTTPRoundTrip implements EventDB.InsertIntoHTTPRoundTrip.
func (db *MeasurementDB) InsertIntoHTTPRoundTrip(ev *HTTPRoundTripEvent) {
db.mu.Lock()
db.httpRoundTripTable = append(db.httpRoundTripTable, ev)
db.mu.Unlock()
}
// selectAllFromHTTPRoundTripUnlocked returns all HTTP round trip events.
func (db *MeasurementDB) selectAllFromHTTPRoundTripUnlocked() (out []*HTTPRoundTripEvent) {
out = append(out, db.httpRoundTripTable...)
return
}
// InsertIntoHTTPRedirect implements EventDB.InsertIntoHTTPRedirect.
func (db *MeasurementDB) InsertIntoHTTPRedirect(ev *HTTPRedirectEvent) {
db.mu.Lock()
db.httpRedirectTable = append(db.httpRedirectTable, ev)
db.mu.Unlock()
}
// selectAllFromHTTPRedirectUnlocked returns all HTTP redirections.
func (db *MeasurementDB) selectAllFromHTTPRedirectUnlocked() (out []*HTTPRedirectEvent) {
out = append(out, db.httpRedirectTable...)
return
}
// InsertIntoQUICHandshake implements EventDB.InsertIntoQUICHandshake.
func (db *MeasurementDB) InsertIntoQUICHandshake(ev *QUICHandshakeEvent) {
db.mu.Lock()
db.quicHandshakeTable = append(db.quicHandshakeTable, ev)
db.mu.Unlock()
}
// selectAllFromQUICHandshakeUnlocked returns all QUIC handshake events.
func (db *MeasurementDB) selectAllFromQUICHandshakeUnlocked() (out []*QUICHandshakeEvent) {
out = append(out, db.quicHandshakeTable...)
return
}
// AsMeasurement converts the current state of the database into
// a finalized Measurement structure. The original events will remain
// into the database. To start a new measurement cycle, just create
// a new MeasurementDB instance and use that.
func (db *MeasurementDB) AsMeasurement() *Measurement {
db.mu.Lock()
meas := &Measurement{
Connect: db.selectAllFromDialUnlocked(),
ReadWrite: db.selectAllFromReadWriteUnlocked(),
Close: db.selectAllFromCloseUnlocked(),
TLSHandshake: db.selectAllFromTLSHandshakeUnlocked(),
QUICHandshake: db.selectAllFromQUICHandshakeUnlocked(),
LookupHost: db.selectAllFromLookupHostUnlocked(),
LookupHTTPSSvc: db.selectAllFromLookupHTTPSSvcUnlocked(),
DNSRoundTrip: db.selectAllFromDNSRoundTripUnlocked(),
HTTPRoundTrip: db.selectAllFromHTTPRoundTripUnlocked(),
HTTPRedirect: db.selectAllFromHTTPRedirectUnlocked(),
}
db.mu.Unlock()
return meas
}

167
internal/measurex/dialer.go Normal file
View File

@ -0,0 +1,167 @@
package measurex
//
// Dialer
//
// Wrappers for Dialer and Conn to store events into a WritableDB.
//
import (
"context"
"net"
"time"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)
// Conn is a network connection.
type Conn = net.Conn
// Dialer dials network connections.
type Dialer = netxlite.Dialer
// WrapDialer creates a new dialer that writes events
// into the given WritableDB. The net.Conns created by
// a wrapped dialer also write into the WritableDB.
func (mx *Measurer) WrapDialer(db WritableDB, dialer netxlite.Dialer) Dialer {
return WrapDialer(mx.Begin, db, dialer)
}
// WrapDialer wraps a dialer.
func WrapDialer(begin time.Time, db WritableDB, dialer netxlite.Dialer) Dialer {
return &dialerDB{Dialer: dialer, db: db, begin: begin}
}
// NewDialerWithSystemResolver creates a
func (mx *Measurer) NewDialerWithSystemResolver(db WritableDB, logger Logger) Dialer {
r := mx.NewResolverSystem(db, logger)
return mx.WrapDialer(db, netxlite.NewDialerWithResolver(logger, r))
}
// NewDialerWithoutResolver is a convenience factory for creating
// a dialer that saves measurements into the DB and that is not attached
// to any resolver (hence only works when passed IP addresses).
func (mx *Measurer) NewDialerWithoutResolver(db WritableDB, logger Logger) Dialer {
return mx.WrapDialer(db, netxlite.NewDialerWithoutResolver(logger))
}
type dialerDB struct {
netxlite.Dialer
begin time.Time
db WritableDB
}
// NetworkEvent contains a network event. This kind of events
// are generated by Dialer, QUICDialer, Conn, QUICConn.
type NetworkEvent struct {
// JSON names compatible with df-008-netevents
RemoteAddr string `json:"address"`
Failure *string `json:"failure"`
Count int `json:"num_bytes,omitempty"`
Operation string `json:"operation"`
Network string `json:"proto"`
Finished float64 `json:"t"`
Started float64 `json:"started"`
// Names that are not part of the spec.
Oddity Oddity `json:"oddity"`
}
func (d *dialerDB) DialContext(
ctx context.Context, network, address string) (Conn, error) {
started := time.Since(d.begin).Seconds()
conn, err := d.Dialer.DialContext(ctx, network, address)
finished := time.Since(d.begin).Seconds()
d.db.InsertIntoDial(&NetworkEvent{
Operation: "connect",
Network: network,
RemoteAddr: address,
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Oddity: d.computeOddity(err),
Count: 0,
})
if err != nil {
return nil, err
}
return &connDB{
Conn: conn,
begin: d.begin,
db: d.db,
network: network,
remoteAddr: address,
}, nil
}
func (c *dialerDB) computeOddity(err error) Oddity {
if err == nil {
return ""
}
switch err.Error() {
case netxlite.FailureGenericTimeoutError:
return OddityTCPConnectTimeout
case netxlite.FailureConnectionRefused:
return OddityTCPConnectRefused
case netxlite.FailureHostUnreachable:
return OddityTCPConnectHostUnreachable
default:
return OddityTCPConnectOher
}
}
type connDB struct {
net.Conn
begin time.Time
db WritableDB
network string
remoteAddr string
}
func (c *connDB) Read(b []byte) (int, error) {
started := time.Since(c.begin).Seconds()
count, err := c.Conn.Read(b)
finished := time.Since(c.begin).Seconds()
c.db.InsertIntoReadWrite(&NetworkEvent{
Operation: "read",
Network: c.network,
RemoteAddr: c.remoteAddr,
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Count: count,
})
return count, err
}
func (c *connDB) Write(b []byte) (int, error) {
started := time.Since(c.begin).Seconds()
count, err := c.Conn.Write(b)
finished := time.Since(c.begin).Seconds()
c.db.InsertIntoReadWrite(&NetworkEvent{
Operation: "write",
Network: c.network,
RemoteAddr: c.remoteAddr,
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Count: count,
})
return count, err
}
func (c *connDB) Close() error {
started := time.Since(c.begin).Seconds()
err := c.Conn.Close()
finished := time.Since(c.begin).Seconds()
c.db.InsertIntoClose(&NetworkEvent{
Operation: "close",
Network: c.network,
RemoteAddr: c.remoteAddr,
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Count: 0,
})
return err
}

60
internal/measurex/dnsx.go Normal file
View File

@ -0,0 +1,60 @@
package measurex
//
// DNSX (DNS eXtensions)
//
// We wrap dnsx.RoundTripper to store events into a WritableDB.
//
import (
"context"
"time"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)
// DNSXRoundTripper is a transport for sending raw DNS queries
// and receiving raw DNS replies. The internal/netxlite/dnsx
// package implements a bunch of these transports.
type DNSTransport = netxlite.DNSTransport
// WrapDNSXRoundTripper creates a new DNSXRoundTripper that
// saves events into the given WritableDB.
func (mx *Measurer) WrapDNSXRoundTripper(db WritableDB, rtx netxlite.DNSTransport) DNSTransport {
return &dnsxRoundTripperDB{db: db, DNSTransport: rtx, begin: mx.Begin}
}
type dnsxRoundTripperDB struct {
netxlite.DNSTransport
begin time.Time
db WritableDB
}
// DNSRoundTripEvent contains the result of a DNS round trip.
type DNSRoundTripEvent struct {
// This data structure is not in df-002-dns but the names and
// semantics try to be consistent with such a spec.
Network string `json:"engine"`
Address string `json:"resolver_address"`
Query *ArchivalBinaryData `json:"raw_query"`
Started float64 `json:"started"`
Finished float64 `json:"t"`
Failure *string `json:"failure"`
Reply *ArchivalBinaryData `json:"raw_reply"`
}
func (txp *dnsxRoundTripperDB) RoundTrip(ctx context.Context, query []byte) ([]byte, error) {
started := time.Since(txp.begin).Seconds()
reply, err := txp.DNSTransport.RoundTrip(ctx, query)
finished := time.Since(txp.begin).Seconds()
txp.db.InsertIntoDNSRoundTrip(&DNSRoundTripEvent{
Network: txp.DNSTransport.Network(),
Address: txp.DNSTransport.Address(),
Query: NewArchivalBinaryData(query),
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Reply: NewArchivalBinaryData(reply),
})
return reply, err
}

2
internal/measurex/doc.go Normal file
View File

@ -0,0 +1,2 @@
// Package measurex contains measurement extensions.
package measurex

View File

@ -0,0 +1,67 @@
package measurex
import (
"fmt"
"net/http"
"net/url"
)
//
// Endpoint
//
// This file contains the definition of Endpoint and HTTPEndpoint
//
// EndpointNetwork is the network of an endpoint.
type EndpointNetwork string
const (
// NetworkTCP identifies endpoints using TCP.
NetworkTCP = EndpointNetwork("tcp")
// NetworkQUIC identifies endpoints using QUIC.
NetworkQUIC = EndpointNetwork("quic")
)
// Endpoint is an endpoint for a domain.
type Endpoint struct {
// Network is the network (e.g., "tcp", "quic")
Network EndpointNetwork
// Address is the endpoint address (e.g., "8.8.8.8:443")
Address string
}
// String converts an endpoint to a string (e.g., "8.8.8.8:443/tcp")
func (e *Endpoint) String() string {
return fmt.Sprintf("%s/%s", e.Address, e.Network)
}
// HTTPEndpoint is an HTTP/HTTPS/HTTP3 endpoint.
type HTTPEndpoint struct {
// Domain is the endpoint domain (e.g., "dns.google").
Domain string
// Network is the network (e.g., "tcp" or "quic").
Network EndpointNetwork
// Address is the endpoint address (e.g., "8.8.8.8:443").
Address string
// SNI is the SNI to use (only used with URL.scheme == "https").
SNI string
// ALPN is the ALPN to use (only used with URL.scheme == "https").
ALPN []string
// URL is the endpoint URL.
URL *url.URL
// Header contains request headers.
Header http.Header
}
// String converts an HTTP endpoint to a string (e.g., "8.8.8.8:443/tcp")
func (e *HTTPEndpoint) String() string {
return fmt.Sprintf("%s/%s", e.Address, e.Network)
}

318
internal/measurex/http.go Normal file
View File

@ -0,0 +1,318 @@
package measurex
//
// HTTP
//
// This file contains basic networking code. We provide:
//
// - a wrapper for netxlite.HTTPTransport that stores
// round trip events into an EventDB
//
// - an interface that is http.Client like and one internal
// implementation of such an interface that helps us to
// store HTTP redirections info into an EventDB
//
import (
"bytes"
"context"
"crypto/tls"
"errors"
"io"
"net/http"
"net/http/cookiejar"
"net/url"
"time"
"unicode/utf8"
"github.com/lucas-clemente/quic-go"
"github.com/ooni/probe-cli/v3/internal/engine/httpheader"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/runtimex"
"golang.org/x/net/publicsuffix"
)
// HTTPTransport is the HTTP transport type we use.
type HTTPTransport = netxlite.HTTPTransport
// WrapHTTPTransport creates a new transport that saves
// HTTP events into the WritableDB.
func (mx *Measurer) WrapHTTPTransport(
db WritableDB, txp HTTPTransport) *HTTPTransportDB {
return WrapHTTPTransport(mx.Begin, db, txp)
}
// We only read a small snapshot of the body to keep measurements
// lean, since we're mostly interested in TLS interference nowadays
// but we'll also allow for reading more bytes from the conn.
const httpMaxBodySnapshot = 1 << 11
func WrapHTTPTransport(
begin time.Time, db WritableDB, txp HTTPTransport) *HTTPTransportDB {
return &HTTPTransportDB{
HTTPTransport: txp,
Begin: begin,
DB: db,
MaxBodySnapshotSize: httpMaxBodySnapshot,
}
}
// NewHTTPTransportWithConn creates and wraps an HTTPTransport that
// does not dial and only uses the given conn.
func (mx *Measurer) NewHTTPTransportWithConn(
logger Logger, db WritableDB, conn Conn) *HTTPTransportDB {
return mx.WrapHTTPTransport(db, netxlite.NewHTTPTransport(
logger, netxlite.NewSingleUseDialer(conn), netxlite.NewNullTLSDialer()))
}
// NewHTTPTransportWithTLSConn creates and wraps an HTTPTransport that
// does not dial and only uses the given conn.
func (mx *Measurer) NewHTTPTransportWithTLSConn(
logger Logger, db WritableDB, conn netxlite.TLSConn) *HTTPTransportDB {
return mx.WrapHTTPTransport(db, netxlite.NewHTTPTransport(
logger, netxlite.NewNullDialer(), netxlite.NewSingleUseTLSDialer(conn)))
}
// NewHTTPTransportWithQUICSess creates and wraps an HTTPTransport that
// does not dial and only uses the given QUIC session.
func (mx *Measurer) NewHTTPTransportWithQUICSess(
logger Logger, db WritableDB, sess quic.EarlySession) *HTTPTransportDB {
return mx.WrapHTTPTransport(db, netxlite.NewHTTP3Transport(
logger, netxlite.NewSingleUseQUICDialer(sess), &tls.Config{}))
}
// HTTPTransportDB is an implementation of HTTPTransport that
// writes measurement events into a WritableDB.
//
// There are many factories to construct this data type. Otherwise,
// you can construct it manually. In which case, do not modify
// public fields during usage, since this may cause a data race.
type HTTPTransportDB struct {
netxlite.HTTPTransport
// Begin is when we started measuring.
Begin time.Time
// DB is where to write events.
DB WritableDB
// MaxBodySnapshotSize is the maximum size of the body
// snapshot that we take during a round trip.
MaxBodySnapshotSize int64
}
// HTTPRequest is the HTTP request.
type HTTPRequest struct {
// Names consistent with df-001-http.md
Method string `json:"method"`
URL string `json:"url"`
Headers ArchivalHeaders `json:"headers"`
}
// HTTPResponse is the HTTP response.
type HTTPResponse struct {
// Names consistent with df-001-http.md
Code int64 `json:"code"`
Headers ArchivalHeaders `json:"headers"`
Body *ArchivalBinaryData `json:"body"`
BodyIsTruncated bool `json:"body_is_truncated"`
// Fields not part of the spec
BodyLength int64 `json:"x_body_length"`
BodyIsUTF8 bool `json:"x_body_is_utf8"`
}
// HTTPRoundTripEvent contains information about an HTTP round trip.
type HTTPRoundTripEvent struct {
// JSON names following the df-001-httpt data format.
Failure *string `json:"failure"`
Request *HTTPRequest `json:"request"`
Response *HTTPResponse `json:"response"`
Finished float64 `json:"t"`
Started float64 `json:"started"`
// Names not in the specification
Oddity Oddity `json:"oddity"`
}
func (txp *HTTPTransportDB) RoundTrip(req *http.Request) (*http.Response, error) {
started := time.Since(txp.Begin).Seconds()
resp, err := txp.HTTPTransport.RoundTrip(req)
rt := &HTTPRoundTripEvent{
Request: &HTTPRequest{
Method: req.Method,
URL: req.URL.String(),
Headers: NewArchivalHeaders(req.Header),
},
Started: started,
}
if err != nil {
rt.Finished = time.Since(txp.Begin).Seconds()
rt.Failure = NewArchivalFailure(err)
txp.DB.InsertIntoHTTPRoundTrip(rt)
return nil, err
}
switch {
case resp.StatusCode == 403:
rt.Oddity = OddityStatus403
case resp.StatusCode == 404:
rt.Oddity = OddityStatus404
case resp.StatusCode == 503:
rt.Oddity = OddityStatus503
case resp.StatusCode >= 400:
rt.Oddity = OddityStatusOther
}
rt.Response = &HTTPResponse{
Code: int64(resp.StatusCode),
Headers: NewArchivalHeaders(resp.Header),
}
r := io.LimitReader(resp.Body, txp.MaxBodySnapshotSize)
body, err := netxlite.ReadAllContext(req.Context(), r)
if errors.Is(err, io.EOF) && resp.Close {
err = nil // we expected to see an EOF here, so no real error
}
if err != nil {
rt.Finished = time.Since(txp.Begin).Seconds()
rt.Failure = NewArchivalFailure(err)
txp.DB.InsertIntoHTTPRoundTrip(rt)
return nil, err
}
resp.Body = &httpTransportBody{ // allow for reading more if needed
Reader: io.MultiReader(bytes.NewReader(body), resp.Body),
Closer: resp.Body,
}
rt.Response.Body = NewArchivalBinaryData(body)
rt.Response.BodyLength = int64(len(body))
rt.Response.BodyIsTruncated = int64(len(body)) >= txp.MaxBodySnapshotSize
rt.Response.BodyIsUTF8 = utf8.Valid(body)
rt.Finished = time.Since(txp.Begin).Seconds()
txp.DB.InsertIntoHTTPRoundTrip(rt)
return resp, nil
}
type httpTransportBody struct {
io.Reader
io.Closer
}
// HTTPClient is the HTTP client type we use. This interface is
// compatible with http.Client. What changes in this kind of clients
// is that we'll insert redirection events into the WritableDB.
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
CloseIdleConnections()
}
// NewHTTPClient creates a new HTTPClient instance that
// does not automatically perform redirects.
func NewHTTPClientWithoutRedirects(
db WritableDB, jar http.CookieJar, txp HTTPTransport) HTTPClient {
return newHTTPClient(db, jar, txp, http.ErrUseLastResponse)
}
// NewHTTPClientWithRedirects creates a new HTTPClient
// instance that automatically perform redirects.
func NewHTTPClientWithRedirects(
db WritableDB, jar http.CookieJar, txp HTTPTransport) HTTPClient {
return newHTTPClient(db, jar, txp, nil)
}
// HTTPRedirectEvent records an HTTP redirect.
type HTTPRedirectEvent struct {
// URL is the URL triggering the redirect.
URL *url.URL
// Location is the URL to which we're redirected.
Location *url.URL
// Cookies contains the cookies for Location.
Cookies []*http.Cookie
// The Error field can have three values:
//
// - nil if the redirect occurred;
//
// - ErrHTTPTooManyRedirects when we see too many redirections;
//
// - http.ErrUseLastResponse if redirections are disabled.
Error error
}
// ErrHTTPTooManyRedirects is the unexported error that the standard library
// would return when hitting too many redirects.
var ErrHTTPTooManyRedirects = errors.New("stopped after 10 redirects")
func newHTTPClient(db WritableDB, cookiejar http.CookieJar,
txp HTTPTransport, defaultErr error) HTTPClient {
return &httpClientErrWrapper{&http.Client{
Transport: txp,
Jar: cookiejar,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
err := defaultErr
if len(via) >= 10 {
err = ErrHTTPTooManyRedirects
}
db.InsertIntoHTTPRedirect(&HTTPRedirectEvent{
URL: via[0].URL, // bug in Go stdlib if we crash here
Location: req.URL,
Cookies: cookiejar.Cookies(req.URL),
Error: err,
})
return err
},
}}
}
type httpClientErrWrapper struct {
HTTPClient
}
func (c *httpClientErrWrapper) Do(req *http.Request) (*http.Response, error) {
resp, err := c.HTTPClient.Do(req)
if err != nil {
err = netxlite.NewTopLevelGenericErrWrapper(err)
}
return resp, err
}
// NewCookieJar is a convenience factory for creating an http.CookieJar
// that is aware of the effective TLS / public suffix list. This
// means that the jar won't allow a domain to set cookies for another
// unrelated domain (in the public-suffix-list sense).
func NewCookieJar() http.CookieJar {
jar, err := cookiejar.New(&cookiejar.Options{
PublicSuffixList: publicsuffix.List,
})
// Safe to PanicOnError here: cookiejar.New _always_ returns nil.
runtimex.PanicOnError(err, "cookiejar.New failed")
return jar
}
// NewHTTPRequestHeaderForMeasuring returns an http.Header where
// the headers are the ones we use for measuring.
func NewHTTPRequestHeaderForMeasuring() http.Header {
h := http.Header{}
h.Set("Accept", httpheader.Accept())
h.Set("Accept-Language", httpheader.AcceptLanguage())
h.Set("User-Agent", httpheader.UserAgent())
return h
}
// NewHTTPRequestWithContext is a convenience factory for creating
// a new HTTP request with the typical headers we use when performing
// measurements already set inside of req.Header.
func NewHTTPRequestWithContext(ctx context.Context,
method, URL string, body io.Reader) (*http.Request, error) {
req, err := http.NewRequestWithContext(ctx, method, URL, body)
if err != nil {
return nil, err
}
req.Header = NewHTTPRequestHeaderForMeasuring()
return req, nil
}
// NewHTTPGetRequest is a convenience factory for creating a new
// http.Request using the GET method and the given URL.
func NewHTTPGetRequest(ctx context.Context, URL string) (*http.Request, error) {
return NewHTTPRequestWithContext(ctx, "GET", URL, nil)
}

View File

@ -0,0 +1,75 @@
package measurex
//
// Logger
//
// Code for logging
//
import (
"fmt"
"sync"
"time"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)
// Logger is the logger type we use. This type is compatible
// with the logger type of github.com/apex/log.
type Logger interface {
netxlite.Logger
Info(msg string)
Infof(format string, v ...interface{})
Warn(msg string)
Warnf(format string, v ...interface{})
}
// NewOperationLogger creates a new logger that logs
// about an in-progress operation.
func NewOperationLogger(logger Logger, format string, v ...interface{}) *OperationLogger {
ol := &OperationLogger{
sighup: make(chan interface{}),
logger: logger,
once: &sync.Once{},
message: fmt.Sprintf(format, v...),
wg: &sync.WaitGroup{},
}
ol.wg.Add(1)
go ol.logloop()
return ol
}
// OperationLogger logs about an in-progress operation
type OperationLogger struct {
logger Logger
message string
once *sync.Once
sighup chan interface{}
wg *sync.WaitGroup
}
func (ol *OperationLogger) logloop() {
defer ol.wg.Done()
timer := time.NewTimer(500 * time.Millisecond)
defer timer.Stop()
select {
case <-timer.C:
ol.logger.Infof("%s... in progress", ol.message)
case <-ol.sighup:
// we'll emit directly in stop
}
}
func (ol *OperationLogger) Stop(err error) {
ol.once.Do(func() {
close(ol.sighup)
ol.wg.Wait()
if err != nil {
ol.logger.Infof("%s... %s", ol.message, err.Error())
return
}
ol.logger.Infof("%s... ok", ol.message)
})
}

View File

@ -0,0 +1,264 @@
package measurex
import (
"net"
"net/http"
"net/url"
"time"
)
//
// Measurement
//
// Here we define the fundamental measurement types
// produced by this package.
//
// URLMeasurement is the measurement of a whole URL. It contains
// a bunch of measurements detailing each measurement step.
type URLMeasurement struct {
// URL is the URL we're measuring.
URL string `json:"url"`
// DNS contains all the DNS related measurements.
DNS []*DNSMeasurement `json:"dns"`
// Endpoints contains a measurement for each endpoint
// that we discovered via DNS or TH.
Endpoints []*HTTPEndpointMeasurement `json:"endpoints"`
// RedirectURLs contain the URLs to which we should fetch
// if we choose to follow redirections.
RedirectURLs []string `json:"-"`
// THMeasurement is the measurement collected by the TH.
TH interface{} `json:"th,omitempty"`
// TotalRuntime is the total time to measure this URL.
TotalRuntime time.Duration `json:"-"`
// DNSRuntime is the time to run all DNS checks.
DNSRuntime time.Duration `json:"x_dns_runtime"`
// THRuntime is the total time to invoke all test helpers.
THRuntime time.Duration `json:"x_th_runtime"`
// EpntsRuntime is the total time to check all the endpoints.
EpntsRuntime time.Duration `json:"x_epnts_runtime"`
}
// fillRedirects takes in input a complete URLMeasurement and fills
// the field named Redirects with all redirections.
func (m *URLMeasurement) fillRedirects() {
dups := make(map[string]bool)
for _, epnt := range m.Endpoints {
for _, redir := range epnt.HTTPRedirect {
loc := redir.Location.String()
if _, found := dups[loc]; found {
continue
}
dups[loc] = true
m.RedirectURLs = append(m.RedirectURLs, loc)
}
}
}
// Measurement groups all the events that have the same MeasurementID. This
// data format is not compatible with the OONI data format.
type Measurement struct {
// Connect contains all the connect operations.
Connect []*NetworkEvent `json:"connect,omitempty"`
// ReadWrite contains all the read and write operations.
ReadWrite []*NetworkEvent `json:"read_write,omitempty"`
// Close contains all the close operations.
Close []*NetworkEvent `json:"-"`
// TLSHandshake contains all the TLS handshakes.
TLSHandshake []*TLSHandshakeEvent `json:"tls_handshake,omitempty"`
// QUICHandshake contains all the QUIC handshakes.
QUICHandshake []*QUICHandshakeEvent `json:"quic_handshake,omitempty"`
// LookupHost contains all the host lookups.
LookupHost []*DNSLookupEvent `json:"lookup_host,omitempty"`
// LookupHTTPSSvc contains all the HTTPSSvc lookups.
LookupHTTPSSvc []*DNSLookupEvent `json:"lookup_httpssvc,omitempty"`
// DNSRoundTrip contains all the DNS round trips.
DNSRoundTrip []*DNSRoundTripEvent `json:"dns_round_trip,omitempty"`
// HTTPRoundTrip contains all the HTTP round trips.
HTTPRoundTrip []*HTTPRoundTripEvent `json:"http_round_trip,omitempty"`
// HTTPRedirect contains all the redirections.
HTTPRedirect []*HTTPRedirectEvent `json:"-"`
}
// DNSMeasurement is a DNS measurement.
type DNSMeasurement struct {
// Domain is the domain this measurement refers to.
Domain string `json:"domain"`
// A DNSMeasurement is a Measurement.
*Measurement
}
// allEndpointsForDomain returns all the endpoints for
// a specific domain contained in a measurement.
//
// Arguments:
//
// - domain is the domain we want to connect to;
//
// - port is the port for the endpoint.
func (m *DNSMeasurement) allEndpointsForDomain(domain, port string) (out []*Endpoint) {
out = append(out, m.allTCPEndpoints(domain, port)...)
out = append(out, m.allQUICEndpoints(domain, port)...)
return
}
// AllEndpointsForDomain gathers all the endpoints for a given domain from
// a list of DNSMeasurements, removes duplicates and returns the result.
func AllEndpointsForDomain(domain, port string, meas ...*DNSMeasurement) ([]*Endpoint, error) {
var out []*Endpoint
for _, m := range meas {
epnt := m.allEndpointsForDomain(domain, port)
out = append(out, epnt...)
}
return removeDuplicateEndpoints(out...), nil
}
func (m *DNSMeasurement) allTCPEndpoints(domain, port string) (out []*Endpoint) {
for _, entry := range m.LookupHost {
if domain != entry.Domain {
continue
}
for _, addr := range entry.Addrs() {
if net.ParseIP(addr) == nil {
continue // skip CNAME entries courtesy the WCTH
}
out = append(out, m.newEndpoint(addr, port, NetworkTCP))
}
}
return
}
func (m *DNSMeasurement) allQUICEndpoints(domain, port string) (out []*Endpoint) {
for _, entry := range m.LookupHTTPSSvc {
if domain != entry.Domain {
continue
}
if !entry.SupportsHTTP3() {
continue
}
for _, addr := range entry.Addrs() {
out = append(out, m.newEndpoint(addr, port, NetworkQUIC))
}
}
return
}
func (m *DNSMeasurement) newEndpoint(addr, port string, network EndpointNetwork) *Endpoint {
return &Endpoint{Network: network, Address: net.JoinHostPort(addr, port)}
}
// allHTTPEndpointsForURL returns all the HTTPEndpoints matching
// a specific URL's domain inside this measurement.
//
// Arguments:
//
// - URL is the URL for which we want endpoints;
//
// - headers are the headers to use.
//
// Returns a list of endpoints or an error.
func (m *DNSMeasurement) allHTTPEndpointsForURL(
URL *url.URL, headers http.Header) ([]*HTTPEndpoint, error) {
domain := URL.Hostname()
port, err := PortFromURL(URL)
if err != nil {
return nil, err
}
epnts := m.allEndpointsForDomain(domain, port)
var out []*HTTPEndpoint
for _, epnt := range epnts {
if URL.Scheme != "https" && epnt.Network == NetworkQUIC {
continue // we'll only use QUIC with HTTPS
}
out = append(out, &HTTPEndpoint{
Domain: domain,
Network: epnt.Network,
Address: epnt.Address,
SNI: domain,
ALPN: ALPNForHTTPEndpoint(epnt.Network),
URL: URL,
Header: headers,
})
}
return out, nil
}
// AllEndpointsForURL is like AllHTTPEndpointsForURL but return
// simple Endpoints rather than HTTPEndpoints.
func AllEndpointsForURL(URL *url.URL, meas ...*DNSMeasurement) ([]*Endpoint, error) {
all, err := AllHTTPEndpointsForURL(URL, http.Header{}, meas...)
if err != nil {
return nil, err
}
var out []*Endpoint
for _, epnt := range all {
out = append(out, &Endpoint{
Network: epnt.Network,
Address: epnt.Address,
})
}
return out, nil
}
// AllHTTPEndpointsForURL gathers all the HTTP endpoints for a given
// URL from a list of DNSMeasurements, removes duplicates and returns
// the result. This call may fail if we cannot determine the port
// from the URL, in which case we return an error. You MUST supply
// the headers you want to use for measuring.
func AllHTTPEndpointsForURL(URL *url.URL,
headers http.Header, meas ...*DNSMeasurement) ([]*HTTPEndpoint, error) {
var out []*HTTPEndpoint
for _, m := range meas {
epnt, err := m.allHTTPEndpointsForURL(URL, headers)
if err != nil {
return nil, err
}
out = append(out, epnt...)
}
return removeDuplicateHTTPEndpoints(out...), nil
}
// EndpointMeasurement is an endpoint measurement.
type EndpointMeasurement struct {
// Network is the network of this endpoint.
Network EndpointNetwork `json:"network"`
// Address is the address of this endpoint.
Address string `json:"address"`
// An EndpointMeasurement is a Measurement.
*Measurement
}
// HTTPEndpointMeasurement is an HTTP endpoint measurement.
type HTTPEndpointMeasurement struct {
// URL is the URL this measurement refers to.
URL string `json:"url"`
// Network is the network of this endpoint.
Network EndpointNetwork `json:"network"`
// Address is the address of this endpoint.
Address string `json:"address"`
// An HTTPEndpointMeasurement is a Measurement.
*Measurement
}

View File

@ -0,0 +1,931 @@
package measurex
//
// Measurer
//
// High-level API for running measurements. The code in here
// has been designed to easily implement the new websteps
// network experiment, which is quite complex. It should be
// possible to write most other experiments using a Measurer.
//
import (
"context"
"crypto/tls"
"errors"
stdlog "log"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/apex/log"
"github.com/lucas-clemente/quic-go"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)
// Measurer performs measurements. If you don't use a factory
// for creating this type, make sure you set all the MANDATORY fields.
type Measurer struct {
// Begin is when we started measuring (this field is MANDATORY).
Begin time.Time
// HTTPClient is the MANDATORY HTTP client for the WCTH.
HTTPClient HTTPClient
// Logger is the MANDATORY logger to use.
Logger Logger
// MeasureURLHelper is the OPTIONAL test helper to use when
// we're measuring using the MeasureURL function. If this field
// is not set, we'll not be using any helper.
MeasureURLHelper MeasureURLHelper
// Resolvers is the MANDATORY list of resolvers.
Resolvers []*ResolverInfo
// TLSHandshaker is the MANDATORY TLS handshaker.
TLSHandshaker netxlite.TLSHandshaker
}
// NewMeasurerWithDefaultSettings creates a new Measurer
// instance using the most default settings.
func NewMeasurerWithDefaultSettings() *Measurer {
return &Measurer{
Begin: time.Now(),
HTTPClient: &http.Client{},
Logger: log.Log,
Resolvers: []*ResolverInfo{{
Network: "system",
Address: "",
}, {
Network: "udp",
Address: "8.8.4.4:53",
}},
TLSHandshaker: netxlite.NewTLSHandshakerStdlib(log.Log),
}
}
// LookupHostSystem performs a LookupHost using the system resolver.
func (mx *Measurer) LookupHostSystem(ctx context.Context, domain string) *DNSMeasurement {
const timeout = 4 * time.Second
ol := NewOperationLogger(mx.Logger, "LookupHost %s with getaddrinfo", domain)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
db := &MeasurementDB{}
r := mx.NewResolverSystem(db, mx.Logger)
defer r.CloseIdleConnections()
_, err := r.LookupHost(ctx, domain)
ol.Stop(err)
return &DNSMeasurement{
Domain: domain,
Measurement: db.AsMeasurement(),
}
}
// lookupHostForeign performs a LookupHost using a "foreign" resolver.
func (mx *Measurer) lookupHostForeign(
ctx context.Context, domain string, r Resolver) *DNSMeasurement {
const timeout = 4 * time.Second
ol := NewOperationLogger(mx.Logger, "LookupHost %s with %s", domain, r.Network())
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
db := &MeasurementDB{}
_, err := mx.WrapResolver(db, r).LookupHost(ctx, domain)
ol.Stop(err)
return &DNSMeasurement{
Domain: domain,
Measurement: db.AsMeasurement(),
}
}
// LookupHostUDP is like LookupHostSystem but uses an UDP resolver.
//
// Arguments:
//
// - ctx is the context allowing to timeout the operation;
//
// - domain is the domain to resolve (e.g., "x.org");
//
// - address is the UDP resolver address (e.g., "dns.google:53").
//
// Returns a DNSMeasurement.
func (mx *Measurer) LookupHostUDP(
ctx context.Context, domain, address string) *DNSMeasurement {
const timeout = 4 * time.Second
ol := NewOperationLogger(mx.Logger, "LookupHost %s with %s/udp", domain, address)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
db := &MeasurementDB{}
r := mx.NewResolverUDP(db, mx.Logger, address)
defer r.CloseIdleConnections()
_, err := r.LookupHost(ctx, domain)
ol.Stop(err)
return &DNSMeasurement{
Domain: domain,
Measurement: db.AsMeasurement(),
}
}
// LookupHTTPSSvcUDP issues an HTTPSSvc query for the given domain.
//
// Arguments:
//
// - ctx is the context allowing to timeout the operation;
//
// - domain is the domain to resolve (e.g., "x.org");
//
// - address is the UDP resolver address (e.g., "dns.google:53").
//
// Returns a DNSMeasurement.
func (mx *Measurer) LookupHTTPSSvcUDP(
ctx context.Context, domain, address string) *DNSMeasurement {
const timeout = 4 * time.Second
ol := NewOperationLogger(mx.Logger, "LookupHTTPSvc %s with %s/udp", domain, address)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
db := &MeasurementDB{}
r := mx.NewResolverUDP(db, mx.Logger, address)
defer r.CloseIdleConnections()
_, err := r.LookupHTTPS(ctx, domain)
ol.Stop(err)
return &DNSMeasurement{
Domain: domain,
Measurement: db.AsMeasurement(),
}
}
// lookupHTTPSSvcUDPForeign is like LookupHTTPSSvcUDP
// except that it uses a "foreign" resolver.
func (mx *Measurer) lookupHTTPSSvcUDPForeign(
ctx context.Context, domain string, r Resolver) *DNSMeasurement {
const timeout = 4 * time.Second
ol := NewOperationLogger(mx.Logger, "LookupHTTPSvc %s with %s", domain, r.Address())
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
db := &MeasurementDB{}
_, err := mx.WrapResolver(db, r).LookupHTTPS(ctx, domain)
ol.Stop(err)
return &DNSMeasurement{
Domain: domain,
Measurement: db.AsMeasurement(),
}
}
// TCPConnect establishes a connection with a TCP endpoint.
//
// Arguments:
//
// - ctx is the context allowing to timeout the connect;
//
// - address is the TCP endpoint address (e.g., "8.8.4.4:443").
//
// Returns an EndpointMeasurement.
func (mx *Measurer) TCPConnect(ctx context.Context, address string) *EndpointMeasurement {
db := &MeasurementDB{}
conn, _ := mx.TCPConnectWithDB(ctx, db, address)
measurement := db.AsMeasurement()
if conn != nil {
conn.Close()
}
return &EndpointMeasurement{
Network: NetworkTCP,
Address: address,
Measurement: measurement,
}
}
// TCPConnectWithDB is like TCPConnect but does not create a new measurement,
// rather it just stores the events inside of the given DB.
func (mx *Measurer) TCPConnectWithDB(ctx context.Context, db WritableDB, address string) (Conn, error) {
const timeout = 10 * time.Second
ol := NewOperationLogger(mx.Logger, "TCPConnect %s", address)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
d := mx.NewDialerWithoutResolver(db, mx.Logger)
defer d.CloseIdleConnections()
conn, err := d.DialContext(ctx, "tcp", address)
ol.Stop(err)
return conn, err
}
// TLSConnectAndHandshake connects and TLS handshakes with a TCP endpoint.
//
// Arguments:
//
// - ctx is the context allowing to timeout the whole operation;
//
// - address is the endpoint address (e.g., "1.1.1.1:443");
//
// - config contains the TLS config (see below).
//
// You MUST set the following config fields:
//
// - ServerName to the desired SNI or InsecureSkipVerify to
// skip the certificate name verification;
//
// - RootCAs to nextlite.NewDefaultCertPool() output;
//
// - NextProtos to the desired ALPN ([]string{"h2", "http/1.1"} for
// HTTPS and []string{"dot"} for DNS-over-TLS).
//
// Caveats:
//
// The mx.TLSHandshaker field could point to a TLS handshaker using
// the Go stdlib or one using gitlab.com/yawning/utls.git.
//
// In the latter case, the content of the ClientHello message
// will not only depend on the config field but also on the
// utls.ClientHelloID thay you're using.
//
// Returns an EndpointMeasurement.
func (mx *Measurer) TLSConnectAndHandshake(ctx context.Context,
address string, config *tls.Config) *EndpointMeasurement {
db := &MeasurementDB{}
conn, _ := mx.TLSConnectAndHandshakeWithDB(ctx, db, address, config)
measurement := db.AsMeasurement()
if conn != nil {
conn.Close()
}
return &EndpointMeasurement{
Network: NetworkTCP,
Address: address,
Measurement: measurement,
}
}
// TLSConnectAndHandshakeWithDB is like TLSConnectAndHandshake but
// uses the given DB instead of creating a new Measurement.
func (mx *Measurer) TLSConnectAndHandshakeWithDB(ctx context.Context,
db WritableDB, address string, config *tls.Config) (netxlite.TLSConn, error) {
conn, err := mx.TCPConnectWithDB(ctx, db, address)
if err != nil {
return nil, err
}
const timeout = 10 * time.Second
ol := NewOperationLogger(mx.Logger,
"TLSHandshake %s with sni=%s", address, config.ServerName)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
th := mx.WrapTLSHandshaker(db, mx.TLSHandshaker)
tlsConn, _, err := th.Handshake(ctx, conn, config)
ol.Stop(err)
if err != nil {
return nil, err
}
// cast safe according to the docs of netxlite's handshaker
return tlsConn.(netxlite.TLSConn), nil
}
// QUICHandshake connects and TLS handshakes with a QUIC endpoint.
//
// Arguments:
//
// - ctx is the context allowing to timeout the whole operation;
//
// - address is the endpoint address (e.g., "1.1.1.1:443");
//
// - config contains the TLS config (see below).
//
// You MUST set the following config fields:
//
// - ServerName to the desired SNI or InsecureSkipVerify to
// skip the certificate name verification;
//
// - RootCAs to nextlite.NewDefaultCertPool() output;
//
// - NextProtos to the desired ALPN ([]string{"h2", "http/1.1"} for
// HTTPS and []string{"dot"} for DNS-over-TLS).
//
// Returns an EndpointMeasurement.
func (mx *Measurer) QUICHandshake(ctx context.Context, address string,
config *tls.Config) *EndpointMeasurement {
db := &MeasurementDB{}
sess, _ := mx.QUICHandshakeWithDB(ctx, db, address, config)
measurement := db.AsMeasurement()
if sess != nil {
// TODO(bassosimone): close session with correct message
sess.CloseWithError(0, "")
}
return &EndpointMeasurement{
Network: NetworkQUIC,
Address: address,
Measurement: measurement,
}
}
// QUICHandshakeWithDB is like QUICHandshake but uses the given
// db to store events rather than creating a temporary one and
// use it to generate a new Measuremet.
func (mx *Measurer) QUICHandshakeWithDB(ctx context.Context, db WritableDB,
address string, config *tls.Config) (quic.EarlySession, error) {
const timeout = 10 * time.Second
ol := NewOperationLogger(mx.Logger,
"QUICHandshake %s with sni=%s", address, config.ServerName)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
qd := mx.NewQUICDialerWithoutResolver(db, mx.Logger)
defer qd.CloseIdleConnections()
sess, err := qd.DialContext(ctx, "udp", address, config, &quic.Config{})
ol.Stop(err)
return sess, err
}
// HTTPEndpointGet performs a GET request for an HTTP endpoint.
//
// This function WILL NOT follow redirects. If there is a redirect
// you will see it inside the specific database table.
//
// Arguments:
//
// - ctx is the context allowing to timeout the operation;
//
// - epnt is the HTTP endpoint;
//
// - jar is the cookie jar to use.
//
// Returns a measurement. The returned measurement is empty if
// the endpoint is misconfigured or the URL has an unknown scheme.
func (mx *Measurer) HTTPEndpointGet(
ctx context.Context, epnt *HTTPEndpoint, jar http.CookieJar) *HTTPEndpointMeasurement {
resp, m, _ := mx.httpEndpointGet(ctx, epnt, jar)
if resp != nil {
resp.Body.Close()
}
return m
}
// HTTPEndpointGetWithoutCookies is like HTTPEndpointGet
// but does not require you to provide a CookieJar.
func (mx *Measurer) HTTPEndpointGetWithoutCookies(
ctx context.Context, epnt *HTTPEndpoint) *HTTPEndpointMeasurement {
return mx.HTTPEndpointGet(ctx, epnt, NewCookieJar())
}
var (
errUnknownHTTPEndpointURLScheme = errors.New("unknown HTTPEndpoint.URL.Scheme")
// ErrUnknownHTTPEndpointNetwork means that the given endpoint's
// network is of a type that we don't know how to handle.
ErrUnknownHTTPEndpointNetwork = errors.New("unknown HTTPEndpoint.Network")
)
// httpEndpointGet implements HTTPEndpointGet.
func (mx *Measurer) httpEndpointGet(ctx context.Context, epnt *HTTPEndpoint,
jar http.CookieJar) (*http.Response, *HTTPEndpointMeasurement, error) {
resp, m, err := mx.httpEndpointGetMeasurement(ctx, epnt, jar)
out := &HTTPEndpointMeasurement{
URL: epnt.URL.String(),
Network: epnt.Network,
Address: epnt.Address,
Measurement: m,
}
return resp, out, err
}
// httpEndpointGetMeasurement implements httpEndpointGet.
//
// This function returns a triple where:
//
// - the first element is a valid response on success a nil response on failure
//
// - the second element is always a valid Measurement
//
// - the third element is a nil error on success and an error on failure
func (mx *Measurer) httpEndpointGetMeasurement(ctx context.Context, epnt *HTTPEndpoint,
jar http.CookieJar) (resp *http.Response, m *Measurement, err error) {
db := &MeasurementDB{}
resp, err = mx.httpEndpointGetWithDB(ctx, epnt, db, jar)
m = db.AsMeasurement()
return
}
// HTTPEndpointGetWithDB is an HTTPEndpointGet that stores the
// events into the given WritableDB.
func (mx *Measurer) HTTPEndpointGetWithDB(ctx context.Context, epnt *HTTPEndpoint,
db WritableDB, jar http.CookieJar) (err error) {
switch epnt.Network {
case NetworkQUIC:
_, err = mx.httpEndpointGetQUIC(ctx, db, epnt, jar)
case NetworkTCP:
_, err = mx.httpEndpointGetTCP(ctx, db, epnt, jar)
default:
err = ErrUnknownHTTPEndpointNetwork
}
return
}
// httpEndpointGetWithDB is an HTTPEndpointGet that stores the
// events into the given WritableDB.
func (mx *Measurer) httpEndpointGetWithDB(ctx context.Context, epnt *HTTPEndpoint,
db WritableDB, jar http.CookieJar) (resp *http.Response, err error) {
switch epnt.Network {
case NetworkQUIC:
resp, err = mx.httpEndpointGetQUIC(ctx, db, epnt, jar)
case NetworkTCP:
resp, err = mx.httpEndpointGetTCP(ctx, db, epnt, jar)
default:
err = ErrUnknownHTTPEndpointNetwork
}
return
}
// httpEndpointGetTCP specializes HTTPSEndpointGet for HTTP and HTTPS.
func (mx *Measurer) httpEndpointGetTCP(ctx context.Context,
db WritableDB, epnt *HTTPEndpoint, jar http.CookieJar) (*http.Response, error) {
switch epnt.URL.Scheme {
case "http":
return mx.httpEndpointGetHTTP(ctx, db, epnt, jar)
case "https":
return mx.httpEndpointGetHTTPS(ctx, db, epnt, jar)
default:
return nil, errUnknownHTTPEndpointURLScheme
}
}
// httpEndpointGetHTTP specializes httpEndpointGetTCP for HTTP.
func (mx *Measurer) httpEndpointGetHTTP(ctx context.Context,
db WritableDB, epnt *HTTPEndpoint, jar http.CookieJar) (*http.Response, error) {
conn, err := mx.TCPConnectWithDB(ctx, db, epnt.Address)
if err != nil {
return nil, err
}
defer conn.Close() // we own it
clnt := NewHTTPClientWithoutRedirects(db, jar,
mx.NewHTTPTransportWithConn(mx.Logger, db, conn))
defer clnt.CloseIdleConnections()
return mx.httpClientDo(ctx, clnt, epnt)
}
// httpEndpointGetHTTPS specializes httpEndpointGetTCP for HTTPS.
func (mx *Measurer) httpEndpointGetHTTPS(ctx context.Context,
db WritableDB, epnt *HTTPEndpoint, jar http.CookieJar) (*http.Response, error) {
conn, err := mx.TLSConnectAndHandshakeWithDB(ctx, db, epnt.Address, &tls.Config{
ServerName: epnt.SNI,
NextProtos: epnt.ALPN,
RootCAs: netxlite.NewDefaultCertPool(),
})
if err != nil {
return nil, err
}
defer conn.Close() // we own it
clnt := NewHTTPClientWithoutRedirects(db, jar,
mx.NewHTTPTransportWithTLSConn(mx.Logger, db, conn))
defer clnt.CloseIdleConnections()
return mx.httpClientDo(ctx, clnt, epnt)
}
// httpEndpointGetQUIC specializes httpEndpointGetTCP for QUIC.
func (mx *Measurer) httpEndpointGetQUIC(ctx context.Context,
db WritableDB, epnt *HTTPEndpoint, jar http.CookieJar) (*http.Response, error) {
sess, err := mx.QUICHandshakeWithDB(ctx, db, epnt.Address, &tls.Config{
ServerName: epnt.SNI,
NextProtos: epnt.ALPN,
RootCAs: netxlite.NewDefaultCertPool(),
})
if err != nil {
return nil, err
}
// TODO(bassosimone): close session with correct message
defer sess.CloseWithError(0, "") // we own it
clnt := NewHTTPClientWithoutRedirects(db, jar,
mx.NewHTTPTransportWithQUICSess(mx.Logger, db, sess))
defer clnt.CloseIdleConnections()
return mx.httpClientDo(ctx, clnt, epnt)
}
func (mx *Measurer) HTTPClientGET(
ctx context.Context, clnt HTTPClient, URL *url.URL) (*http.Response, error) {
return mx.httpClientDo(ctx, clnt, &HTTPEndpoint{
Domain: URL.Hostname(),
Network: "tcp",
Address: URL.Hostname(),
SNI: "", // not needed
ALPN: []string{}, // not needed
URL: URL,
Header: NewHTTPRequestHeaderForMeasuring(),
})
}
func (mx *Measurer) httpClientDo(ctx context.Context,
clnt HTTPClient, epnt *HTTPEndpoint) (*http.Response, error) {
req, err := NewHTTPGetRequest(ctx, epnt.URL.String())
if err != nil {
return nil, err
}
req.Header = epnt.Header
const timeout = 15 * time.Second
ol := NewOperationLogger(mx.Logger,
"%s %s with %s/%s", req.Method, req.URL.String(), epnt.Address, epnt.Network)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
resp, err := clnt.Do(req.WithContext(ctx))
ol.Stop(err)
return resp, err
}
// HTTPEndpointGetParallel performs an HTTPEndpointGet for each
// input endpoint using a pool of background goroutines.
//
// This function returns to the caller a channel where to read
// measurements from. The channel is closed when done.
func (mx *Measurer) HTTPEndpointGetParallel(ctx context.Context,
jar http.CookieJar, epnts ...*HTTPEndpoint) <-chan *HTTPEndpointMeasurement {
var (
done = make(chan interface{})
input = make(chan *HTTPEndpoint)
output = make(chan *HTTPEndpointMeasurement)
)
go func() {
defer close(input)
for _, epnt := range epnts {
input <- epnt
}
}()
const parallelism = 3
for i := 0; i < parallelism; i++ {
go func() {
for epnt := range input {
output <- mx.HTTPEndpointGet(ctx, epnt, jar)
}
done <- true
}()
}
go func() {
for i := 0; i < parallelism; i++ {
<-done
}
close(output)
}()
return output
}
// ResolverNetwork identifies the network of a resolver.
type ResolverNetwork string
var (
// ResolverSystem is the system resolver (i.e., getaddrinfo)
ResolverSystem = ResolverNetwork("system")
// ResolverUDP is a resolver using DNS-over-UDP
ResolverUDP = ResolverNetwork("udp")
// ResolverForeign is a resolver that is not managed by
// this package. We can wrap it, but we don't be able to
// observe any event but Lookup{Host,HTTPSvc}
ResolverForeign = ResolverNetwork("foreign")
)
// ResolverInfo contains info about a DNS resolver.
type ResolverInfo struct {
// Network is the resolver's network (e.g., "doh", "udp")
Network ResolverNetwork
// Address is the address (e.g., "1.1.1.1:53", "https://1.1.1.1/dns-query")
Address string
// ForeignResolver is only used when Network's
// value equals the ResolverForeign constant.
ForeignResolver Resolver
}
// LookupURLHostParallel performs an LookupHost-like operation for each
// resolver that you provide as argument using a pool of goroutines.
func (mx *Measurer) LookupURLHostParallel(ctx context.Context,
URL *url.URL, resos ...*ResolverInfo) <-chan *DNSMeasurement {
var (
done = make(chan interface{})
resolvers = make(chan *ResolverInfo)
output = make(chan *DNSMeasurement)
)
go func() {
defer close(resolvers)
for _, reso := range resos {
resolvers <- reso
}
}()
const parallelism = 3
for i := 0; i < parallelism; i++ {
go func() {
for reso := range resolvers {
mx.lookupHostWithResolverInfo(ctx, reso, URL, output)
}
done <- true
}()
}
go func() {
for i := 0; i < parallelism; i++ {
<-done
}
close(output)
}()
return output
}
// lookupHostWithResolverInfo performs a LookupHost-like
// operation using the given ResolverInfo.
func (mx *Measurer) lookupHostWithResolverInfo(
ctx context.Context, reso *ResolverInfo, URL *url.URL,
output chan<- *DNSMeasurement) {
switch reso.Network {
case ResolverSystem:
output <- mx.LookupHostSystem(ctx, URL.Hostname())
case ResolverUDP:
output <- mx.LookupHostUDP(ctx, URL.Hostname(), reso.Address)
case ResolverForeign:
output <- mx.lookupHostForeign(ctx, URL.Hostname(), reso.ForeignResolver)
default:
return
}
switch URL.Scheme {
case "https":
default:
return
}
switch reso.Network {
case ResolverUDP:
output <- mx.LookupHTTPSSvcUDP(ctx, URL.Hostname(), reso.Address)
case ResolverForeign:
output <- mx.lookupHTTPSSvcUDPForeign(ctx, URL.Hostname(), reso.ForeignResolver)
}
}
// LookupHostParallel is like LookupURLHostParallel but we only
// have in input an hostname rather than a URL. As such, we cannot
// determine whether to perform HTTPSSvc lookups and so we aren't
// going to perform this kind of lookups in this case.
func (mx *Measurer) LookupHostParallel(
ctx context.Context, hostname, port string) <-chan *DNSMeasurement {
out := make(chan *DNSMeasurement)
go func() {
defer close(out)
URL := &url.URL{
Scheme: "", // so we don't see https and we don't try HTTPSSvc
Host: net.JoinHostPort(hostname, port),
}
for m := range mx.LookupURLHostParallel(ctx, URL) {
out <- &DNSMeasurement{Domain: hostname, Measurement: m.Measurement}
}
}()
return out
}
// MeasureURLHelper is a Test Helper that discovers additional
// endpoints after MeasureURL has finished discovering endpoints
// via the usual DNS mechanism. The MeasureURLHelper:
//
// - is used by experiments to call a real test helper, i.e.,
// a remote service providing extra endpoints
//
// - is used by test helpers to augment the set of endpoints
// discovered so far with the ones provided by a client.
type MeasureURLHelper interface {
// LookupExtraHTTPEndpoints searches for extra HTTP endpoints
// suitable for the given URL we're measuring.
//
// Arguments:
//
// - ctx is the context for timeout/cancellation/deadline
//
// - URL is the URL we're currently measuring
//
// - headers contains the HTTP headers we wish to use
//
// - epnts is the current list of endpoints
//
// This function SHOULD return a NEW list of extra endpoints
// it discovered and SHOULD NOT merge the epnts endpoints with
// extra endpoints it discovered. Therefore:
//
// - on any kind of error it MUST return nil, err
//
// - on success it MUST return the NEW endpoints it discovered
// as well as the TH measurement to be added to the measurement
// that the URL measurer is constructing.
//
// It is the caller's responsibility to merge the NEW list of
// endpoints with the ones it passed as argument.
//
// It is also the caller's responsibility to ENSURE that the
// newly returned endpoints only use the few headers that our
// test helper protocol allows one to set.
LookupExtraHTTPEndpoints(ctx context.Context, URL *url.URL,
headers http.Header, epnts ...*HTTPEndpoint) (
newEpnts []*HTTPEndpoint, thMeasurement interface{}, err error)
}
// MeasureURL measures an HTTP or HTTPS URL. The DNS resolvers
// and the Test Helpers we use in this measurement are the ones
// configured into the database. The default is to use the system
// resolver and to use not Test Helper. Use RegisterWCTH and
// RegisterUDPResolvers (and other similar functions that have
// not been written at the moment of writing this note) to
// augment the set of resolvers and Test Helpers we use here.
//
// Arguments:
//
// - ctx is the context for timeout/cancellation
//
// - URL is the URL to measure
//
// - header contains the HTTP headers for the request
//
// - cookies contains the cookies we should use for measuring
// this URL and possibly future redirections.
//
// To create an empty set of cookies, use NewCookieJar. It's
// normal to have empty cookies at the beginning. If we follow
// extra redirections after this run then the cookie jar will
// contain the cookies for following the next redirection.
//
// We need cookies because a small amount of URLs does not
// redirect properly without cookies. This has been
// documented at https://github.com/ooni/probe/issues/1727.
func (mx *Measurer) MeasureURL(
ctx context.Context, URL string, headers http.Header,
cookies http.CookieJar) (*URLMeasurement, error) {
mx.Logger.Infof("MeasureURL url=%s", URL)
m := &URLMeasurement{URL: URL}
begin := time.Now()
defer func() { m.TotalRuntime = time.Since(begin) }()
parsed, err := url.Parse(URL)
if err != nil {
return nil, err
}
if len(mx.Resolvers) < 1 {
return nil, errors.New("measurer: no configured resolver")
}
dnsBegin := time.Now()
for dns := range mx.LookupURLHostParallel(ctx, parsed, mx.Resolvers...) {
m.DNS = append(m.DNS, dns)
}
m.DNSRuntime = time.Since(dnsBegin)
epnts, err := AllHTTPEndpointsForURL(parsed, headers, m.DNS...)
if err != nil {
return nil, err
}
if mx.MeasureURLHelper != nil {
thBegin := time.Now()
extraEpnts, thMeasurement, _ := mx.MeasureURLHelper.LookupExtraHTTPEndpoints(
ctx, parsed, headers, epnts...)
m.THRuntime = time.Since(thBegin)
epnts = removeDuplicateHTTPEndpoints(append(epnts, extraEpnts...)...)
m.TH = thMeasurement
mx.enforceAllowedHeadersOnly(epnts)
}
epntRuntime := time.Now()
for epnt := range mx.HTTPEndpointGetParallel(ctx, cookies, epnts...) {
m.Endpoints = append(m.Endpoints, epnt)
}
mx.maybeQUICFollowUp(ctx, m, cookies, epnts...)
m.EpntsRuntime = time.Since(epntRuntime)
m.fillRedirects()
return m, nil
}
// maybeQUICFollowUp checks whether we need to use Alt-Svc to check
// for QUIC. We query for HTTPSSvc but currently only Cloudflare
// implements this proposed standard. So, this function is
// where we take care of all the other servers implementing QUIC.
func (mx *Measurer) maybeQUICFollowUp(ctx context.Context,
m *URLMeasurement, cookies http.CookieJar, epnts ...*HTTPEndpoint) {
altsvc := []string{}
for _, epnt := range m.Endpoints {
// Check whether we have a QUIC handshake. If so, then
// HTTPSSvc worked and we can stop here.
if epnt.QUICHandshake != nil {
return
}
for idx, rtrip := range epnt.HTTPRoundTrip {
if rtrip.Response == nil {
stdlog.Printf("malformed HTTPRoundTrip@%d: %+v", idx, rtrip)
continue
}
if v := rtrip.Response.Headers.Get("alt-svc"); v != "" {
altsvc = append(altsvc, v)
}
}
}
// syntax:
//
// Alt-Svc: clear
// Alt-Svc: <protocol-id>=<alt-authority>; ma=<max-age>
// Alt-Svc: <protocol-id>=<alt-authority>; ma=<max-age>; persist=1
//
// multiple entries may be separated by comma.
//
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Alt-Svc
for _, header := range altsvc {
entries := strings.Split(header, ",")
if len(entries) < 1 {
continue
}
for _, entry := range entries {
parts := strings.Split(entry, ";")
if len(parts) < 1 {
continue
}
if parts[0] == "h3=\":443\"" {
mx.doQUICFollowUp(ctx, m, cookies, epnts...)
return
}
}
}
}
// doQUICFollowUp runs when we know there's QUIC support via Alt-Svc.
func (mx *Measurer) doQUICFollowUp(ctx context.Context,
m *URLMeasurement, cookies http.CookieJar, epnts ...*HTTPEndpoint) {
quicEpnts := []*HTTPEndpoint{}
// do not mutate the existing list rather create a new one
for _, epnt := range epnts {
quicEpnts = append(quicEpnts, &HTTPEndpoint{
Domain: epnt.Domain,
Network: NetworkQUIC,
Address: epnt.Address,
SNI: epnt.SNI,
ALPN: []string{"h3"},
URL: epnt.URL,
Header: epnt.Header,
})
}
for mquic := range mx.HTTPEndpointGetParallel(ctx, cookies, quicEpnts...) {
m.Endpoints = append(m.Endpoints, mquic)
}
}
func (mx *Measurer) enforceAllowedHeadersOnly(epnts []*HTTPEndpoint) {
for _, epnt := range epnts {
epnt.Header = mx.keepOnlyAllowedHeaders(epnt.Header)
}
}
func (mx *Measurer) keepOnlyAllowedHeaders(header http.Header) (out http.Header) {
out = http.Header{}
for k, vv := range header {
switch strings.ToLower(k) {
case "accept", "accept-language", "cookie", "user-agent":
for _, v := range vv {
out.Add(k, v)
}
default:
// ignore all the other headers
}
}
return
}
// redirectionQueue is the type we use to manage the redirection
// queue and to follow a reasonable number of redirects.
type redirectionQueue struct {
q []string
cnt int
}
func (r *redirectionQueue) append(URL ...string) {
r.q = append(r.q, URL...)
}
func (r *redirectionQueue) popleft() (URL string) {
URL = r.q[0]
r.q = r.q[1:]
return
}
func (r *redirectionQueue) empty() bool {
return len(r.q) <= 0
}
func (r *redirectionQueue) redirectionsCount() int {
return r.cnt
}
// MeasureURLAndFollowRedirections is like MeasureURL except
// that it _also_ follows all the HTTP redirections.
func (mx *Measurer) MeasureURLAndFollowRedirections(ctx context.Context,
URL string, headers http.Header, cookies http.CookieJar) <-chan *URLMeasurement {
out := make(chan *URLMeasurement)
go func() {
defer close(out)
meas, err := mx.MeasureURL(ctx, URL, headers, cookies)
if err != nil {
mx.Logger.Warnf("mx.MeasureURL failed: %s", err.Error())
return
}
out <- meas
rq := &redirectionQueue{q: meas.RedirectURLs}
const maxRedirects = 7
for !rq.empty() && rq.redirectionsCount() < maxRedirects {
URL = rq.popleft()
meas, err = mx.MeasureURL(ctx, URL, headers, cookies)
if err != nil {
mx.Logger.Warnf("mx.MeasureURL failed: %s", err.Error())
return
}
out <- meas
rq.append(meas.RedirectURLs...)
}
}()
return out
}

View File

@ -0,0 +1,48 @@
package measurex
//
// Oddity
//
// Here we define the oddity type.
//
// Oddity is an unexpected result on the probe or
// or test helper side during a measurement. We will
// promote the oddity to anomaly if the probe and
// the test helper see different results.
type Oddity string
// This enumeration lists all known oddities.
var (
// tcp.connect
OddityTCPConnectTimeout = Oddity("tcp.connect.timeout")
OddityTCPConnectRefused = Oddity("tcp.connect.refused")
OddityTCPConnectHostUnreachable = Oddity("tcp.connect.host_unreachable")
OddityTCPConnectOher = Oddity("tcp.connect.other")
// tls.handshake
OddityTLSHandshakeTimeout = Oddity("tls.handshake.timeout")
OddityTLSHandshakeReset = Oddity("tls.handshake.reset")
OddityTLSHandshakeOther = Oddity("tls.handshake.other")
OddityTLSHandshakeUnexpectedEOF = Oddity("tls.handshake.unexpected_eof")
OddityTLSHandshakeInvalidHostname = Oddity("tls.handshake.invalid_hostname")
OddityTLSHandshakeUnknownAuthority = Oddity("tls.handshake.unknown_authority")
// quic.handshake
OddityQUICHandshakeTimeout = Oddity("quic.handshake.timeout")
OddityQUICHandshakeHostUnreachable = Oddity("quic.handshake.host_unreachable")
OddityQUICHandshakeOther = Oddity("quic.handshake.other")
// dns.lookup
OddityDNSLookupNXDOMAIN = Oddity("dns.lookup.nxdomain")
OddityDNSLookupTimeout = Oddity("dns.lookup.timeout")
OddityDNSLookupRefused = Oddity("dns.lookup.refused")
OddityDNSLookupBogon = Oddity("dns.lookup.bogon")
OddityDNSLookupOther = Oddity("dns.lookup.other")
// http.status
OddityStatus403 = Oddity("http.status.403")
OddityStatus404 = Oddity("http.status.404")
OddityStatus503 = Oddity("http.status.503")
OddityStatusOther = Oddity("http.status.other")
)

175
internal/measurex/quic.go Normal file
View File

@ -0,0 +1,175 @@
package measurex
//
// QUIC
//
// Wrappers for QUIC to store events into a WritableDB.
//
import (
"context"
"crypto/tls"
"net"
"time"
"github.com/lucas-clemente/quic-go"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/netxlite/quicx"
)
// QUICConn is the kind of conn used by QUIC.
type QUICConn = quicx.UDPLikeConn
// QUICDialer creates QUICSesssions.
type QUICDialer = netxlite.QUICDialer
// QUICListener creates listening connections for QUIC.
type QUICListener = netxlite.QUICListener
type quicListenerDB struct {
netxlite.QUICListener
begin time.Time
db WritableDB
}
func (ql *quicListenerDB) Listen(addr *net.UDPAddr) (QUICConn, error) {
pconn, err := ql.QUICListener.Listen(addr)
if err != nil {
return nil, err
}
return &udpLikeConnDB{
UDPLikeConn: pconn,
begin: ql.begin,
db: ql.db,
}, nil
}
type udpLikeConnDB struct {
quicx.UDPLikeConn
begin time.Time
db WritableDB
}
func (c *udpLikeConnDB) WriteTo(p []byte, addr net.Addr) (int, error) {
started := time.Since(c.begin).Seconds()
count, err := c.UDPLikeConn.WriteTo(p, addr)
finished := time.Since(c.begin).Seconds()
c.db.InsertIntoReadWrite(&NetworkEvent{
Operation: "write_to",
Network: "quic",
RemoteAddr: addr.String(),
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Count: count,
})
return count, err
}
func (c *udpLikeConnDB) ReadFrom(b []byte) (int, net.Addr, error) {
started := time.Since(c.begin).Seconds()
count, addr, err := c.UDPLikeConn.ReadFrom(b)
finished := time.Since(c.begin).Seconds()
c.db.InsertIntoReadWrite(&NetworkEvent{
Operation: "read_from",
Network: "quic",
RemoteAddr: addrStringIfNotNil(addr),
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Count: count,
})
return count, addr, err
}
func (c *udpLikeConnDB) Close() error {
started := time.Since(c.begin).Seconds()
err := c.UDPLikeConn.Close()
finished := time.Since(c.begin).Seconds()
c.db.InsertIntoClose(&NetworkEvent{
Operation: "close",
Network: "quic",
RemoteAddr: "",
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Count: 0,
})
return err
}
// QUICHandshakeEvent is the result of QUICHandshake.
type QUICHandshakeEvent = TLSHandshakeEvent
// NewQUICDialerWithoutResolver creates a new QUICDialer that is not
// attached to any resolver. This means that every attempt to dial any
// address containing a domain name will fail. This QUICDialer will
// save any event into the WritableDB. Any QUICConn created by it will
// likewise save any event into the WritableDB.
func (mx *Measurer) NewQUICDialerWithoutResolver(db WritableDB, logger Logger) QUICDialer {
return &quicDialerDB{db: db, logger: logger, begin: mx.Begin}
}
type quicDialerDB struct {
netxlite.QUICDialer
begin time.Time
db WritableDB
logger Logger
}
func (qh *quicDialerDB) DialContext(ctx context.Context, network, address string,
tlsConfig *tls.Config, quicConfig *quic.Config) (quic.EarlySession, error) {
started := time.Since(qh.begin).Seconds()
var state tls.ConnectionState
listener := &quicListenerDB{
QUICListener: netxlite.NewQUICListener(),
begin: qh.begin,
db: qh.db,
}
dialer := netxlite.NewQUICDialerWithoutResolver(listener, qh.logger)
defer dialer.CloseIdleConnections()
sess, err := dialer.DialContext(ctx, network, address, tlsConfig, quicConfig)
if err == nil {
select {
case <-sess.HandshakeComplete().Done():
state = sess.ConnectionState().TLS.ConnectionState
case <-ctx.Done():
sess, err = nil, ctx.Err()
}
}
finished := time.Since(qh.begin).Seconds()
qh.db.InsertIntoQUICHandshake(&QUICHandshakeEvent{
Network: "quic",
RemoteAddr: address,
SNI: tlsConfig.ServerName,
ALPN: tlsConfig.NextProtos,
SkipVerify: tlsConfig.InsecureSkipVerify,
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Oddity: qh.computeOddity(err),
TLSVersion: netxlite.TLSVersionString(state.Version),
CipherSuite: netxlite.TLSCipherSuiteString(state.CipherSuite),
NegotiatedProto: state.NegotiatedProtocol,
PeerCerts: NewArchivalTLSCerts(peerCerts(nil, &state)),
})
return sess, err
}
func (qh *quicDialerDB) computeOddity(err error) Oddity {
if err == nil {
return ""
}
switch err.Error() {
case netxlite.FailureGenericTimeoutError:
return OddityQUICHandshakeTimeout
case netxlite.FailureHostUnreachable:
return OddityQUICHandshakeHostUnreachable
default:
return OddityQUICHandshakeOther
}
}
func (qh *quicDialerDB) CloseIdleConnections() {
// nothing to do
}

View File

@ -0,0 +1,231 @@
package measurex
//
// Resolver
//
// Wrappers for Resolver to store events into a WritableDB.
//
import (
"context"
"net"
"strings"
"time"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/netxlite/dnsx"
)
// HTTPSSvc is the result returned by HTTPSSvc queries.
type HTTPSSvc = dnsx.HTTPSSvc
// Resolver is the resolver type we use. This resolver will
// store resolve events into the DB.
type Resolver = netxlite.Resolver
// WrapResolver creates a new Resolver that saves events into the WritableDB.
func (mx *Measurer) WrapResolver(db WritableDB, r netxlite.Resolver) Resolver {
return WrapResolver(mx.Begin, db, r)
}
// WrapResolver wraps a resolver.
func WrapResolver(begin time.Time, db WritableDB, r netxlite.Resolver) Resolver {
return &resolverDB{Resolver: r, db: db, begin: begin}
}
// NewResolverSystem creates a system resolver and then wraps
// it using the WrapResolver function/
func (mx *Measurer) NewResolverSystem(db WritableDB, logger Logger) Resolver {
return mx.WrapResolver(db, netxlite.NewResolverStdlib(logger))
}
// NewResolverUDP is a convenience factory for creating a Resolver
// using UDP that saves measurements into the DB.
//
// Arguments:
//
// - db is where to save events;
//
// - logger is the logger;
//
// - address is the resolver address (e.g., "1.1.1.1:53").
func (mx *Measurer) NewResolverUDP(db WritableDB, logger Logger, address string) Resolver {
return mx.WrapResolver(db, netxlite.WrapResolver(
logger, netxlite.NewSerialResolver(
mx.WrapDNSXRoundTripper(db, netxlite.NewDNSOverUDP(
mx.NewDialerWithSystemResolver(db, logger),
address,
)))),
)
}
type resolverDB struct {
netxlite.Resolver
begin time.Time
db WritableDB
}
// DNSLookupAnswer is a DNS lookup answer.
type DNSLookupAnswer struct {
// JSON names compatible with df-002-dnst's spec
Type string `json:"answer_type"`
IPv4 string `json:"ipv4,omitempty"`
IPv6 string `json:"ivp6,omitempty"`
// Names not part of the spec.
ALPN string `json:"alpn,omitempty"`
}
// DNSLookupEvent contains the results of a DNS lookup.
type DNSLookupEvent struct {
// fields inside df-002-dnst
Answers []DNSLookupAnswer `json:"answers"`
Network string `json:"engine"`
Failure *string `json:"failure"`
Domain string `json:"hostname"`
QueryType string `json:"query_type"`
Address string `json:"resolver_address"`
Finished float64 `json:"t"`
// Names not part of the spec.
Started float64 `json:"started"`
Oddity Oddity `json:"oddity"`
}
// SupportsHTTP3 returns true if this query is for HTTPS and
// the answer contains an ALPN for "h3"
func (ev *DNSLookupEvent) SupportsHTTP3() bool {
if ev.QueryType != "HTTPS" {
return false
}
for _, ans := range ev.Answers {
switch ans.Type {
case "ALPN":
if ans.ALPN == "h3" {
return true
}
}
}
return false
}
// Addrs returns all the IPv4/IPv6 addresses
func (ev *DNSLookupEvent) Addrs() (out []string) {
for _, ans := range ev.Answers {
switch ans.Type {
case "A":
if net.ParseIP(ans.IPv4) != nil {
out = append(out, ans.IPv4)
}
case "AAAA":
if net.ParseIP(ans.IPv6) != nil {
out = append(out, ans.IPv6)
}
}
}
return
}
func (r *resolverDB) LookupHost(ctx context.Context, domain string) ([]string, error) {
started := time.Since(r.begin).Seconds()
addrs, err := r.Resolver.LookupHost(ctx, domain)
finished := time.Since(r.begin).Seconds()
for _, qtype := range []string{"A", "AAAA"} {
ev := &DNSLookupEvent{
Answers: r.computeAnswers(addrs, qtype),
Network: r.Resolver.Network(),
Address: r.Resolver.Address(),
Failure: NewArchivalFailure(err),
Domain: domain,
QueryType: qtype,
Finished: finished,
Started: started,
Oddity: r.computeOddityLookupHost(addrs, err),
}
r.db.InsertIntoLookupHost(ev)
}
return addrs, err
}
func (r *resolverDB) computeAnswers(addrs []string, qtype string) (out []DNSLookupAnswer) {
for _, addr := range addrs {
if qtype == "A" && !strings.Contains(addr, ":") {
out = append(out, DNSLookupAnswer{Type: qtype, IPv4: addr})
continue
}
if qtype == "AAAA" && strings.Contains(addr, ":") {
out = append(out, DNSLookupAnswer{Type: qtype, IPv6: addr})
continue
}
}
return
}
func (r *resolverDB) computeOddityLookupHost(addrs []string, err error) Oddity {
if err != nil {
switch err.Error() {
case netxlite.FailureGenericTimeoutError:
return OddityDNSLookupTimeout
case netxlite.FailureDNSNXDOMAINError:
return OddityDNSLookupNXDOMAIN
case netxlite.FailureDNSRefusedError:
return OddityDNSLookupRefused
default:
return OddityDNSLookupOther
}
}
for _, addr := range addrs {
if isBogon(addr) {
return OddityDNSLookupBogon
}
}
return ""
}
func (r *resolverDB) LookupHTTPS(ctx context.Context, domain string) (*HTTPSSvc, error) {
started := time.Since(r.begin).Seconds()
https, err := r.Resolver.LookupHTTPS(ctx, domain)
finished := time.Since(r.begin).Seconds()
ev := &DNSLookupEvent{
Network: r.Resolver.Network(),
Address: r.Resolver.Address(),
Domain: domain,
QueryType: "HTTPS",
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Oddity: Oddity(r.computeOddityHTTPSSvc(https, err)),
}
if err == nil {
for _, addr := range https.IPv4 {
ev.Answers = append(ev.Answers, DNSLookupAnswer{
Type: "A",
IPv4: addr,
})
}
for _, addr := range https.IPv6 {
ev.Answers = append(ev.Answers, DNSLookupAnswer{
Type: "AAAA",
IPv6: addr,
})
}
for _, alpn := range https.ALPN {
ev.Answers = append(ev.Answers, DNSLookupAnswer{
Type: "ALPN",
ALPN: alpn,
})
}
}
r.db.InsertIntoLookupHTTPSSvc(ev)
return https, err
}
func (r *resolverDB) computeOddityHTTPSSvc(https *HTTPSSvc, err error) Oddity {
if err != nil {
return r.computeOddityLookupHost(nil, err)
}
var addrs []string
addrs = append(addrs, https.IPv4...)
addrs = append(addrs, https.IPv6...)
return r.computeOddityLookupHost(addrs, nil)
}

128
internal/measurex/tls.go Normal file
View File

@ -0,0 +1,128 @@
package measurex
//
// TLS
//
// Wraps TLS code to write events into a WritableDB.
//
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"net"
"time"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)
// TLSHandshaker performs TLS handshakes.
type TLSHandshaker = netxlite.TLSHandshaker
// WrapTLSHandshaker wraps a netxlite.TLSHandshaker to return a new
// instance of TLSHandshaker that saves events into the DB.
func (mx *Measurer) WrapTLSHandshaker(db WritableDB, thx netxlite.TLSHandshaker) TLSHandshaker {
return &tlsHandshakerDB{TLSHandshaker: thx, db: db, begin: mx.Begin}
}
// NewTLSHandshakerStdlib creates a new TLS handshaker that
// saves results into the DB and uses the stdlib for TLS.
func (mx *Measurer) NewTLSHandshakerStdlib(db WritableDB, logger Logger) TLSHandshaker {
return mx.WrapTLSHandshaker(db, netxlite.NewTLSHandshakerStdlib(logger))
}
type tlsHandshakerDB struct {
netxlite.TLSHandshaker
begin time.Time
db WritableDB
}
// TLSHandshakeEvent contains a TLS handshake event.
type TLSHandshakeEvent struct {
// JSON names compatible with df-006-tlshandshake
CipherSuite string `json:"cipher_suite"`
Failure *string `json:"failure"`
NegotiatedProto string `json:"negotiated_proto"`
TLSVersion string `json:"tls_version"`
PeerCerts []*ArchivalBinaryData `json:"peer_certificates"`
Finished float64 `json:"t"`
// JSON names that are consistent with the
// spirit of the spec but are not in it
RemoteAddr string `json:"address"`
SNI string `json:"server_name"` // used in prod
ALPN []string `json:"alpn"`
SkipVerify bool `json:"no_tls_verify"` // used in prod
Oddity Oddity `json:"oddity"`
Network string `json:"proto"`
Started float64 `json:"started"`
}
func (thx *tlsHandshakerDB) Handshake(ctx context.Context,
conn Conn, config *tls.Config) (net.Conn, tls.ConnectionState, error) {
network := conn.RemoteAddr().Network()
remoteAddr := conn.RemoteAddr().String()
started := time.Since(thx.begin).Seconds()
tconn, state, err := thx.TLSHandshaker.Handshake(ctx, conn, config)
finished := time.Since(thx.begin).Seconds()
thx.db.InsertIntoTLSHandshake(&TLSHandshakeEvent{
Network: network,
RemoteAddr: remoteAddr,
SNI: config.ServerName,
ALPN: config.NextProtos,
SkipVerify: config.InsecureSkipVerify,
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Oddity: thx.computeOddity(err),
TLSVersion: netxlite.TLSVersionString(state.Version),
CipherSuite: netxlite.TLSCipherSuiteString(state.CipherSuite),
NegotiatedProto: state.NegotiatedProtocol,
PeerCerts: NewArchivalTLSCerts(peerCerts(err, &state)),
})
return tconn, state, err
}
func (thx *tlsHandshakerDB) computeOddity(err error) Oddity {
if err == nil {
return ""
}
switch err.Error() {
case netxlite.FailureGenericTimeoutError:
return OddityTLSHandshakeTimeout
case netxlite.FailureConnectionReset:
return OddityTLSHandshakeReset
case netxlite.FailureEOFError:
return OddityTLSHandshakeUnexpectedEOF
case netxlite.FailureSSLInvalidHostname:
return OddityTLSHandshakeInvalidHostname
case netxlite.FailureSSLUnknownAuthority:
return OddityTLSHandshakeUnknownAuthority
default:
return OddityTLSHandshakeOther
}
}
func peerCerts(err error, state *tls.ConnectionState) (out [][]byte) {
var x509HostnameError x509.HostnameError
if errors.As(err, &x509HostnameError) {
// Test case: https://wrong.host.badssl.com/
return [][]byte{x509HostnameError.Certificate.Raw}
}
var x509UnknownAuthorityError x509.UnknownAuthorityError
if errors.As(err, &x509UnknownAuthorityError) {
// Test case: https://self-signed.badssl.com/. This error has
// never been among the ones returned by MK.
return [][]byte{x509UnknownAuthorityError.Cert.Raw}
}
var x509CertificateInvalidError x509.CertificateInvalidError
if errors.As(err, &x509CertificateInvalidError) {
// Test case: https://expired.badssl.com/
return [][]byte{x509CertificateInvalidError.Cert.Raw}
}
for _, cert := range state.PeerCertificates {
out = append(out, cert.Raw)
}
return
}

View File

@ -0,0 +1,81 @@
package measurex
import (
"net/http"
"net/url"
"time"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)
// NewTracingHTTPTransport creates a new HTTPTransport
// instance with events tracing.
//
// Arguments:
//
// - logger is the logger to use
//
// - begin is the zero time for measurements
//
// - db is the DB in which to write events that will
// eventually become the measurement
//
// - dialer is the base dialer to establish conns
//
// - resolver is the underlying resolver to use
//
// - handshake is the TLS handshaker to use
func NewTracingHTTPTransport(logger Logger, begin time.Time, db WritableDB,
resolver Resolver, dialer Dialer, handshaker TLSHandshaker) *HTTPTransportDB {
resolver = WrapResolver(begin, db, resolver)
dialer = netxlite.WrapDialer(logger, resolver, WrapDialer(begin, db, dialer))
tlsDialer := netxlite.NewTLSDialer(dialer, handshaker)
return WrapHTTPTransport(
begin, db, netxlite.NewHTTPTransport(logger, dialer, tlsDialer))
}
// NewTracingHTTPTransportWithDefaultSettings creates a new
// HTTP transport with tracing capabilities and default settings.
//
// Arguments:
//
// - begin is the zero time for measurements
//
// - logger is the logger to use
//
// - db is the DB in which to write events that will
// eventually become the measurement
//
func NewTracingHTTPTransportWithDefaultSettings(
begin time.Time, logger Logger, db WritableDB) *HTTPTransportDB {
return NewTracingHTTPTransport(logger, begin, db,
netxlite.NewResolverStdlib(logger),
netxlite.NewDialerWithoutResolver(logger),
netxlite.NewTLSHandshakerStdlib(logger))
}
func (mx *Measurer) NewTracingHTTPTransportWithDefaultSettings(
logger Logger, db WritableDB) *HTTPTransportDB {
return NewTracingHTTPTransport(
mx.Logger, mx.Begin, db, mx.NewResolverSystem(db, mx.Logger),
mx.NewDialerWithoutResolver(db, mx.Logger),
mx.TLSHandshaker)
}
// UnmeasuredHTTPEndpoints returns the endpoints whose IP address
// has been resolved but for which we don't have any measurement
// inside of the given database. The returned list will be
// empty if there is no such endpoint in the DB. This function will
// return an error if the URL is not valid or not HTTP/HTTPS.
func UnmeasuredHTTPEndpoints(db *MeasurementDB, URL string,
headers http.Header) ([]*HTTPEndpoint, error) {
parsedURL, err := url.Parse(URL)
if err != nil {
return nil, err
}
m := &DNSMeasurement{
Domain: parsedURL.Hostname(),
Measurement: db.AsMeasurement(),
}
return AllHTTPEndpointsForURL(parsedURL, headers, m)
}

View File

@ -0,0 +1,88 @@
package measurex
import (
"errors"
"net"
"net/url"
)
//
// Utils
//
// This is where we put free functions.
//
// ALPNForHTTPEndpoint selects the correct ALPN for an HTTP endpoint
// given the network. On failure, we return a nil list.
func ALPNForHTTPEndpoint(network EndpointNetwork) []string {
switch network {
case NetworkQUIC:
return []string{"h3"}
case NetworkTCP:
return []string{"h2", "http/1.1"}
default:
return nil
}
}
// addrStringIfNotNil returns the string of the given addr
// unless the addr is nil, in which case it returns an empty string.
func addrStringIfNotNil(addr net.Addr) (out string) {
if addr != nil {
out = addr.String()
}
return
}
// ErrCannotDeterminePortFromURL indicates that we could not determine
// the correct port from the URL authority and scheme.
var ErrCannotDeterminePortFromURL = errors.New("cannot determine port from URL")
// PortFromURL returns the port determined from the URL or an error.
func PortFromURL(URL *url.URL) (string, error) {
switch {
case URL.Port() != "":
return URL.Port(), nil
case URL.Scheme == "https":
return "443", nil
case URL.Scheme == "http":
return "80", nil
default:
return "", ErrCannotDeterminePortFromURL
}
}
// removeDuplicateEndpoints removes duplicate endpoints from a list of endpoints.
func removeDuplicateEndpoints(epnts ...*Endpoint) (out []*Endpoint) {
duplicates := make(map[string]*Endpoint)
for _, epnt := range epnts {
duplicates[epnt.String()] = epnt
}
for _, epnt := range duplicates {
out = append(out, epnt)
}
return
}
// removeDuplicateHTTPEndpoints removes duplicate endpoints from a list of endpoints.
func removeDuplicateHTTPEndpoints(epnts ...*HTTPEndpoint) (out []*HTTPEndpoint) {
duplicates := make(map[string]*HTTPEndpoint)
for _, epnt := range epnts {
duplicates[epnt.String()] = epnt
}
for _, epnt := range duplicates {
out = append(out, epnt)
}
return
}
// HTTPEndpointsToEndpoints convers HTTPEndpoints to Endpoints
func HTTPEndpointsToEndpoints(in []*HTTPEndpoint) (out []*Endpoint) {
for _, epnt := range in {
out = append(out, &Endpoint{
Network: epnt.Network,
Address: epnt.Address,
})
}
return
}