ooni-probe-cli/internal/engine/experiment/tor/tor.go

422 lines
13 KiB
Go

// Package tor contains the tor experiment.
//
// Spec: https://github.com/ooni/spec/blob/master/nettests/ts-023-tor.md
package tor
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"sync"
"time"
"github.com/ooni/probe-cli/v3/internal/atomicx"
"github.com/ooni/probe-cli/v3/internal/engine/netx/archival"
"github.com/ooni/probe-cli/v3/internal/measurex"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/runtimex"
"github.com/ooni/probe-cli/v3/internal/scrubber"
)
const (
// parallelism is the number of parallel threads we use for this experiment
parallelism = 2
// testName is the name of this experiment
testName = "tor"
// testVersion is the version of this experiment
testVersion = "0.4.0"
)
// Config contains the experiment config.
type Config struct{}
// Summary contains a summary of what happened.
type Summary struct {
Failure *string `json:"failure"`
}
// TargetResults contains the results of measuring a target.
type TargetResults struct {
Agent string `json:"agent"`
Failure *string `json:"failure"`
NetworkEvents []*measurex.ArchivalNetworkEvent `json:"network_events"`
Queries []*measurex.ArchivalDNSLookupEvent `json:"queries"`
Requests []*measurex.ArchivalHTTPRoundTripEvent `json:"requests"`
Summary map[string]Summary `json:"summary"`
TargetAddress string `json:"target_address"`
TargetName string `json:"target_name,omitempty"`
TargetProtocol string `json:"target_protocol"`
TargetSource string `json:"target_source,omitempty"`
TCPConnect []*measurex.ArchivalTCPConnect `json:"tcp_connect"`
TLSHandshakes []*measurex.ArchivalQUICTLSHandshakeEvent `json:"tls_handshakes"`
// Only for testing. We don't care about this field otherwise. We
// cannot make this private because otherwise the IP address sanitizer
// is going to panic over a private field.
DirPortCount int `json:"-"`
}
func registerExtensions(m *model.Measurement) {
archival.ExtHTTP.AddTo(m)
archival.ExtNetevents.AddTo(m)
archival.ExtDNS.AddTo(m)
archival.ExtTCPConnect.AddTo(m)
archival.ExtTLSHandshake.AddTo(m)
}
// fillSummary fills the Summary field used by the UI.
func (tr *TargetResults) fillSummary() {
tr.Summary = make(map[string]Summary)
if len(tr.TCPConnect) < 1 {
return
}
tr.Summary[netxlite.ConnectOperation] = Summary{
Failure: tr.TCPConnect[0].Status.Failure,
}
switch tr.TargetProtocol {
case "dir_port":
// The UI currently doesn't care about this protocol
// as long as drawing a table is concerned.
tr.DirPortCount++
case "obfs4":
// We currently only perform an OBFS4 handshake, hence
// the final Failure is the handshake result
tr.Summary["handshake"] = Summary{
Failure: tr.Failure,
}
case "or_port_dirauth", "or_port":
if len(tr.TLSHandshakes) < 1 {
return
}
tr.Summary["handshake"] = Summary{
Failure: tr.TLSHandshakes[0].Failure,
}
}
}
// TestKeys contains tor test keys.
type TestKeys struct {
DirPortTotal int64 `json:"dir_port_total"`
DirPortAccessible int64 `json:"dir_port_accessible"`
OBFS4Total int64 `json:"obfs4_total"`
OBFS4Accessible int64 `json:"obfs4_accessible"`
ORPortDirauthTotal int64 `json:"or_port_dirauth_total"`
ORPortDirauthAccessible int64 `json:"or_port_dirauth_accessible"`
ORPortTotal int64 `json:"or_port_total"`
ORPortAccessible int64 `json:"or_port_accessible"`
Targets map[string]TargetResults `json:"targets"`
}
func (tk *TestKeys) fillToplevelKeys() {
for _, value := range tk.Targets {
switch value.TargetProtocol {
case "dir_port":
tk.DirPortTotal++
if value.Failure == nil {
tk.DirPortAccessible++
}
case "obfs4":
tk.OBFS4Total++
if value.Failure == nil {
tk.OBFS4Accessible++
}
case "or_port_dirauth":
tk.ORPortDirauthTotal++
if value.Failure == nil {
tk.ORPortDirauthAccessible++
}
case "or_port":
tk.ORPortTotal++
if value.Failure == nil {
tk.ORPortAccessible++
}
}
}
}
// Measurer performs the measurement.
type Measurer struct {
config Config
fetchTorTargets func(ctx context.Context, sess model.ExperimentSession, cc string) (map[string]model.OOAPITorTarget, error)
}
// NewMeasurer creates a new Measurer
func NewMeasurer(config Config) *Measurer {
return &Measurer{
config: config,
fetchTorTargets: func(ctx context.Context, sess model.ExperimentSession, cc string) (map[string]model.OOAPITorTarget, error) {
return sess.FetchTorTargets(ctx, cc)
},
}
}
// ExperimentName implements ExperimentMeasurer.ExperiExperimentName.
func (m *Measurer) ExperimentName() string {
return testName
}
// ExperimentVersion implements ExperimentMeasurer.ExperimentVersion.
func (m *Measurer) ExperimentVersion() string {
return testVersion
}
// Run implements ExperimentMeasurer.Run.
func (m *Measurer) Run(
ctx context.Context,
sess model.ExperimentSession,
measurement *model.Measurement,
callbacks model.ExperimentCallbacks,
) error {
targets, err := m.gimmeTargets(ctx, sess)
if err != nil {
return err // fail the measurement if we cannot get any target
}
registerExtensions(measurement)
m.measureTargets(ctx, sess, measurement, callbacks, targets)
return nil
}
func (m *Measurer) gimmeTargets(
ctx context.Context, sess model.ExperimentSession,
) (map[string]model.OOAPITorTarget, error) {
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
return m.fetchTorTargets(ctx, sess, sess.ProbeCC())
}
// keytarget contains a key and the related target
type keytarget struct {
key string
target model.OOAPITorTarget
}
// private returns whether a target is private. We consider private
// every target coming from a non-empty data source.
func (kt keytarget) private() bool {
return kt.target.Source != ""
}
// maybeTargetAddress returns the target address if the target is
// not private, otherwise it returns `"[scrubbed]""`.
func (kt keytarget) maybeTargetAddress() (address string) {
address = "[scrubbed]"
if !kt.private() {
address = kt.target.Address
}
return
}
func (m *Measurer) measureTargets(
ctx context.Context,
sess model.ExperimentSession,
measurement *model.Measurement,
callbacks model.ExperimentCallbacks,
targets map[string]model.OOAPITorTarget,
) {
// run measurements in parallel
var waitgroup sync.WaitGroup
rc := newResultsCollector(sess, measurement, callbacks)
waitgroup.Add(len(targets))
workch := make(chan keytarget)
for i := 0; i < parallelism; i++ {
go func(ch <-chan keytarget, total int) {
for kt := range ch {
rc.measureSingleTarget(ctx, kt, total)
waitgroup.Done()
}
}(workch, len(targets))
}
for key, target := range targets {
workch <- keytarget{key: key, target: target}
}
close(workch)
waitgroup.Wait()
// fill the measurement entry
testkeys := &TestKeys{Targets: rc.targetresults}
testkeys.fillToplevelKeys()
measurement.TestKeys = testkeys
}
type resultsCollector struct {
callbacks model.ExperimentCallbacks
completed *atomicx.Int64
flexibleConnect func(context.Context, keytarget) (*measurex.ArchivalMeasurement, *string)
measurement *model.Measurement
mu sync.Mutex
sess model.ExperimentSession
targetresults map[string]TargetResults
}
func newResultsCollector(
sess model.ExperimentSession,
measurement *model.Measurement,
callbacks model.ExperimentCallbacks,
) *resultsCollector {
rc := &resultsCollector{
callbacks: callbacks,
completed: &atomicx.Int64{},
measurement: measurement,
sess: sess,
targetresults: make(map[string]TargetResults),
}
rc.flexibleConnect = rc.defaultFlexibleConnect
return rc
}
func maybeSanitize(input TargetResults, kt keytarget) TargetResults {
if !kt.private() {
return input
}
data, err := json.Marshal(input)
runtimex.PanicOnError(err, "json.Marshal should not fail here")
// Implementation note: here we are using a strict scrubbing policy where
// we remove all IP _endpoints_, mainly for convenience, because we already
// have a well tested implementation that does that.
data = []byte(scrubber.Scrub(string(data)))
var out TargetResults
err = json.Unmarshal(data, &out)
runtimex.PanicOnError(err, "json.Unmarshal should not fail here")
return out
}
func (rc *resultsCollector) measureSingleTarget(
ctx context.Context, kt keytarget, total int,
) {
tk, failure := rc.flexibleConnect(ctx, kt)
runtimex.PanicIfNil(tk, "measurex should guarantee non-nil here")
tr := TargetResults{
Agent: "redirect",
Failure: failure,
NetworkEvents: tk.NetworkEvents,
Queries: tk.Queries,
Requests: tk.Requests,
TCPConnect: tk.TCPConnect,
TLSHandshakes: tk.TLSHandshakes,
}
tr.fillSummary()
tr = maybeSanitize(tr, kt)
rc.mu.Lock()
tr.TargetAddress = kt.maybeTargetAddress()
tr.TargetName = kt.target.Name
tr.TargetProtocol = kt.target.Protocol
tr.TargetSource = kt.target.Source
rc.targetresults[kt.key] = tr
rc.mu.Unlock()
sofar := rc.completed.Add(1)
percentage := 0.0
if total > 0 {
percentage = float64(sofar) / float64(total)
}
rc.callbacks.OnProgress(percentage, fmt.Sprintf(
"tor: access %s/%s: %s", kt.maybeTargetAddress(), kt.target.Protocol,
failureString(failure),
))
}
func maybeScrubbingLogger(input model.Logger, kt keytarget) model.Logger {
if !kt.private() {
return input
}
return &scrubber.Logger{Logger: input}
}
// defaultFlexibleConnect is the default implementation of the
// rc.flexibleConnect testable function.
//
// Arguments:
//
// - ctx is the context for timeout/cancellation;
//
// - kt contains information about the target.
//
// Returns:
//
// - tk is the measurement, which is always non nil because
// the measurex "easy" API provides this guarantee;
//
// - failure is nil or an OONI failure string.
func (rc *resultsCollector) defaultFlexibleConnect(ctx context.Context,
kt keytarget) (tk *measurex.ArchivalMeasurement, failure *string) {
mx := measurex.NewMeasurerWithDefaultSettings()
mx.Begin = rc.measurement.MeasurementStartTimeSaved
mx.Logger = maybeScrubbingLogger(rc.sess.Logger(), kt)
switch kt.target.Protocol {
case "dir_port":
URL := url.URL{
Host: kt.target.Address,
Path: "/tor/status-vote/current/consensus.z",
Scheme: "http",
}
const snapshotsize = 1 << 8 // no need to include all in report
mx.HTTPMaxBodySnapshotSize = snapshotsize
const timeout = 15 * time.Second
return mx.EasyHTTPRoundTripGET(ctx, timeout, URL.String())
case "or_port", "or_port_dirauth":
tlsConfig := measurex.NewEasyTLSConfig().InsecureSkipVerify(true)
return mx.EasyTLSConnectAndHandshake(ctx, kt.target.Address, tlsConfig)
case "obfs4":
const timeout = 15 * time.Second
return mx.EasyOBFS4ConnectAndHandshake(
ctx, timeout, kt.target.Address, rc.sess.TempDir(),
kt.target.Params)
default:
return mx.EasyTCPConnect(ctx, kt.target.Address)
}
}
// NewExperimentMeasurer creates a new ExperimentMeasurer.
func NewExperimentMeasurer(config Config) model.ExperimentMeasurer {
return NewMeasurer(config)
}
func failureString(failure *string) (s string) {
s = "success"
if failure != nil {
s = *failure
}
return
}
// SummaryKeys contains summary keys for this experiment.
//
// Note that this structure is part of the ABI contract with ooniprobe
// therefore we should be careful when changing it.
type SummaryKeys struct {
DirPortTotal int64 `json:"dir_port_total"`
DirPortAccessible int64 `json:"dir_port_accessible"`
OBFS4Total int64 `json:"obfs4_total"`
OBFS4Accessible int64 `json:"obfs4_accessible"`
ORPortDirauthTotal int64 `json:"or_port_dirauth_total"`
ORPortDirauthAccessible int64 `json:"or_port_dirauth_accessible"`
ORPortTotal int64 `json:"or_port_total"`
ORPortAccessible int64 `json:"or_port_accessible"`
IsAnomaly bool `json:"-"`
}
// GetSummaryKeys implements model.ExperimentMeasurer.GetSummaryKeys.
func (m Measurer) GetSummaryKeys(measurement *model.Measurement) (interface{}, error) {
sk := SummaryKeys{IsAnomaly: false}
tk, ok := measurement.TestKeys.(*TestKeys)
if !ok {
return sk, errors.New("invalid test keys type")
}
sk.DirPortTotal = tk.DirPortTotal
sk.DirPortAccessible = tk.DirPortAccessible
sk.OBFS4Total = tk.OBFS4Total
sk.OBFS4Accessible = tk.OBFS4Accessible
sk.ORPortDirauthTotal = tk.ORPortDirauthTotal
sk.ORPortDirauthAccessible = tk.ORPortDirauthAccessible
sk.ORPortTotal = tk.ORPortTotal
sk.ORPortAccessible = tk.ORPortAccessible
sk.IsAnomaly = ((sk.DirPortAccessible <= 0 && sk.DirPortTotal > 0) ||
(sk.OBFS4Accessible <= 0 && sk.OBFS4Total > 0) ||
(sk.ORPortDirauthAccessible <= 0 && sk.ORPortDirauthTotal > 0) ||
(sk.ORPortAccessible <= 0 && sk.ORPortTotal > 0))
return sk, nil
}