fix(measurex): use same keys of the OONI data format (#572)

This change should simplify the pipeline's job.

Reference issue: https://github.com/ooni/probe/issues/1817.

I previously dismissed this possibility, but now it seems clear it
is simpler to have a very tabular data format internally and to
convert such a format to OONI's data format when serializing.

The OONI data format is what the pipeline expects, but processing
is easier with a more linear/tabular format.
This commit is contained in:
Simone Basso
2021-11-05 10:46:45 +01:00
committed by GitHub
parent 6f90d29bfa
commit aa27bbe33f
35 changed files with 1571 additions and 1025 deletions
+525 -8
View File
@@ -1,8 +1,11 @@
package measurex
import (
"net"
"net/http"
"strconv"
"strings"
"time"
)
//
@@ -11,6 +14,10 @@ import (
// This file defines helpers to serialize to the OONI data format.
//
//
// BinaryData
//
// ArchivalBinaryData is the archival format for binary data.
type ArchivalBinaryData struct {
Data []byte `json:"data"`
@@ -29,6 +36,130 @@ func NewArchivalBinaryData(data []byte) (out *ArchivalBinaryData) {
return
}
//
// NetworkEvent
//
// ArchivalNetworkEvent is the OONI data format representation
// of a network event according to df-008-netevents.
type ArchivalNetworkEvent struct {
// JSON names compatible with df-008-netevents
RemoteAddr string `json:"address"`
Failure *string `json:"failure"`
Count int `json:"num_bytes,omitempty"`
Operation string `json:"operation"`
Network string `json:"proto"`
Finished float64 `json:"t"`
Started float64 `json:"started"`
// Names that are not part of the spec.
Oddity Oddity `json:"oddity"`
}
// NewArchivalNetworkEvent converts a network event to its archival format.
func NewArchivalNetworkEvent(in *NetworkEvent) *ArchivalNetworkEvent {
return &ArchivalNetworkEvent{
RemoteAddr: in.RemoteAddr,
Failure: in.Failure,
Count: in.Count,
Operation: in.Operation,
Network: in.Network,
Finished: in.Finished,
Started: in.Started,
Oddity: in.Oddity,
}
}
// NewArchivalNetworkEventList converts a list of NetworkEvent
// to a list of ArchivalNetworkEvent.
func NewArchivalNetworkEventList(in []*NetworkEvent) (out []*ArchivalNetworkEvent) {
for _, ev := range in {
out = append(out, NewArchivalNetworkEvent(ev))
}
return
}
//
// DNSRoundTripEvent
//
// ArchivalDNSRoundTripEvent is the OONI data format representation
// of a DNS round trip, which is currently not specified.
//
// We are trying to use names compatible with the names currently
// used by other specifications we currently use.
type ArchivalDNSRoundTripEvent struct {
Network string `json:"engine"`
Address string `json:"resolver_address"`
Query *ArchivalBinaryData `json:"raw_query"`
Started float64 `json:"started"`
Finished float64 `json:"t"`
Failure *string `json:"failure"`
Reply *ArchivalBinaryData `json:"raw_reply"`
}
// NewArchivalDNSRoundTripEvent converts a DNSRoundTripEvent into is archival format.
func NewArchivalDNSRoundTripEvent(in *DNSRoundTripEvent) *ArchivalDNSRoundTripEvent {
return &ArchivalDNSRoundTripEvent{
Network: in.Network,
Address: in.Address,
Query: NewArchivalBinaryData(in.Query),
Started: in.Started,
Finished: in.Finished,
Failure: in.Failure,
Reply: NewArchivalBinaryData(in.Reply),
}
}
// NewArchivalDNSRoundTripEventList converts a DNSRoundTripEvent
// list to the corresponding archival format.
func NewArchivalDNSRoundTripEventList(in []*DNSRoundTripEvent) (out []*ArchivalDNSRoundTripEvent) {
for _, ev := range in {
out = append(out, NewArchivalDNSRoundTripEvent(ev))
}
return
}
//
// HTTPRoundTrip
//
// ArchivalHTTPRequest is the archival format of an HTTP
// request according to df-001-http.md.
type ArchivalHTTPRequest struct {
Method string `json:"method"`
URL string `json:"url"`
Headers ArchivalHeaders `json:"headers"`
}
// ArchivalHTTPResponse is the archival format of an HTTP
// response according to df-001-http.md.
type ArchivalHTTPResponse struct {
// Names consistent with df-001-http.md
Code int64 `json:"code"`
Headers ArchivalHeaders `json:"headers"`
Body *ArchivalBinaryData `json:"body"`
BodyIsTruncated bool `json:"body_is_truncated"`
// Fields not part of the spec
BodyLength int64 `json:"x_body_length"`
BodyIsUTF8 bool `json:"x_body_is_utf8"`
}
// ArchivalHTTPRoundTripEvent is the archival format of an
// HTTP response according to df-001-http.md.
type ArchivalHTTPRoundTripEvent struct {
// JSON names following the df-001-httpt data format.
Failure *string `json:"failure"`
Request *HTTPRequest `json:"request"`
Response *HTTPResponse `json:"response"`
Finished float64 `json:"t"`
Started float64 `json:"started"`
// Names not in the specification
Oddity Oddity `json:"oddity"`
}
// ArchivalHeaders is a list of HTTP headers.
type ArchivalHeaders map[string]string
@@ -54,6 +185,64 @@ func NewArchivalHeaders(in http.Header) (out ArchivalHeaders) {
return
}
// NewArchivalHTTPRoundTripEvent converts an HTTPRoundTrip to its archival format.
func NewArchivalHTTPRoundTripEvent(in *HTTPRoundTripEvent) *ArchivalHTTPRoundTripEvent {
return &ArchivalHTTPRoundTripEvent{
Failure: in.Failure,
Request: &HTTPRequest{
Method: in.Method,
URL: in.URL,
Headers: NewArchivalHeaders(in.RequestHeaders),
},
Response: &HTTPResponse{
Code: in.StatusCode,
Headers: NewArchivalHeaders(in.ResponseHeaders),
Body: NewArchivalBinaryData(in.ResponseBody),
BodyLength: in.ResponseBodyLength,
BodyIsTruncated: in.ResponseBodyIsTruncated,
BodyIsUTF8: in.ResponseBodyIsUTF8,
},
Finished: in.Finished,
Started: in.Started,
Oddity: in.Oddity,
}
}
// NewArchivalHTTPRoundTripEventList converts a list of
// HTTPRoundTripEvent to a list of ArchivalRoundTripEvent.
func NewArchivalHTTPRoundTripEventList(in []*HTTPRoundTripEvent) (out []*ArchivalHTTPRoundTripEvent) {
for _, ev := range in {
out = append(out, NewArchivalHTTPRoundTripEvent(ev))
}
return
}
//
// QUICTLSHandshakeEvent
//
// ArchivalQUICTLSHandshakeEvent is the archival data format for a
// QUIC or TLS handshake event according to df-006-tlshandshake.
type ArchivalQUICTLSHandshakeEvent struct {
// JSON names compatible with df-006-tlshandshake
CipherSuite string `json:"cipher_suite"`
Failure *string `json:"failure"`
NegotiatedProto string `json:"negotiated_proto"`
TLSVersion string `json:"tls_version"`
PeerCerts []*ArchivalBinaryData `json:"peer_certificates"`
Finished float64 `json:"t"`
// JSON names that are consistent with the
// spirit of the spec but are not in it
RemoteAddr string `json:"address"`
SNI string `json:"server_name"` // used in prod
ALPN []string `json:"alpn"`
SkipVerify bool `json:"no_tls_verify"` // used in prod
Oddity Oddity `json:"oddity"`
Network string `json:"proto"`
Started float64 `json:"started"`
}
// NewArchivalTLSCertList builds a new []ArchivalBinaryData
// from a list of raw x509 certificates data.
func NewArchivalTLSCerts(in [][]byte) (out []*ArchivalBinaryData) {
@@ -66,13 +255,341 @@ func NewArchivalTLSCerts(in [][]byte) (out []*ArchivalBinaryData) {
return
}
// NewArchivalFailure creates an archival failure from an error. We
// cannot round trip an error using JSON, so we serialize to this
// intermediate format that is a sort of Optional<string>.
func NewArchivalFailure(err error) *string {
if err == nil {
return nil
// NewArchivalQUICTLSHandshakeEvent converts a QUICTLSHandshakeEvent
// to its archival data format.
func NewArchivalQUICTLSHandshakeEvent(in *QUICTLSHandshakeEvent) *ArchivalQUICTLSHandshakeEvent {
return &ArchivalQUICTLSHandshakeEvent{
CipherSuite: in.CipherSuite,
Failure: in.Failure,
NegotiatedProto: in.NegotiatedProto,
TLSVersion: in.TLSVersion,
PeerCerts: NewArchivalTLSCerts(in.PeerCerts),
Finished: in.Finished,
RemoteAddr: in.RemoteAddr,
SNI: in.SNI,
ALPN: in.ALPN,
SkipVerify: in.SkipVerify,
Oddity: in.Oddity,
Network: in.Network,
Started: in.Started,
}
s := err.Error()
return &s
}
// NewArchivalQUICTLSHandshakeEventList converts a list of
// QUICTLSHandshakeEvent to a list of ArchivalQUICTLSHandshakeEvent.
func NewArchivalQUICTLSHandshakeEventList(in []*QUICTLSHandshakeEvent) (out []*ArchivalQUICTLSHandshakeEvent) {
for _, ev := range in {
out = append(out, NewArchivalQUICTLSHandshakeEvent(ev))
}
return
}
//
// DNSLookup
//
// ArchivalDNSLookupAnswer is the archival format of a
// DNS lookup answer according to df-002-dnst.
type ArchivalDNSLookupAnswer struct {
// JSON names compatible with df-002-dnst's spec
Type string `json:"answer_type"`
IPv4 string `json:"ipv4,omitempty"`
IPv6 string `json:"ivp6,omitempty"`
// Names not part of the spec.
ALPN string `json:"alpn,omitempty"`
}
// ArchivalDNSLookupEvent is the archival data format
// of a DNS lookup according to df-002-dnst.
type ArchivalDNSLookupEvent struct {
// fields inside df-002-dnst
Answers []ArchivalDNSLookupAnswer `json:"answers"`
Network string `json:"engine"`
Failure *string `json:"failure"`
Domain string `json:"hostname"`
QueryType string `json:"query_type"`
Address string `json:"resolver_address"`
Finished float64 `json:"t"`
// Names not part of the spec.
Started float64 `json:"started"`
Oddity Oddity `json:"oddity"`
}
// NewArchivalDNSLookupAnswers creates a list of ArchivalDNSLookupAnswer.
func NewArchivalDNSLookupAnswers(in *DNSLookupEvent) (out []ArchivalDNSLookupAnswer) {
for _, ip := range in.A {
out = append(out, ArchivalDNSLookupAnswer{
Type: "A",
IPv4: ip,
})
}
for _, ip := range in.AAAA {
out = append(out, ArchivalDNSLookupAnswer{
Type: "AAAA",
IPv6: ip,
})
}
for _, alpn := range in.ALPN {
out = append(out, ArchivalDNSLookupAnswer{
Type: "ALPN",
ALPN: alpn,
})
}
return
}
// NewArchivalDNSLookupEvent converts a DNSLookupEvent
// to its archival representation.
func NewArchivalDNSLookupEvent(in *DNSLookupEvent) *ArchivalDNSLookupEvent {
return &ArchivalDNSLookupEvent{
Answers: NewArchivalDNSLookupAnswers(in),
Network: in.Network,
Failure: in.Failure,
Domain: in.Domain,
QueryType: in.QueryType,
Address: in.Address,
Finished: in.Finished,
Started: in.Started,
Oddity: in.Oddity,
}
}
// NewArchivalDNSLookupEventList converts a list of DNSLookupEvent
// to a list of ArchivalDNSLookupEvent.
func NewArchivalDNSLookupEventList(in []*DNSLookupEvent) (out []*ArchivalDNSLookupEvent) {
for _, ev := range in {
out = append(out, NewArchivalDNSLookupEvent(ev))
}
return
}
//
// TCPConnect
//
// ArchivalTCPConnect is the archival form of TCP connect
// events in compliance with df-005-tcpconnect.
type ArchivalTCPConnect struct {
// Names part of the spec.
IP string `json:"ip"`
Port int64 `json:"port"`
Finished float64 `json:"t"`
Status *ArchivalTCPConnectStatus `json:"status"`
// Names not part of the spec.
Started float64 `json:"started"`
Oddity Oddity `json:"oddity"`
}
// ArchivalTCPConnectStatus contains the status of a TCP connect.
type ArchivalTCPConnectStatus struct {
Blocked bool `json:"blocked"`
Failure *string `json:"failure"`
Success bool `json:"success"`
}
// NewArchivalTCPConnect converts a NetworkEvent to an ArchivalTCPConnect.
func NewArchivalTCPConnect(in *NetworkEvent) *ArchivalTCPConnect {
// We ignore errors because values come from Go code that
// emits correct serialization of TCP/UDP addresses.
addr, port, _ := net.SplitHostPort(in.RemoteAddr)
portnum, _ := strconv.Atoi(port)
return &ArchivalTCPConnect{
IP: addr,
Port: int64(portnum),
Finished: in.Finished,
Status: &ArchivalTCPConnectStatus{
Blocked: in.Failure != nil,
Failure: in.Failure,
Success: in.Failure == nil,
},
Started: in.Started,
Oddity: in.Oddity,
}
}
// NewArchivalTCPConnectList converts a list of NetworkEvent
// to a list of ArchivalTCPConnect. In doing that, the code
// only considers "connect" events using the TCP protocol.
func NewArchivalTCPConnectList(in []*NetworkEvent) (out []*ArchivalTCPConnect) {
for _, ev := range in {
if ev.Operation != "connect" {
continue
}
switch ev.Network {
case "tcp", "tcp4", "tcp6":
out = append(out, NewArchivalTCPConnect(ev))
default:
// nothing
}
}
return
}
//
// URLMeasurement
//
// ArchivalURLMeasurement is the archival representation of URLMeasurement
type ArchivalURLMeasurement struct {
URL string `json:"url"`
DNS []*ArchivalDNSMeasurement `json:"dns"`
Endpoints []*ArchivalHTTPEndpointMeasurement `json:"endpoints"`
TH *ArchivalTHMeasurement `json:"th"`
TotalRuntime time.Duration `json:"x_total_runtime"`
DNSRuntime time.Duration `json:"x_dns_runtime"`
THRuntime time.Duration `json:"x_th_runtime"`
EpntsRuntime time.Duration `json:"x_epnts_runtime"`
}
// NewArchivalURLMeasurement creates the archival representation
// of an URLMeasurement data structure.
func NewArchivalURLMeasurement(in *URLMeasurement) *ArchivalURLMeasurement {
return &ArchivalURLMeasurement{
URL: in.URL,
DNS: NewArchivalDNSMeasurementList(in.DNS),
Endpoints: NewArchivalHTTPEndpointMeasurementList(in.Endpoints),
TH: NewArchivalTHMeasurement(in.TH),
TotalRuntime: in.TotalRuntime,
DNSRuntime: in.DNSRuntime,
THRuntime: in.THRuntime,
EpntsRuntime: in.EpntsRuntime,
}
}
//
// EndpointMeasurement
//
// ArchivalEndpointMeasurement is the archival representation of EndpointMeasurement.
type ArchivalEndpointMeasurement struct {
// Network is the network of this endpoint.
Network EndpointNetwork `json:"network"`
// Address is the address of this endpoint.
Address string `json:"address"`
// An EndpointMeasurement is a Measurement.
*ArchivalMeasurement
}
// NewArchivalEndpointMeasurement converts an EndpointMeasurement
// to the corresponding archival data format.
func NewArchivalEndpointMeasurement(in *EndpointMeasurement) *ArchivalEndpointMeasurement {
return &ArchivalEndpointMeasurement{
Network: in.Network,
Address: in.Address,
ArchivalMeasurement: NewArchivalMeasurement(in.Measurement),
}
}
//
// THMeasurement
//
// ArchivalTHMeasurement is the archival representation of THMeasurement.
type ArchivalTHMeasurement struct {
DNS []*ArchivalDNSMeasurement `json:"dns"`
Endpoints []*ArchivalHTTPEndpointMeasurement `json:"endpoints"`
}
// NewArchivalTHMeasurement creates the archival representation of THMeasurement.
func NewArchivalTHMeasurement(in *THMeasurement) *ArchivalTHMeasurement {
return &ArchivalTHMeasurement{
DNS: NewArchivalDNSMeasurementList(in.DNS),
Endpoints: NewArchivalHTTPEndpointMeasurementList(in.Endpoints),
}
}
//
// DNSMeasurement
//
// ArchivalDNSMeasurement is the archival representation of DNSMeasurement.
type ArchivalDNSMeasurement struct {
Domain string `json:"domain"`
*ArchivalMeasurement
}
// NewArchivalDNSMeasurement converts a DNSMeasurement to an ArchivalDNSMeasurement.
func NewArchivalDNSMeasurement(in *DNSMeasurement) *ArchivalDNSMeasurement {
return &ArchivalDNSMeasurement{
Domain: in.Domain,
ArchivalMeasurement: NewArchivalMeasurement(in.Measurement),
}
}
// NewArchivalDNSMeasurementList converts a list of DNSMeasurement
// to a list of ArchivalDNSMeasurement.
func NewArchivalDNSMeasurementList(in []*DNSMeasurement) (out []*ArchivalDNSMeasurement) {
for _, m := range in {
out = append(out, NewArchivalDNSMeasurement(m))
}
return
}
//
// HTTPEndpointMeasurement
//
// ArchivalHTTPEndpointMeasurement is the archival representation
// of an HTTPEndpointMeasurement.
type ArchivalHTTPEndpointMeasurement struct {
URL string `json:"url"`
Network EndpointNetwork `json:"network"`
Address string `json:"address"`
*ArchivalMeasurement
}
// NewArchivalHTTPEndpointMeasurement converts an HTTPEndpointMeasurement
// to an ArchivalHTTPEndpointMeasurement.
func NewArchivalHTTPEndpointMeasurement(in *HTTPEndpointMeasurement) *ArchivalHTTPEndpointMeasurement {
return &ArchivalHTTPEndpointMeasurement{
URL: in.URL,
Network: in.Network,
Address: in.Address,
ArchivalMeasurement: NewArchivalMeasurement(in.Measurement),
}
}
// NewArchivalHTTPEndpointMeasurementList converts a list of HTTPEndpointMeasurement
// to a list of ArchivalHTTPEndpointMeasurement.
func NewArchivalHTTPEndpointMeasurementList(in []*HTTPEndpointMeasurement) (out []*ArchivalHTTPEndpointMeasurement) {
for _, m := range in {
out = append(out, NewArchivalHTTPEndpointMeasurement(m))
}
return
}
//
// Measurement
//
// ArchivalMeasurement is the archival representation of a Measurement.
type ArchivalMeasurement struct {
NetworkEvents []*ArchivalNetworkEvent `json:"network_events,omitempty"`
DNSEvents []*ArchivalDNSRoundTripEvent `json:"dns_events,omitempty"`
Queries []*ArchivalDNSLookupEvent `json:"queries,omitempty"`
TCPConnect []*ArchivalTCPConnect `json:"tcp_connect,omitempty"`
TLSHandshakes []*ArchivalQUICTLSHandshakeEvent `json:"tls_handshakes,omitempty"`
QUICHandshakes []*ArchivalQUICTLSHandshakeEvent `json:"quic_handshakes,omitempty"`
Requests []*ArchivalHTTPRoundTripEvent `json:"requests,omitempty"`
}
// NewArchivalMeasurement converts a Measurement to ArchivalMeasurement.
func NewArchivalMeasurement(in *Measurement) *ArchivalMeasurement {
out := &ArchivalMeasurement{
NetworkEvents: NewArchivalNetworkEventList(in.ReadWrite),
DNSEvents: NewArchivalDNSRoundTripEventList(in.DNSRoundTrip),
Queries: nil, // done below
TCPConnect: NewArchivalTCPConnectList(in.Connect),
TLSHandshakes: NewArchivalQUICTLSHandshakeEventList(in.TLSHandshake),
QUICHandshakes: NewArchivalQUICTLSHandshakeEventList(in.QUICHandshake),
Requests: NewArchivalHTTPRoundTripEventList(in.HTTPRoundTrip),
}
out.Queries = append(out.Queries, NewArchivalDNSLookupEventList(in.LookupHost)...)
out.Queries = append(out.Queries, NewArchivalDNSLookupEventList(in.LookupHTTPSSvc)...)
return out
}
+8 -8
View File
@@ -28,7 +28,7 @@ type WritableDB interface {
InsertIntoClose(ev *NetworkEvent)
// InsertIntoTLSHandshake saves a TLS handshake event.
InsertIntoTLSHandshake(ev *TLSHandshakeEvent)
InsertIntoTLSHandshake(ev *QUICTLSHandshakeEvent)
// InsertIntoLookupHost saves a lookup host event.
InsertIntoLookupHost(ev *DNSLookupEvent)
@@ -46,7 +46,7 @@ type WritableDB interface {
InsertIntoHTTPRedirect(ev *HTTPRedirectEvent)
// InsertIntoQUICHandshake saves a QUIC handshake event.
InsertIntoQUICHandshake(ev *QUICHandshakeEvent)
InsertIntoQUICHandshake(ev *QUICTLSHandshakeEvent)
}
// MeasurementDB is a WritableDB that also allows high-level code
@@ -56,13 +56,13 @@ type MeasurementDB struct {
dialTable []*NetworkEvent
readWriteTable []*NetworkEvent
closeTable []*NetworkEvent
tlsHandshakeTable []*TLSHandshakeEvent
tlsHandshakeTable []*QUICTLSHandshakeEvent
lookupHostTable []*DNSLookupEvent
lookupHTTPSvcTable []*DNSLookupEvent
dnsRoundTripTable []*DNSRoundTripEvent
httpRoundTripTable []*HTTPRoundTripEvent
httpRedirectTable []*HTTPRedirectEvent
quicHandshakeTable []*QUICHandshakeEvent
quicHandshakeTable []*QUICTLSHandshakeEvent
// mu protects all the fields
mu sync.Mutex
@@ -126,14 +126,14 @@ func (db *MeasurementDB) selectAllFromCloseUnlocked() (out []*NetworkEvent) {
}
// InsertIntoTLSHandshake implements EventDB.InsertIntoTLSHandshake.
func (db *MeasurementDB) InsertIntoTLSHandshake(ev *TLSHandshakeEvent) {
func (db *MeasurementDB) InsertIntoTLSHandshake(ev *QUICTLSHandshakeEvent) {
db.mu.Lock()
db.tlsHandshakeTable = append(db.tlsHandshakeTable, ev)
db.mu.Unlock()
}
// selectAllFromTLSHandshakeUnlocked returns all TLS handshake events.
func (db *MeasurementDB) selectAllFromTLSHandshakeUnlocked() (out []*TLSHandshakeEvent) {
func (db *MeasurementDB) selectAllFromTLSHandshakeUnlocked() (out []*QUICTLSHandshakeEvent) {
out = append(out, db.tlsHandshakeTable...)
return
}
@@ -204,14 +204,14 @@ func (db *MeasurementDB) selectAllFromHTTPRedirectUnlocked() (out []*HTTPRedirec
}
// InsertIntoQUICHandshake implements EventDB.InsertIntoQUICHandshake.
func (db *MeasurementDB) InsertIntoQUICHandshake(ev *QUICHandshakeEvent) {
func (db *MeasurementDB) InsertIntoQUICHandshake(ev *QUICTLSHandshakeEvent) {
db.mu.Lock()
db.quicHandshakeTable = append(db.quicHandshakeTable, ev)
db.mu.Unlock()
}
// selectAllFromQUICHandshakeUnlocked returns all QUIC handshake events.
func (db *MeasurementDB) selectAllFromQUICHandshakeUnlocked() (out []*QUICHandshakeEvent) {
func (db *MeasurementDB) selectAllFromQUICHandshakeUnlocked() (out []*QUICTLSHandshakeEvent) {
out = append(out, db.quicHandshakeTable...)
return
}
+12 -15
View File
@@ -54,17 +54,14 @@ type dialerDB struct {
// NetworkEvent contains a network event. This kind of events
// are generated by Dialer, QUICDialer, Conn, QUICConn.
type NetworkEvent struct {
// JSON names compatible with df-008-netevents
RemoteAddr string `json:"address"`
Failure *string `json:"failure"`
Count int `json:"num_bytes,omitempty"`
Operation string `json:"operation"`
Network string `json:"proto"`
Finished float64 `json:"t"`
Started float64 `json:"started"`
// Names that are not part of the spec.
Oddity Oddity `json:"oddity"`
RemoteAddr string
Failure *string
Count int
Operation string
Network string
Oddity Oddity
Finished float64
Started float64
}
func (d *dialerDB) DialContext(
@@ -78,7 +75,7 @@ func (d *dialerDB) DialContext(
RemoteAddr: address,
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Failure: NewFailure(err),
Oddity: d.computeOddity(err),
Count: 0,
})
@@ -128,7 +125,7 @@ func (c *connDB) Read(b []byte) (int, error) {
RemoteAddr: c.remoteAddr,
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Failure: NewFailure(err),
Count: count,
})
return count, err
@@ -144,7 +141,7 @@ func (c *connDB) Write(b []byte) (int, error) {
RemoteAddr: c.remoteAddr,
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Failure: NewFailure(err),
Count: count,
})
return count, err
@@ -160,7 +157,7 @@ func (c *connDB) Close() error {
RemoteAddr: c.remoteAddr,
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Failure: NewFailure(err),
Count: 0,
})
return err
+10 -12
View File
@@ -32,15 +32,13 @@ type dnsxRoundTripperDB struct {
// DNSRoundTripEvent contains the result of a DNS round trip.
type DNSRoundTripEvent struct {
// This data structure is not in df-002-dns but the names and
// semantics try to be consistent with such a spec.
Network string `json:"engine"`
Address string `json:"resolver_address"`
Query *ArchivalBinaryData `json:"raw_query"`
Started float64 `json:"started"`
Finished float64 `json:"t"`
Failure *string `json:"failure"`
Reply *ArchivalBinaryData `json:"raw_reply"`
Network string
Address string
Query []byte
Started float64
Finished float64
Failure *string
Reply []byte
}
func (txp *dnsxRoundTripperDB) RoundTrip(ctx context.Context, query []byte) ([]byte, error) {
@@ -50,11 +48,11 @@ func (txp *dnsxRoundTripperDB) RoundTrip(ctx context.Context, query []byte) ([]b
txp.db.InsertIntoDNSRoundTrip(&DNSRoundTripEvent{
Network: txp.DNSTransport.Network(),
Address: txp.DNSTransport.Address(),
Query: NewArchivalBinaryData(query),
Query: query,
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Reply: NewArchivalBinaryData(reply),
Failure: NewFailure(err),
Reply: reply,
})
return reply, err
}
+12
View File
@@ -0,0 +1,12 @@
package measurex
// NewFailure creates a serializable failure from an error. We
// cannot round trip an error using JSON, so we serialize to this
// intermediate format that is a sort of Optional<string>.
func NewFailure(err error) *string {
if err == nil {
return nil
}
s := err.Error()
return &s
}
+25 -25
View File
@@ -124,31 +124,33 @@ type HTTPResponse struct {
// HTTPRoundTripEvent contains information about an HTTP round trip.
type HTTPRoundTripEvent struct {
// JSON names following the df-001-httpt data format.
Failure *string `json:"failure"`
Request *HTTPRequest `json:"request"`
Response *HTTPResponse `json:"response"`
Finished float64 `json:"t"`
Started float64 `json:"started"`
// Names not in the specification
Oddity Oddity `json:"oddity"`
Failure *string
Method string
URL string
RequestHeaders http.Header
StatusCode int64
ResponseHeaders http.Header
ResponseBody []byte
ResponseBodyLength int64
ResponseBodyIsTruncated bool
ResponseBodyIsUTF8 bool
Finished float64
Started float64
Oddity Oddity
}
func (txp *HTTPTransportDB) RoundTrip(req *http.Request) (*http.Response, error) {
started := time.Since(txp.Begin).Seconds()
resp, err := txp.HTTPTransport.RoundTrip(req)
rt := &HTTPRoundTripEvent{
Request: &HTTPRequest{
Method: req.Method,
URL: req.URL.String(),
Headers: NewArchivalHeaders(req.Header),
},
Started: started,
Method: req.Method,
URL: req.URL.String(),
RequestHeaders: req.Header,
Started: started,
}
if err != nil {
rt.Finished = time.Since(txp.Begin).Seconds()
rt.Failure = NewArchivalFailure(err)
rt.Failure = NewFailure(err)
txp.DB.InsertIntoHTTPRoundTrip(rt)
return nil, err
}
@@ -162,10 +164,8 @@ func (txp *HTTPTransportDB) RoundTrip(req *http.Request) (*http.Response, error)
case resp.StatusCode >= 400:
rt.Oddity = OddityStatusOther
}
rt.Response = &HTTPResponse{
Code: int64(resp.StatusCode),
Headers: NewArchivalHeaders(resp.Header),
}
rt.StatusCode = int64(resp.StatusCode)
rt.ResponseHeaders = resp.Header
r := io.LimitReader(resp.Body, txp.MaxBodySnapshotSize)
body, err := netxlite.ReadAllContext(req.Context(), r)
if errors.Is(err, io.EOF) && resp.Close {
@@ -173,7 +173,7 @@ func (txp *HTTPTransportDB) RoundTrip(req *http.Request) (*http.Response, error)
}
if err != nil {
rt.Finished = time.Since(txp.Begin).Seconds()
rt.Failure = NewArchivalFailure(err)
rt.Failure = NewFailure(err)
txp.DB.InsertIntoHTTPRoundTrip(rt)
return nil, err
}
@@ -181,10 +181,10 @@ func (txp *HTTPTransportDB) RoundTrip(req *http.Request) (*http.Response, error)
Reader: io.MultiReader(bytes.NewReader(body), resp.Body),
Closer: resp.Body,
}
rt.Response.Body = NewArchivalBinaryData(body)
rt.Response.BodyLength = int64(len(body))
rt.Response.BodyIsTruncated = int64(len(body)) >= txp.MaxBodySnapshotSize
rt.Response.BodyIsUTF8 = utf8.Valid(body)
rt.ResponseBody = body
rt.ResponseBodyLength = int64(len(body))
rt.ResponseBodyIsTruncated = int64(len(body)) >= txp.MaxBodySnapshotSize
rt.ResponseBodyIsUTF8 = utf8.Valid(body)
rt.Finished = time.Since(txp.Begin).Seconds()
txp.DB.InsertIntoHTTPRoundTrip(rt)
return resp, nil
+36 -26
View File
@@ -18,33 +18,33 @@ import (
// a bunch of measurements detailing each measurement step.
type URLMeasurement struct {
// URL is the URL we're measuring.
URL string `json:"url"`
URL string
// DNS contains all the DNS related measurements.
DNS []*DNSMeasurement `json:"dns"`
DNS []*DNSMeasurement
// Endpoints contains a measurement for each endpoint
// that we discovered via DNS or TH.
Endpoints []*HTTPEndpointMeasurement `json:"endpoints"`
Endpoints []*HTTPEndpointMeasurement
// RedirectURLs contain the URLs to which we should fetch
// if we choose to follow redirections.
RedirectURLs []string `json:"-"`
RedirectURLs []string
// THMeasurement is the measurement collected by the TH.
TH interface{} `json:"th,omitempty"`
// TH is the measurement collected by the TH.
TH *THMeasurement
// TotalRuntime is the total time to measure this URL.
TotalRuntime time.Duration `json:"-"`
TotalRuntime time.Duration
// DNSRuntime is the time to run all DNS checks.
DNSRuntime time.Duration `json:"x_dns_runtime"`
DNSRuntime time.Duration
// THRuntime is the total time to invoke all test helpers.
THRuntime time.Duration `json:"x_th_runtime"`
THRuntime time.Duration
// EpntsRuntime is the total time to check all the endpoints.
EpntsRuntime time.Duration `json:"x_epnts_runtime"`
EpntsRuntime time.Duration
}
// fillRedirects takes in input a complete URLMeasurement and fills
@@ -67,40 +67,40 @@ func (m *URLMeasurement) fillRedirects() {
// data format is not compatible with the OONI data format.
type Measurement struct {
// Connect contains all the connect operations.
Connect []*NetworkEvent `json:"connect,omitempty"`
Connect []*NetworkEvent
// ReadWrite contains all the read and write operations.
ReadWrite []*NetworkEvent `json:"read_write,omitempty"`
ReadWrite []*NetworkEvent
// Close contains all the close operations.
Close []*NetworkEvent `json:"-"`
Close []*NetworkEvent
// TLSHandshake contains all the TLS handshakes.
TLSHandshake []*TLSHandshakeEvent `json:"tls_handshake,omitempty"`
TLSHandshake []*QUICTLSHandshakeEvent
// QUICHandshake contains all the QUIC handshakes.
QUICHandshake []*QUICHandshakeEvent `json:"quic_handshake,omitempty"`
QUICHandshake []*QUICTLSHandshakeEvent
// LookupHost contains all the host lookups.
LookupHost []*DNSLookupEvent `json:"lookup_host,omitempty"`
LookupHost []*DNSLookupEvent
// LookupHTTPSSvc contains all the HTTPSSvc lookups.
LookupHTTPSSvc []*DNSLookupEvent `json:"lookup_httpssvc,omitempty"`
LookupHTTPSSvc []*DNSLookupEvent
// DNSRoundTrip contains all the DNS round trips.
DNSRoundTrip []*DNSRoundTripEvent `json:"dns_round_trip,omitempty"`
DNSRoundTrip []*DNSRoundTripEvent
// HTTPRoundTrip contains all the HTTP round trips.
HTTPRoundTrip []*HTTPRoundTripEvent `json:"http_round_trip,omitempty"`
HTTPRoundTrip []*HTTPRoundTripEvent
// HTTPRedirect contains all the redirections.
HTTPRedirect []*HTTPRedirectEvent `json:"-"`
HTTPRedirect []*HTTPRedirectEvent
}
// DNSMeasurement is a DNS measurement.
type DNSMeasurement struct {
// Domain is the domain this measurement refers to.
Domain string `json:"domain"`
Domain string
// A DNSMeasurement is a Measurement.
*Measurement
@@ -239,10 +239,10 @@ func AllHTTPEndpointsForURL(URL *url.URL,
// EndpointMeasurement is an endpoint measurement.
type EndpointMeasurement struct {
// Network is the network of this endpoint.
Network EndpointNetwork `json:"network"`
Network EndpointNetwork
// Address is the address of this endpoint.
Address string `json:"address"`
Address string
// An EndpointMeasurement is a Measurement.
*Measurement
@@ -251,14 +251,24 @@ type EndpointMeasurement struct {
// HTTPEndpointMeasurement is an HTTP endpoint measurement.
type HTTPEndpointMeasurement struct {
// URL is the URL this measurement refers to.
URL string `json:"url"`
URL string
// Network is the network of this endpoint.
Network EndpointNetwork `json:"network"`
Network EndpointNetwork
// Address is the address of this endpoint.
Address string `json:"address"`
Address string
// An HTTPEndpointMeasurement is a Measurement.
*Measurement
}
// THMeasurement is the measurement performed by the TH.
type THMeasurement struct {
// DNS contains all the DNS related measurements.
DNS []*DNSMeasurement
// Endpoints contains a measurement for each endpoint
// that was discovered by the probe or the TH.
Endpoints []*HTTPEndpointMeasurement
}
+3 -8
View File
@@ -13,7 +13,6 @@ import (
"context"
"crypto/tls"
"errors"
stdlog "log"
"net"
"net/http"
"net/url"
@@ -712,7 +711,7 @@ type MeasureURLHelper interface {
// test helper protocol allows one to set.
LookupExtraHTTPEndpoints(ctx context.Context, URL *url.URL,
headers http.Header, epnts ...*HTTPEndpoint) (
newEpnts []*HTTPEndpoint, thMeasurement interface{}, err error)
newEpnts []*HTTPEndpoint, thMeasurement *THMeasurement, err error)
}
// MeasureURL measures an HTTP or HTTPS URL. The DNS resolvers
@@ -802,12 +801,8 @@ func (mx *Measurer) maybeQUICFollowUp(ctx context.Context,
if epnt.QUICHandshake != nil {
return
}
for idx, rtrip := range epnt.HTTPRoundTrip {
if rtrip.Response == nil {
stdlog.Printf("malformed HTTPRoundTrip@%d: %+v", idx, rtrip)
continue
}
if v := rtrip.Response.Headers.Get("alt-svc"); v != "" {
for _, rtrip := range epnt.HTTPRoundTrip {
if v := rtrip.ResponseHeaders.Get("alt-svc"); v != "" {
altsvc = append(altsvc, v)
}
}
+6 -9
View File
@@ -60,7 +60,7 @@ func (c *udpLikeConnDB) WriteTo(p []byte, addr net.Addr) (int, error) {
RemoteAddr: addr.String(),
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Failure: NewFailure(err),
Count: count,
})
return count, err
@@ -76,7 +76,7 @@ func (c *udpLikeConnDB) ReadFrom(b []byte) (int, net.Addr, error) {
RemoteAddr: addrStringIfNotNil(addr),
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Failure: NewFailure(err),
Count: count,
})
return count, addr, err
@@ -92,15 +92,12 @@ func (c *udpLikeConnDB) Close() error {
RemoteAddr: "",
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Failure: NewFailure(err),
Count: 0,
})
return err
}
// QUICHandshakeEvent is the result of QUICHandshake.
type QUICHandshakeEvent = TLSHandshakeEvent
// NewQUICDialerWithoutResolver creates a new QUICDialer that is not
// attached to any resolver. This means that every attempt to dial any
// address containing a domain name will fail. This QUICDialer will
@@ -138,7 +135,7 @@ func (qh *quicDialerDB) DialContext(ctx context.Context, network, address string
}
}
finished := time.Since(qh.begin).Seconds()
qh.db.InsertIntoQUICHandshake(&QUICHandshakeEvent{
qh.db.InsertIntoQUICHandshake(&QUICTLSHandshakeEvent{
Network: "quic",
RemoteAddr: address,
SNI: tlsConfig.ServerName,
@@ -146,12 +143,12 @@ func (qh *quicDialerDB) DialContext(ctx context.Context, network, address string
SkipVerify: tlsConfig.InsecureSkipVerify,
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Failure: NewFailure(err),
Oddity: qh.computeOddity(err),
TLSVersion: netxlite.TLSVersionString(state.Version),
CipherSuite: netxlite.TLSCipherSuiteString(state.CipherSuite),
NegotiatedProto: state.NegotiatedProtocol,
PeerCerts: NewArchivalTLSCerts(peerCerts(nil, &state)),
PeerCerts: peerCerts(nil, &state),
})
return sess, err
}
+42 -79
View File
@@ -8,7 +8,6 @@ package measurex
import (
"context"
"net"
"strings"
"time"
@@ -65,31 +64,19 @@ type resolverDB struct {
db WritableDB
}
// DNSLookupAnswer is a DNS lookup answer.
type DNSLookupAnswer struct {
// JSON names compatible with df-002-dnst's spec
Type string `json:"answer_type"`
IPv4 string `json:"ipv4,omitempty"`
IPv6 string `json:"ivp6,omitempty"`
// Names not part of the spec.
ALPN string `json:"alpn,omitempty"`
}
// DNSLookupEvent contains the results of a DNS lookup.
type DNSLookupEvent struct {
// fields inside df-002-dnst
Answers []DNSLookupAnswer `json:"answers"`
Network string `json:"engine"`
Failure *string `json:"failure"`
Domain string `json:"hostname"`
QueryType string `json:"query_type"`
Address string `json:"resolver_address"`
Finished float64 `json:"t"`
// Names not part of the spec.
Started float64 `json:"started"`
Oddity Oddity `json:"oddity"`
Network string
Failure *string
Domain string
QueryType string
Address string
Finished float64
Started float64
Oddity Oddity
A []string
AAAA []string
ALPN []string
}
// SupportsHTTP3 returns true if this query is for HTTPS and
@@ -98,12 +85,9 @@ func (ev *DNSLookupEvent) SupportsHTTP3() bool {
if ev.QueryType != "HTTPS" {
return false
}
for _, ans := range ev.Answers {
switch ans.Type {
case "ALPN":
if ans.ALPN == "h3" {
return true
}
for _, alpn := range ev.ALPN {
if alpn == "h3" {
return true
}
}
return false
@@ -111,18 +95,8 @@ func (ev *DNSLookupEvent) SupportsHTTP3() bool {
// Addrs returns all the IPv4/IPv6 addresses
func (ev *DNSLookupEvent) Addrs() (out []string) {
for _, ans := range ev.Answers {
switch ans.Type {
case "A":
if net.ParseIP(ans.IPv4) != nil {
out = append(out, ans.IPv4)
}
case "AAAA":
if net.ParseIP(ans.IPv6) != nil {
out = append(out, ans.IPv6)
}
}
}
out = append(out, ev.A...)
out = append(out, ev.AAAA...)
return
}
@@ -130,35 +104,39 @@ func (r *resolverDB) LookupHost(ctx context.Context, domain string) ([]string, e
started := time.Since(r.begin).Seconds()
addrs, err := r.Resolver.LookupHost(ctx, domain)
finished := time.Since(r.begin).Seconds()
for _, qtype := range []string{"A", "AAAA"} {
ev := &DNSLookupEvent{
Answers: r.computeAnswers(addrs, qtype),
Network: r.Resolver.Network(),
Address: r.Resolver.Address(),
Failure: NewArchivalFailure(err),
Domain: domain,
QueryType: qtype,
Finished: finished,
Started: started,
Oddity: r.computeOddityLookupHost(addrs, err),
}
r.db.InsertIntoLookupHost(ev)
}
r.saveLookupResults(domain, started, finished, err, addrs, "A")
r.saveLookupResults(domain, started, finished, err, addrs, "AAAA")
return addrs, err
}
func (r *resolverDB) computeAnswers(addrs []string, qtype string) (out []DNSLookupAnswer) {
func (r *resolverDB) saveLookupResults(domain string, started, finished float64,
err error, addrs []string, qtype string) {
ev := &DNSLookupEvent{
Network: r.Resolver.Network(),
Address: r.Resolver.Address(),
Failure: NewFailure(err),
Domain: domain,
QueryType: qtype,
Finished: finished,
Started: started,
}
for _, addr := range addrs {
if qtype == "A" && !strings.Contains(addr, ":") {
out = append(out, DNSLookupAnswer{Type: qtype, IPv4: addr})
ev.A = append(ev.A, addr)
continue
}
if qtype == "AAAA" && strings.Contains(addr, ":") {
out = append(out, DNSLookupAnswer{Type: qtype, IPv6: addr})
ev.AAAA = append(ev.AAAA, addr)
continue
}
}
return
switch qtype {
case "A":
ev.Oddity = r.computeOddityLookupHost(ev.A, err)
case "AAAA":
ev.Oddity = r.computeOddityLookupHost(ev.AAAA, err)
}
r.db.InsertIntoLookupHost(ev)
}
func (r *resolverDB) computeOddityLookupHost(addrs []string, err error) Oddity {
@@ -193,28 +171,13 @@ func (r *resolverDB) LookupHTTPS(ctx context.Context, domain string) (*HTTPSSvc,
QueryType: "HTTPS",
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Failure: NewFailure(err),
Oddity: Oddity(r.computeOddityHTTPSSvc(https, err)),
}
if err == nil {
for _, addr := range https.IPv4 {
ev.Answers = append(ev.Answers, DNSLookupAnswer{
Type: "A",
IPv4: addr,
})
}
for _, addr := range https.IPv6 {
ev.Answers = append(ev.Answers, DNSLookupAnswer{
Type: "AAAA",
IPv6: addr,
})
}
for _, alpn := range https.ALPN {
ev.Answers = append(ev.Answers, DNSLookupAnswer{
Type: "ALPN",
ALPN: alpn,
})
}
ev.A = append(ev.A, https.IPv4...)
ev.AAAA = append(ev.AAAA, https.IPv6...)
ev.ALPN = append(ev.ALPN, https.ALPN...)
}
r.db.InsertIntoLookupHTTPSSvc(ev)
return https, err
+18 -22
View File
@@ -38,25 +38,21 @@ type tlsHandshakerDB struct {
db WritableDB
}
// TLSHandshakeEvent contains a TLS handshake event.
type TLSHandshakeEvent struct {
// JSON names compatible with df-006-tlshandshake
CipherSuite string `json:"cipher_suite"`
Failure *string `json:"failure"`
NegotiatedProto string `json:"negotiated_proto"`
TLSVersion string `json:"tls_version"`
PeerCerts []*ArchivalBinaryData `json:"peer_certificates"`
Finished float64 `json:"t"`
// JSON names that are consistent with the
// spirit of the spec but are not in it
RemoteAddr string `json:"address"`
SNI string `json:"server_name"` // used in prod
ALPN []string `json:"alpn"`
SkipVerify bool `json:"no_tls_verify"` // used in prod
Oddity Oddity `json:"oddity"`
Network string `json:"proto"`
Started float64 `json:"started"`
// QUICTLSHandshakeEvent contains a QUIC or TLS handshake event.
type QUICTLSHandshakeEvent struct {
CipherSuite string
Failure *string
NegotiatedProto string
TLSVersion string
PeerCerts [][]byte
Finished float64
RemoteAddr string
SNI string
ALPN []string
SkipVerify bool
Oddity Oddity
Network string
Started float64
}
func (thx *tlsHandshakerDB) Handshake(ctx context.Context,
@@ -66,7 +62,7 @@ func (thx *tlsHandshakerDB) Handshake(ctx context.Context,
started := time.Since(thx.begin).Seconds()
tconn, state, err := thx.TLSHandshaker.Handshake(ctx, conn, config)
finished := time.Since(thx.begin).Seconds()
thx.db.InsertIntoTLSHandshake(&TLSHandshakeEvent{
thx.db.InsertIntoTLSHandshake(&QUICTLSHandshakeEvent{
Network: network,
RemoteAddr: remoteAddr,
SNI: config.ServerName,
@@ -74,12 +70,12 @@ func (thx *tlsHandshakerDB) Handshake(ctx context.Context,
SkipVerify: config.InsecureSkipVerify,
Started: started,
Finished: finished,
Failure: NewArchivalFailure(err),
Failure: NewFailure(err),
Oddity: thx.computeOddity(err),
TLSVersion: netxlite.TLSVersionString(state.Version),
CipherSuite: netxlite.TLSCipherSuiteString(state.CipherSuite),
NegotiatedProto: state.NegotiatedProtocol,
PeerCerts: NewArchivalTLSCerts(peerCerts(err, &state)),
PeerCerts: peerCerts(err, &state),
})
return tconn, state, err
}