refactor(measurexlite): make buffered channels private (#864)
Closes https://github.com/ooni/probe/issues/2215
This commit is contained in:
parent
342a74cad8
commit
e1d014e826
|
@ -182,7 +182,7 @@ func (m *Measurer) quicHandshake(ctx context.Context, index int64,
|
||||||
}
|
}
|
||||||
_, err := dialer.DialContext(ctx, "udp", address, tlsConfig, &quic.Config{})
|
_, err := dialer.DialContext(ctx, "udp", address, tlsConfig, &quic.Config{})
|
||||||
ol.Stop(err)
|
ol.Stop(err)
|
||||||
sp.QUICHandshake = <-trace.QUICHandshake
|
sp.QUICHandshake = trace.FirstQUICHandshakeOrNil() // record the first handshake from the buffer
|
||||||
sp.NetworkEvents = append(sp.NetworkEvents, trace.NetworkEvents()...)
|
sp.NetworkEvents = append(sp.NetworkEvents, trace.NetworkEvents()...)
|
||||||
return sp
|
return sp
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,7 +141,7 @@ func (m *Measurer) tcpConnect(ctx context.Context, index int64,
|
||||||
ol.Stop(err)
|
ol.Stop(err)
|
||||||
measurexlite.MaybeClose(conn)
|
measurexlite.MaybeClose(conn)
|
||||||
sp := &SinglePing{
|
sp := &SinglePing{
|
||||||
TCPConnect: <-trace.TCPConnect,
|
TCPConnect: trace.FirstTCPConnectOrNil(), // record the first connect from the buffer
|
||||||
}
|
}
|
||||||
return sp
|
return sp
|
||||||
}
|
}
|
||||||
|
|
|
@ -176,7 +176,7 @@ func (m *Measurer) tlsConnectAndHandshake(ctx context.Context, index int64,
|
||||||
sni := m.config.sni(address)
|
sni := m.config.sni(address)
|
||||||
ol := measurexlite.NewOperationLogger(logger, "TLSPing #%d %s %s %v", index, address, sni, alpn)
|
ol := measurexlite.NewOperationLogger(logger, "TLSPing #%d %s %s %v", index, address, sni, alpn)
|
||||||
conn, err := dialer.DialContext(ctx, "tcp", address)
|
conn, err := dialer.DialContext(ctx, "tcp", address)
|
||||||
sp.TCPConnect = <-trace.TCPConnect
|
sp.TCPConnect = trace.FirstTCPConnectOrNil() // record the first connect from the buffer
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ol.Stop(err)
|
ol.Stop(err)
|
||||||
return sp
|
return sp
|
||||||
|
@ -191,7 +191,7 @@ func (m *Measurer) tlsConnectAndHandshake(ctx context.Context, index int64,
|
||||||
}
|
}
|
||||||
_, _, err = thx.Handshake(ctx, conn, config)
|
_, _, err = thx.Handshake(ctx, conn, config)
|
||||||
ol.Stop(err)
|
ol.Stop(err)
|
||||||
sp.TLSHandshake = <-trace.TLSHandshake
|
sp.TLSHandshake = trace.FirstTLSHandshakeOrNil() // record the first handshake from the buffer
|
||||||
sp.NetworkEvents = trace.NetworkEvents()
|
sp.NetworkEvents = trace.NetworkEvents()
|
||||||
return sp
|
return sp
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,7 +47,7 @@ func (c *connTrace) Read(b []byte) (int, error) {
|
||||||
count, err := c.Conn.Read(b)
|
count, err := c.Conn.Read(b)
|
||||||
finished := c.tx.TimeSince(c.tx.ZeroTime)
|
finished := c.tx.TimeSince(c.tx.ZeroTime)
|
||||||
select {
|
select {
|
||||||
case c.tx.NetworkEvent <- NewArchivalNetworkEvent(
|
case c.tx.networkEvent <- NewArchivalNetworkEvent(
|
||||||
c.tx.Index, started, netxlite.ReadOperation, network, addr, count, err, finished):
|
c.tx.Index, started, netxlite.ReadOperation, network, addr, count, err, finished):
|
||||||
default: // buffer is full
|
default: // buffer is full
|
||||||
}
|
}
|
||||||
|
@ -62,7 +62,7 @@ func (c *connTrace) Write(b []byte) (int, error) {
|
||||||
count, err := c.Conn.Write(b)
|
count, err := c.Conn.Write(b)
|
||||||
finished := c.tx.TimeSince(c.tx.ZeroTime)
|
finished := c.tx.TimeSince(c.tx.ZeroTime)
|
||||||
select {
|
select {
|
||||||
case c.tx.NetworkEvent <- NewArchivalNetworkEvent(
|
case c.tx.networkEvent <- NewArchivalNetworkEvent(
|
||||||
c.tx.Index, started, netxlite.WriteOperation, network, addr, count, err, finished):
|
c.tx.Index, started, netxlite.WriteOperation, network, addr, count, err, finished):
|
||||||
default: // buffer is full
|
default: // buffer is full
|
||||||
}
|
}
|
||||||
|
@ -100,7 +100,7 @@ func (c *udpLikeConnTrace) ReadFrom(b []byte) (int, net.Addr, error) {
|
||||||
finished := c.tx.TimeSince(c.tx.ZeroTime)
|
finished := c.tx.TimeSince(c.tx.ZeroTime)
|
||||||
address := addrStringIfNotNil(addr)
|
address := addrStringIfNotNil(addr)
|
||||||
select {
|
select {
|
||||||
case c.tx.NetworkEvent <- NewArchivalNetworkEvent(
|
case c.tx.networkEvent <- NewArchivalNetworkEvent(
|
||||||
c.tx.Index, started, netxlite.ReadFromOperation, "udp", address, count, err, finished):
|
c.tx.Index, started, netxlite.ReadFromOperation, "udp", address, count, err, finished):
|
||||||
default: // buffer is full
|
default: // buffer is full
|
||||||
}
|
}
|
||||||
|
@ -114,7 +114,7 @@ func (c *udpLikeConnTrace) WriteTo(b []byte, addr net.Addr) (int, error) {
|
||||||
count, err := c.UDPLikeConn.WriteTo(b, addr)
|
count, err := c.UDPLikeConn.WriteTo(b, addr)
|
||||||
finished := c.tx.TimeSince(c.tx.ZeroTime)
|
finished := c.tx.TimeSince(c.tx.ZeroTime)
|
||||||
select {
|
select {
|
||||||
case c.tx.NetworkEvent <- NewArchivalNetworkEvent(
|
case c.tx.networkEvent <- NewArchivalNetworkEvent(
|
||||||
c.tx.Index, started, netxlite.WriteToOperation, "udp", address, count, err, finished):
|
c.tx.Index, started, netxlite.WriteToOperation, "udp", address, count, err, finished):
|
||||||
default: // buffer is full
|
default: // buffer is full
|
||||||
}
|
}
|
||||||
|
@ -155,10 +155,20 @@ func NewAnnotationArchivalNetworkEvent(
|
||||||
func (tx *Trace) NetworkEvents() (out []*model.ArchivalNetworkEvent) {
|
func (tx *Trace) NetworkEvents() (out []*model.ArchivalNetworkEvent) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case ev := <-tx.NetworkEvent:
|
case ev := <-tx.networkEvent:
|
||||||
out = append(out, ev)
|
out = append(out, ev)
|
||||||
default:
|
default:
|
||||||
return // done
|
return // done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FirstNetworkEventOrNil drains the network events buffered inside the NetworkEvents channel
|
||||||
|
// and returns the first NetworkEvent, if any. Otherwise, it returns nil.
|
||||||
|
func (tx *Trace) FirstNetworkEventOrNil() *model.ArchivalNetworkEvent {
|
||||||
|
ev := tx.NetworkEvents()
|
||||||
|
if len(ev) < 1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return ev[0]
|
||||||
|
}
|
||||||
|
|
|
@ -117,7 +117,7 @@ func TestWrapNetConn(t *testing.T) {
|
||||||
}
|
}
|
||||||
zeroTime := time.Now()
|
zeroTime := time.Now()
|
||||||
trace := NewTrace(0, zeroTime)
|
trace := NewTrace(0, zeroTime)
|
||||||
trace.NetworkEvent = make(chan *model.ArchivalNetworkEvent) // no buffer
|
trace.networkEvent = make(chan *model.ArchivalNetworkEvent) // no buffer
|
||||||
conn := trace.WrapNetConn(underlying)
|
conn := trace.WrapNetConn(underlying)
|
||||||
const bufsiz = 128
|
const bufsiz = 128
|
||||||
buffer := make([]byte, bufsiz)
|
buffer := make([]byte, bufsiz)
|
||||||
|
@ -201,7 +201,7 @@ func TestWrapNetConn(t *testing.T) {
|
||||||
}
|
}
|
||||||
zeroTime := time.Now()
|
zeroTime := time.Now()
|
||||||
trace := NewTrace(0, zeroTime)
|
trace := NewTrace(0, zeroTime)
|
||||||
trace.NetworkEvent = make(chan *model.ArchivalNetworkEvent) // no buffer
|
trace.networkEvent = make(chan *model.ArchivalNetworkEvent) // no buffer
|
||||||
conn := trace.WrapNetConn(underlying)
|
conn := trace.WrapNetConn(underlying)
|
||||||
const bufsiz = 128
|
const bufsiz = 128
|
||||||
buffer := make([]byte, bufsiz)
|
buffer := make([]byte, bufsiz)
|
||||||
|
@ -292,7 +292,7 @@ func TestWrapUDPLikeConn(t *testing.T) {
|
||||||
}
|
}
|
||||||
zeroTime := time.Now()
|
zeroTime := time.Now()
|
||||||
trace := NewTrace(0, zeroTime)
|
trace := NewTrace(0, zeroTime)
|
||||||
trace.NetworkEvent = make(chan *model.ArchivalNetworkEvent) // no buffer
|
trace.networkEvent = make(chan *model.ArchivalNetworkEvent) // no buffer
|
||||||
conn := trace.WrapUDPLikeConn(underlying)
|
conn := trace.WrapUDPLikeConn(underlying)
|
||||||
const bufsiz = 128
|
const bufsiz = 128
|
||||||
buffer := make([]byte, bufsiz)
|
buffer := make([]byte, bufsiz)
|
||||||
|
@ -364,7 +364,7 @@ func TestWrapUDPLikeConn(t *testing.T) {
|
||||||
}
|
}
|
||||||
zeroTime := time.Now()
|
zeroTime := time.Now()
|
||||||
trace := NewTrace(0, zeroTime)
|
trace := NewTrace(0, zeroTime)
|
||||||
trace.NetworkEvent = make(chan *model.ArchivalNetworkEvent) // no buffer
|
trace.networkEvent = make(chan *model.ArchivalNetworkEvent) // no buffer
|
||||||
conn := trace.WrapUDPLikeConn(underlying)
|
conn := trace.WrapUDPLikeConn(underlying)
|
||||||
const bufsiz = 128
|
const bufsiz = 128
|
||||||
buffer := make([]byte, bufsiz)
|
buffer := make([]byte, bufsiz)
|
||||||
|
@ -387,6 +387,49 @@ func TestWrapUDPLikeConn(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFirstNetworkEvent(t *testing.T) {
|
||||||
|
t.Run("returns nil when buffer is empty", func(t *testing.T) {
|
||||||
|
zeroTime := time.Now()
|
||||||
|
trace := NewTrace(0, zeroTime)
|
||||||
|
got := trace.FirstNetworkEventOrNil()
|
||||||
|
if got != nil {
|
||||||
|
t.Fatal("expected nil event")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("return first non-nil network event", func(t *testing.T) {
|
||||||
|
filler := func(tx *Trace, events []*model.ArchivalNetworkEvent) {
|
||||||
|
for _, ev := range events {
|
||||||
|
tx.networkEvent <- ev
|
||||||
|
}
|
||||||
|
}
|
||||||
|
zeroTime := time.Now()
|
||||||
|
trace := NewTrace(0, zeroTime)
|
||||||
|
expect := []*model.ArchivalNetworkEvent{{
|
||||||
|
Address: "1.1.1.1:443",
|
||||||
|
Failure: nil,
|
||||||
|
NumBytes: 0,
|
||||||
|
Operation: "read_from",
|
||||||
|
Proto: "udp",
|
||||||
|
T: 1.0,
|
||||||
|
Tags: []string{},
|
||||||
|
}, {
|
||||||
|
Address: "1.1.1.1:443",
|
||||||
|
Failure: nil,
|
||||||
|
NumBytes: 0,
|
||||||
|
Operation: "write_to",
|
||||||
|
Proto: "udp",
|
||||||
|
T: 1.0,
|
||||||
|
Tags: []string{},
|
||||||
|
}}
|
||||||
|
filler(trace, expect)
|
||||||
|
got := trace.FirstNetworkEventOrNil()
|
||||||
|
if diff := cmp.Diff(got, expect[0]); diff != "" {
|
||||||
|
t.Fatal(diff)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestNewAnnotationArchivalNetworkEvent(t *testing.T) {
|
func TestNewAnnotationArchivalNetworkEvent(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
index int64 = 3
|
index int64 = 3
|
||||||
|
|
|
@ -54,7 +54,7 @@ func (tx *Trace) OnConnectDone(
|
||||||
switch network {
|
switch network {
|
||||||
case "tcp", "tcp4", "tcp6":
|
case "tcp", "tcp4", "tcp6":
|
||||||
select {
|
select {
|
||||||
case tx.TCPConnect <- NewArchivalTCPConnectResult(
|
case tx.tcpConnect <- NewArchivalTCPConnectResult(
|
||||||
tx.Index,
|
tx.Index,
|
||||||
started.Sub(tx.ZeroTime),
|
started.Sub(tx.ZeroTime),
|
||||||
remoteAddr,
|
remoteAddr,
|
||||||
|
@ -112,10 +112,20 @@ func archivalPortToString(sport string) int {
|
||||||
func (tx *Trace) TCPConnects() (out []*model.ArchivalTCPConnectResult) {
|
func (tx *Trace) TCPConnects() (out []*model.ArchivalTCPConnectResult) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case ev := <-tx.TCPConnect:
|
case ev := <-tx.tcpConnect:
|
||||||
out = append(out, ev)
|
out = append(out, ev)
|
||||||
default:
|
default:
|
||||||
return // done
|
return // done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FirstTCPConnectOrNil drains the network events buffered inside the TCPConnect channel
|
||||||
|
// and returns the first TCPConnect, if any. Otherwise, it returns nil.
|
||||||
|
func (tx *Trace) FirstTCPConnectOrNil() *model.ArchivalTCPConnectResult {
|
||||||
|
ev := tx.TCPConnects()
|
||||||
|
if len(ev) < 1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return ev[0]
|
||||||
|
}
|
||||||
|
|
|
@ -121,7 +121,7 @@ func TestNewDialerWithoutResolver(t *testing.T) {
|
||||||
t.Run("DialContext discards events when buffer is full", func(t *testing.T) {
|
t.Run("DialContext discards events when buffer is full", func(t *testing.T) {
|
||||||
zeroTime := time.Now()
|
zeroTime := time.Now()
|
||||||
trace := NewTrace(0, zeroTime)
|
trace := NewTrace(0, zeroTime)
|
||||||
trace.TCPConnect = make(chan *model.ArchivalTCPConnectResult) // no buffer
|
trace.tcpConnect = make(chan *model.ArchivalTCPConnectResult) // no buffer
|
||||||
dialer := trace.NewDialerWithoutResolver(model.DiscardLogger)
|
dialer := trace.NewDialerWithoutResolver(model.DiscardLogger)
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
cancel() // we cancel immediately so connect is ~instantaneous
|
cancel() // we cancel immediately so connect is ~instantaneous
|
||||||
|
@ -177,6 +177,49 @@ func TestNewDialerWithoutResolver(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFirstTCPConnect(t *testing.T) {
|
||||||
|
t.Run("returns nil when buffer is empty", func(t *testing.T) {
|
||||||
|
zeroTime := time.Now()
|
||||||
|
trace := NewTrace(0, zeroTime)
|
||||||
|
got := trace.FirstTCPConnectOrNil()
|
||||||
|
if got != nil {
|
||||||
|
t.Fatal("expected nil event")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("return first non-nil TCPConnect", func(t *testing.T) {
|
||||||
|
filler := func(tx *Trace, events []*model.ArchivalTCPConnectResult) {
|
||||||
|
for _, ev := range events {
|
||||||
|
tx.tcpConnect <- ev
|
||||||
|
}
|
||||||
|
}
|
||||||
|
zeroTime := time.Now()
|
||||||
|
trace := NewTrace(0, zeroTime)
|
||||||
|
expect := []*model.ArchivalTCPConnectResult{{
|
||||||
|
IP: "1.1.1.1",
|
||||||
|
Port: 443,
|
||||||
|
Status: model.ArchivalTCPConnectStatus{
|
||||||
|
Blocked: nil,
|
||||||
|
Failure: nil,
|
||||||
|
Success: true,
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
IP: "0.0.0.0",
|
||||||
|
Port: 443,
|
||||||
|
Status: model.ArchivalTCPConnectStatus{
|
||||||
|
Blocked: nil,
|
||||||
|
Failure: nil,
|
||||||
|
Success: true,
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
filler(trace, expect)
|
||||||
|
got := trace.FirstTCPConnectOrNil()
|
||||||
|
if diff := cmp.Diff(got, expect[0]); diff != "" {
|
||||||
|
t.Fatal(diff)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestArchivalSplitHostPort(t *testing.T) {
|
func TestArchivalSplitHostPort(t *testing.T) {
|
||||||
addr, port := archivalSplitHostPort("1.1.1.1") // missing port
|
addr, port := archivalSplitHostPort("1.1.1.1") // missing port
|
||||||
if addr != "" {
|
if addr != "" {
|
||||||
|
|
|
@ -83,7 +83,7 @@ func (tx *Trace) OnDNSRoundTripForLookupHost(started time.Time, reso model.Resol
|
||||||
response model.DNSResponse, addrs []string, err error, finished time.Time) {
|
response model.DNSResponse, addrs []string, err error, finished time.Time) {
|
||||||
t := finished.Sub(tx.ZeroTime)
|
t := finished.Sub(tx.ZeroTime)
|
||||||
select {
|
select {
|
||||||
case tx.DNSLookup <- NewArchivalDNSLookupResultFromRoundTrip(
|
case tx.dnsLookup <- NewArchivalDNSLookupResultFromRoundTrip(
|
||||||
tx.Index,
|
tx.Index,
|
||||||
started.Sub(tx.ZeroTime),
|
started.Sub(tx.ZeroTime),
|
||||||
reso,
|
reso,
|
||||||
|
@ -150,10 +150,20 @@ func archivalAnswersFromAddrs(addrs []string) (out []model.ArchivalDNSAnswer) {
|
||||||
func (tx *Trace) DNSLookupsFromRoundTrip() (out []*model.ArchivalDNSLookupResult) {
|
func (tx *Trace) DNSLookupsFromRoundTrip() (out []*model.ArchivalDNSLookupResult) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case ev := <-tx.DNSLookup:
|
case ev := <-tx.dnsLookup:
|
||||||
out = append(out, ev)
|
out = append(out, ev)
|
||||||
default:
|
default:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FirstDNSLookupOrNil drains the network events buffered inside the DNSLookup channel
|
||||||
|
// and returns the first DNSLookup, if any. Otherwise, it returns nil.
|
||||||
|
func (tx *Trace) FirstDNSLookup() *model.ArchivalDNSLookupResult {
|
||||||
|
ev := tx.DNSLookupsFromRoundTrip()
|
||||||
|
if len(ev) < 1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return ev[0]
|
||||||
|
}
|
||||||
|
|
|
@ -193,7 +193,7 @@ func TestNewResolver(t *testing.T) {
|
||||||
zeroTime := time.Now()
|
zeroTime := time.Now()
|
||||||
td := testingx.NewTimeDeterministic(zeroTime)
|
td := testingx.NewTimeDeterministic(zeroTime)
|
||||||
trace := NewTrace(0, zeroTime)
|
trace := NewTrace(0, zeroTime)
|
||||||
trace.DNSLookup = make(chan *model.ArchivalDNSLookupResult) // no buffer
|
trace.dnsLookup = make(chan *model.ArchivalDNSLookupResult) // no buffer
|
||||||
trace.TimeNowFn = td.Now
|
trace.TimeNowFn = td.Now
|
||||||
txp := &mocks.DNSTransport{
|
txp := &mocks.DNSTransport{
|
||||||
MockRoundTrip: func(ctx context.Context, query model.DNSQuery) (model.DNSResponse, error) {
|
MockRoundTrip: func(ctx context.Context, query model.DNSQuery) (model.DNSResponse, error) {
|
||||||
|
@ -285,6 +285,43 @@ func TestNewWrappedResolvers(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFirstDNSLookup(t *testing.T) {
|
||||||
|
t.Run("returns nil when buffer is empty", func(t *testing.T) {
|
||||||
|
zeroTime := time.Now()
|
||||||
|
trace := NewTrace(0, zeroTime)
|
||||||
|
got := trace.FirstDNSLookup()
|
||||||
|
if got != nil {
|
||||||
|
t.Fatal("expected nil event")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("return first non-nil DNSLookup", func(t *testing.T) {
|
||||||
|
filler := func(tx *Trace, events []*model.ArchivalDNSLookupResult) {
|
||||||
|
for _, ev := range events {
|
||||||
|
tx.dnsLookup <- ev
|
||||||
|
}
|
||||||
|
}
|
||||||
|
zeroTime := time.Now()
|
||||||
|
trace := NewTrace(0, zeroTime)
|
||||||
|
expect := []*model.ArchivalDNSLookupResult{{
|
||||||
|
Engine: "doh",
|
||||||
|
Failure: nil,
|
||||||
|
Hostname: "example.com",
|
||||||
|
QueryType: "A",
|
||||||
|
}, {
|
||||||
|
Engine: "doh",
|
||||||
|
Failure: nil,
|
||||||
|
Hostname: "example.com",
|
||||||
|
QueryType: "AAAA",
|
||||||
|
}}
|
||||||
|
filler(trace, expect)
|
||||||
|
got := trace.FirstDNSLookup()
|
||||||
|
if diff := cmp.Diff(got, expect[0]); diff != "" {
|
||||||
|
t.Fatal(diff)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestAnswersFromAddrs(t *testing.T) {
|
func TestAnswersFromAddrs(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
|
|
@ -71,7 +71,7 @@ func (qdx *quicDialerTrace) CloseIdleConnections() {
|
||||||
func (tx *Trace) OnQUICHandshakeStart(now time.Time, remoteAddr string, config *quic.Config) {
|
func (tx *Trace) OnQUICHandshakeStart(now time.Time, remoteAddr string, config *quic.Config) {
|
||||||
t := now.Sub(tx.ZeroTime)
|
t := now.Sub(tx.ZeroTime)
|
||||||
select {
|
select {
|
||||||
case tx.NetworkEvent <- NewAnnotationArchivalNetworkEvent(tx.Index, t, "quic_handshake_start"):
|
case tx.networkEvent <- NewAnnotationArchivalNetworkEvent(tx.Index, t, "quic_handshake_start"):
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -85,7 +85,7 @@ func (tx *Trace) OnQUICHandshakeDone(started time.Time, remoteAddr string, qconn
|
||||||
state = qconn.ConnectionState().TLS.ConnectionState
|
state = qconn.ConnectionState().TLS.ConnectionState
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case tx.QUICHandshake <- NewArchivalTLSOrQUICHandshakeResult(
|
case tx.quicHandshake <- NewArchivalTLSOrQUICHandshakeResult(
|
||||||
tx.Index,
|
tx.Index,
|
||||||
started.Sub(tx.ZeroTime),
|
started.Sub(tx.ZeroTime),
|
||||||
"quic",
|
"quic",
|
||||||
|
@ -98,7 +98,7 @@ func (tx *Trace) OnQUICHandshakeDone(started time.Time, remoteAddr string, qconn
|
||||||
default: // buffer is full
|
default: // buffer is full
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case tx.NetworkEvent <- NewAnnotationArchivalNetworkEvent(tx.Index, t, "quic_handshake_done"):
|
case tx.networkEvent <- NewAnnotationArchivalNetworkEvent(tx.Index, t, "quic_handshake_done"):
|
||||||
default: // buffer is full
|
default: // buffer is full
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -107,10 +107,20 @@ func (tx *Trace) OnQUICHandshakeDone(started time.Time, remoteAddr string, qconn
|
||||||
func (tx *Trace) QUICHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult) {
|
func (tx *Trace) QUICHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case ev := <-tx.QUICHandshake:
|
case ev := <-tx.quicHandshake:
|
||||||
out = append(out, ev)
|
out = append(out, ev)
|
||||||
default:
|
default:
|
||||||
return // done
|
return // done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FirstQUICHandshakeOrNil drains the network events buffered inside the QUICHandshake channel
|
||||||
|
// and returns the first QUICHandshake, if any. Otherwise, it returns nil.
|
||||||
|
func (tx *Trace) FirstQUICHandshakeOrNil() *model.ArchivalTLSOrQUICHandshakeResult {
|
||||||
|
ev := tx.QUICHandshakes()
|
||||||
|
if len(ev) < 1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return ev[0]
|
||||||
|
}
|
||||||
|
|
|
@ -259,8 +259,8 @@ func TestNewQUICDialerWithoutResolver(t *testing.T) {
|
||||||
mockedErr := errors.New("mocked")
|
mockedErr := errors.New("mocked")
|
||||||
zeroTime := time.Now()
|
zeroTime := time.Now()
|
||||||
trace := NewTrace(0, zeroTime)
|
trace := NewTrace(0, zeroTime)
|
||||||
trace.NetworkEvent = make(chan *model.ArchivalNetworkEvent) // no buffer
|
trace.networkEvent = make(chan *model.ArchivalNetworkEvent) // no buffer
|
||||||
trace.QUICHandshake = make(chan *model.ArchivalTLSOrQUICHandshakeResult) // no buffer
|
trace.quicHandshake = make(chan *model.ArchivalTLSOrQUICHandshakeResult) // no buffer
|
||||||
pconn := &mocks.UDPLikeConn{
|
pconn := &mocks.UDPLikeConn{
|
||||||
MockLocalAddr: func() net.Addr {
|
MockLocalAddr: func() net.Addr {
|
||||||
return &net.UDPAddr{
|
return &net.UDPAddr{
|
||||||
|
@ -313,3 +313,54 @@ func TestNewQUICDialerWithoutResolver(t *testing.T) {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFirstQUICHandshake(t *testing.T) {
|
||||||
|
t.Run("returns nil when buffer is empty", func(t *testing.T) {
|
||||||
|
zeroTime := time.Now()
|
||||||
|
trace := NewTrace(0, zeroTime)
|
||||||
|
got := trace.FirstQUICHandshakeOrNil()
|
||||||
|
if got != nil {
|
||||||
|
t.Fatal("expected nil event")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("return first non-nil QUICHandshake", func(t *testing.T) {
|
||||||
|
filler := func(tx *Trace, events []*model.ArchivalTLSOrQUICHandshakeResult) {
|
||||||
|
for _, ev := range events {
|
||||||
|
tx.quicHandshake <- ev
|
||||||
|
}
|
||||||
|
}
|
||||||
|
zeroTime := time.Now()
|
||||||
|
trace := NewTrace(0, zeroTime)
|
||||||
|
expect := []*model.ArchivalTLSOrQUICHandshakeResult{{
|
||||||
|
Network: "quic",
|
||||||
|
Address: "1.1.1.1:443",
|
||||||
|
CipherSuite: "",
|
||||||
|
Failure: nil,
|
||||||
|
NegotiatedProtocol: "",
|
||||||
|
NoTLSVerify: true,
|
||||||
|
PeerCertificates: []model.ArchivalMaybeBinaryData{},
|
||||||
|
ServerName: "dns.cloudflare.com",
|
||||||
|
T: time.Second.Seconds(),
|
||||||
|
Tags: []string{},
|
||||||
|
TLSVersion: "",
|
||||||
|
}, {
|
||||||
|
Network: "quic",
|
||||||
|
Address: "8.8.8.8:443",
|
||||||
|
CipherSuite: "",
|
||||||
|
Failure: nil,
|
||||||
|
NegotiatedProtocol: "",
|
||||||
|
NoTLSVerify: true,
|
||||||
|
PeerCertificates: []model.ArchivalMaybeBinaryData{},
|
||||||
|
ServerName: "dns.google.com",
|
||||||
|
T: time.Second.Seconds(),
|
||||||
|
Tags: []string{},
|
||||||
|
TLSVersion: "",
|
||||||
|
}}
|
||||||
|
filler(trace, expect)
|
||||||
|
got := trace.FirstQUICHandshakeOrNil()
|
||||||
|
if diff := cmp.Diff(got, expect[0]); diff != "" {
|
||||||
|
t.Fatal(diff)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -44,7 +44,7 @@ func (thx *tlsHandshakerTrace) Handshake(
|
||||||
func (tx *Trace) OnTLSHandshakeStart(now time.Time, remoteAddr string, config *tls.Config) {
|
func (tx *Trace) OnTLSHandshakeStart(now time.Time, remoteAddr string, config *tls.Config) {
|
||||||
t := now.Sub(tx.ZeroTime)
|
t := now.Sub(tx.ZeroTime)
|
||||||
select {
|
select {
|
||||||
case tx.NetworkEvent <- NewAnnotationArchivalNetworkEvent(tx.Index, t, "tls_handshake_start"):
|
case tx.networkEvent <- NewAnnotationArchivalNetworkEvent(tx.Index, t, "tls_handshake_start"):
|
||||||
default: // buffer is full
|
default: // buffer is full
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -54,7 +54,7 @@ func (tx *Trace) OnTLSHandshakeDone(started time.Time, remoteAddr string, config
|
||||||
state tls.ConnectionState, err error, finished time.Time) {
|
state tls.ConnectionState, err error, finished time.Time) {
|
||||||
t := finished.Sub(tx.ZeroTime)
|
t := finished.Sub(tx.ZeroTime)
|
||||||
select {
|
select {
|
||||||
case tx.TLSHandshake <- NewArchivalTLSOrQUICHandshakeResult(
|
case tx.tlsHandshake <- NewArchivalTLSOrQUICHandshakeResult(
|
||||||
tx.Index,
|
tx.Index,
|
||||||
started.Sub(tx.ZeroTime),
|
started.Sub(tx.ZeroTime),
|
||||||
"tls",
|
"tls",
|
||||||
|
@ -67,7 +67,7 @@ func (tx *Trace) OnTLSHandshakeDone(started time.Time, remoteAddr string, config
|
||||||
default: // buffer is full
|
default: // buffer is full
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case tx.NetworkEvent <- NewAnnotationArchivalNetworkEvent(tx.Index, t, "tls_handshake_done"):
|
case tx.networkEvent <- NewAnnotationArchivalNetworkEvent(tx.Index, t, "tls_handshake_done"):
|
||||||
default: // buffer is full
|
default: // buffer is full
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -137,10 +137,20 @@ func TLSPeerCerts(
|
||||||
func (tx *Trace) TLSHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult) {
|
func (tx *Trace) TLSHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case ev := <-tx.TLSHandshake:
|
case ev := <-tx.tlsHandshake:
|
||||||
out = append(out, ev)
|
out = append(out, ev)
|
||||||
default:
|
default:
|
||||||
return // done
|
return // done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FirstTLSHandshakeOrNil drains the network events buffered inside the TLSHandshake channel
|
||||||
|
// and returns the first TLSHandshake, if any. Otherwise, it returns nil.
|
||||||
|
func (tx *Trace) FirstTLSHandshakeOrNil() *model.ArchivalTLSOrQUICHandshakeResult {
|
||||||
|
ev := tx.TLSHandshakes()
|
||||||
|
if len(ev) < 1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return ev[0]
|
||||||
|
}
|
||||||
|
|
|
@ -182,8 +182,8 @@ func TestNewTLSHandshakerStdlib(t *testing.T) {
|
||||||
mockedErr := errors.New("mocked")
|
mockedErr := errors.New("mocked")
|
||||||
zeroTime := time.Now()
|
zeroTime := time.Now()
|
||||||
trace := NewTrace(0, zeroTime)
|
trace := NewTrace(0, zeroTime)
|
||||||
trace.NetworkEvent = make(chan *model.ArchivalNetworkEvent) // no buffer
|
trace.networkEvent = make(chan *model.ArchivalNetworkEvent) // no buffer
|
||||||
trace.TLSHandshake = make(chan *model.ArchivalTLSOrQUICHandshakeResult) // no buffer
|
trace.tlsHandshake = make(chan *model.ArchivalTLSOrQUICHandshakeResult) // no buffer
|
||||||
thx := trace.NewTLSHandshakerStdlib(model.DiscardLogger)
|
thx := trace.NewTLSHandshakerStdlib(model.DiscardLogger)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
tcpConn := &mocks.Conn{
|
tcpConn := &mocks.Conn{
|
||||||
|
@ -341,6 +341,57 @@ func TestNewTLSHandshakerStdlib(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFirstTLSHandshake(t *testing.T) {
|
||||||
|
t.Run("returns nil when buffer is empty", func(t *testing.T) {
|
||||||
|
zeroTime := time.Now()
|
||||||
|
trace := NewTrace(0, zeroTime)
|
||||||
|
got := trace.FirstTLSHandshakeOrNil()
|
||||||
|
if got != nil {
|
||||||
|
t.Fatal("expected nil event")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("return first non-nil TLSHandshake", func(t *testing.T) {
|
||||||
|
filler := func(tx *Trace, events []*model.ArchivalTLSOrQUICHandshakeResult) {
|
||||||
|
for _, ev := range events {
|
||||||
|
tx.tlsHandshake <- ev
|
||||||
|
}
|
||||||
|
}
|
||||||
|
zeroTime := time.Now()
|
||||||
|
trace := NewTrace(0, zeroTime)
|
||||||
|
expect := []*model.ArchivalTLSOrQUICHandshakeResult{{
|
||||||
|
Network: "tls",
|
||||||
|
Address: "1.1.1.1:443",
|
||||||
|
CipherSuite: "",
|
||||||
|
Failure: nil,
|
||||||
|
NegotiatedProtocol: "",
|
||||||
|
NoTLSVerify: true,
|
||||||
|
PeerCertificates: []model.ArchivalMaybeBinaryData{},
|
||||||
|
ServerName: "dns.cloudflare.com",
|
||||||
|
T: time.Second.Seconds(),
|
||||||
|
Tags: []string{},
|
||||||
|
TLSVersion: "",
|
||||||
|
}, {
|
||||||
|
Network: "tls",
|
||||||
|
Address: "8.8.8.8:443",
|
||||||
|
CipherSuite: "",
|
||||||
|
Failure: nil,
|
||||||
|
NegotiatedProtocol: "",
|
||||||
|
NoTLSVerify: true,
|
||||||
|
PeerCertificates: []model.ArchivalMaybeBinaryData{},
|
||||||
|
ServerName: "dns.google.com",
|
||||||
|
T: time.Second.Seconds(),
|
||||||
|
Tags: []string{},
|
||||||
|
TLSVersion: "",
|
||||||
|
}}
|
||||||
|
filler(trace, expect)
|
||||||
|
got := trace.FirstTLSHandshakeOrNil()
|
||||||
|
if diff := cmp.Diff(got, expect[0]); diff != "" {
|
||||||
|
t.Fatal(diff)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestTLSPeerCerts(t *testing.T) {
|
func TestTLSPeerCerts(t *testing.T) {
|
||||||
type args struct {
|
type args struct {
|
||||||
state tls.ConnectionState
|
state tls.ConnectionState
|
||||||
|
|
|
@ -34,9 +34,9 @@ type Trace struct {
|
||||||
// traces, you can use zero to indicate the "default" trace.
|
// traces, you can use zero to indicate the "default" trace.
|
||||||
Index int64
|
Index int64
|
||||||
|
|
||||||
// NetworkEvent is MANDATORY and buffers network events. If you create
|
// networkEvent is MANDATORY and buffers network events. If you create
|
||||||
// this channel manually, ensure it has some buffer.
|
// this channel manually, ensure it has some buffer.
|
||||||
NetworkEvent chan *model.ArchivalNetworkEvent
|
networkEvent chan *model.ArchivalNetworkEvent
|
||||||
|
|
||||||
// NewStdlibResolverFn is OPTIONAL and can be used to overide
|
// NewStdlibResolverFn is OPTIONAL and can be used to overide
|
||||||
// calls to the netxlite.NewStdlibResolver factory.
|
// calls to the netxlite.NewStdlibResolver factory.
|
||||||
|
@ -62,21 +62,21 @@ type Trace struct {
|
||||||
// calls to the netxlite.NewQUICDialerWithoutResolver factory.
|
// calls to the netxlite.NewQUICDialerWithoutResolver factory.
|
||||||
NewQUICDialerWithoutResolverFn func(listener model.QUICListener, dl model.DebugLogger) model.QUICDialer
|
NewQUICDialerWithoutResolverFn func(listener model.QUICListener, dl model.DebugLogger) model.QUICDialer
|
||||||
|
|
||||||
// DNSLookup is MANDATORY and buffers DNS Lookup observations. If you create
|
// dnsLookup is MANDATORY and buffers DNS Lookup observations. If you create
|
||||||
// this channel manually, ensure it has some buffer.
|
// this channel manually, ensure it has some buffer.
|
||||||
DNSLookup chan *model.ArchivalDNSLookupResult
|
dnsLookup chan *model.ArchivalDNSLookupResult
|
||||||
|
|
||||||
// TCPConnect is MANDATORY and buffers TCP connect observations. If you create
|
// tcpConnect is MANDATORY and buffers TCP connect observations. If you create
|
||||||
// this channel manually, ensure it has some buffer.
|
// this channel manually, ensure it has some buffer.
|
||||||
TCPConnect chan *model.ArchivalTCPConnectResult
|
tcpConnect chan *model.ArchivalTCPConnectResult
|
||||||
|
|
||||||
// TLSHandshake is MANDATORY and buffers TLS handshake observations. If you create
|
// tlsHandshake is MANDATORY and buffers TLS handshake observations. If you create
|
||||||
// this channel manually, ensure it has some buffer.
|
// this channel manually, ensure it has some buffer.
|
||||||
TLSHandshake chan *model.ArchivalTLSOrQUICHandshakeResult
|
tlsHandshake chan *model.ArchivalTLSOrQUICHandshakeResult
|
||||||
|
|
||||||
// QUICHandshake is MANDATORY and buffers QUIC handshake observations. If you create
|
// quicHandshake is MANDATORY and buffers QUIC handshake observations. If you create
|
||||||
// this channel manually, ensure it has some buffer.
|
// this channel manually, ensure it has some buffer.
|
||||||
QUICHandshake chan *model.ArchivalTLSOrQUICHandshakeResult
|
quicHandshake chan *model.ArchivalTLSOrQUICHandshakeResult
|
||||||
|
|
||||||
// TimeNowFn is OPTIONAL and can be used to override calls to time.Now
|
// TimeNowFn is OPTIONAL and can be used to override calls to time.Now
|
||||||
// to produce deterministic timing when testing.
|
// to produce deterministic timing when testing.
|
||||||
|
@ -122,25 +122,25 @@ const (
|
||||||
func NewTrace(index int64, zeroTime time.Time) *Trace {
|
func NewTrace(index int64, zeroTime time.Time) *Trace {
|
||||||
return &Trace{
|
return &Trace{
|
||||||
Index: index,
|
Index: index,
|
||||||
NetworkEvent: make(
|
networkEvent: make(
|
||||||
chan *model.ArchivalNetworkEvent,
|
chan *model.ArchivalNetworkEvent,
|
||||||
NetworkEventBufferSize,
|
NetworkEventBufferSize,
|
||||||
),
|
),
|
||||||
NewDialerWithoutResolverFn: nil, // use default
|
NewDialerWithoutResolverFn: nil, // use default
|
||||||
NewTLSHandshakerStdlibFn: nil, // use default
|
NewTLSHandshakerStdlibFn: nil, // use default
|
||||||
DNSLookup: make(
|
dnsLookup: make(
|
||||||
chan *model.ArchivalDNSLookupResult,
|
chan *model.ArchivalDNSLookupResult,
|
||||||
DNSLookupBufferSize,
|
DNSLookupBufferSize,
|
||||||
),
|
),
|
||||||
TCPConnect: make(
|
tcpConnect: make(
|
||||||
chan *model.ArchivalTCPConnectResult,
|
chan *model.ArchivalTCPConnectResult,
|
||||||
TCPConnectBufferSize,
|
TCPConnectBufferSize,
|
||||||
),
|
),
|
||||||
TLSHandshake: make(
|
tlsHandshake: make(
|
||||||
chan *model.ArchivalTLSOrQUICHandshakeResult,
|
chan *model.ArchivalTLSOrQUICHandshakeResult,
|
||||||
TLSHandshakeBufferSize,
|
TLSHandshakeBufferSize,
|
||||||
),
|
),
|
||||||
QUICHandshake: make(
|
quicHandshake: make(
|
||||||
chan *model.ArchivalTLSOrQUICHandshakeResult,
|
chan *model.ArchivalTLSOrQUICHandshakeResult,
|
||||||
QUICHandshakeBufferSize,
|
QUICHandshakeBufferSize,
|
||||||
),
|
),
|
||||||
|
|
|
@ -37,7 +37,7 @@ func TestNewTrace(t *testing.T) {
|
||||||
ev := &model.ArchivalNetworkEvent{}
|
ev := &model.ArchivalNetworkEvent{}
|
||||||
ff.Fill(ev)
|
ff.Fill(ev)
|
||||||
select {
|
select {
|
||||||
case trace.NetworkEvent <- ev:
|
case trace.networkEvent <- ev:
|
||||||
idx++
|
idx++
|
||||||
default:
|
default:
|
||||||
break Loop
|
break Loop
|
||||||
|
@ -92,7 +92,7 @@ func TestNewTrace(t *testing.T) {
|
||||||
ev := &model.ArchivalDNSLookupResult{}
|
ev := &model.ArchivalDNSLookupResult{}
|
||||||
ff.Fill(ev)
|
ff.Fill(ev)
|
||||||
select {
|
select {
|
||||||
case trace.DNSLookup <- ev:
|
case trace.dnsLookup <- ev:
|
||||||
idx++
|
idx++
|
||||||
default:
|
default:
|
||||||
break Loop
|
break Loop
|
||||||
|
@ -111,7 +111,7 @@ func TestNewTrace(t *testing.T) {
|
||||||
ev := &model.ArchivalTCPConnectResult{}
|
ev := &model.ArchivalTCPConnectResult{}
|
||||||
ff.Fill(ev)
|
ff.Fill(ev)
|
||||||
select {
|
select {
|
||||||
case trace.TCPConnect <- ev:
|
case trace.tcpConnect <- ev:
|
||||||
idx++
|
idx++
|
||||||
default:
|
default:
|
||||||
break Loop
|
break Loop
|
||||||
|
@ -130,7 +130,7 @@ func TestNewTrace(t *testing.T) {
|
||||||
ev := &model.ArchivalTLSOrQUICHandshakeResult{}
|
ev := &model.ArchivalTLSOrQUICHandshakeResult{}
|
||||||
ff.Fill(ev)
|
ff.Fill(ev)
|
||||||
select {
|
select {
|
||||||
case trace.TLSHandshake <- ev:
|
case trace.tlsHandshake <- ev:
|
||||||
idx++
|
idx++
|
||||||
default:
|
default:
|
||||||
break Loop
|
break Loop
|
||||||
|
@ -149,7 +149,7 @@ func TestNewTrace(t *testing.T) {
|
||||||
ev := &model.ArchivalTLSOrQUICHandshakeResult{}
|
ev := &model.ArchivalTLSOrQUICHandshakeResult{}
|
||||||
ff.Fill(ev)
|
ff.Fill(ev)
|
||||||
select {
|
select {
|
||||||
case trace.QUICHandshake <- ev:
|
case trace.quicHandshake <- ev:
|
||||||
idx++
|
idx++
|
||||||
default:
|
default:
|
||||||
break Loop
|
break Loop
|
||||||
|
|
Loading…
Reference in New Issue
Block a user