refactor(tor): rewrite using measurex (#652)

This diff rewrites the tor experiment to use measurex "easy" API.

To this end, we need to introduce an "easy" measurex API, which basically
performs easy measurements returning two pieces of data:

1. the resulting measurement, which is already using the OONI
archival data format and is always non-nil

2. a failure (i.e., the pointer to an error string), which
is nil on success and points to a string on failure

With this change, we should now be able to completely dispose of
the original netx API, which was only used by tor.

Reference issue: https://github.com/ooni/probe/issues/1688.
This commit is contained in:
Simone Basso
2022-01-05 18:41:11 +01:00
committed by GitHub
parent f0181c432f
commit dfa5e708fe
5 changed files with 421 additions and 141 deletions
+64 -85
View File
@@ -13,10 +13,8 @@ import (
"time"
"github.com/ooni/probe-cli/v3/internal/atomicx"
"github.com/ooni/probe-cli/v3/internal/engine/httpheader"
"github.com/ooni/probe-cli/v3/internal/engine/legacy/netxlogger"
"github.com/ooni/probe-cli/v3/internal/engine/legacy/oonidatamodel"
"github.com/ooni/probe-cli/v3/internal/engine/legacy/oonitemplates"
"github.com/ooni/probe-cli/v3/internal/engine/netx/archival"
"github.com/ooni/probe-cli/v3/internal/measurex"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/runtimex"
@@ -31,7 +29,7 @@ const (
testName = "tor"
// testVersion is the version of this experiment
testVersion = "0.3.0"
testVersion = "0.4.0"
)
// Config contains the experiment config.
@@ -44,18 +42,18 @@ type Summary struct {
// TargetResults contains the results of measuring a target.
type TargetResults struct {
Agent string `json:"agent"`
Failure *string `json:"failure"`
NetworkEvents oonidatamodel.NetworkEventsList `json:"network_events"`
Queries oonidatamodel.DNSQueriesList `json:"queries"`
Requests oonidatamodel.RequestList `json:"requests"`
Summary map[string]Summary `json:"summary"`
TargetAddress string `json:"target_address"`
TargetName string `json:"target_name,omitempty"`
TargetProtocol string `json:"target_protocol"`
TargetSource string `json:"target_source,omitempty"`
TCPConnect oonidatamodel.TCPConnectList `json:"tcp_connect"`
TLSHandshakes oonidatamodel.TLSHandshakesList `json:"tls_handshakes"`
Agent string `json:"agent"`
Failure *string `json:"failure"`
NetworkEvents []*measurex.ArchivalNetworkEvent `json:"network_events"`
Queries []*measurex.ArchivalDNSLookupEvent `json:"queries"`
Requests []*measurex.ArchivalHTTPRoundTripEvent `json:"requests"`
Summary map[string]Summary `json:"summary"`
TargetAddress string `json:"target_address"`
TargetName string `json:"target_name,omitempty"`
TargetProtocol string `json:"target_protocol"`
TargetSource string `json:"target_source,omitempty"`
TCPConnect []*measurex.ArchivalTCPConnect `json:"tcp_connect"`
TLSHandshakes []*measurex.ArchivalQUICTLSHandshakeEvent `json:"tls_handshakes"`
// Only for testing. We don't care about this field otherwise. We
// cannot make this private because otherwise the IP address sanitizer
@@ -64,11 +62,11 @@ type TargetResults struct {
}
func registerExtensions(m *model.Measurement) {
oonidatamodel.ExtHTTP.AddTo(m)
oonidatamodel.ExtNetevents.AddTo(m)
oonidatamodel.ExtDNS.AddTo(m)
oonidatamodel.ExtTCPConnect.AddTo(m)
oonidatamodel.ExtTLSHandshake.AddTo(m)
archival.ExtHTTP.AddTo(m)
archival.ExtNetevents.AddTo(m)
archival.ExtDNS.AddTo(m)
archival.ExtTCPConnect.AddTo(m)
archival.ExtTLSHandshake.AddTo(m)
}
// fillSummary fills the Summary field used by the UI.
@@ -178,10 +176,6 @@ func (m *Measurer) Run(
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(
ctx, 15*time.Second*time.Duration(len(targets)),
)
defer cancel()
registerExtensions(measurement)
m.measureTargets(ctx, sess, measurement, callbacks, targets)
return nil
@@ -251,7 +245,7 @@ func (m *Measurer) measureTargets(
type resultsCollector struct {
callbacks model.ExperimentCallbacks
completed *atomicx.Int64
flexibleConnect func(context.Context, keytarget) (oonitemplates.Results, error)
flexibleConnect func(context.Context, keytarget) (*measurex.ArchivalMeasurement, *string)
measurement *model.Measurement
mu sync.Mutex
sess model.ExperimentSession
@@ -293,15 +287,16 @@ func maybeSanitize(input TargetResults, kt keytarget) TargetResults {
func (rc *resultsCollector) measureSingleTarget(
ctx context.Context, kt keytarget, total int,
) {
tk, err := rc.flexibleConnect(ctx, kt)
tk, failure := rc.flexibleConnect(ctx, kt)
runtimex.PanicIfNil(tk, "measurex should guarantee non-nil here")
tr := TargetResults{
Agent: "redirect",
Failure: setFailure(err),
NetworkEvents: oonidatamodel.NewNetworkEventsList(tk),
Queries: oonidatamodel.NewDNSQueriesList(tk),
Requests: oonidatamodel.NewRequestList(tk),
TCPConnect: oonidatamodel.NewTCPConnectList(tk),
TLSHandshakes: oonidatamodel.NewTLSHandshakesList(tk),
Failure: failure,
NetworkEvents: tk.NetworkEvents,
Queries: tk.Queries,
Requests: tk.Requests,
TCPConnect: tk.TCPConnect,
TLSHandshakes: tk.TLSHandshakes,
}
tr.fillSummary()
tr = maybeSanitize(tr, kt)
@@ -319,7 +314,7 @@ func (rc *resultsCollector) measureSingleTarget(
}
rc.callbacks.OnProgress(percentage, fmt.Sprintf(
"tor: access %s/%s: %s", kt.maybeTargetAddress(), kt.target.Protocol,
errString(err),
failureString(failure),
))
}
@@ -330,56 +325,48 @@ func maybeScrubbingLogger(input model.Logger, kt keytarget) model.Logger {
return &scrubber.Logger{Logger: input}
}
func (rc *resultsCollector) defaultFlexibleConnect(
ctx context.Context, kt keytarget,
) (tk oonitemplates.Results, err error) {
logger := maybeScrubbingLogger(rc.sess.Logger(), kt)
// defaultFlexibleConnect is the default implementation of the
// rc.flexibleConnect testable function.
//
// Arguments:
//
// - ctx is the context for timeout/cancellation;
//
// - kt contains information about the target.
//
// Returns:
//
// - tk is the measurement, which is always non nil because
// the measurex "easy" API provides this guarantee;
//
// - failure is nil or an OONI failure string.
func (rc *resultsCollector) defaultFlexibleConnect(ctx context.Context,
kt keytarget) (tk *measurex.ArchivalMeasurement, failure *string) {
mx := measurex.NewMeasurerWithDefaultSettings()
mx.Begin = rc.measurement.MeasurementStartTimeSaved
mx.Logger = maybeScrubbingLogger(rc.sess.Logger(), kt)
switch kt.target.Protocol {
case "dir_port":
url := url.URL{
URL := url.URL{
Host: kt.target.Address,
Path: "/tor/status-vote/current/consensus.z",
Scheme: "http",
}
const snapshotsize = 1 << 8 // no need to include all in report
r := oonitemplates.HTTPDo(ctx, oonitemplates.HTTPDoConfig{
Accept: httpheader.Accept(),
AcceptLanguage: httpheader.AcceptLanguage(),
Beginning: rc.measurement.MeasurementStartTimeSaved,
MaxEventsBodySnapSize: snapshotsize,
MaxResponseBodySnapSize: snapshotsize,
Handler: netxlogger.NewHandler(logger),
Method: "GET",
URL: url.String(),
UserAgent: httpheader.UserAgent(),
})
tk, err = r.TestKeys, r.Error
mx.HTTPMaxBodySnapshotSize = snapshotsize
const timeout = 15 * time.Second
return mx.EasyHTTPRoundTripGET(ctx, timeout, URL.String())
case "or_port", "or_port_dirauth":
r := oonitemplates.TLSConnect(ctx, oonitemplates.TLSConnectConfig{
Address: kt.target.Address,
Beginning: rc.measurement.MeasurementStartTimeSaved,
InsecureSkipVerify: true,
Handler: netxlogger.NewHandler(logger),
})
tk, err = r.TestKeys, r.Error
tlsConfig := measurex.NewEasyTLSConfig().InsecureSkipVerify(true)
return mx.EasyTLSConnectAndHandshake(ctx, kt.target.Address, tlsConfig)
case "obfs4":
r := oonitemplates.OBFS4Connect(ctx, oonitemplates.OBFS4ConnectConfig{
Address: kt.target.Address,
Beginning: rc.measurement.MeasurementStartTimeSaved,
Handler: netxlogger.NewHandler(logger),
Params: kt.target.Params,
StateBaseDir: rc.sess.TempDir(),
})
tk, err = r.TestKeys, r.Error
const timeout = 15 * time.Second
return mx.EasyOBFS4ConnectAndHandshake(
ctx, timeout, kt.target.Address, rc.sess.TempDir(),
kt.target.Params)
default:
r := oonitemplates.TCPConnect(ctx, oonitemplates.TCPConnectConfig{
Address: kt.target.Address,
Beginning: rc.measurement.MeasurementStartTimeSaved,
Handler: netxlogger.NewHandler(logger),
})
tk, err = r.TestKeys, r.Error
return mx.EasyTCPConnect(ctx, kt.target.Address)
}
return
}
// NewExperimentMeasurer creates a new ExperimentMeasurer.
@@ -387,18 +374,10 @@ func NewExperimentMeasurer(config Config) model.ExperimentMeasurer {
return NewMeasurer(config)
}
func errString(err error) (s string) {
func failureString(failure *string) (s string) {
s = "success"
if err != nil {
s = err.Error()
}
return
}
func setFailure(err error) (s *string) {
if err != nil {
descr := err.Error()
s = &descr
if failure != nil {
s = *failure
}
return
}
+53 -52
View File
@@ -13,9 +13,8 @@ import (
"github.com/apex/log"
"github.com/google/go-cmp/cmp"
"github.com/ooni/probe-cli/v3/internal/engine/legacy/oonidatamodel"
"github.com/ooni/probe-cli/v3/internal/engine/legacy/oonitemplates"
"github.com/ooni/probe-cli/v3/internal/engine/mockable"
"github.com/ooni/probe-cli/v3/internal/measurex"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/scrubber"
@@ -26,7 +25,7 @@ func TestNewExperimentMeasurer(t *testing.T) {
if measurer.ExperimentName() != "tor" {
t.Fatal("unexpected name")
}
if measurer.ExperimentVersion() != "0.3.0" {
if measurer.ExperimentVersion() != "0.4.0" {
t.Fatal("unexpected version")
}
}
@@ -118,15 +117,15 @@ func TestMeasurerMeasureGood(t *testing.T) {
}
}
var staticPrivateTestingTargetEndpoint = "192.95.36.142:443"
var staticPrivateTestingTargetEndpoint = "209.148.46.65:443"
var staticPrivateTestingTarget = model.OOAPITorTarget{
Address: staticPrivateTestingTargetEndpoint,
Params: map[string][]string{
"cert": {
"qUVQ0srL1JI/vO6V6m/24anYXiJD3QP2HgzUKQtQ7GRqqUvs7P+tG43RtAqdhLOALP7DJQ",
"ssH+9rP8dG2NLDN2XuFw63hIO/9MNNinLmxQDpVa+7kTOa9/m+tGWT1SmSYpQ9uTBGa6Hw",
},
"iat-mode": {"1"},
"iat-mode": {"0"},
},
Protocol: "obfs4",
Source: "bridgedb",
@@ -159,7 +158,7 @@ func TestMeasurerMeasureSanitiseOutput(t *testing.T) {
tk := measurement.TestKeys.(*TestKeys)
entry := tk.Targets[key]
if entry.Failure != nil {
t.Fatal("measurement failed unexpectedly")
t.Fatal("measurement failed unexpectedly", *entry.Failure)
}
if !bytes.Contains(data, []byte(key)) {
t.Fatal("cannot find expected key")
@@ -258,8 +257,8 @@ func TestResultsCollectorMeasureSingleTargetGood(t *testing.T) {
new(model.Measurement),
model.NewPrinterCallbacks(log.Log),
)
rc.flexibleConnect = func(context.Context, keytarget) (oonitemplates.Results, error) {
return oonitemplates.Results{}, nil
rc.flexibleConnect = func(context.Context, keytarget) (*measurex.ArchivalMeasurement, *string) {
return &measurex.ArchivalMeasurement{}, nil
}
rc.measureSingleTarget(
context.Background(), wrapTestingTarget(staticTestingTargets[0]),
@@ -292,8 +291,9 @@ func TestResultsCollectorMeasureSingleTargetWithFailure(t *testing.T) {
new(model.Measurement),
model.NewPrinterCallbacks(log.Log),
)
rc.flexibleConnect = func(context.Context, keytarget) (oonitemplates.Results, error) {
return oonitemplates.Results{}, errors.New("mocked error")
rc.flexibleConnect = func(context.Context, keytarget) (*measurex.ArchivalMeasurement, *string) {
failure := "mocked error"
return &measurex.ArchivalMeasurement{}, &failure
}
rc.measureSingleTarget(
context.Background(), keytarget{
@@ -331,14 +331,14 @@ func TestDefautFlexibleConnectDirPort(t *testing.T) {
)
ctx, cancel := context.WithCancel(context.Background())
cancel()
tk, err := rc.defaultFlexibleConnect(ctx, wrapTestingTarget(staticTestingTargets[1]))
if err == nil {
t.Fatal("expected an error here")
tk, failure := rc.defaultFlexibleConnect(ctx, wrapTestingTarget(staticTestingTargets[1]))
if failure == nil {
t.Fatal("expected a failure here")
}
if !strings.HasSuffix(err.Error(), "interrupted") {
if !strings.HasSuffix(*failure, "interrupted") {
t.Fatal("not the error we expected")
}
if tk.HTTPRequests == nil {
if tk.Requests == nil {
t.Fatal("expected HTTP data here")
}
}
@@ -353,18 +353,18 @@ func TestDefautFlexibleConnectOrPort(t *testing.T) {
)
ctx, cancel := context.WithCancel(context.Background())
cancel()
tk, err := rc.defaultFlexibleConnect(ctx, wrapTestingTarget(staticTestingTargets[2]))
if err == nil {
t.Fatal("expected an error here")
tk, failure := rc.defaultFlexibleConnect(ctx, wrapTestingTarget(staticTestingTargets[2]))
if failure == nil {
t.Fatal("expected a failure here")
}
if err.Error() != "interrupted" {
if *failure != "interrupted" {
t.Fatal("not the error we expected")
}
if tk.Connects == nil {
if tk.TCPConnect == nil {
t.Fatal("expected connects data here")
}
if tk.NetworkEvents == nil {
t.Fatal("expected network events data here")
if tk.NetworkEvents != nil {
t.Fatal("expected no network events data here")
}
}
@@ -378,18 +378,18 @@ func TestDefautFlexibleConnectOBFS4(t *testing.T) {
)
ctx, cancel := context.WithCancel(context.Background())
cancel()
tk, err := rc.defaultFlexibleConnect(ctx, wrapTestingTarget(staticTestingTargets[0]))
if err == nil {
t.Fatal("expected an error here")
tk, failure := rc.defaultFlexibleConnect(ctx, wrapTestingTarget(staticTestingTargets[0]))
if failure == nil {
t.Fatal("expected a failure here")
}
if err.Error() != "interrupted" {
if *failure != "interrupted" {
t.Fatal("not the error we expected")
}
if tk.Connects == nil {
if tk.TCPConnect == nil {
t.Fatal("expected connects data here")
}
if tk.NetworkEvents == nil {
t.Fatal("expected network events data here")
if tk.NetworkEvents != nil {
t.Fatal("expected no network events data here")
}
}
@@ -403,24 +403,25 @@ func TestDefautFlexibleConnectDefault(t *testing.T) {
)
ctx, cancel := context.WithCancel(context.Background())
cancel()
tk, err := rc.defaultFlexibleConnect(ctx, wrapTestingTarget(staticTestingTargets[3]))
if err == nil {
t.Fatal("expected an error here")
tk, failure := rc.defaultFlexibleConnect(ctx, wrapTestingTarget(staticTestingTargets[3]))
if failure == nil {
t.Fatal("expected a failure here")
}
if err.Error() != "interrupted" {
t.Fatalf("not the error we expected: %+v", err)
if *failure != "interrupted" {
t.Fatalf("not the error we expected: %+v", *failure)
}
if tk.Connects == nil {
t.Fatalf("expected connects data here, found: %+v", tk.Connects)
if tk.TCPConnect == nil {
t.Fatalf("expected connects data here, found: %+v", tk.TCPConnect)
}
}
func TestErrString(t *testing.T) {
if errString(nil) != "success" {
func TestFailureString(t *testing.T) {
if failureString(nil) != "success" {
t.Fatal("not working with nil")
}
if errString(errors.New("antani")) != "antani" {
t.Fatal("not working with error")
s := "antani"
if failureString(&s) != "antani" {
t.Fatal("not working with non-nil string")
}
}
@@ -436,8 +437,8 @@ func TestSummary(t *testing.T) {
t.Run("with a TCP connect and nothing else", func(t *testing.T) {
tr := new(TargetResults)
failure := "mocked_error"
tr.TCPConnect = append(tr.TCPConnect, oonidatamodel.TCPConnectEntry{
Status: oonidatamodel.TCPConnectStatus{
tr.TCPConnect = append(tr.TCPConnect, &measurex.ArchivalTCPConnect{
Status: &measurex.ArchivalTCPConnectStatus{
Success: true,
Failure: &failure,
},
@@ -453,8 +454,8 @@ func TestSummary(t *testing.T) {
t.Run("for OBFS4", func(t *testing.T) {
tr := new(TargetResults)
tr.TCPConnect = append(tr.TCPConnect, oonidatamodel.TCPConnectEntry{
Status: oonidatamodel.TCPConnectStatus{
tr.TCPConnect = append(tr.TCPConnect, &measurex.ArchivalTCPConnect{
Status: &measurex.ArchivalTCPConnectStatus{
Success: true,
},
})
@@ -474,16 +475,16 @@ func TestSummary(t *testing.T) {
})
t.Run("for or_port/or_port_dirauth", func(t *testing.T) {
doit := func(targetProtocol string, handshake *oonidatamodel.TLSHandshake) {
doit := func(targetProtocol string, handshake *measurex.ArchivalQUICTLSHandshakeEvent) {
tr := new(TargetResults)
tr.TCPConnect = append(tr.TCPConnect, oonidatamodel.TCPConnectEntry{
Status: oonidatamodel.TCPConnectStatus{
tr.TCPConnect = append(tr.TCPConnect, &measurex.ArchivalTCPConnect{
Status: &measurex.ArchivalTCPConnectStatus{
Success: true,
},
})
tr.TargetProtocol = targetProtocol
if handshake != nil {
tr.TLSHandshakes = append(tr.TLSHandshakes, *handshake)
tr.TLSHandshakes = append(tr.TLSHandshakes, handshake)
}
tr.fillSummary()
if len(tr.Summary) < 1 {
@@ -507,7 +508,7 @@ func TestSummary(t *testing.T) {
}
doit("or_port_dirauth", nil)
doit("or_port", nil)
doit("or_port", &oonidatamodel.TLSHandshake{
doit("or_port", &measurex.ArchivalQUICTLSHandshakeEvent{
Failure: (func() *string {
s := io.EOF.Error()
return &s
@@ -796,10 +797,10 @@ func TestSummaryKeysWorksAsIntended(t *testing.T) {
func TestTargetResultsFillSummaryDirPort(t *testing.T) {
tr := &TargetResults{
TargetProtocol: "dir_port",
TCPConnect: oonidatamodel.TCPConnectList{{
TCPConnect: []*measurex.ArchivalTCPConnect{{
IP: "1.2.3.4",
Port: 443,
Status: oonidatamodel.TCPConnectStatus{
Status: &measurex.ArchivalTCPConnectStatus{
Failure: nil,
},
}},