feat: tlsping and tcpping using step-by-step (#815)
## Checklist - [x] I have read the [contribution guidelines](https://github.com/ooni/probe-cli/blob/master/CONTRIBUTING.md) - [x] reference issue for this pull request: https://github.com/ooni/probe/issues/2158 - [x] if you changed anything related how experiments work and you need to reflect these changes in the ooni/spec repository, please link to the related ooni/spec pull request: https://github.com/ooni/spec/pull/250 ## Description This diff refactors the codebase to reimplement tlsping and tcpping to use the step-by-step measurements style. See docs/design/dd-003-step-by-step.md for more information on the step-by-step measurement style.
This commit is contained in:
@@ -10,13 +10,13 @@ import (
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/ooni/probe-cli/v3/internal/measurex"
|
||||
"github.com/ooni/probe-cli/v3/internal/measurexlite"
|
||||
"github.com/ooni/probe-cli/v3/internal/model"
|
||||
)
|
||||
|
||||
const (
|
||||
testName = "tcpping"
|
||||
testVersion = "0.1.0"
|
||||
testVersion = "0.2.0"
|
||||
)
|
||||
|
||||
// Config contains the experiment configuration.
|
||||
@@ -49,7 +49,7 @@ type TestKeys struct {
|
||||
|
||||
// SinglePing contains the results of a single ping.
|
||||
type SinglePing struct {
|
||||
TCPConnect []*measurex.ArchivalTCPConnect `json:"tcp_connect"`
|
||||
TCPConnect *model.ArchivalTCPConnectResult `json:"tcp_connect"`
|
||||
}
|
||||
|
||||
// Measurer performs the measurement.
|
||||
@@ -103,42 +103,47 @@ func (m *Measurer) Run(
|
||||
}
|
||||
tk := new(TestKeys)
|
||||
measurement.TestKeys = tk
|
||||
out := make(chan *measurex.EndpointMeasurement)
|
||||
mxmx := measurex.NewMeasurerWithDefaultSettings()
|
||||
go m.tcpPingLoop(ctx, mxmx, parsed.Host, out)
|
||||
out := make(chan *SinglePing)
|
||||
go m.tcpPingLoop(ctx, measurement.MeasurementStartTimeSaved, sess.Logger(), parsed.Host, out)
|
||||
for len(tk.Pings) < int(m.config.repetitions()) {
|
||||
meas := <-out
|
||||
tk.Pings = append(tk.Pings, &SinglePing{
|
||||
TCPConnect: measurex.NewArchivalTCPConnectList(meas.Connect),
|
||||
})
|
||||
tk.Pings = append(tk.Pings, <-out)
|
||||
}
|
||||
return nil // return nil so we always submit the measurement
|
||||
}
|
||||
|
||||
// tcpPingLoop sends all the ping requests and emits the results onto the out channel.
|
||||
func (m *Measurer) tcpPingLoop(ctx context.Context, mxmx *measurex.Measurer,
|
||||
address string, out chan<- *measurex.EndpointMeasurement) {
|
||||
func (m *Measurer) tcpPingLoop(ctx context.Context, zeroTime time.Time,
|
||||
logger model.Logger, address string, out chan<- *SinglePing) {
|
||||
ticker := time.NewTicker(m.config.delay())
|
||||
defer ticker.Stop()
|
||||
for i := int64(0); i < m.config.repetitions(); i++ {
|
||||
go m.tcpPingAsync(ctx, mxmx, address, out)
|
||||
go m.tcpPingAsync(ctx, i, zeroTime, logger, address, out)
|
||||
<-ticker.C
|
||||
}
|
||||
}
|
||||
|
||||
// tcpPingAsync performs a TCP ping and emits the result onto the out channel.
|
||||
func (m *Measurer) tcpPingAsync(ctx context.Context, mxmx *measurex.Measurer,
|
||||
address string, out chan<- *measurex.EndpointMeasurement) {
|
||||
out <- m.tcpConnect(ctx, mxmx, address)
|
||||
func (m *Measurer) tcpPingAsync(ctx context.Context, index int64,
|
||||
zeroTime time.Time, logger model.Logger, address string, out chan<- *SinglePing) {
|
||||
out <- m.tcpConnect(ctx, index, zeroTime, logger, address)
|
||||
}
|
||||
|
||||
// tcpConnect performs a TCP connect and returns the result to the caller.
|
||||
func (m *Measurer) tcpConnect(ctx context.Context, mxmx *measurex.Measurer,
|
||||
address string) *measurex.EndpointMeasurement {
|
||||
func (m *Measurer) tcpConnect(ctx context.Context, index int64,
|
||||
zeroTime time.Time, logger model.Logger, address string) *SinglePing {
|
||||
// TODO(bassosimone): make the timeout user-configurable
|
||||
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
return mxmx.TCPConnect(ctx, address)
|
||||
trace := measurexlite.NewTrace(index, zeroTime)
|
||||
dialer := trace.NewDialerWithoutResolver(logger)
|
||||
ol := measurexlite.NewOperationLogger(logger, "TCPPing #%d %s", index, address)
|
||||
conn, err := dialer.DialContext(ctx, "tcp", address)
|
||||
ol.Stop(err)
|
||||
measurexlite.MaybeClose(conn)
|
||||
sp := &SinglePing{
|
||||
TCPConnect: <-trace.TCPConnect,
|
||||
}
|
||||
return sp
|
||||
}
|
||||
|
||||
// NewExperimentMeasurer creates a new ExperimentMeasurer.
|
||||
|
||||
@@ -40,14 +40,16 @@ func TestMeasurer_run(t *testing.T) {
|
||||
if m.ExperimentName() != "tcpping" {
|
||||
t.Fatal("invalid experiment name")
|
||||
}
|
||||
if m.ExperimentVersion() != "0.1.0" {
|
||||
if m.ExperimentVersion() != "0.2.0" {
|
||||
t.Fatal("invalid experiment version")
|
||||
}
|
||||
ctx := context.Background()
|
||||
meas := &model.Measurement{
|
||||
Input: model.MeasurementTarget(input),
|
||||
}
|
||||
sess := &mockable.Session{}
|
||||
sess := &mockable.Session{
|
||||
MockableLogger: model.DiscardLogger,
|
||||
}
|
||||
callbacks := model.NewPrinterCallbacks(model.DiscardLogger)
|
||||
err := m.Run(ctx, sess, meas, callbacks)
|
||||
return meas, m, err
|
||||
|
||||
Reference in New Issue
Block a user