feat: create tunnel inside NewSession (#286)

* feat: create tunnel inside NewSession

We want to create the tunnel when we create the session. This change
allows us to nicely ignore the problem of creating a tunnel when we
already have a proxy, as well as the problem of locking. Everything is
happening, in fact, inside of the NewSession factory.

Modify miniooni such that --tunnel is just syntactic sugar for
--proxy, at least for now. We want, in the future, to teach the
tunnel to possibly use a socks5 proxy.

Because starting a tunnel is a slow operation, we need a context in
NewSession. This causes a bunch of places to change. Not really a big
deal except we need to propagate the changes.

Make sure that the mobile code can create a new session using a
proxy for all the APIs we support.

Make sure all tests are still green and we don't loose coverage of
the various ways in which this code could be used.

This change is part of https://github.com/ooni/probe/issues/985.

* changes after merge

* fix: only keep tests that can hopefully work

While there, identify other places where we should add more
tests or fix integration tests.

Part of https://github.com/ooni/probe/issues/985
This commit is contained in:
Simone Basso
2021-04-05 15:28:13 +02:00
committed by GitHub
parent a849213b59
commit c5ad5eedeb
18 changed files with 135 additions and 223 deletions
@@ -221,7 +221,7 @@ func TestComputeEndpointStatsDNSIsLying(t *testing.T) {
}
func newsession(t *testing.T) model.ExperimentSession {
sess, err := engine.NewSession(engine.SessionConfig{
sess, err := engine.NewSession(context.Background(), engine.SessionConfig{
AvailableProbeServices: []model.Service{{
Address: "https://ams-pg-test.ooni.org",
Type: "https",
+1 -1
View File
@@ -558,7 +558,7 @@ func TestTransactCannotReadBody(t *testing.T) {
}
func newsession(t *testing.T) model.ExperimentSession {
sess, err := engine.NewSession(engine.SessionConfig{
sess, err := engine.NewSession(context.Background(), engine.SessionConfig{
AvailableProbeServices: []model.Service{{
Address: "https://ams-pg-test.ooni.org",
Type: "https",
@@ -82,6 +82,10 @@ func (g Getter) Get(ctx context.Context) (TestKeys, error) {
return tk, err
}
// TODO(bassosimone): this mechanism where we count breaks tests
// because now tests are not idempotent anymore. Therefore, we
// SHOULD be creating a temporary directory instead.
var (
// tunnelDirCount counts the number of tunnels started by
// the urlgetter package so far.
@@ -654,6 +654,9 @@ func TestGetterIntegrationHTTPSWithTunnel(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
// TODO(bassosimone): this test is broken. It now requires a
// real Session to work as intended. We didn't notice until now
// because integration tests do not run for every PR.
ctx := context.Background()
g := urlgetter.Getter{
Config: urlgetter.Config{
@@ -201,7 +201,7 @@ func TestMeasureWithNoAvailableTestHelpers(t *testing.T) {
}
func newsession(t *testing.T, lookupBackends bool) model.ExperimentSession {
sess, err := engine.NewSession(engine.SessionConfig{
sess, err := engine.NewSession(context.Background(), engine.SessionConfig{
AvailableProbeServices: []model.Service{{
Address: "https://ams-pg-test.ooni.org",
Type: "https",
+1 -1
View File
@@ -14,7 +14,7 @@ func TestInputLoaderInputOrQueryBackendWithNoInput(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess, err := engine.NewSession(engine.SessionConfig{
sess, err := engine.NewSession(context.Background(), engine.SessionConfig{
AvailableProbeServices: []model.Service{{
Address: "https://ams-pg-test.ooni.org/",
Type: "https",
+1 -1
View File
@@ -236,7 +236,7 @@ func TestInputLoaderInputOrQueryBackendWithInput(t *testing.T) {
}
func TestInputLoaderInputOrQueryBackendWithNoInputAndCancelledContext(t *testing.T) {
sess, err := NewSession(SessionConfig{
sess, err := NewSession(context.Background(), SessionConfig{
KVStore: kvstore.NewMemoryKeyValueStore(),
Logger: log.Log,
SoftwareName: "miniooni",
+59 -69
View File
@@ -63,12 +63,6 @@ type Session struct {
softwareName string
softwareVersion string
tempDir string
torArgs []string
torBinary string
tunnelDir string
tunnelMu sync.Mutex
tunnelName string
tunnel tunnel.Tunnel
// closeOnce allows us to call Close just once.
closeOnce sync.Once
@@ -92,6 +86,18 @@ type Session struct {
// allowing us to mock NewProbeServicesClient when calling CheckIn.
testNewProbeServicesClientForCheckIn func(ctx context.Context) (
sessionProbeServicesClientForCheckIn, error)
// torArgs contains the optional arguments for tor that we may need
// to pass to urlgetter when it uses a tor tunnel.
torArgs []string
// torBinary contains the optional path to the tor binary that we
// may need to pass to urlgetter when it uses a tor tunnel.
torBinary string
// tunnel is the optional tunnel that we may be using. It is created
// by NewSession and it is cleaned up by Close.
tunnel tunnel.Tunnel
}
// sessionProbeServicesClientForCheckIn returns the probe services
@@ -100,8 +106,32 @@ type sessionProbeServicesClientForCheckIn interface {
CheckIn(ctx context.Context, config model.CheckInConfig) (*model.CheckInInfo, error)
}
// NewSession creates a new session or returns an error
func NewSession(config SessionConfig) (*Session, error) {
// NewSession creates a new session. This factory function will
// execute the following steps:
//
// 1. Make sure the config is sane, apply reasonable defaults
// where possible, otherwise return an error.
//
// 2. Create a temporary directory.
//
// 3. Create an instance of the session.
//
// 4. If the user requested for a proxy that entails a tunnel (at the
// moment of writing this note, either psiphon or tor), then start the
// requested tunnel and configure it as our proxy.
//
// 5. Create a compound resolver for the session that will attempt
// to use a bunch of DoT/DoH servers before falling back to the system
// resolver if nothing else works (see the sessionresolver pkg). This
// sessionresolver will be using the configured proxy, if any.
//
// 6. Create the default HTTP transport that we should be using when
// we communicate with the OONI backends. This transport will be
// using the configured proxy, if any.
//
// If any of these steps fails, then we cannot create a measurement
// session and we return an error.
func NewSession(ctx context.Context, config SessionConfig) (*Session, error) {
if config.Logger == nil {
return nil, errors.New("Logger is empty")
}
@@ -127,26 +157,43 @@ func NewSession(config SessionConfig) (*Session, error) {
byteCounter: bytecounter.New(),
kvStore: config.KVStore,
logger: config.Logger,
proxyURL: config.ProxyURL,
queryProbeServicesCount: atomicx.NewInt64(),
softwareName: config.SoftwareName,
softwareVersion: config.SoftwareVersion,
tempDir: tempDir,
torArgs: config.TorArgs,
torBinary: config.TorBinary,
tunnelDir: config.TunnelDir,
}
proxyURL := config.ProxyURL
if proxyURL != nil {
switch proxyURL.Scheme {
case "psiphon", "tor":
tunnel, err := tunnel.Start(ctx, &tunnel.Config{
Name: proxyURL.Scheme,
Session: &sessionTunnelEarlySession{},
TorArgs: config.TorArgs,
TorBinary: config.TorBinary,
TunnelDir: config.TunnelDir,
})
if err != nil {
return nil, err
}
sess.tunnel = tunnel
proxyURL = tunnel.SOCKS5ProxyURL()
}
}
sess.proxyURL = proxyURL
httpConfig := netx.Config{
ByteCounter: sess.byteCounter,
BogonIsError: true,
Logger: sess.logger,
ProxyURL: config.ProxyURL,
ProxyURL: proxyURL,
}
sess.resolver = &sessionresolver.Resolver{
ByteCounter: sess.byteCounter,
KVStore: config.KVStore,
Logger: sess.logger,
ProxyURL: config.ProxyURL,
ProxyURL: proxyURL,
}
httpConfig.FullResolver = sess.resolver
sess.httpDefaultTransport = netx.NewHTTPTransport(httpConfig)
@@ -330,63 +377,6 @@ var ErrAlreadyUsingProxy = errors.New(
"session: cannot create a new tunnel of this kind: we are already using a proxy",
)
// MaybeStartTunnel starts the requested tunnel.
//
// This function silently succeeds if we're already using a tunnel with
// the same name or if the requested tunnel name is the empty string. This
// function fails, tho, when we already have a proxy or a tunnel with
// another name and we try to open a tunnel. This function of course also
// fails if we cannot start the requested tunnel. All in all, if you request
// for a tunnel name that is not the empty string and you get a nil error,
// you can be confident that session.ProxyURL() gives you the tunnel URL.
//
// The tunnel will be closed by session.Close().
func (s *Session) MaybeStartTunnel(ctx context.Context, name string) error {
// TODO(bassosimone): see if we can unify tunnelMu and mu.
s.tunnelMu.Lock()
defer s.tunnelMu.Unlock()
if name == "" {
// There is no point in continuing if we know
// we don't need to do anything.
return nil
}
if s.tunnel != nil && s.tunnelName == name {
// We've been asked more than once to start the same tunnel.
return nil
}
if s.proxyURL != nil && name == "" {
// The user configured a proxy and here we're not actually trying
// to start any tunnel since `name` is empty.
// TODO(bassosimone): this if branch is probably useless now
// because we stop above when name is "".
return nil
}
if s.proxyURL != nil || s.tunnel != nil {
// We already have a proxy or we have a different tunnel. Because a tunnel
// sets a proxy, the second check for s.tunnel is for robustness.
return ErrAlreadyUsingProxy
}
s.logger.Infof("starting '%s' tunnel; please be patient...", name)
tunnel, err := tunnel.Start(ctx, &tunnel.Config{
Name: name,
Session: &sessionTunnelEarlySession{},
TorArgs: s.TorArgs(),
TorBinary: s.TorBinary(),
TunnelDir: s.tunnelDir,
})
if err != nil {
return err
}
// Implementation note: tunnel _may_ be NIL here if name is ""
if tunnel == nil {
return nil
}
s.tunnelName = name
s.tunnel = tunnel
s.proxyURL = tunnel.SOCKS5ProxyURL()
return nil
}
// NewExperimentBuilder returns a new experiment builder
// for the experiment with the given name, or an error if
// there's no such experiment with the given name
+9 -128
View File
@@ -75,7 +75,7 @@ func TestNewSessionBuilderGood(t *testing.T) {
}
func newSessionMustFail(t *testing.T, config SessionConfig) {
sess, err := NewSession(config)
sess, err := NewSession(context.Background(), config)
if err == nil {
t.Fatal("expected an error here")
}
@@ -88,7 +88,7 @@ func TestSessionTorArgsTorBinary(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess, err := NewSession(SessionConfig{
sess, err := NewSession(context.Background(), SessionConfig{
AvailableProbeServices: []model.Service{{
Address: "https://ams-pg-test.ooni.org",
Type: "https",
@@ -120,7 +120,7 @@ func TestSessionTorArgsTorBinary(t *testing.T) {
}
func newSessionForTestingNoLookupsWithProxyURL(t *testing.T, URL *url.URL) *Session {
sess, err := NewSession(SessionConfig{
sess, err := NewSession(context.Background(), SessionConfig{
AvailableProbeServices: []model.Service{{
Address: "https://ams-pg-test.ooni.org",
Type: "https",
@@ -336,7 +336,7 @@ func TestGetAvailableProbeServices(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess, err := NewSession(SessionConfig{
sess, err := NewSession(context.Background(), SessionConfig{
Logger: model.DiscardLogger,
SoftwareName: "ooniprobe-engine",
SoftwareVersion: "0.0.1",
@@ -356,7 +356,7 @@ func TestMaybeLookupBackendsFailure(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess, err := NewSession(SessionConfig{
sess, err := NewSession(context.Background(), SessionConfig{
Logger: model.DiscardLogger,
SoftwareName: "ooniprobe-engine",
SoftwareVersion: "0.0.1",
@@ -377,7 +377,7 @@ func TestMaybeLookupTestHelpersIdempotent(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess, err := NewSession(SessionConfig{
sess, err := NewSession(context.Background(), SessionConfig{
Logger: model.DiscardLogger,
SoftwareName: "ooniprobe-engine",
SoftwareVersion: "0.0.1",
@@ -402,7 +402,7 @@ func TestAllProbeServicesUnsupported(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess, err := NewSession(SessionConfig{
sess, err := NewSession(context.Background(), SessionConfig{
Logger: model.DiscardLogger,
SoftwareName: "ooniprobe-engine",
SoftwareVersion: "0.0.1",
@@ -421,127 +421,8 @@ func TestAllProbeServicesUnsupported(t *testing.T) {
}
}
func TestStartTunnelGood(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess := newSessionForTestingNoLookups(t)
defer sess.Close()
ctx := context.Background()
if err := sess.MaybeStartTunnel(ctx, "psiphon"); err != nil {
t.Fatal(err)
}
if err := sess.MaybeStartTunnel(ctx, "psiphon"); err != nil {
t.Fatal(err) // check twice, must be idempotent
}
if sess.ProxyURL() == nil {
t.Fatal("expected non-nil ProxyURL")
}
}
func TestStartTunnelNonexistent(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess := newSessionForTestingNoLookups(t)
defer sess.Close()
ctx := context.Background()
if err := sess.MaybeStartTunnel(ctx, "antani"); err.Error() != "unsupported tunnel" {
t.Fatal("not the error we expected")
}
if sess.ProxyURL() != nil {
t.Fatal("expected nil ProxyURL")
}
}
func TestStartTunnelEmptyString(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess := newSessionForTestingNoLookups(t)
defer sess.Close()
ctx := context.Background()
if sess.MaybeStartTunnel(ctx, "") != nil {
t.Fatal("expected no error here")
}
if sess.ProxyURL() != nil {
t.Fatal("expected nil ProxyURL")
}
}
func TestStartTunnelEmptyStringWithProxy(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
proxyURL := &url.URL{Scheme: "socks5", Host: "127.0.0.1:9050"}
sess := newSessionForTestingNoLookups(t)
sess.proxyURL = proxyURL
defer sess.Close()
ctx := context.Background()
if sess.MaybeStartTunnel(ctx, "") != nil {
t.Fatal("expected no error here")
}
diff := cmp.Diff(proxyURL, sess.ProxyURL())
if diff != "" {
t.Fatal(diff)
}
}
func TestStartTunnelWithAlreadyExistingTunnel(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess := newSessionForTestingNoLookups(t)
defer sess.Close()
ctx := context.Background()
if sess.MaybeStartTunnel(ctx, "psiphon") != nil {
t.Fatal("expected no error here")
}
prev := sess.ProxyURL()
err := sess.MaybeStartTunnel(ctx, "tor")
if !errors.Is(err, ErrAlreadyUsingProxy) {
t.Fatal("expected another error here")
}
cur := sess.ProxyURL()
diff := cmp.Diff(prev, cur)
if diff != "" {
t.Fatal(diff)
}
}
func TestStartTunnelWithAlreadyExistingProxy(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess := newSessionForTestingNoLookups(t)
defer sess.Close()
ctx := context.Background()
orig := &url.URL{Scheme: "socks5", Host: "[::1]:9050"}
sess.proxyURL = orig
err := sess.MaybeStartTunnel(ctx, "psiphon")
if !errors.Is(err, ErrAlreadyUsingProxy) {
t.Fatal("expected another error here")
}
cur := sess.ProxyURL()
diff := cmp.Diff(orig, cur)
if diff != "" {
t.Fatal(diff)
}
}
func TestStartTunnelCanceledContext(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess := newSessionForTestingNoLookups(t)
defer sess.Close()
ctx, cancel := context.WithCancel(context.Background())
cancel() // immediately cancel
err := sess.MaybeStartTunnel(ctx, "psiphon")
if !errors.Is(err, context.Canceled) {
t.Fatal("not the error we expected")
}
}
// TODO(bassosimone): we should write unit/integration tests
// for the new way in which tunnels work.
func TestUserAgentNoProxy(t *testing.T) {
if testing.Short() {
+3 -2
View File
@@ -15,7 +15,7 @@ func TestPsiphonStartWithCancelledContext(t *testing.T) {
// can move it inside of the internal tests.
ctx, cancel := context.WithCancel(context.Background())
cancel() // fail immediately
sess, err := engine.NewSession(engine.SessionConfig{
sess, err := engine.NewSession(ctx, engine.SessionConfig{
Logger: log.Log,
SoftwareName: "miniooni",
SoftwareVersion: "0.1.0-dev",
@@ -41,7 +41,8 @@ func TestPsiphonStartStop(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess, err := engine.NewSession(engine.SessionConfig{
ctx := context.Background()
sess, err := engine.NewSession(ctx, engine.SessionConfig{
Logger: log.Log,
SoftwareName: "ooniprobe-engine",
SoftwareVersion: "0.0.1",