diff --git a/internal/engine/experiment/dht/dht.go b/internal/engine/experiment/dht/dht.go new file mode 100644 index 0000000..207a58b --- /dev/null +++ b/internal/engine/experiment/dht/dht.go @@ -0,0 +1,340 @@ +package dht + +import ( + "context" + "fmt" + "github.com/pkg/errors" + "net" + "net/url" + "time" + + "github.com/anacrolix/dht/v2" + "github.com/ooni/probe-cli/v3/internal/engine/experiment/urlgetter" + "github.com/ooni/probe-cli/v3/internal/measurexlite" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/tracex" +) + +var ( + // errInputIsNotAnURL indicates that input is not an URL + errInputIsNotAnURL = errors.New("input is not an URL") + + // errInvalidScheme indicates that the scheme is invalid + errInvalidScheme = errors.New("scheme must be dht://") + + // errInvalidPort indicates that no port was provided + errInvalidPort = errors.New("no port was provided but dht:// requires explicit port") +) + +const ( + testName = "dht" + testVersion = "0.0.1" +) + +// Config contains the experiment config. +type Config struct{} + +type RuntimeConfig struct { + // nodeaddr IP or domain name + dhtnode string + port string + infohash string +} + +func config(input model.MeasurementTarget) (*RuntimeConfig, error) { + // Bittorrent v2 hybrid test torrent: https://blog.libtorrent.org/2020/09/bittorrent-v2/ + // Has good chances of being seeded years from now + hash := "631a31dd0a46257d5078c0dee4e66e26f73e42ac" + + // TODO: static input from defaultDHTBoostrapNodes() + // input == "" triggers runtime error from the experiment runner + if input == "DUMMY" { + // No requested DHT bootstrap node, let the DHT library try all it knows + return &RuntimeConfig{ + dhtnode: "", + port: "", + infohash: hash, + }, nil + } + + parsed, err := url.Parse(string(input)) + if err != nil { + return nil, fmt.Errorf("%w: %s", errInputIsNotAnURL, err.Error()) + } + if parsed.Scheme != "dht" { + return nil, errInvalidScheme + } + + if parsed.Port() == "" { + // Port is mandatory because DHT bootstrap nodes use different ports + return nil, errInvalidPort + } + + valid_config := RuntimeConfig{ + dhtnode: parsed.Hostname(), + port: parsed.Port(), + infohash: hash, + } + + return &valid_config, nil +} + +// TestKeys contains the experiment results +type TestKeys struct { + Queries []*model.ArchivalDNSLookupResult `json:"queries"` + Runs []*IndividualTestKeys `json:"runs"` + // Used for global failure (DNS resolution) + Failure string `json:"failure"` + // Indicated global or individual run failure + Failed bool `json:"failed"` +} + +func (tk *TestKeys) global_failure(err error) { + tk.Failure = *tracex.NewFailure(err) + tk.Failed = true +} + +func (tk *TestKeys) compute_global_failure() { + if tk.Failure != "" { + tk.Failed = true + return + } + for _, itk := range(tk.Runs) { + if itk.Failure != "" { + tk.Failure = itk.Failure + tk.Failed = true + return + } + } +} + +// Results for a single IP/port combo DHT bootstrap node +// in case the DNS resolves to several IPs, or multiple bootstrap domains were used +type IndividualTestKeys struct { + // Logger, not exported to JSON + logger model.Logger + + // List of IP/port combos tried to boostrap DHT + BootstrapNodes []string `json:"bootstrap_nodes"` + // Number of DHT bootsrap nodes + BootstrapNum int `json:"bootstrap_num"` + // Number of DHT peers contacted + PeersTried uint32 `json:"peers_tried"` + // Number of DHT peers who answered + PeersResponded uint32 `json:"peers_responded"` + // Number of DHT peers found for specific requested infohash + InfohashPeers int `json:"infohash_peers"` + // Individual failure aborting the test run for this address/port combo + Failure string `json:"failure"` +} + +func (itk *IndividualTestKeys) error(err error) { + itk.Failure = *tracex.NewFailure(err) + itk.logger.Warn(itk.Failure) +} + +func NewITK(tk *TestKeys, log model.Logger) *IndividualTestKeys { + itk := new(IndividualTestKeys) + itk.logger = log + tk.Runs = append(tk.Runs, itk) + return itk +} + +type Measurer struct { + // Config contains the experiment settings. If empty we + // will be using default settings. + Config Config + + // Getter is an optional getter to be used for testing. + Getter urlgetter.MultiGetter +} + +// ExperimentName implements ExperimentMeasurer.ExperimentName +func (m Measurer) ExperimentName() string { + return testName +} + +// ExperimentVersion implements ExperimentMeasurer.ExperimentVersion +func (m Measurer) ExperimentVersion() string { + return testVersion +} + +func defaultDHTBoostrapNodes() []string { + return []string{ + "router.utorrent.com:6881", + "router.bittorrent.com:6881", + "dht.transmissionbt.com:6881", + "dht.aelitis.com:6881", + "router.silotis.us:6881", + "dht.libtorrent.org:25401", + "dht.anacrolix.link:42069", + "router.bittorrent.cloud:42069", + } +} + +func DHTServer(bootstrap_nodes []string, itk *IndividualTestKeys) (*dht.Server, bool) { + itk.BootstrapNodes = bootstrap_nodes + itk.BootstrapNum = len(bootstrap_nodes) + + // Starting new DHT client + dhtconf := dht.NewDefaultServerConfig() + dhtconf.QueryResendDelay = func() time.Duration { + return 5 * time.Second + } + + dhtconf.StartingNodes = func() (addrs []dht.Addr, err error) { + for _, addrport := range(bootstrap_nodes) { + udp_addr, err := net.ResolveUDPAddr("udp", addrport) + if err != nil { + return nil, err + } + addrs = append(addrs, dht.NewAddr(udp_addr)) + } + return addrs, nil + } + + dhtsrv, err := dht.NewServer(dhtconf) + if err != nil { + itk.error(err) + return nil, false + } + itk.logger.Infof("Finished starting DHT server with bootstrap nodes: %v", bootstrap_nodes) + return dhtsrv, true +} + +func TestDHTServer(dht *dht.Server, infohash [20]byte, bootstrap_nodes []string, itk *IndividualTestKeys) bool { + announce, err := dht.AnnounceTraversal(infohash) + if err != nil { + itk.error(err) + return false + } + defer announce.Close() + + counter := 0 + for entry := range announce.Peers { + counter += 1 + itk.logger.Debugf("peer %d: %s", counter, entry.NodeInfo.Addr) + } + + stats := announce.TraversalStats() + itk.PeersTried = stats.NumAddrsTried + itk.PeersResponded = stats.NumResponses + itk.InfohashPeers = counter + + if itk.PeersResponded == 0 { + itk.error(errors.New("No DHT peers were found")) + return false + } else { + itk.logger.Infof("Tried %d peers obtained from %d bootstrap nodes. Got response from %d. %d have requested infohash.", itk.PeersTried, itk.BootstrapNum, itk.PeersResponded, itk.InfohashPeers) + } + + return true + +} + +// Run implements ExperimentMeasurer.Run +func (m Measurer) Run( + ctx context.Context, sess model.ExperimentSession, + measurement *model.Measurement, callbacks model.ExperimentCallbacks, +) error { + log := sess.Logger() + trace := measurexlite.NewTrace(0, measurement.MeasurementStartTimeSaved) + + config, err := config(measurement.Input) + if err != nil { + // Invalid input data, we don't even generate report + return err + } + + tk := new(TestKeys) + measurement.TestKeys = tk + + ctx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + + // Turn string infohash into 20-bytes array + var infohash [20]byte + copy(infohash[:], config.infohash) + + resolver := trace.NewStdlibResolver(log) + + if config.dhtnode != "" { + // Specific node provided: resolve it + log.Infof("Resolving DNS for %s", config.dhtnode) + resolved_addrs, err := resolver.LookupHost(ctx, config.dhtnode) + tk.Queries = append(tk.Queries, trace.DNSLookupsFromRoundTrip()...) + if err != nil { + tk.global_failure(err) + return nil + } + log.Infof("Finished DNS for %s: %v", config.dhtnode, resolved_addrs) + + for _, addr := range(resolved_addrs) { + + node_addrport := net.JoinHostPort(addr, config.port) + log.Infof("Trying DHT bootstrap node %s", node_addrport) + node_addrports := []string{node_addrport} + + itk := NewITK(tk, log) + + dht, success := DHTServer(node_addrports, itk) + if ! success { + continue + } + + TestDHTServer(dht, infohash, node_addrports, itk) + } + } else { + // Use default DHT bootstrap nodes because none was given by input + resolved_addrports := []string{} + for _, bootstrap_domain := range(defaultDHTBoostrapNodes()) { + // Ignore error because we use static input so panic chance is 0 + host, port, _ := net.SplitHostPort(bootstrap_domain) + log.Infof("Resolving DNS for %s", host) + resolved_addrs, err := resolver.LookupHost(ctx, host) + tk.Queries = append(tk.Queries, trace.DNSLookupsFromRoundTrip()...) + if err != nil { + tk.global_failure(err) + return nil + } + log.Infof("Finished DNS for %s: %v", host, resolved_addrs) + for _, resolved_addr := range(resolved_addrs) { + resolved_addrports = append(resolved_addrports, net.JoinHostPort(resolved_addr, port)) + } + } + log.Infof("Resolved the following bootstrap nodes: %v", resolved_addrports) + + itk := NewITK(tk, log) + dht, success := DHTServer(resolved_addrports, itk) + if success { + TestDHTServer(dht, infohash, resolved_addrports, itk) + } + } + + tk.compute_global_failure() + + return nil +} + +// NewExperimentMeasurer creates a new ExperimentMeasurer. +func NewExperimentMeasurer(config Config) model.ExperimentMeasurer { + return Measurer{Config: config} +} + +// SummaryKeys contains summary keys for this experiment. +// +// Note that this structure is part of the ABI contract with ooniprobe +// therefore we should be careful when changing it. +type SummaryKeys struct { + IsAnomaly bool `json:"-"` +} + +// GetSummaryKeys implements model.ExperimentMeasurer.GetSummaryKeys. +func (m Measurer) GetSummaryKeys(measurement *model.Measurement) (interface{}, error) { + sk := SummaryKeys{IsAnomaly: false} + _, ok := measurement.TestKeys.(*TestKeys) + if !ok { + return sk, errors.New("invalid test keys type") + } + return sk, nil +}