diff --git a/internal/engine/experiment/smtp/smtp.go b/internal/engine/experiment/smtp/smtp.go index 50b66af..97c183f 100644 --- a/internal/engine/experiment/smtp/smtp.go +++ b/internal/engine/experiment/smtp/smtp.go @@ -88,14 +88,24 @@ func config(input model.MeasurementTarget) (*Config, error) { } // TestKeys contains the experiment results + type TestKeys struct { - Queries []*model.ArchivalDNSLookupResult `json:"queries"` - TCPConnect []*model.ArchivalTCPConnectResult `json:"tcp_connect"` - TLSHandshakes []*model.ArchivalTLSOrQUICHandshakeResult `json:"tls_handshakes"` - Errors map[string][]*string `json:"smtp"` - NoOpCounter uint8 `json:"successful_noops"` + Queries []*model.ArchivalDNSLookupResult `json:"queries"` + Runs map[string]*IndividualTestKeys `json:"runs"` // Used for global failure (DNS resolution) Failure string `json:"failure"` + // Indicates global failure or individual test failure + Failed bool `json:"failed"` +} + +// IndividualTestKeys contains results for TCP/IP level stuff for each address found +// in the DNS lookup +type IndividualTestKeys struct { + NoOpCounter uint8 + TCPConnect []*model.ArchivalTCPConnectResult `json:"tcp_connect"` + TLSHandshakes []*model.ArchivalTLSOrQUICHandshakeResult `json:"tls_handshakes"` + // Individual failure aborting the test run for this address/port combo + Failure *string `json:"failure"` } type Measurer struct { @@ -117,6 +127,8 @@ func (m Measurer) ExperimentVersion() string { return testVersion } +// Manages sequential TCP sessions to the same hostname (over different IPs) +// don't use in parallel! type TCPRunner struct { trace *measurexlite.Trace logger model.Logger @@ -125,31 +137,31 @@ type TCPRunner struct { tlsconfig *tls.Config host string port string + // addr is changed everytime TCPRunner.conn(addr) is called + addr string } type TCPSession struct { addr string port string runner *TCPRunner - errors []*string + tk *IndividualTestKeys tls bool raw_conn *net.Conn tls_conn *net.Conn } -func (s TCPSession) Close() { +func (s *TCPSession) Close() { if s.tls { - var conn = *s.tls_conn - conn.Close() - //*(s.tls_conn).Close() + var conn = *s.tls_conn + conn.Close() } else { - var conn = *s.raw_conn - conn.Close() - //*(s.raw_conn).Close() - } + var conn = *s.raw_conn + conn.Close() + } } -func (s TCPSession) current_conn() net.Conn { +func (s *TCPSession) current_conn() net.Conn { if s.tls { return *s.tls_conn } else { @@ -157,52 +169,69 @@ func (s TCPSession) current_conn() net.Conn { } } -func (r TCPRunner) conn(addr string, port string) (*TCPSession, bool) { - key := net.JoinHostPort(addr, port) - // Initialize errors - if r.tk.Errors == nil { - r.tk.Errors = make(map[string][]*string) - } - r.tk.Errors[key] = []*string{} +func (r *TCPRunner) run_key() string { + return net.JoinHostPort(r.addr, r.port) +} + +func (r *TCPRunner) get_run() *IndividualTestKeys { + if r.tk.Runs == nil { + r.tk.Runs = make(map[string]*IndividualTestKeys) + } + key := r.run_key() + val, exists := r.tk.Runs[key] + if exists { + return val + } else { + r.tk.Runs[key] = &IndividualTestKeys{} + return r.tk.Runs[key] + } +} + +func (r *TCPRunner) conn(addr string, port string) (*TCPSession, bool) { + r.addr = addr + run := r.get_run() s := new(TCPSession) - if !s.conn(addr, port, r, r.tk.Errors[key]) { + if !s.conn(addr, port, r, run) { return nil, false } return s, true } -func (r TCPRunner) dial(addr string, port string) (net.Conn, error) { +func (r *TCPRunner) dial(addr string, port string) (net.Conn, error) { dialer := r.trace.NewDialerWithoutResolver(r.logger) conn, err := dialer.DialContext(r.ctx, "tcp", net.JoinHostPort(addr, port)) - r.tk.TCPConnect = append(r.tk.TCPConnect, r.trace.TCPConnects()...) + run := r.get_run() + run.TCPConnect = append(run.TCPConnect, r.trace.TCPConnects()...) return conn, err } -func (s TCPSession) conn(addr string, port string, runner TCPRunner, errors []*string) bool { +func (s *TCPSession) conn(addr string, port string, runner *TCPRunner, tk *IndividualTestKeys) bool { // Initialize addr field and corresponding errors in TestKeys s.addr = addr s.port = port s.tls = false - s.runner = &runner - s.errors = errors + s.runner = runner + s.tk = tk conn, err := runner.dial(addr, port) if err != nil { s.error(err) return false } - s.raw_conn = &conn + s.raw_conn = &conn return true } -func (s TCPSession) error(err error) { - s.errors = append(s.errors, tracex.NewFailure(err)) +func (s *TCPSession) error(err error) { + s.runner.tk.Failed = true + s.tk.Failure = tracex.NewFailure(err) + //s. = append(s.errors, tracex.NewFailure(err)) } -func (r TCPRunner) resolve(host string) ([]string, bool) { +func (r *TCPRunner) resolve(host string) ([]string, bool) { r.logger.Infof("Resolving DNS for %s", host) resolver := r.trace.NewStdlibResolver(r.logger) addrs, err := resolver.LookupHost(r.ctx, host) @@ -216,7 +245,7 @@ func (r TCPRunner) resolve(host string) ([]string, bool) { return addrs, true } -func (s TCPSession) handshake() bool { +func (s *TCPSession) handshake() bool { if s.tls { // TLS already initialized... return true @@ -224,7 +253,7 @@ func (s TCPSession) handshake() bool { s.runner.logger.Infof("Starting TLS handshake with %s:%s", s.addr, s.port) thx := s.runner.trace.NewTLSHandshakerStdlib(s.runner.logger) tconn, _, err := thx.Handshake(s.runner.ctx, *s.raw_conn, s.runner.tlsconfig) - s.runner.tk.TLSHandshakes = append(s.runner.tk.TLSHandshakes, s.runner.trace.FirstTLSHandshakeOrNil()) + s.tk.TLSHandshakes = append(s.tk.TLSHandshakes, s.runner.trace.FirstTLSHandshakeOrNil()) if err != nil { s.error(err) return false @@ -236,7 +265,7 @@ func (s TCPSession) handshake() bool { return true } -func (s TCPSession) starttls(message string) bool { +func (s *TCPSession) starttls(message string) bool { if s.tls { // TLS already initialized... return true @@ -248,7 +277,7 @@ func (s TCPSession) starttls(message string) bool { return s.handshake() } -func (s TCPSession) smtp(ehlo string, noop uint8) bool { +func (s *TCPSession) smtp(ehlo string, noop uint8) bool { // Auto-choose plaintext/TCP session client, err := smtp.NewClient(s.current_conn(), ehlo) if err != nil { @@ -264,10 +293,10 @@ func (s TCPSession) smtp(ehlo string, noop uint8) bool { if noop > 0 { s.runner.logger.Infof("Trying to generate more no-op traffic") // TODO: noop counter per IP address - s.runner.tk.NoOpCounter = 0 - for s.runner.tk.NoOpCounter < noop { - s.runner.tk.NoOpCounter += 1 - s.runner.logger.Infof("NoOp Iteration %d", s.runner.tk.NoOpCounter) + s.tk.NoOpCounter = 0 + for s.tk.NoOpCounter < noop { + s.tk.NoOpCounter += 1 + s.runner.logger.Infof("NoOp Iteration %d", s.tk.NoOpCounter) err = client.Noop() if err != nil { s.error(err) @@ -275,11 +304,11 @@ func (s TCPSession) smtp(ehlo string, noop uint8) bool { } } - if s.runner.tk.NoOpCounter == noop { + if s.tk.NoOpCounter == noop { s.runner.logger.Infof("Successfully generated no-op traffic") return true } else { - s.runner.logger.Infof("Failed no-op traffic at iteration %d", s.runner.tk.NoOpCounter) + s.runner.logger.Infof("Failed no-op traffic at iteration %d", s.tk.NoOpCounter) return false } } @@ -313,7 +342,7 @@ func (m Measurer) Run( ServerName: config.host, } - runner := TCPRunner{ + runner := &TCPRunner{ trace: trace, logger: log, ctx: ctx,