ooni-probe-cli/internal/engine/session.go

684 lines
22 KiB
Go
Raw Normal View History

package engine
import (
"context"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"sync"
"github.com/ooni/probe-cli/v3/internal/engine/atomicx"
"github.com/ooni/probe-cli/v3/internal/engine/geolocate"
"github.com/ooni/probe-cli/v3/internal/engine/internal/platform"
"github.com/ooni/probe-cli/v3/internal/engine/internal/sessionresolver"
"github.com/ooni/probe-cli/v3/internal/engine/kvstore"
"github.com/ooni/probe-cli/v3/internal/engine/model"
"github.com/ooni/probe-cli/v3/internal/engine/netx"
"github.com/ooni/probe-cli/v3/internal/engine/netx/bytecounter"
"github.com/ooni/probe-cli/v3/internal/engine/probeservices"
"github.com/ooni/probe-cli/v3/internal/engine/tunnel"
"github.com/ooni/probe-cli/v3/internal/version"
)
// SessionConfig contains the Session config
type SessionConfig struct {
AvailableProbeServices []model.Service
KVStore KVStore
Logger model.Logger
ProxyURL *url.URL
SoftwareName string
SoftwareVersion string
TempDir string
TorArgs []string
TorBinary string
// TunnelDir is the directory where we should store
// the state of persistent tunnels. This field is
// optional _unless_ you want to use tunnels. In such
// case, starting a tunnel will fail because there
// is no directory where to store state.
TunnelDir string
}
// Session is a measurement session. It contains shared information
// required to run a measurement session, and it controls the lifecycle
// of such resources. It is not possible to reuse a Session. You MUST
// NOT attempt to use a Session again after Session.Close.
type Session struct {
availableProbeServices []model.Service
availableTestHelpers map[string][]model.Service
byteCounter *bytecounter.Counter
httpDefaultTransport netx.HTTPRoundTripper
kvStore model.KeyValueStore
location *geolocate.Results
logger model.Logger
proxyURL *url.URL
queryProbeServicesCount *atomicx.Int64
resolver *sessionresolver.Resolver
selectedProbeServiceHook func(*model.Service)
selectedProbeService *model.Service
softwareName string
softwareVersion string
tempDir string
// closeOnce allows us to call Close just once.
closeOnce sync.Once
// mu provides mutual exclusion.
mu sync.Mutex
// testLookupLocationContext is a an optional hook for testing
// allowing us to mock LookupLocationContext.
testLookupLocationContext func(ctx context.Context) (*geolocate.Results, error)
// testMaybeLookupBackendsContext is an optional hook for testing
// allowing us to mock MaybeLookupBackendsContext.
testMaybeLookupBackendsContext func(ctx context.Context) error
// testMaybeLookupLocationContext is an optional hook for testing
// allowing us to mock MaybeLookupLocationContext.
testMaybeLookupLocationContext func(ctx context.Context) error
// testNewProbeServicesClientForCheckIn is an optional hook for testing
// allowing us to mock NewProbeServicesClient when calling CheckIn.
testNewProbeServicesClientForCheckIn func(ctx context.Context) (
sessionProbeServicesClientForCheckIn, error)
// torArgs contains the optional arguments for tor that we may need
// to pass to urlgetter when it uses a tor tunnel.
torArgs []string
// torBinary contains the optional path to the tor binary that we
// may need to pass to urlgetter when it uses a tor tunnel.
torBinary string
// tunnel is the optional tunnel that we may be using. It is created
// by NewSession and it is cleaned up by Close.
tunnel tunnel.Tunnel
}
// sessionProbeServicesClientForCheckIn returns the probe services
// client that we should be using for performing the check-in.
type sessionProbeServicesClientForCheckIn interface {
CheckIn(ctx context.Context, config model.CheckInConfig) (*model.CheckInInfo, error)
}
// NewSession creates a new session. This factory function will
// execute the following steps:
//
// 1. Make sure the config is sane, apply reasonable defaults
// where possible, otherwise return an error.
//
// 2. Create a temporary directory.
//
// 3. Create an instance of the session.
//
// 4. If the user requested for a proxy that entails a tunnel (at the
// moment of writing this note, either psiphon or tor), then start the
// requested tunnel and configure it as our proxy.
//
// 5. Create a compound resolver for the session that will attempt
// to use a bunch of DoT/DoH servers before falling back to the system
// resolver if nothing else works (see the sessionresolver pkg). This
// sessionresolver will be using the configured proxy, if any.
//
// 6. Create the default HTTP transport that we should be using when
// we communicate with the OONI backends. This transport will be
// using the configured proxy, if any.
//
// If any of these steps fails, then we cannot create a measurement
// session and we return an error.
func NewSession(ctx context.Context, config SessionConfig) (*Session, error) {
if config.Logger == nil {
return nil, errors.New("Logger is empty")
}
if config.SoftwareName == "" {
return nil, errors.New("SoftwareName is empty")
}
if config.SoftwareVersion == "" {
return nil, errors.New("SoftwareVersion is empty")
}
if config.KVStore == nil {
config.KVStore = kvstore.NewMemoryKeyValueStore()
}
// Implementation note: if config.TempDir is empty, then Go will
// use the temporary directory on the current system. This should
// work on Desktop. We tested that it did also work on iOS, but
// we have also seen on 2020-06-10 that it does not work on Android.
tempDir, err := ioutil.TempDir(config.TempDir, "ooniengine")
if err != nil {
return nil, err
}
sess := &Session{
availableProbeServices: config.AvailableProbeServices,
byteCounter: bytecounter.New(),
kvStore: config.KVStore,
logger: config.Logger,
queryProbeServicesCount: atomicx.NewInt64(),
softwareName: config.SoftwareName,
softwareVersion: config.SoftwareVersion,
tempDir: tempDir,
torArgs: config.TorArgs,
torBinary: config.TorBinary,
}
proxyURL := config.ProxyURL
if proxyURL != nil {
switch proxyURL.Scheme {
case "psiphon", "tor", "fake":
config.Logger.Infof(
"starting '%s' tunnel; please be patient...", proxyURL.Scheme)
tunnel, err := tunnel.Start(ctx, &tunnel.Config{
Name: proxyURL.Scheme,
Session: &sessionTunnelEarlySession{},
TorArgs: config.TorArgs,
TorBinary: config.TorBinary,
TunnelDir: config.TunnelDir,
})
if err != nil {
return nil, err
}
config.Logger.Infof("tunnel '%s' running...", proxyURL.Scheme)
sess.tunnel = tunnel
proxyURL = tunnel.SOCKS5ProxyURL()
}
}
sess.proxyURL = proxyURL
httpConfig := netx.Config{
ByteCounter: sess.byteCounter,
BogonIsError: true,
Logger: sess.logger,
ProxyURL: proxyURL,
}
sess.resolver = &sessionresolver.Resolver{
ByteCounter: sess.byteCounter,
KVStore: config.KVStore,
Logger: sess.logger,
ProxyURL: proxyURL,
}
httpConfig.FullResolver = sess.resolver
sess.httpDefaultTransport = netx.NewHTTPTransport(httpConfig)
return sess, nil
}
// KibiBytesReceived accounts for the KibiBytes received by the HTTP clients
// managed by this session so far, including experiments.
func (s *Session) KibiBytesReceived() float64 {
return s.byteCounter.KibiBytesReceived()
}
// KibiBytesSent is like KibiBytesReceived but for the bytes sent.
func (s *Session) KibiBytesSent() float64 {
return s.byteCounter.KibiBytesSent()
}
// CheckIn calls the check-in API. The input arguments MUST NOT
// be nil. Before querying the API, this function will ensure
// that the config structure does not contain any field that
// SHOULD be initialized and is not initialized. Whenever there
// is a field that is not initialized, we will attempt to set
// a reasonable default value for such a field. This list describes
// the current defaults we'll choose:
//
// - Platform: if empty, set to Session.Platform();
//
// - ProbeASN: if empty, set to Session.ProbeASNString();
//
// - ProbeCC: if empty, set to Session.ProbeCC();
//
// - RunType: if empty, set to "timed";
//
// - SoftwareName: if empty, set to Session.SoftwareName();
//
// - SoftwareVersion: if empty, set to Session.SoftwareVersion();
//
// - WebConnectivity.CategoryCodes: if nil, we will allocate
// an empty array (the API does not like nil).
//
// Because we MAY need to know the current ASN and CC, this
// function MAY call MaybeLookupLocationContext.
//
// The return value is either the check-in response or an error.
func (s *Session) CheckIn(
ctx context.Context, config *model.CheckInConfig) (*model.CheckInInfo, error) {
if err := s.maybeLookupLocationContext(ctx); err != nil {
return nil, err
}
client, err := s.newProbeServicesClientForCheckIn(ctx)
if err != nil {
return nil, err
}
if config.Platform == "" {
config.Platform = s.Platform()
}
if config.ProbeASN == "" {
config.ProbeASN = s.ProbeASNString()
}
if config.ProbeCC == "" {
config.ProbeCC = s.ProbeCC()
}
if config.RunType == "" {
config.RunType = "timed" // most conservative choice
}
if config.SoftwareName == "" {
config.SoftwareName = s.SoftwareName()
}
if config.SoftwareVersion == "" {
config.SoftwareVersion = s.SoftwareVersion()
}
if config.WebConnectivity.CategoryCodes == nil {
config.WebConnectivity.CategoryCodes = []string{}
}
return client.CheckIn(ctx, *config)
}
// maybeLookupLocationContext is a wrapper for MaybeLookupLocationContext that calls
// the configurable testMaybeLookupLocationContext mock, if configured, and the
// real MaybeLookupLocationContext API otherwise.
func (s *Session) maybeLookupLocationContext(ctx context.Context) error {
if s.testMaybeLookupLocationContext != nil {
return s.testMaybeLookupLocationContext(ctx)
}
return s.MaybeLookupLocationContext(ctx)
}
// newProbeServicesClientForCheckIn is a wrapper for NewProbeServicesClientForCheckIn
// that calls the configurable testNewProbeServicesClientForCheckIn mock, if
// configured, and the real NewProbeServicesClient API otherwise.
func (s *Session) newProbeServicesClientForCheckIn(
ctx context.Context) (sessionProbeServicesClientForCheckIn, error) {
if s.testNewProbeServicesClientForCheckIn != nil {
return s.testNewProbeServicesClientForCheckIn(ctx)
}
client, err := s.NewProbeServicesClient(ctx)
if err != nil {
return nil, err
}
return client, nil
}
// Close ensures that we close all the idle connections that the HTTP clients
// we are currently using may have created. It will also remove the temp dir
// that contains data from this session. Not calling this function may likely
// cause memory leaks in your application because of open idle connections,
// as well as excessive usage of disk space.
func (s *Session) Close() error {
s.closeOnce.Do(s.doClose)
return nil
}
// doClose implements Close. This function is called just once.
func (s *Session) doClose() {
s.httpDefaultTransport.CloseIdleConnections()
s.resolver.CloseIdleConnections()
s.logger.Infof("%s", s.resolver.Stats())
if s.tunnel != nil {
s.tunnel.Stop()
}
_ = os.RemoveAll(s.tempDir)
}
// GetTestHelpersByName returns the available test helpers that
// use the specified name, or false if there's none.
func (s *Session) GetTestHelpersByName(name string) ([]model.Service, bool) {
defer s.mu.Unlock()
s.mu.Lock()
services, ok := s.availableTestHelpers[name]
return services, ok
}
// DefaultHTTPClient returns the session's default HTTP client.
func (s *Session) DefaultHTTPClient() *http.Client {
return &http.Client{Transport: s.httpDefaultTransport}
}
// FetchTorTargets fetches tor targets from the API.
func (s *Session) FetchTorTargets(
ctx context.Context, cc string) (map[string]model.TorTarget, error) {
clnt, err := s.NewOrchestraClient(ctx)
if err != nil {
return nil, err
}
return clnt.FetchTorTargets(ctx, cc)
}
// FetchURLList fetches the URL list from the API.
func (s *Session) FetchURLList(
ctx context.Context, config model.URLListConfig) ([]model.URLInfo, error) {
clnt, err := s.NewOrchestraClient(ctx)
if err != nil {
return nil, err
}
return clnt.FetchURLList(ctx, config)
}
// KeyValueStore returns the configured key-value store.
func (s *Session) KeyValueStore() model.KeyValueStore {
return s.kvStore
}
// Logger returns the logger used by the session.
func (s *Session) Logger() model.Logger {
return s.logger
}
// MaybeLookupLocation is a caching location lookup call.
func (s *Session) MaybeLookupLocation() error {
return s.MaybeLookupLocationContext(context.Background())
}
// MaybeLookupBackends is a caching OONI backends lookup call.
func (s *Session) MaybeLookupBackends() error {
return s.MaybeLookupBackendsContext(context.Background())
}
// ErrAlreadyUsingProxy indicates that we cannot create a tunnel with
// a specific name because we already configured a proxy.
var ErrAlreadyUsingProxy = errors.New(
"session: cannot create a new tunnel of this kind: we are already using a proxy",
)
// NewExperimentBuilder returns a new experiment builder
// for the experiment with the given name, or an error if
// there's no such experiment with the given name
func (s *Session) NewExperimentBuilder(name string) (*ExperimentBuilder, error) {
return newExperimentBuilder(s, name)
}
// NewProbeServicesClient creates a new client for talking with the
// OONI probe services. This function will benchmark the available
// probe services, and select the fastest. In case all probe services
// seem to be down, we try again applying circumvention tactics.
// This function will fail IMMEDIATELY if given a cancelled context.
func (s *Session) NewProbeServicesClient(ctx context.Context) (*probeservices.Client, error) {
if ctx.Err() != nil {
return nil, ctx.Err() // helps with testing
}
if err := s.maybeLookupBackendsContext(ctx); err != nil {
return nil, err
}
if err := s.maybeLookupLocationContext(ctx); err != nil {
return nil, err
}
if s.selectedProbeServiceHook != nil {
s.selectedProbeServiceHook(s.selectedProbeService)
}
return probeservices.NewClient(s, *s.selectedProbeService)
}
// NewSubmitter creates a new submitter instance.
func (s *Session) NewSubmitter(ctx context.Context) (Submitter, error) {
psc, err := s.NewProbeServicesClient(ctx)
if err != nil {
return nil, err
}
return probeservices.NewSubmitter(psc, s.Logger()), nil
}
// NewOrchestraClient creates a new orchestra client. This client is registered
// and logged in with the OONI orchestra. An error is returned on failure.
//
// This function is DEPRECATED. New code SHOULD NOT use it. It will eventually
// be made private or entirely removed from the codebase.
func (s *Session) NewOrchestraClient(ctx context.Context) (*probeservices.Client, error) {
clnt, err := s.NewProbeServicesClient(ctx)
if err != nil {
return nil, err
}
return s.initOrchestraClient(ctx, clnt, clnt.MaybeLogin)
}
// Platform returns the current platform. The platform is one of:
//
// - android
// - ios
// - linux
// - macos
// - windows
// - unknown
//
// When running on the iOS simulator, the returned platform is
// macos rather than ios if CGO is disabled. This is a known issue,
// that however should have a very limited impact.
func (s *Session) Platform() string {
return platform.Name()
}
// ProbeASNString returns the probe ASN as a string.
func (s *Session) ProbeASNString() string {
return fmt.Sprintf("AS%d", s.ProbeASN())
}
// ProbeASN returns the probe ASN as an integer.
func (s *Session) ProbeASN() uint {
defer s.mu.Unlock()
s.mu.Lock()
asn := geolocate.DefaultProbeASN
if s.location != nil {
asn = s.location.ASN
}
return asn
}
// ProbeCC returns the probe CC.
func (s *Session) ProbeCC() string {
defer s.mu.Unlock()
s.mu.Lock()
cc := geolocate.DefaultProbeCC
if s.location != nil {
cc = s.location.CountryCode
}
return cc
}
// ProbeNetworkName returns the probe network name.
func (s *Session) ProbeNetworkName() string {
defer s.mu.Unlock()
s.mu.Lock()
nn := geolocate.DefaultProbeNetworkName
if s.location != nil {
nn = s.location.NetworkName
}
return nn
}
// ProbeIP returns the probe IP.
func (s *Session) ProbeIP() string {
defer s.mu.Unlock()
s.mu.Lock()
ip := geolocate.DefaultProbeIP
if s.location != nil {
ip = s.location.ProbeIP
}
return ip
}
// ProxyURL returns the Proxy URL, or nil if not set
func (s *Session) ProxyURL() *url.URL {
return s.proxyURL
}
// ResolverASNString returns the resolver ASN as a string
func (s *Session) ResolverASNString() string {
return fmt.Sprintf("AS%d", s.ResolverASN())
}
// ResolverASN returns the resolver ASN
func (s *Session) ResolverASN() uint {
defer s.mu.Unlock()
s.mu.Lock()
asn := geolocate.DefaultResolverASN
if s.location != nil {
asn = s.location.ResolverASN
}
return asn
}
// ResolverIP returns the resolver IP
func (s *Session) ResolverIP() string {
defer s.mu.Unlock()
s.mu.Lock()
ip := geolocate.DefaultResolverIP
if s.location != nil {
ip = s.location.ResolverIP
}
return ip
}
// ResolverNetworkName returns the resolver network name.
func (s *Session) ResolverNetworkName() string {
defer s.mu.Unlock()
s.mu.Lock()
nn := geolocate.DefaultResolverNetworkName
if s.location != nil {
nn = s.location.ResolverNetworkName
}
return nn
}
// SoftwareName returns the application name.
func (s *Session) SoftwareName() string {
return s.softwareName
}
// SoftwareVersion returns the application version.
func (s *Session) SoftwareVersion() string {
return s.softwareVersion
}
// TempDir returns the temporary directory.
func (s *Session) TempDir() string {
return s.tempDir
}
// TorArgs returns the configured extra args for the tor binary. If not set
// we will not pass in any extra arg. Applies to `-OTunnel=tor` mainly.
func (s *Session) TorArgs() []string {
return s.torArgs
}
// TorBinary returns the configured path to the tor binary. If not set
// we will attempt to use "tor". Applies to `-OTunnel=tor` mainly.
func (s *Session) TorBinary() string {
return s.torBinary
}
// UserAgent constructs the user agent to be used in this session.
func (s *Session) UserAgent() (useragent string) {
useragent += s.softwareName + "/" + s.softwareVersion
useragent += " ooniprobe-engine/" + version.Version
return
}
// getAvailableProbeServicesUnlocked returns the available probe
// services. This function WILL NOT acquire the mu mutex, therefore,
// you MUST ensure you are using it from a locked context.
func (s *Session) getAvailableProbeServicesUnlocked() []model.Service {
if len(s.availableProbeServices) > 0 {
return s.availableProbeServices
}
return probeservices.Default()
}
func (s *Session) initOrchestraClient(
ctx context.Context, clnt *probeservices.Client,
maybeLogin func(ctx context.Context) error,
) (*probeservices.Client, error) {
// The original implementation has as its only use case that we
// were registering and logging in for sending an update regarding
// the probe whereabouts. Yet here in probe-engine, the orchestra
// is currently only used to fetch inputs. For this purpose, we don't
// need to communicate any specific information. The code that will
// perform an update used to be responsible of doing that. Now, we
// are not using orchestra for this purpose anymore.
meta := probeservices.Metadata{
Platform: "miniooni",
ProbeASN: "AS0",
ProbeCC: "ZZ",
SoftwareName: "miniooni",
SoftwareVersion: "0.1.0-dev",
SupportedTests: []string{"web_connectivity"},
}
if err := clnt.MaybeRegister(ctx, meta); err != nil {
return nil, err
}
if err := maybeLogin(ctx); err != nil {
return nil, err
}
return clnt, nil
}
// ErrAllProbeServicesFailed indicates all probe services failed.
var ErrAllProbeServicesFailed = errors.New("all available probe services failed")
// maybeLookupBackendsContext uses testMaybeLookupBackendsContext if
// not nil, otherwise it calls MaybeLookupBackendsContext.
func (s *Session) maybeLookupBackendsContext(ctx context.Context) error {
if s.testMaybeLookupBackendsContext != nil {
return s.testMaybeLookupBackendsContext(ctx)
}
return s.MaybeLookupBackendsContext(ctx)
}
// MaybeLookupBackendsContext is like MaybeLookupBackends but with context.
func (s *Session) MaybeLookupBackendsContext(ctx context.Context) error {
defer s.mu.Unlock()
s.mu.Lock()
if s.selectedProbeService != nil {
return nil
}
s.queryProbeServicesCount.Add(1)
candidates := probeservices.TryAll(ctx, s, s.getAvailableProbeServicesUnlocked())
selected := probeservices.SelectBest(candidates)
if selected == nil {
return ErrAllProbeServicesFailed
}
s.logger.Infof("session: using probe services: %+v", selected.Endpoint)
s.selectedProbeService = &selected.Endpoint
s.availableTestHelpers = selected.TestHelpers
return nil
}
// LookupLocationContext performs a location lookup. If you want memoisation
// of the results, you should use MaybeLookupLocationContext.
func (s *Session) LookupLocationContext(ctx context.Context) (*geolocate.Results, error) {
task := geolocate.Must(geolocate.NewTask(geolocate.Config{
Logger: s.Logger(),
Resolver: s.resolver,
UserAgent: s.UserAgent(),
}))
return task.Run(ctx)
}
// lookupLocationContext calls testLookupLocationContext if set and
// otherwise calls LookupLocationContext.
func (s *Session) lookupLocationContext(ctx context.Context) (*geolocate.Results, error) {
if s.testLookupLocationContext != nil {
return s.testLookupLocationContext(ctx)
}
return s.LookupLocationContext(ctx)
}
// MaybeLookupLocationContext is like MaybeLookupLocation but with a context
// that can be used to interrupt this long running operation. This function
// will fail IMMEDIATELY if given a cancelled context.
func (s *Session) MaybeLookupLocationContext(ctx context.Context) error {
if ctx.Err() != nil {
return ctx.Err() // helps with testing
}
defer s.mu.Unlock()
s.mu.Lock()
if s.location == nil {
location, err := s.lookupLocationContext(ctx)
if err != nil {
return err
}
s.location = location
}
return nil
}
var _ model.ExperimentSession = &Session{}