From 5501b2201adb22443544be4489cb1996f53c0491 Mon Sep 17 00:00:00 2001 From: DecFox <33030671+DecFox@users.noreply.github.com> Date: Fri, 8 Jul 2022 23:12:24 +0530 Subject: [PATCH] feat: dnsping using step-by-step (#831) Reference issue for this pull request: https://github.com/ooni/probe/issues/2159 This diff refactors the `dnsping` experiment to use the [step-by-step measurement style](https://github.com/ooni/probe-cli/blob/master/docs/design/dd-003-step-by-step.md). Co-authored-by: decfox Co-authored-by: Simone Basso --- internal/engine/experiment/dnsping/dnsping.go | 102 +++--- .../engine/experiment/dnsping/dnsping_test.go | 2 +- .../engine/experiment/dnsping/testkeys.go | 35 +++ internal/measurexlite/dns.go | 170 ++++++++++ internal/measurexlite/dns_test.go | 296 ++++++++++++++++++ internal/measurexlite/trace.go | 50 ++- internal/measurexlite/trace_test.go | 78 +++++ internal/model/mocks/trace.go | 8 + internal/model/mocks/trace_test.go | 24 ++ internal/model/netx.go | 22 ++ internal/netxlite/integration_test.go | 8 +- internal/netxlite/resolvercore.go | 8 +- internal/netxlite/resolvercore_test.go | 8 +- internal/netxlite/resolverparallel.go | 5 + internal/netxlite/trace.go | 6 + internal/tutorial/netxlite/chapter06/main.go | 2 +- 16 files changed, 747 insertions(+), 77 deletions(-) create mode 100644 internal/engine/experiment/dnsping/testkeys.go create mode 100644 internal/measurexlite/dns.go create mode 100644 internal/measurexlite/dns_test.go diff --git a/internal/engine/experiment/dnsping/dnsping.go b/internal/engine/experiment/dnsping/dnsping.go index bc77b94..b03f005 100644 --- a/internal/engine/experiment/dnsping/dnsping.go +++ b/internal/engine/experiment/dnsping/dnsping.go @@ -9,15 +9,18 @@ import ( "fmt" "net/url" "strings" + "sync" "time" - "github.com/ooni/probe-cli/v3/internal/measurex" + "github.com/miekg/dns" + "github.com/ooni/probe-cli/v3/internal/measurexlite" "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/netxlite" ) const ( testName = "dnsping" - testVersion = "0.1.0" + testVersion = "0.2.0" ) // Config contains the experiment configuration. @@ -53,21 +56,6 @@ func (c Config) domains() string { return "edge-chat.instagram.com example.com" } -// TestKeys contains the experiment results. -type TestKeys struct { - Pings []*SinglePing `json:"pings"` -} - -// TODO(bassosimone): save more data once the dnsping improvements at -// github.com/bassosimone/websteps-illustrated contains have been merged -// into this repository. When this happens, we'll able to save raw -// queries and network events of each individual query. - -// SinglePing contains the results of a single ping. -type SinglePing struct { - Queries []*measurex.ArchivalDNSLookupEvent `json:"queries"` -} - // Measurer performs the measurement. type Measurer struct { config Config @@ -117,69 +105,61 @@ func (m *Measurer) Run( if parsed.Port() == "" { return errMissingPort } - tk := new(TestKeys) + tk := NewTestKeys() measurement.TestKeys = tk - mxmx := measurex.NewMeasurerWithDefaultSettings() - out := make(chan *measurex.DNSMeasurement) domains := strings.Split(m.config.domains(), " ") + wg := new(sync.WaitGroup) + wg.Add(len(domains)) for _, domain := range domains { - go m.dnsPingLoop(ctx, mxmx, parsed.Host, domain, out) - } - // The following multiplication could overflow but we're always using small - // numbers so it's fine for us not to bother with checking for that. - // - // We emit two results (A and AAAA) for each domain and repetition. - numResults := int(m.config.repetitions()) * len(domains) * 2 - for len(tk.Pings) < numResults { - meas := <-out - // TODO(bassosimone): when we merge the improvements at - // https://github.com/bassosimone/websteps-illustrated it - // will become unnecessary to split with query type - // as we're doing below. - queries := measurex.NewArchivalDNSLookupEventList(meas.LookupHost) - tk.Pings = append(tk.Pings, m.onlyQueryWithType(queries, "A")...) - tk.Pings = append(tk.Pings, m.onlyQueryWithType(queries, "AAAA")...) + go m.dnsPingLoop(ctx, measurement.MeasurementStartTimeSaved, sess.Logger(), parsed.Host, domain, wg, tk) } + wg.Wait() return nil // return nil so we always submit the measurement } -// onlyQueryWithType returns only the queries with the given type. -func (m *Measurer) onlyQueryWithType( - in []*measurex.ArchivalDNSLookupEvent, kind string) (out []*SinglePing) { - for _, query := range in { - if query.QueryType == kind { - out = append(out, &SinglePing{ - Queries: []*measurex.ArchivalDNSLookupEvent{query}, - }) - } - } - return -} - // dnsPingLoop sends all the ping requests and emits the results onto the out channel. -func (m *Measurer) dnsPingLoop(ctx context.Context, mxmx *measurex.Measurer, - address string, domain string, out chan<- *measurex.DNSMeasurement) { +func (m *Measurer) dnsPingLoop(ctx context.Context, zeroTime time.Time, logger model.Logger, + address string, domain string, wg *sync.WaitGroup, tk *TestKeys) { + defer wg.Done() ticker := time.NewTicker(m.config.delay()) defer ticker.Stop() for i := int64(0); i < m.config.repetitions(); i++ { - go m.dnsPingAsync(ctx, mxmx, address, domain, out) + wg.Add(1) + go m.dnsRoundTrip(ctx, i, zeroTime, logger, address, domain, wg, tk) <-ticker.C } } -// dnsPingAsync performs a DNS ping and emits the result onto the out channel. -func (m *Measurer) dnsPingAsync(ctx context.Context, mxmx *measurex.Measurer, - address string, domain string, out chan<- *measurex.DNSMeasurement) { - out <- m.dnsRoundTrip(ctx, mxmx, address, domain) -} - // dnsRoundTrip performs a round trip and returns the results to the caller. -func (m *Measurer) dnsRoundTrip(ctx context.Context, mxmx *measurex.Measurer, - address string, domain string) *measurex.DNSMeasurement { +func (m *Measurer) dnsRoundTrip(ctx context.Context, index int64, zeroTime time.Time, + logger model.Logger, address string, domain string, wg *sync.WaitGroup, tk *TestKeys) { // TODO(bassosimone): make the timeout user-configurable ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() - return mxmx.LookupHostUDP(ctx, domain, address) + defer wg.Done() + pings := []*SinglePing{} + trace := measurexlite.NewTrace(index, zeroTime) + ol := measurexlite.NewOperationLogger(logger, "DNSPing #%d %s %s", index, address, domain) + // TODO(bassosimone, DecFox): what should we do if the user passes us a resolver with a + // domain name in terms of saving its results? Shall we save also the system resolver's lookups? + // Shall we, otherwise, pre-resolve the domain name to IP addresses once and for all? In such + // a case, shall we use all the available IP addresses or just some of them? + dialer := netxlite.NewDialerWithStdlibResolver(logger) + resolver := trace.NewParallelUDPResolver(logger, dialer, address) + _, err := resolver.LookupHost(ctx, domain) + ol.Stop(err) + // Add the dns.TypeA ping + pings = append(pings, m.makePingFromLookup(<-trace.DNSLookup[dns.TypeA])) + // Add the dns.TypeAAAA ping + pings = append(pings, m.makePingFromLookup(<-trace.DNSLookup[dns.TypeAAAA])) + tk.addPings(pings) +} + +// makePingfromLookup returns a SinglePing from the result of a single query +func (m *Measurer) makePingFromLookup(lookup *model.ArchivalDNSLookupResult) (pings *SinglePing) { + return &SinglePing{ + Query: lookup, + } } // NewExperimentMeasurer creates a new ExperimentMeasurer. diff --git a/internal/engine/experiment/dnsping/dnsping_test.go b/internal/engine/experiment/dnsping/dnsping_test.go index 8f8702f..5fdc012 100644 --- a/internal/engine/experiment/dnsping/dnsping_test.go +++ b/internal/engine/experiment/dnsping/dnsping_test.go @@ -50,7 +50,7 @@ func TestMeasurer_run(t *testing.T) { if m.ExperimentName() != "dnsping" { t.Fatal("invalid experiment name") } - if m.ExperimentVersion() != "0.1.0" { + if m.ExperimentVersion() != "0.2.0" { t.Fatal("invalid experiment version") } ctx := context.Background() diff --git a/internal/engine/experiment/dnsping/testkeys.go b/internal/engine/experiment/dnsping/testkeys.go new file mode 100644 index 0000000..21cf3e3 --- /dev/null +++ b/internal/engine/experiment/dnsping/testkeys.go @@ -0,0 +1,35 @@ +package dnsping + +import ( + "sync" + + "github.com/ooni/probe-cli/v3/internal/model" +) + +// TestKeys contains the experiment results. +type TestKeys struct { + Pings []*SinglePing `json:"pings"` + + // mu provides mutual exclusion + mu sync.Mutex +} + +// SinglePing contains the results of a single ping. +type SinglePing struct { + Query *model.ArchivalDNSLookupResult `json:"query"` +} + +// NewTestKeys creates new dnsping TestKeys +func NewTestKeys() *TestKeys { + return &TestKeys{ + Pings: []*SinglePing{}, + mu: sync.Mutex{}, + } +} + +// addSinglePing adds []*SinglePing to the test keys +func (tk *TestKeys) addPings(pings []*SinglePing) { + tk.mu.Lock() + tk.Pings = append(tk.Pings, pings...) + tk.mu.Unlock() +} diff --git a/internal/measurexlite/dns.go b/internal/measurexlite/dns.go new file mode 100644 index 0000000..37ab72b --- /dev/null +++ b/internal/measurexlite/dns.go @@ -0,0 +1,170 @@ +package measurexlite + +// +// DNS Lookup with tracing +// + +import ( + "context" + "log" + "net" + "time" + + "github.com/miekg/dns" + "github.com/ooni/probe-cli/v3/internal/engine/geolocate" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/tracex" +) + +// newParallelResolverTrace is equivalent to netxlite.NewParallelResolver +// except that it returns a model.Resolver that uses this trace. +func (tx *Trace) newParallelResolverTrace(newResolver func() model.Resolver) model.Resolver { + return &resolverTrace{ + r: tx.newParallelResolver(newResolver), + tx: tx, + } +} + +// resolverTrace is a trace-aware resolver +type resolverTrace struct { + r model.Resolver + tx *Trace +} + +var _ model.Resolver = &resolverTrace{} + +// Address implements model.Resolver.Address +func (r *resolverTrace) Address() string { + return r.r.Address() +} + +// Network implements model.Resolver.Network +func (r *resolverTrace) Network() string { + return r.r.Network() +} + +// CloseIdleConnections implements model.Resolver.CloseIdleConnections +func (r *resolverTrace) CloseIdleConnections() { + r.r.CloseIdleConnections() +} + +// LookupHost implements model.Resolver.LookupHost +func (r *resolverTrace) LookupHost(ctx context.Context, hostname string) ([]string, error) { + return r.r.LookupHost(netxlite.ContextWithTrace(ctx, r.tx), hostname) +} + +// LookupHTTPS implements model.Resolver.LookupHTTPS +func (r *resolverTrace) LookupHTTPS(ctx context.Context, domain string) (*model.HTTPSSvc, error) { + return r.r.LookupHTTPS(netxlite.ContextWithTrace(ctx, r.tx), domain) +} + +// LookupNS implements model.Resolver.LookupNS +func (r *resolverTrace) LookupNS(ctx context.Context, domain string) ([]*net.NS, error) { + return r.r.LookupNS(netxlite.ContextWithTrace(ctx, r.tx), domain) +} + +// NewParallelUDPResolver returns a trace-ware parallel UDP resolver +func (tx *Trace) NewParallelUDPResolver(logger model.Logger, dialer model.Dialer, address string) model.Resolver { + return tx.newParallelResolverTrace(func() model.Resolver { + return netxlite.NewParallelUDPResolver(logger, dialer, address) + }) +} + +// NewParallelDNSOverHTTPSResolver returns a trace-aware parallel DoH resolver +func (tx *Trace) NewParallelDNSOverHTTPSResolver(logger model.Logger, URL string) model.Resolver { + return tx.newParallelResolverTrace(func() model.Resolver { + return netxlite.NewParallelDNSOverHTTPSResolver(logger, URL) + }) +} + +// OnDNSRoundTripForLookupHost implements model.Trace.OnDNSRoundTripForLookupHost +func (tx *Trace) OnDNSRoundTripForLookupHost(started time.Time, reso model.Resolver, query model.DNSQuery, + response model.DNSResponse, addrs []string, err error, finished time.Time) { + ch := tx.DNSLookup[query.Type()] + if ch == nil { + // Prevent blocking forever. See https://dave.cheney.net/2014/03/19/channel-axioms. + log.Printf("BUG: Requested query type %s has no valid channel to buffer results", dns.TypeToString[query.Type()]) + return + } + select { + case ch <- NewArchivalDNSLookupResultFromRoundTrip( + tx.Index, + started.Sub(tx.ZeroTime), + reso, + query, + response, + addrs, + err, + finished.Sub(tx.ZeroTime), + ): + default: + } +} + +// NewArchivalDNSLookupResultFromRoundTrip generates a model.ArchivalDNSLookupResultFromRoundTrip +// from the available information right after the DNS RoundTrip +func NewArchivalDNSLookupResultFromRoundTrip(index int64, started time.Duration, reso model.Resolver, query model.DNSQuery, + response model.DNSResponse, addrs []string, err error, finished time.Duration) *model.ArchivalDNSLookupResult { + return &model.ArchivalDNSLookupResult{ + Answers: archivalAnswersFromAddrs(addrs), + Engine: reso.Network(), + Failure: tracex.NewFailure(err), + Hostname: query.Domain(), + QueryType: dns.TypeToString[query.Type()], + ResolverHostname: nil, + ResolverAddress: reso.Address(), + T: finished.Seconds(), + } +} + +// archivalAnswersFromAddrs generates model.ArchivalDNSAnswer from an array of addresses +func archivalAnswersFromAddrs(addrs []string) (out []model.ArchivalDNSAnswer) { + for _, addr := range addrs { + ipv6, err := netxlite.IsIPv6(addr) + if err != nil { + log.Printf("BUG: NewArchivalDNSLookupResult: invalid IP address: %s", addr) + continue + } + asn, org, _ := geolocate.LookupASN(addr) + switch ipv6 { + case false: + out = append(out, model.ArchivalDNSAnswer{ + ASN: int64(asn), + ASOrgName: org, + AnswerType: "A", + Hostname: "", + IPv4: addr, + TTL: nil, + }) + case true: + out = append(out, model.ArchivalDNSAnswer{ + ASN: int64(asn), + ASOrgName: org, + AnswerType: "AAAA", + Hostname: "", + IPv6: addr, + TTL: nil, + }) + } + } + return +} + +// DNSLookupsFromRoundTrip drains the network events buffered inside the corresponding query channel +func (tx *Trace) DNSLookupsFromRoundTrip(query uint16) (out []*model.ArchivalDNSLookupResult) { + ch := tx.DNSLookup[query] + if ch == nil { + // Prevent blocking forever. See https://dave.cheney.net/2014/03/19/channel-axioms. + log.Printf("BUG: Requested query type %s has no valid channel to buffer results", dns.TypeToString[query]) + return + } + for { + select { + case ev := <-ch: + out = append(out, ev) + default: + return + } + } +} diff --git a/internal/measurexlite/dns_test.go b/internal/measurexlite/dns_test.go new file mode 100644 index 0000000..f3c9c6c --- /dev/null +++ b/internal/measurexlite/dns_test.go @@ -0,0 +1,296 @@ +package measurexlite + +import ( + "context" + "testing" + "time" + + "github.com/miekg/dns" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/model/mocks" + "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/testingx" +) + +func TestNewUnwrappedParallelResolver(t *testing.T) { + t.Run("NewUnwrappedParallelResolver creates an UnwrappedParallelResolver with Trace", func(t *testing.T) { + underlying := &mocks.Resolver{} + zeroTime := time.Now() + trace := NewTrace(0, zeroTime) + trace.NewParallelResolverFn = func() model.Resolver { + return underlying + } + resolver := trace.newParallelResolverTrace(func() model.Resolver { + return nil + }) + resolvert := resolver.(*resolverTrace) + if resolvert.r != underlying { + t.Fatal("invalid parallel resolver") + } + if resolvert.tx != trace { + t.Fatal("invalid trace") + } + }) + + t.Run("Trace-aware resolver forwards underlying functions", func(t *testing.T) { + var called bool + zeroTime := time.Now() + trace := NewTrace(0, zeroTime) + newMockResolver := func() model.Resolver { + return &mocks.Resolver{ + MockAddress: func() string { + return "dns.google" + }, + MockNetwork: func() string { + return "udp" + }, + MockCloseIdleConnections: func() { + called = true + }, + } + } + resolver := trace.newParallelResolver(newMockResolver) + + t.Run("Address is correctly forwarded", func(t *testing.T) { + got := resolver.Address() + if got != "dns.google" { + t.Fatal("Address not called") + } + }) + + t.Run("Network is correctly forwarded", func(t *testing.T) { + got := resolver.Network() + if got != "udp" { + t.Fatal("Network not called") + } + }) + + t.Run("CloseIdleConnections is correctly forwarded", func(t *testing.T) { + resolver.CloseIdleConnections() + if !called { + t.Fatal("CloseIdleConnections not called") + } + }) + }) + + t.Run("LookupHost saves into trace", func(t *testing.T) { + zeroTime := time.Now() + td := testingx.NewTimeDeterministic(zeroTime) + trace := NewTrace(0, zeroTime) + trace.TimeNowFn = td.Now + txp := &mocks.DNSTransport{ + MockRoundTrip: func(ctx context.Context, query model.DNSQuery) (model.DNSResponse, error) { + response := &mocks.DNSResponse{ + MockDecodeLookupHost: func() ([]string, error) { + if query.Type() != dns.TypeA { + return []string{"fe80::a00:20ff:feb9:4c54"}, nil + } + return []string{"1.1.1.1"}, nil + }, + } + return response, nil + }, + MockRequiresPadding: func() bool { + return true + }, + MockNetwork: func() string { + return "" + }, + MockAddress: func() string { + return "dns.google" + }, + } + newResolver := func() model.Resolver { + return netxlite.NewUnwrappedParallelResolver(txp) + } + resolver := trace.newParallelResolverTrace(newResolver) + ctx := context.Background() + addrs, err := resolver.LookupHost(ctx, "example.com") + if err != nil { + t.Fatal("unexpected err", err) + } + if len(addrs) != 2 { + t.Fatal("unexpected array output", addrs) + } + if addrs[0] != "1.1.1.1" && addrs[1] != "1.1.1.1" { + t.Fatal("unexpected array output", addrs) + } + if addrs[0] != "fe80::a00:20ff:feb9:4c54" && addrs[1] != "fe80::a00:20ff:feb9:4c54" { + t.Fatal("unexpected array output", addrs) + } + + t.Run("DNSLookups QueryType A", func(t *testing.T) { + events := trace.DNSLookupsFromRoundTrip(dns.TypeA) + if len(events) != 1 { + t.Fatal("expected to see single DNSLookup event") + } + lookup := events[0] + answers := lookup.Answers + if lookup.Failure != nil { + t.Fatal("unexpected err", *(lookup.Failure)) + } + if lookup.ResolverAddress != "dns.google" { + t.Fatal("unexpected address field") + } + if len(answers) != 1 { + t.Fatal("expected 1 DNS answer, got", len(answers)) + } + if answers[0].AnswerType != "A" || answers[0].IPv4 != "1.1.1.1" { + t.Fatal("unexpected DNS answer", answers) + } + }) + + t.Run("DNSLookups QueryType AAAA", func(t *testing.T) { + events := trace.DNSLookupsFromRoundTrip(dns.TypeAAAA) + if len(events) != 1 { + t.Fatal("expected to see single DNSLookup event") + } + lookup := events[0] + answers := lookup.Answers + if lookup.Failure != nil { + t.Fatal("unexpected err", *(lookup.Failure)) + } + if lookup.ResolverAddress != "dns.google" { + t.Fatal("unexpected address field") + } + if len(answers) != 1 { + t.Fatal("expected 1 DNS answer, got", len(answers)) + } + if answers[0].AnswerType != "AAAA" || answers[0].IPv6 != "fe80::a00:20ff:feb9:4c54" { + t.Fatal("unexpected DNS answer", answers) + } + }) + }) + + t.Run("LookupHost discards events when buffers are full", func(t *testing.T) { + zeroTime := time.Now() + td := testingx.NewTimeDeterministic(zeroTime) + trace := NewTrace(0, zeroTime) + trace.DNSLookup = map[uint16]chan *model.ArchivalDNSLookupResult{ + dns.TypeA: make(chan *model.ArchivalDNSLookupResult), // no buffer + dns.TypeAAAA: make(chan *model.ArchivalDNSLookupResult), // no buffer + } + trace.TimeNowFn = td.Now + txp := &mocks.DNSTransport{ + MockRoundTrip: func(ctx context.Context, query model.DNSQuery) (model.DNSResponse, error) { + response := &mocks.DNSResponse{ + MockDecodeLookupHost: func() ([]string, error) { + if query.Type() != dns.TypeA { + return []string{"fe80::a00:20ff:feb9:4c54"}, nil + } + return []string{"1.1.1.1"}, nil + }, + } + return response, nil + }, + MockRequiresPadding: func() bool { + return true + }, + MockNetwork: func() string { + return "" + }, + MockAddress: func() string { + return "dns.google" + }, + } + newResolver := func() model.Resolver { + return netxlite.NewUnwrappedParallelResolver(txp) + } + resolver := trace.newParallelResolverTrace(newResolver) + ctx := context.Background() + addrs, err := resolver.LookupHost(ctx, "example.com") + if err != nil { + t.Fatal("unexpected err", err) + } + if len(addrs) != 2 { + t.Fatal("unexpected array output", addrs) + } + + t.Run("DNSLookups QueryType A", func(t *testing.T) { + events := trace.DNSLookupsFromRoundTrip(dns.TypeA) + if len(events) != 0 { + t.Fatal("expected to see no DNSLookup") + } + }) + t.Run("DNSLookups QueryType AAAA", func(t *testing.T) { + events := trace.DNSLookupsFromRoundTrip(dns.TypeAAAA) + if len(events) != 0 { + t.Fatal("expected to see no DNSLookup") + } + }) + }) +} + +func TestAnswersFromAddrs(t *testing.T) { + tests := []struct { + name string + args []string + }{{ + name: "with valid input", + args: []string{"1.1.1.1", "fe80::a00:20ff:feb9:4c54"}, + }, { + name: "with invalid IPv4 address", + args: []string{"1.1.1.1.1", "fe80::a00:20ff:feb9:4c54"}, + }, { + name: "with invalid IPv6 address", + args: []string{"1.1.1.1", "fe80::a00:20ff:feb9:::4c54"}, + }, { + name: "with empty input", + args: []string{}, + }, { + name: "with nil input", + args: nil, + }} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := archivalAnswersFromAddrs(tt.args) + var idx int + for _, inp := range tt.args { + ip6, err := netxlite.IsIPv6(inp) + if err != nil { + continue + } + if idx >= len(got) { + t.Fatal("unexpected array length") + } + answer := got[idx] + if ip6 { + if answer.AnswerType != "AAAA" || answer.IPv6 != inp { + t.Fatal("unexpected output", answer) + } + } else { + if answer.AnswerType != "A" || answer.IPv4 != inp { + t.Fatal("unexpected output", answer) + } + } + idx++ + } + if idx != len(got) { + t.Fatal("unexpected array length", len(got)) + } + }) + } +} + +func TestDNSLookupsFromRoundTrips(t *testing.T) { + zeroTime := time.Now() + trace := NewTrace(0, zeroTime) + checkPanic := func(query uint16, f func(uint16) []*model.ArchivalDNSLookupResult) { + defer func() { + if r := recover(); r != nil { + t.Fatal("unexpected panic encoutered") + } + }() + f(query) + } + t.Run("DNSLookup is nil", func(t *testing.T) { + trace.DNSLookup = nil + checkPanic(dns.TypeA, trace.DNSLookupsFromRoundTrip) + }) + t.Run("Query has nil channel", func(t *testing.T) { + trace.DNSLookup = map[uint16]chan *model.ArchivalDNSLookupResult{ + dns.TypeA: nil, + } + checkPanic(dns.TypeA, trace.DNSLookupsFromRoundTrip) + }) +} diff --git a/internal/measurexlite/trace.go b/internal/measurexlite/trace.go index 93ad677..092b2a4 100644 --- a/internal/measurexlite/trace.go +++ b/internal/measurexlite/trace.go @@ -7,6 +7,7 @@ package measurexlite import ( "time" + "github.com/miekg/dns" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/netxlite" ) @@ -31,13 +32,17 @@ import ( type Trace struct { // Index is the MANDATORY unique index of this trace within the // current measurement. If you don't care about uniquely identifying - // treaces, you can use zero to indicate the "default" trace. + // traces, you can use zero to indicate the "default" trace. Index int64 // NetworkEvent is MANDATORY and buffers network events. If you create // this channel manually, ensure it has some buffer. NetworkEvent chan *model.ArchivalNetworkEvent + // NewParallelResolverFn is OPTIONAL and can be used to overide + // calls to the netxlite.NewParallelResolver factory. + NewParallelResolverFn func() model.Resolver + // NewDialerWithoutResolverFn is OPTIONAL and can be used to override // calls to the netxlite.NewDialerWithoutResolver factory. NewDialerWithoutResolverFn func(dl model.DebugLogger) model.Dialer @@ -46,6 +51,14 @@ type Trace struct { // calls to the netxlite.NewTLSHandshakerStdlib factory. NewTLSHandshakerStdlibFn func(dl model.DebugLogger) model.TLSHandshaker + // DNSLookup is MANDATORY and buffers DNSLookup results based on the + // query type. When we create this map using NewTrace, we will create + // an entry for each dns.Type in DNSQueryTypes. If you create this channel + // manually, you probably want to to the same (and most likely you also + // want to create buffered channels). Note that the code will print a + // warning and otherwise ignore all the query types not included in this map. + DNSLookup map[uint16]chan *model.ArchivalDNSLookupResult + // TCPConnect is MANDATORY and buffers TCP connect observations. If you create // this channel manually, ensure it has some buffer. TCPConnect chan *model.ArchivalTCPConnectResult @@ -67,6 +80,10 @@ const ( // the Trace's NetworkEvent buffered channel. NetworkEventBufferSize = 64 + // DNSLookupBufferSize is the buffer size for constructing + // the Trace's DNSLookup map of buffered channels. + DNSLookupBufferSize = 8 + // TCPConnectBufferSize is the buffer size for constructing // the Trace's TCPConnect buffered channel. TCPConnectBufferSize = 8 @@ -76,6 +93,25 @@ const ( TLSHandshakeBufferSize = 8 ) +// DNSQueryTypes contains the list of DNS query types for which +// NewTrace create entries in Trace.DNSLookup. +var DNSQueryTypes = []uint16{ + dns.TypeANY, + dns.TypeA, + dns.TypeAAAA, + dns.TypeCNAME, + dns.TypeNS, +} + +// newDefaultDNSLookupMap is a convenience factory for creating Trace.DNSLookup +func newDefaultDNSLookupMap() map[uint16]chan *model.ArchivalDNSLookupResult { + out := make(map[uint16]chan *model.ArchivalDNSLookupResult) + for _, qtype := range DNSQueryTypes { + out[qtype] = make(chan *model.ArchivalDNSLookupResult, DNSLookupBufferSize) + } + return out +} + // NewTrace creates a new instance of Trace using default settings. // // We create buffered channels using as buffer sizes the constants that @@ -96,6 +132,7 @@ func NewTrace(index int64, zeroTime time.Time) *Trace { ), NewDialerWithoutResolverFn: nil, // use default NewTLSHandshakerStdlibFn: nil, // use default + DNSLookup: newDefaultDNSLookupMap(), TCPConnect: make( chan *model.ArchivalTCPConnectResult, TCPConnectBufferSize, @@ -110,7 +147,7 @@ func NewTrace(index int64, zeroTime time.Time) *Trace { } // newDialerWithoutResolver indirectly calls netxlite.NewDialerWithoutResolver -// thus allows us to mock this func for testing. +// thus allowing us to mock this func for testing. func (tx *Trace) newDialerWithoutResolver(dl model.DebugLogger) model.Dialer { if tx.NewDialerWithoutResolverFn != nil { return tx.NewDialerWithoutResolverFn(dl) @@ -118,6 +155,15 @@ func (tx *Trace) newDialerWithoutResolver(dl model.DebugLogger) model.Dialer { return netxlite.NewDialerWithoutResolver(dl) } +// newParallelResolver indirectly calls the passed netxlite.NewParallerResolver +// thus allowing us to mock this function for testing +func (tx *Trace) newParallelResolver(newResolver func() model.Resolver) model.Resolver { + if tx.NewParallelResolverFn != nil { + return tx.NewParallelResolverFn() + } + return newResolver() +} + // newTLSHandshakerStdlib indirectly calls netxlite.NewTLSHandshakerStdlib // thus allowing us to mock this func for testing. func (tx *Trace) newTLSHandshakerStdlib(dl model.DebugLogger) model.TLSHandshaker { diff --git a/internal/measurexlite/trace_test.go b/internal/measurexlite/trace_test.go index aba42f0..54fae61 100644 --- a/internal/measurexlite/trace_test.go +++ b/internal/measurexlite/trace_test.go @@ -46,6 +46,12 @@ func TestNewTrace(t *testing.T) { } }) + t.Run("NewParallelResolverFn is nil", func(t *testing.T) { + if trace.NewParallelResolverFn != nil { + t.Fatal("expected nil NewUnwrappedParallelResolverFn") + } + }) + t.Run("NewDialerWithoutResolverFn is nil", func(t *testing.T) { if trace.NewDialerWithoutResolverFn != nil { t.Fatal("expected nil NewDialerWithoutResolverFn") @@ -58,6 +64,27 @@ func TestNewTrace(t *testing.T) { } }) + t.Run("DNSLookup has the expected buffer size", func(t *testing.T) { + ff := &testingx.FakeFiller{} + for _, qtype := range DNSQueryTypes { + var count int + Loop: + for { + ev := &model.ArchivalDNSLookupResult{} + ff.Fill(ev) + select { + case trace.DNSLookup[qtype] <- ev: + count++ + default: + break Loop + } + } + if count != DNSLookupBufferSize { + t.Fatal("invalid DNSLookup A channel buffer size") + } + } + }) + t.Run("TCPConnect has the expected buffer size", func(t *testing.T) { ff := &testingx.FakeFiller{} var idx int @@ -111,6 +138,57 @@ func TestNewTrace(t *testing.T) { } func TestTrace(t *testing.T) { + t.Run("NewParallelResolverFn works as intended", func(t *testing.T) { + t.Run("when not nil", func(t *testing.T) { + mockedErr := errors.New("mocked") + tx := &Trace{ + NewParallelResolverFn: func() model.Resolver { + return &mocks.Resolver{ + MockLookupHost: func(ctx context.Context, domain string) ([]string, error) { + return []string{}, mockedErr + }, + } + }, + } + resolver := tx.newParallelResolver(func() model.Resolver { + return nil + }) + ctx := context.Background() + addrs, err := resolver.LookupHost(ctx, "example.com") + if !errors.Is(err, mockedErr) { + t.Fatal("unexpected err", err) + } + if len(addrs) != 0 { + t.Fatal("expected array of size 0") + } + }) + + t.Run("when nil", func(t *testing.T) { + tx := &Trace{ + NewParallelResolverFn: nil, + } + newResolver := func() model.Resolver { + return &mocks.Resolver{ + MockLookupHost: func(ctx context.Context, domain string) ([]string, error) { + return []string{"1.1.1.1"}, nil + }, + } + } + resolver := tx.newParallelResolver(newResolver) + ctx := context.Background() + addrs, err := resolver.LookupHost(ctx, "example.com") + if err != nil { + t.Fatal("unexpected err", err) + } + if len(addrs) != 1 { + t.Fatal("expected array of size 1") + } + if addrs[0] != "1.1.1.1" { + t.Fatal("unexpected array output", addrs) + } + }) + }) + t.Run("NewDialerWithoutResolverFn works as intended", func(t *testing.T) { t.Run("when not nil", func(t *testing.T) { mockedErr := errors.New("mocked") diff --git a/internal/model/mocks/trace.go b/internal/model/mocks/trace.go index 1049b51..363e4a2 100644 --- a/internal/model/mocks/trace.go +++ b/internal/model/mocks/trace.go @@ -15,6 +15,9 @@ import ( type Trace struct { MockTimeNow func() time.Time + MockOnDNSRoundTripForLookupHost func(started time.Time, reso model.Resolver, query model.DNSQuery, + response model.DNSResponse, addrs []string, err error, finished time.Time) + MockOnConnectDone func( started time.Time, network, domain, remoteAddr string, err error, finished time.Time) @@ -30,6 +33,11 @@ func (t *Trace) TimeNow() time.Time { return t.MockTimeNow() } +func (t *Trace) OnDNSRoundTripForLookupHost(started time.Time, reso model.Resolver, query model.DNSQuery, + response model.DNSResponse, addrs []string, err error, finished time.Time) { + t.MockOnDNSRoundTripForLookupHost(started, reso, query, response, addrs, err, finished) +} + func (t *Trace) OnConnectDone( started time.Time, network, domain, remoteAddr string, err error, finished time.Time) { t.MockOnConnectDone(started, network, domain, remoteAddr, err, finished) diff --git a/internal/model/mocks/trace_test.go b/internal/model/mocks/trace_test.go index d7ebcc8..95aa995 100644 --- a/internal/model/mocks/trace_test.go +++ b/internal/model/mocks/trace_test.go @@ -4,6 +4,8 @@ import ( "crypto/tls" "testing" "time" + + "github.com/ooni/probe-cli/v3/internal/model" ) func TestTrace(t *testing.T) { @@ -19,6 +21,28 @@ func TestTrace(t *testing.T) { } }) + t.Run("OnDNSRoundTripForLookupHost", func(t *testing.T) { + var called bool + tx := &Trace{ + MockOnDNSRoundTripForLookupHost: func(started time.Time, reso model.Resolver, query model.DNSQuery, + response model.DNSResponse, addrs []string, err error, finished time.Time) { + called = true + }, + } + tx.OnDNSRoundTripForLookupHost( + time.Now(), + &Resolver{}, + &DNSQuery{}, + &DNSResponse{}, + []string{}, + nil, + time.Now(), + ) + if !called { + t.Fatal("not called") + } + }) + t.Run("OnConnectDone", func(t *testing.T) { var called bool tx := &Trace{ diff --git a/internal/model/netx.go b/internal/model/netx.go index a2fab89..4640d86 100644 --- a/internal/model/netx.go +++ b/internal/model/netx.go @@ -303,6 +303,28 @@ type Trace interface { // can use functionality exported by the ./internal/testingx pkg. TimeNow() time.Time + // OnDNSRoundTripForLookupHost is used with a DNSTransport and called + // when the RoundTrip terminates. + // + // Arguments: + // + // - started is when we called transport.RoundTrip + // + // - reso is the parent resolver for the trace; + // + // - query is the non-nil DNS query we use for the RoundTrip + // + // - response is a valid DNS response, obtained after the RoundTrip; + // + // - addrs is the list of addresses obtained after the RoundTrip, which + // is empty if the RoundTrip failed + // + // - err is the result of DNSLookup; either an error or nil + // + // - finished is the time right after the RoundTrip + OnDNSRoundTripForLookupHost(started time.Time, reso Resolver, query DNSQuery, + response DNSResponse, addrs []string, err error, finished time.Time) + // OnConnectDone is called when connect terminates. // // Arguments: diff --git a/internal/netxlite/integration_test.go b/internal/netxlite/integration_test.go index 4b4d2f3..69b9da6 100644 --- a/internal/netxlite/integration_test.go +++ b/internal/netxlite/integration_test.go @@ -104,7 +104,7 @@ func TestMeasureWithUDPResolver(t *testing.T) { t.Run("on success", func(t *testing.T) { dlr := netxlite.NewDialerWithoutResolver(log.Log) - r := netxlite.NewParallelResolverUDP(log.Log, dlr, "8.8.4.4:53") + r := netxlite.NewParallelUDPResolver(log.Log, dlr, "8.8.4.4:53") defer r.CloseIdleConnections() ctx := context.Background() addrs, err := r.LookupHost(ctx, "dns.google.com") @@ -128,7 +128,7 @@ func TestMeasureWithUDPResolver(t *testing.T) { } defer listener.Close() dlr := netxlite.NewDialerWithoutResolver(log.Log) - r := netxlite.NewParallelResolverUDP(log.Log, dlr, listener.LocalAddr().String()) + r := netxlite.NewParallelUDPResolver(log.Log, dlr, listener.LocalAddr().String()) defer r.CloseIdleConnections() ctx := context.Background() addrs, err := r.LookupHost(ctx, "ooni.org") @@ -152,7 +152,7 @@ func TestMeasureWithUDPResolver(t *testing.T) { } defer listener.Close() dlr := netxlite.NewDialerWithoutResolver(log.Log) - r := netxlite.NewParallelResolverUDP(log.Log, dlr, listener.LocalAddr().String()) + r := netxlite.NewParallelUDPResolver(log.Log, dlr, listener.LocalAddr().String()) defer r.CloseIdleConnections() ctx := context.Background() addrs, err := r.LookupHost(ctx, "ooni.org") @@ -176,7 +176,7 @@ func TestMeasureWithUDPResolver(t *testing.T) { } defer listener.Close() dlr := netxlite.NewDialerWithoutResolver(log.Log) - r := netxlite.NewParallelResolverUDP(log.Log, dlr, listener.LocalAddr().String()) + r := netxlite.NewParallelUDPResolver(log.Log, dlr, listener.LocalAddr().String()) defer r.CloseIdleConnections() ctx := context.Background() addrs, err := r.LookupHost(ctx, "ooni.org") diff --git a/internal/netxlite/resolvercore.go b/internal/netxlite/resolvercore.go index 12ec5a9..ccc7a78 100644 --- a/internal/netxlite/resolvercore.go +++ b/internal/netxlite/resolvercore.go @@ -49,7 +49,7 @@ func NewUnwrappedStdlibResolver(wrappers ...model.DNSTransportWrapper) model.Res } } -// NewSerialResolverUDP creates a new Resolver using DNS-over-UDP +// NewSerialUDPResolver creates a new Resolver using DNS-over-UDP // that performs serial A/AAAA lookups during LookupHost. // // Deprecated: use NewParallelResolverUDP. @@ -64,14 +64,14 @@ func NewUnwrappedStdlibResolver(wrappers ...model.DNSTransportWrapper) model.Res // // - wrappers is the optional list of wrappers to wrap the underlying // transport. Any nil wrapper will be silently ignored. -func NewSerialResolverUDP(logger model.DebugLogger, dialer model.Dialer, +func NewSerialUDPResolver(logger model.DebugLogger, dialer model.Dialer, address string, wrappers ...model.DNSTransportWrapper) model.Resolver { return WrapResolver(logger, NewUnwrappedSerialResolver( WrapDNSTransport(NewUnwrappedDNSOverUDPTransport(dialer, address), wrappers...), )) } -// NewParallelResolverUDP creates a new Resolver using DNS-over-UDP +// NewParallelUDPResolver creates a new Resolver using DNS-over-UDP // that performs parallel A/AAAA lookups during LookupHost. // // Arguments: @@ -84,7 +84,7 @@ func NewSerialResolverUDP(logger model.DebugLogger, dialer model.Dialer, // // - wrappers is the optional list of wrappers to wrap the underlying // transport. Any nil wrapper will be silently ignored. -func NewParallelResolverUDP(logger model.DebugLogger, dialer model.Dialer, +func NewParallelUDPResolver(logger model.DebugLogger, dialer model.Dialer, address string, wrappers ...model.DNSTransportWrapper) model.Resolver { return WrapResolver(logger, NewUnwrappedParallelResolver( WrapDNSTransport(NewUnwrappedDNSOverUDPTransport(dialer, address), wrappers...), diff --git a/internal/netxlite/resolvercore_test.go b/internal/netxlite/resolvercore_test.go index 6ea5c5a..eca36fc 100644 --- a/internal/netxlite/resolvercore_test.go +++ b/internal/netxlite/resolvercore_test.go @@ -33,9 +33,9 @@ func TestNewResolverSystem(t *testing.T) { typecheckForSystemResolver(t, resolver, model.DiscardLogger) } -func TestNewSerialResolverUDP(t *testing.T) { +func TestNewSerialUDPResolver(t *testing.T) { d := NewDialerWithoutResolver(log.Log) - resolver := NewSerialResolverUDP(log.Log, d, "1.1.1.1:53") + resolver := NewSerialUDPResolver(log.Log, d, "1.1.1.1:53") idna := resolver.(*resolverIDNA) logger := idna.Resolver.(*resolverLogger) if logger.Logger != log.Log { @@ -51,9 +51,9 @@ func TestNewSerialResolverUDP(t *testing.T) { } } -func TestNewParallelResolverUDP(t *testing.T) { +func TestNewParallelUDPResolver(t *testing.T) { d := NewDialerWithoutResolver(log.Log) - resolver := NewParallelResolverUDP(log.Log, d, "1.1.1.1:53") + resolver := NewParallelUDPResolver(log.Log, d, "1.1.1.1:53") idna := resolver.(*resolverIDNA) logger := idna.Resolver.(*resolverLogger) if logger.Logger != log.Log { diff --git a/internal/netxlite/resolverparallel.go b/internal/netxlite/resolverparallel.go index c210290..129fe15 100644 --- a/internal/netxlite/resolverparallel.go +++ b/internal/netxlite/resolverparallel.go @@ -98,9 +98,13 @@ type parallelResolverResult struct { func (r *ParallelResolver) lookupHost(ctx context.Context, hostname string, qtype uint16, out chan<- *parallelResolverResult) { encoder := &DNSEncoderMiekg{} + trace := ContextTraceOrDefault(ctx) query := encoder.Encode(hostname, qtype, r.Txp.RequiresPadding()) + started := trace.TimeNow() response, err := r.Txp.RoundTrip(ctx, query) + finished := trace.TimeNow() if err != nil { + trace.OnDNSRoundTripForLookupHost(started, r, query, response, []string{}, err, finished) out <- ¶llelResolverResult{ addrs: []string{}, err: err, @@ -108,6 +112,7 @@ func (r *ParallelResolver) lookupHost(ctx context.Context, hostname string, return } addrs, err := response.DecodeLookupHost() + trace.OnDNSRoundTripForLookupHost(started, r, query, response, addrs, err, finished) out <- ¶llelResolverResult{ addrs: addrs, err: err, diff --git a/internal/netxlite/trace.go b/internal/netxlite/trace.go index 9efc175..201b309 100644 --- a/internal/netxlite/trace.go +++ b/internal/netxlite/trace.go @@ -49,6 +49,12 @@ func (*traceDefault) TimeNow() time.Time { return time.Now() } +// OnDNSRoundTripForLookupHost implements model.Trace.OnDNSRoundTripForLookupHost. +func (*traceDefault) OnDNSRoundTripForLookupHost(started time.Time, reso model.Resolver, query model.DNSQuery, + response model.DNSResponse, addrs []string, err error, finished time.Time) { + // nothing +} + // OnConnectDone implements model.Trace.OnConnectDone. func (*traceDefault) OnConnectDone( started time.Time, network, domain, remoteAddr string, err error, finished time.Time) { diff --git a/internal/tutorial/netxlite/chapter06/main.go b/internal/tutorial/netxlite/chapter06/main.go index 0e5a9a4..99a4313 100644 --- a/internal/tutorial/netxlite/chapter06/main.go +++ b/internal/tutorial/netxlite/chapter06/main.go @@ -54,7 +54,7 @@ func main() { // UDP endpoint address at which the server is listening. // // ```Go - reso := netxlite.NewParallelResolverUDP(log.Log, dialer, *serverAddr) + reso := netxlite.NewParallelUDPResolver(log.Log, dialer, *serverAddr) // ``` // // The API we invoke is the same as in the previous chapter, though,