ooni-probe-cli/internal/engine/experiment/dht/dht.go

350 lines
9.6 KiB
Go

package dht
import (
"context"
"fmt"
"github.com/pkg/errors"
"net"
"net/url"
"time"
"github.com/anacrolix/dht/v2"
"github.com/ooni/probe-cli/v3/internal/engine/experiment/urlgetter"
"github.com/ooni/probe-cli/v3/internal/measurexlite"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/tracex"
)
var (
// errNoInputProvided indicates no input was passed
errNoInputProvided = errors.New("no input provided")
// errInputIsNotAnURL indicates that input is not an URL
errInputIsNotAnURL = errors.New("input is not an URL")
// errInvalidScheme indicates that the scheme is invalid
errInvalidScheme = errors.New("scheme must be dht://")
// errMissingPort indicates that no port was provided
errMissingPort = errors.New("no port was provided but dht:// requires explicit port")
)
const (
testName = "dht"
testVersion = "0.0.1"
)
// Config contains the experiment config.
type Config struct{}
type runtimeConfig struct {
// nodeaddr IP or domain name
dhtnode string
port string
infohash string
}
func config(input model.MeasurementTarget) (*runtimeConfig, error) {
// Bittorrent v2 hybrid test torrent: https://blog.libtorrent.org/2020/09/bittorrent-v2/
// Has good chances of being seeded years from now
hash := "631a31dd0a46257d5078c0dee4e66e26f73e42ac"
if input == "" {
return nil, errNoInputProvided
}
// TODO: static input from defaultDHTBoostrapNodes()
// input == "" triggers runtime error from the experiment runner
if input == "DUMMY" {
// No requested DHT bootstrap node, let the DHT library try all it knows
return &runtimeConfig{
dhtnode: "",
port: "",
infohash: hash,
}, nil
}
parsed, err := url.Parse(string(input))
if err != nil {
return nil, fmt.Errorf("%w: %s", errInputIsNotAnURL, err.Error())
}
if parsed.Scheme != "dht" {
return nil, errInvalidScheme
}
if parsed.Port() == "" {
// Port is mandatory because DHT bootstrap nodes use different ports
return nil, errMissingPort
}
validConfig := runtimeConfig{
dhtnode: parsed.Hostname(),
port: parsed.Port(),
infohash: hash,
}
return &validConfig, nil
}
// TestKeys contains the experiment results
type TestKeys struct {
Queries []*model.ArchivalDNSLookupResult `json:"queries"`
Runs []*IndividualTestKeys `json:"runs"`
// Used for global failure (DNS resolution)
Failure string `json:"failure"`
}
func (tk *TestKeys) failure(err error) {
tk.Failure = *tracex.NewFailure(err)
}
func (tk *TestKeys) computeFailure() {
if tk.Failure != "" {
return
}
for _, itk := range tk.Runs {
if itk.Failure != "" {
tk.Failure = itk.Failure
return
}
}
}
// IndividualTestKeys indicate results for a single IP/port combo DHT bootstrap node
// in case the DNS resolves to several IPs, or multiple bootstrap domains were used
type IndividualTestKeys struct {
// Logger, not exported to JSON
logger model.Logger
// List of IP/port combos tried to boostrap DHT
BootstrapNodes []string `json:"bootstrap_nodes"`
// Number of DHT bootsrap nodes
BootstrapNum int `json:"bootstrap_num"`
// Number of DHT peers contacted
PeersTriedNum uint32 `json:"peers_tried_num"`
// Number of DHT peers who answered
PeersRespondedNum uint32 `json:"peers_responded_num"`
// Number of DHT peers found for specific requested infohash
InfohashPeersNum int `json:"infohash_peers_num"`
// Actual DHT peers found for requested infohash
InfohashPeers []string `json:"infohash_peers"`
// Individual failure aborting the test run for this address/port combo
Failure string `json:"failure"`
}
func (itk *IndividualTestKeys) error(err error) {
itk.Failure = *tracex.NewFailure(err)
itk.logger.Warn(itk.Failure)
}
func newITK(tk *TestKeys, log model.Logger) *IndividualTestKeys {
itk := new(IndividualTestKeys)
itk.logger = log
tk.Runs = append(tk.Runs, itk)
return itk
}
// Measurer performs the measurement.
type Measurer struct {
// Config contains the experiment settings. If empty we
// will be using default settings.
Config Config
// Getter is an optional getter to be used for testing.
Getter urlgetter.MultiGetter
}
// ExperimentName implements ExperimentMeasurer.ExperimentName
func (m Measurer) ExperimentName() string {
return testName
}
// ExperimentVersion implements ExperimentMeasurer.ExperimentVersion
func (m Measurer) ExperimentVersion() string {
return testVersion
}
func defaultDHTBoostrapNodes() []string {
return []string{
"router.utorrent.com:6881",
"router.bittorrent.com:6881",
"dht.transmissionbt.com:6881",
"dht.aelitis.com:6881",
"router.silotis.us:6881",
"dht.libtorrent.org:25401",
"dht.anacrolix.link:42069",
"router.bittorrent.cloud:42069",
}
}
// Server starts a DHT server with a list of bootstrap nodes and stores
// failure cases inside a IndividualTestKeys
func Server(bootstrapNodes []string, itk *IndividualTestKeys) (*dht.Server, bool) {
itk.BootstrapNodes = bootstrapNodes
itk.BootstrapNum = len(bootstrapNodes)
// Starting new DHT client
dhtconf := dht.NewDefaultServerConfig()
dhtconf.QueryResendDelay = func() time.Duration {
return 10 * time.Second
}
dhtconf.StartingNodes = func() (addrs []dht.Addr, err error) {
for _, addrport := range bootstrapNodes {
udpAddr, err := net.ResolveUDPAddr("udp", addrport)
if err != nil {
return nil, err
}
addrs = append(addrs, dht.NewAddr(udpAddr))
}
return addrs, nil
}
dhtsrv, err := dht.NewServer(dhtconf)
if err != nil {
itk.error(err)
return nil, false
}
itk.logger.Infof("Finished starting DHT server with bootstrap nodes: %v", bootstrapNodes)
return dhtsrv, true
}
func testServer(dht *dht.Server, infohash [20]byte, itk *IndividualTestKeys) bool {
announce, err := dht.AnnounceTraversal(infohash)
if err != nil {
itk.error(err)
return false
}
defer announce.Close()
counter := 0
for entry := range announce.Peers {
counter++
itk.InfohashPeers = append(itk.InfohashPeers, entry.NodeInfo.Addr.String())
itk.logger.Debugf("peer %d: %s", counter, entry.NodeInfo.Addr)
}
stats := announce.TraversalStats()
itk.PeersTriedNum = stats.NumAddrsTried
itk.PeersRespondedNum = stats.NumResponses
itk.InfohashPeersNum = counter
if itk.PeersRespondedNum == 0 {
itk.error(errors.New("No DHT peers were found"))
return false
}
itk.logger.Infof("Tried %d peers obtained from %d bootstrap nodes. Got response from %d. %d have requested infohash.", itk.PeersTriedNum, itk.BootstrapNum, itk.PeersRespondedNum, itk.InfohashPeersNum)
return true
}
// Run implements ExperimentMeasurer.Run
func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error {
//ctx context.Context, sess model.ExperimentSession,
//measurement *model.Measurement, callbacks model.ExperimentCallbacks,
//) error {
sess := args.Session
measurement := args.Measurement
log := sess.Logger()
trace := measurexlite.NewTrace(0, measurement.MeasurementStartTimeSaved)
resolver := trace.NewStdlibResolver(log)
config, err := config(measurement.Input)
if err != nil {
// Invalid input data, we don't even generate report
return err
}
tk := new(TestKeys)
measurement.TestKeys = tk
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
// Turn string infohash into 20-bytes array
var infohash [20]byte
copy(infohash[:], config.infohash)
if config.dhtnode != "" {
// Specific node provided: resolve it
log.Infof("Resolving DNS for %s", config.dhtnode)
resolvedAddrs, err := resolver.LookupHost(ctx, config.dhtnode)
tk.Queries = append(tk.Queries, trace.DNSLookupsFromRoundTrip()...)
if err != nil {
tk.failure(err)
return nil
}
log.Infof("Finished DNS for %s: %v", config.dhtnode, resolvedAddrs)
for _, addr := range resolvedAddrs {
nodeAddrport := net.JoinHostPort(addr, config.port)
log.Infof("Trying DHT bootstrap node %s", nodeAddrport)
nodeAddrports := []string{nodeAddrport}
itk := newITK(tk, log)
dht, success := Server(nodeAddrports, itk)
if !success {
continue
}
testServer(dht, infohash, itk)
}
} else {
// Use default DHT bootstrap nodes because none was given by input
resolvedAddrports := []string{}
for _, bootstrapDomain := range defaultDHTBoostrapNodes() {
// Ignore error because we use static input so panic chance is 0
host, port, _ := net.SplitHostPort(bootstrapDomain)
log.Infof("Resolving DNS for %s", host)
resolvedAddrs, err := resolver.LookupHost(ctx, host)
tk.Queries = append(tk.Queries, trace.DNSLookupsFromRoundTrip()...)
if err != nil {
tk.failure(err)
return nil
}
log.Infof("Finished DNS for %s: %v", host, resolvedAddrs)
for _, resolvedAddr := range resolvedAddrs {
resolvedAddrports = append(resolvedAddrports, net.JoinHostPort(resolvedAddr, port))
}
}
log.Infof("Resolved the following bootstrap nodes: %v", resolvedAddrports)
itk := newITK(tk, log)
dht, success := Server(resolvedAddrports, itk)
if success {
testServer(dht, infohash, itk)
}
}
tk.computeFailure()
return nil
}
// NewExperimentMeasurer creates a new ExperimentMeasurer.
func NewExperimentMeasurer(config Config) model.ExperimentMeasurer {
return Measurer{Config: config}
}
// SummaryKeys contains summary keys for this experiment.
//
// Note that this structure is part of the ABI contract with ooniprobe
// therefore we should be careful when changing it.
type SummaryKeys struct {
IsAnomaly bool `json:"-"`
}
// GetSummaryKeys implements model.ExperimentMeasurer.GetSummaryKeys.
func (m Measurer) GetSummaryKeys(measurement *model.Measurement) (interface{}, error) {
sk := SummaryKeys{IsAnomaly: false}
_, ok := measurement.TestKeys.(*TestKeys)
if !ok {
return sk, errors.New("invalid test keys type")
}
return sk, nil
}