From 0b4a49190ad7fd62d8aab6829a62b0494730f8f9 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Fri, 8 Jul 2022 14:20:49 +0200 Subject: [PATCH] feat: start sketching out the oonirun package (#842) This diff refactors the ./internal/cmd/miniooni pkg and moves the code for running experiments inside of the ./internal/oonirun pkg. It's the first concrete step towards https://github.com/ooni/probe/issues/2184. --- internal/cmd/miniooni/libminiooni.go | 179 ++++++--------------- internal/cmd/miniooni/main.go | 4 + internal/oonirun/doc.go | 5 + internal/oonirun/experiment.go | 230 +++++++++++++++++++++++++++ internal/oonirun/experiment_test.go | 60 +++++++ internal/oonirun/session.go | 33 ++++ 6 files changed, 377 insertions(+), 134 deletions(-) create mode 100644 internal/oonirun/doc.go create mode 100644 internal/oonirun/experiment.go create mode 100644 internal/oonirun/experiment_test.go create mode 100644 internal/oonirun/session.go diff --git a/internal/cmd/miniooni/libminiooni.go b/internal/cmd/miniooni/libminiooni.go index 2c9d1ac..5eccd4d 100644 --- a/internal/cmd/miniooni/libminiooni.go +++ b/internal/cmd/miniooni/libminiooni.go @@ -1,11 +1,17 @@ package main +// +// Core implementation +// +// TODO(bassosimone): we should eventually merge this file and main.go. We still +// have this file becaused we used to have ./internal/libminiooni. +// + import ( "context" "errors" "fmt" "io" - "math/rand" "net/url" "os" "path" @@ -20,6 +26,8 @@ import ( "github.com/ooni/probe-cli/v3/internal/kvstore" "github.com/ooni/probe-cli/v3/internal/legacy/assetsdir" "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/oonirun" + "github.com/ooni/probe-cli/v3/internal/runtimex" "github.com/ooni/probe-cli/v3/internal/version" "github.com/pborman/getopt/v2" ) @@ -119,20 +127,11 @@ func init() { &globalOptions.Version, "version", 0, "Print version and exit", ) getopt.FlagLong( - &globalOptions.Yes, "yes", 0, "I accept the risk of running OONI", + &globalOptions.Yes, "yes", 'y', + "Assume yes as the answer to all questions", ) } -func fatalIfFalse(cond bool, msg string) { - if !cond { - panic(msg) - } -} - -func fatalIfTrue(cond bool, msg string) { - fatalIfFalse(!cond, msg) -} - // Main is the main function of miniooni. This function parses the command line // options and uses a global state. Use MainWithConfiguration if you want to avoid // using any global state and relying on command line options. @@ -145,8 +144,8 @@ func Main() { fmt.Printf("%s\n", version.Version) os.Exit(0) } - fatalIfFalse(len(getopt.Args()) == 1, "Missing experiment name") - fatalOnError(engine.CheckEmbeddedPsiphonConfig(), "Invalid embedded psiphon config") + runtimex.PanicIfFalse(len(getopt.Args()) == 1, "Missing experiment name") + runtimex.PanicOnError(engine.CheckEmbeddedPsiphonConfig(), "Invalid embedded psiphon config") MainWithConfiguration(getopt.Arg(0), globalOptions) } @@ -158,24 +157,11 @@ func split(s string) (string, string, error) { return v[0], v[1], nil } -func fatalOnError(err error, msg string) { - if err != nil { - log.WithError(err).Warn(msg) - panic(msg) - } -} - -func warnOnError(err error, msg string) { - if err != nil { - log.WithError(err).Warn(msg) - } -} - func mustMakeMapString(input []string) (output map[string]string) { output = make(map[string]string) for _, opt := range input { key, value, err := split(opt) - fatalOnError(err, "cannot split key-value pair") + runtimex.PanicOnError(err, "cannot split key-value pair") output[key] = value } return @@ -185,7 +171,7 @@ func mustMakeMapAny(input []string) (output map[string]any) { output = make(map[string]any) for _, opt := range input { key, value, err := split(opt) - fatalOnError(err, "cannot split key-value pair") + runtimex.PanicOnError(err, "cannot split key-value pair") output[key] = value } return @@ -193,7 +179,7 @@ func mustMakeMapAny(input []string) (output map[string]any) { func mustParseURL(URL string) *url.URL { rv, err := url.Parse(URL) - fatalOnError(err, "cannot parse URL") + runtimex.PanicOnError(err, "cannot parse URL") return rv } @@ -283,17 +269,12 @@ of miniooni, when we will allow a tunnel to use a proxy. // This function will panic in case of a fatal error. It is up to you that // integrate this function to either handle the panic of ignore it. func MainWithConfiguration(experimentName string, currentOptions Options) { - fatalIfTrue(currentOptions.Proxy != "" && currentOptions.Tunnel != "", + runtimex.PanicIfTrue(currentOptions.Proxy != "" && currentOptions.Tunnel != "", tunnelAndProxy) if currentOptions.Tunnel != "" { currentOptions.Proxy = fmt.Sprintf("%s:///", currentOptions.Tunnel) } - ctx := context.Background() - - extraOptions := mustMakeMapAny(currentOptions.ExtraOptions) - annotations := mustMakeMapString(currentOptions.Annotations) - logger := &log.Logger{Level: log.InfoLevel, Handler: &logHandler{Writer: os.Stderr}} if currentOptions.Verbose { logger.Level = log.DebugLevel @@ -303,14 +284,19 @@ func MainWithConfiguration(experimentName string, currentOptions Options) { } log.Log = logger + extraOptions := mustMakeMapAny(currentOptions.ExtraOptions) + annotations := mustMakeMapString(currentOptions.Annotations) + + ctx := context.Background() + //Mon Jan 2 15:04:05 -0700 MST 2006 log.Infof("Current time: %s", time.Now().Format("2006-01-02 15:04:05 MST")) homeDir := gethomedir(currentOptions.HomeDir) - fatalIfFalse(homeDir != "", "home directory is empty") + runtimex.PanicIfFalse(homeDir != "", "home directory is empty") miniooniDir := path.Join(homeDir, ".miniooni") err := os.MkdirAll(miniooniDir, 0700) - fatalOnError(err, "cannot create $HOME/.miniooni directory") + runtimex.PanicOnError(err, "cannot create $HOME/.miniooni directory") // We cleanup the assets files used by versions of ooniprobe // older than v3.9.0, where we started embedding the assets @@ -324,9 +310,9 @@ func MainWithConfiguration(experimentName string, currentOptions Options) { log.Debugf("miniooni state directory: %s", miniooniDir) consentFile := path.Join(miniooniDir, "informed") - fatalOnError(maybeWriteConsentFile(currentOptions.Yes, consentFile), + runtimex.PanicOnError(maybeWriteConsentFile(currentOptions.Yes, consentFile), "cannot write informed consent file") - fatalIfFalse(canOpen(consentFile), riskOfRunningOONI) + runtimex.PanicIfFalse(canOpen(consentFile), riskOfRunningOONI) log.Info("miniooni home directory: $HOME/.miniooni") var proxyURL *url.URL @@ -336,11 +322,11 @@ func MainWithConfiguration(experimentName string, currentOptions Options) { kvstore2dir := filepath.Join(miniooniDir, "kvstore2") kvstore, err := kvstore.NewFS(kvstore2dir) - fatalOnError(err, "cannot create kvstore2 directory") + runtimex.PanicOnError(err, "cannot create kvstore2 directory") tunnelDir := filepath.Join(miniooniDir, "tunnel") err = os.MkdirAll(tunnelDir, 0700) - fatalOnError(err, "cannot create tunnelDir") + runtimex.PanicOnError(err, "cannot create tunnelDir") config := engine.SessionConfig{ KVStore: kvstore, @@ -360,7 +346,7 @@ func MainWithConfiguration(experimentName string, currentOptions Options) { } sess, err := engine.NewSession(ctx, config) - fatalOnError(err, "cannot create measurement session") + runtimex.PanicOnError(err, "cannot create measurement session") defer func() { sess.Close() log.Infof("whole session: recv %s, sent %s", @@ -372,10 +358,10 @@ func MainWithConfiguration(experimentName string, currentOptions Options) { log.Info("Looking up OONI backends; please be patient...") err = sess.MaybeLookupBackends() - fatalOnError(err, "cannot lookup OONI backends") + runtimex.PanicOnError(err, "cannot lookup OONI backends") log.Info("Looking up your location; please be patient...") err = sess.MaybeLookupLocation() - fatalOnError(err, "cannot lookup your location") + runtimex.PanicOnError(err, "cannot lookup your location") log.Debugf("- IP: %s", sess.ProbeIP()) log.Infof("- country: %s", sess.ProbeCC()) log.Infof("- network: %s (%s)", sess.ProbeNetworkName(), sess.ProbeASNString()) @@ -383,95 +369,20 @@ func MainWithConfiguration(experimentName string, currentOptions Options) { log.Infof("- resolver's network: %s (%s)", sess.ResolverNetworkName(), sess.ResolverASNString()) - builder, err := sess.NewExperimentBuilder(experimentName) - fatalOnError(err, "cannot create experiment builder") - - inputLoader := &engine.InputLoader{ - CheckInConfig: &model.OOAPICheckInConfig{ - RunType: model.RunTypeManual, - OnWiFi: true, // meaning: not on 4G - Charging: true, - }, - ExperimentName: experimentName, - InputPolicy: builder.InputPolicy(), - StaticInputs: currentOptions.Inputs, - SourceFiles: currentOptions.InputFilePaths, + // Run OONI experiments as we normally do. + desc := &oonirun.Experiment{ + Annotations: annotations, + ExtraOptions: extraOptions, + Inputs: currentOptions.Inputs, + InputFilePaths: currentOptions.InputFilePaths, + MaxRuntime: currentOptions.MaxRuntime, + Name: experimentName, + NoCollector: currentOptions.NoCollector, + NoJSON: currentOptions.NoJSON, + Random: currentOptions.Random, + ReportFile: currentOptions.ReportFile, Session: sess, } - inputs, err := inputLoader.Load(context.Background()) - fatalOnError(err, "cannot load inputs") - - if currentOptions.Random { - rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - rnd.Shuffle(len(inputs), func(i, j int) { - inputs[i], inputs[j] = inputs[j], inputs[i] - }) - } - - err = builder.SetOptionsAny(extraOptions) - fatalOnError(err, "cannot parse extraOptions") - - experiment := builder.NewExperiment() - defer func() { - log.Infof("experiment: recv %s, sent %s", - humanize.SI(experiment.KibiBytesReceived()*1024, "byte"), - humanize.SI(experiment.KibiBytesSent()*1024, "byte"), - ) - }() - - submitter, err := engine.NewSubmitter(ctx, engine.SubmitterConfig{ - Enabled: !currentOptions.NoCollector, - Session: sess, - Logger: log.Log, - }) - fatalOnError(err, "cannot create submitter") - - saver, err := engine.NewSaver(engine.SaverConfig{ - Enabled: !currentOptions.NoJSON, - Experiment: experiment, - FilePath: currentOptions.ReportFile, - Logger: log.Log, - }) - fatalOnError(err, "cannot create saver") - - inputProcessor := &engine.InputProcessor{ - Annotations: annotations, - Experiment: &experimentWrapper{ - child: engine.NewInputProcessorExperimentWrapper(experiment), - total: len(inputs), - }, - Inputs: inputs, - MaxRuntime: time.Duration(currentOptions.MaxRuntime) * time.Second, - Options: currentOptions.ExtraOptions, - Saver: engine.NewInputProcessorSaverWrapper(saver), - Submitter: submitterWrapper{ - child: engine.NewInputProcessorSubmitterWrapper(submitter), - }, - } - err = inputProcessor.Run(ctx) - fatalOnError(err, "inputProcessor.Run failed") -} - -type experimentWrapper struct { - child engine.InputProcessorExperimentWrapper - total int -} - -func (ew *experimentWrapper) MeasureAsync( - ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) { - if input != "" { - log.Infof("[%d/%d] running with input: %s", idx+1, ew.total, input) - } - return ew.child.MeasureAsync(ctx, input, idx) -} - -type submitterWrapper struct { - child engine.InputProcessorSubmitterWrapper -} - -func (sw submitterWrapper) Submit(ctx context.Context, idx int, m *model.Measurement) error { - err := sw.child.Submit(ctx, idx, m) - warnOnError(err, "submitting measurement failed") - // policy: we do not stop the loop if measurement submission fails - return nil + err = desc.Run(ctx) + runtimex.PanicOnError(err, "cannot run experiment") } diff --git a/internal/cmd/miniooni/main.go b/internal/cmd/miniooni/main.go index dccef00..75b4e75 100644 --- a/internal/cmd/miniooni/main.go +++ b/internal/cmd/miniooni/main.go @@ -2,6 +2,10 @@ // with a CLI interface similar to MK and OONI Probe v2.x. package main +// +// Main function +// + import ( "fmt" "os" diff --git a/internal/oonirun/doc.go b/internal/oonirun/doc.go new file mode 100644 index 0000000..4e511d7 --- /dev/null +++ b/internal/oonirun/doc.go @@ -0,0 +1,5 @@ +// Package oonirun contains code to run OONI experiments. +// +// This package supports OONI Run v1 and v2 as well as the direct +// creation and instantiation of OONI experiments. +package oonirun diff --git a/internal/oonirun/experiment.go b/internal/oonirun/experiment.go new file mode 100644 index 0000000..10cfdfa --- /dev/null +++ b/internal/oonirun/experiment.go @@ -0,0 +1,230 @@ +package oonirun + +// +// Run experiments. +// + +import ( + "context" + "fmt" + "math/rand" + "time" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/humanize" + "github.com/ooni/probe-cli/v3/internal/model" +) + +// Experiment describes an experiment to run. You MUST fill all the fields that +// are marked as MANDATORY, otherwise Experiment.Run will cause panics. +type Experiment struct { + // Annotations contains OPTIONAL Annotations for the experiment. + Annotations map[string]string + + // ExtraOptions contains OPTIONAL extra options for the experiment. + ExtraOptions map[string]any + + // Inputs contains the OPTIONAL experiment Inputs + Inputs []string + + // InputFilePaths contains OPTIONAL files to read inputs from. + InputFilePaths []string + + // MaxRuntime is the OPTIONAL maximum runtime in seconds. + MaxRuntime int64 + + // Name is the MANDATORY experiment name. + Name string + + // NoCollector OPTIONALLY indicates we should not be using any collector. + NoCollector bool + + // NoJSON OPTIONALLY indicates we don't want to save measurements to a JSON file. + NoJSON bool + + // Random OPTIONALLY indicates we should randomize inputs. + Random bool + + // ReportFile is the MANDATORY file in which to save reports, which is only + // used when noJSON is set to false. + ReportFile string + + // Session is the MANDATORY session. + Session Session +} + +// Run runs the given experiment. +func (ed *Experiment) Run(ctx context.Context) error { + + // 1. create experiment builder + builder, err := ed.newExperimentBuilder(ed.Name) + if err != nil { + return err + } + + // 2. create input loader and load input for this experiment + inputLoader := ed.newInputLoader(builder.InputPolicy()) + inputList, err := inputLoader.Load(ctx) + if err != nil { + return err + } + + // 3. randomize input, if needed + if ed.Random { + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + rnd.Shuffle(len(inputList), func(i, j int) { + inputList[i], inputList[j] = inputList[j], inputList[i] + }) + } + + // 4. configure experiment's options + if err := builder.SetOptionsAny(ed.ExtraOptions); err != nil { + return err + } + + // 5. construct the experiment instance + experiment := builder.NewExperiment() + logger := ed.Session.Logger() + defer func() { + logger.Infof("experiment: recv %s, sent %s", + humanize.SI(experiment.KibiBytesReceived()*1024, "byte"), + humanize.SI(experiment.KibiBytesSent()*1024, "byte"), + ) + }() + + // 6. create the submitter + submitter, err := ed.newSubmitter(ctx) + if err != nil { + return err + } + + // 7. create the saver + saver, err := ed.newSaver(experiment) + if err != nil { + return err + } + + // 8. create an input processor + inputProcessor := ed.newInputProcessor(experiment, inputList, saver, submitter) + + // 9. process input and generate measurements + return inputProcessor.Run(ctx) +} + +// inputProcessor processes inputs running the given experiment. +type inputProcessor interface { + Run(ctx context.Context) error +} + +// newInputProcessor creates a new inputProcessor instance. +func (ed *Experiment) newInputProcessor(experiment engine.Experiment, + inputList []model.OOAPIURLInfo, saver engine.Saver, submitter engine.Submitter) inputProcessor { + return &engine.InputProcessor{ + Annotations: ed.Annotations, + Experiment: &experimentWrapper{ + child: engine.NewInputProcessorExperimentWrapper(experiment), + logger: ed.Session.Logger(), + total: len(inputList), + }, + Inputs: inputList, + MaxRuntime: time.Duration(ed.MaxRuntime) * time.Second, + Options: experimentOptionsToStringList(ed.ExtraOptions), + Saver: engine.NewInputProcessorSaverWrapper(saver), + Submitter: &experimentSubmitterWrapper{ + child: engine.NewInputProcessorSubmitterWrapper(submitter), + logger: ed.Session.Logger(), + }, + } +} + +// newSaver creates a new engine.Saver instance. +func (ed *Experiment) newSaver(experiment engine.Experiment) (engine.Saver, error) { + return engine.NewSaver(engine.SaverConfig{ + Enabled: !ed.NoJSON, + Experiment: experiment, + FilePath: ed.ReportFile, + Logger: ed.Session.Logger(), + }) +} + +// newSubmitter creates a new engine.Submitter instance. +func (ed *Experiment) newSubmitter(ctx context.Context) (engine.Submitter, error) { + return engine.NewSubmitter(ctx, engine.SubmitterConfig{ + Enabled: !ed.NoCollector, + Session: ed.Session, + Logger: ed.Session.Logger(), + }) +} + +// newExperimentBuilder creates a new engine.ExperimentBuilder for the given experimentName. +func (ed *Experiment) newExperimentBuilder(experimentName string) (engine.ExperimentBuilder, error) { + return ed.Session.NewExperimentBuilder(ed.Name) +} + +// inputLoader loads inputs from local or remote sources. +type inputLoader interface { + Load(ctx context.Context) ([]model.OOAPIURLInfo, error) +} + +// newInputLoader creates a new inputLoader. +func (ed *Experiment) newInputLoader(inputPolicy engine.InputPolicy) inputLoader { + return &engine.InputLoader{ + CheckInConfig: &model.OOAPICheckInConfig{ + RunType: model.RunTypeManual, + OnWiFi: true, // meaning: not on 4G + Charging: true, + }, + ExperimentName: ed.Name, + InputPolicy: inputPolicy, + StaticInputs: ed.Inputs, + SourceFiles: ed.InputFilePaths, + Session: ed.Session, + } +} + +// experimentOptionsToStringList convers the options to []string, which is +// the format with which we include them into a OONI Measurement +func experimentOptionsToStringList(options map[string]any) (out []string) { + for key, value := range options { + out = append(out, fmt.Sprintf("%s=%v", key, value)) + } + return +} + +// experimentWrapper wraps an experiment and logs progress +type experimentWrapper struct { + // child is the child experiment wrapper + child engine.InputProcessorExperimentWrapper + + // logger is the logger to use + logger model.Logger + + // total is the total number of inputs + total int +} + +func (ew *experimentWrapper) MeasureAsync( + ctx context.Context, input string, idx int) (<-chan *model.Measurement, error) { + if input != "" { + ew.logger.Infof("[%d/%d] running with input: %s", idx+1, ew.total, input) + } + return ew.child.MeasureAsync(ctx, input, idx) +} + +// experimentSubmitterWrapper implements a submission policy where we don't +// fail if we cannot submit a measurement +type experimentSubmitterWrapper struct { + // child is the child submitter wrapper + child engine.InputProcessorSubmitterWrapper + + // logger is the logger to use + logger model.Logger +} + +func (sw *experimentSubmitterWrapper) Submit(ctx context.Context, idx int, m *model.Measurement) error { + if err := sw.child.Submit(ctx, idx, m); err != nil { + sw.logger.Warnf("submitting measurement failed: %s", err.Error()) + } + // policy: we do not stop the loop if measurement submission fails + return nil +} diff --git a/internal/oonirun/experiment_test.go b/internal/oonirun/experiment_test.go new file mode 100644 index 0000000..692312f --- /dev/null +++ b/internal/oonirun/experiment_test.go @@ -0,0 +1,60 @@ +package oonirun + +import ( + "context" + "os" + "testing" + "time" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/kvstore" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/version" +) + +// TODO(bassosimone): it would be cool to write unit tests. However, to do that +// we need to ~redesign the engine package for unit-testability. + +func newSession(ctx context.Context, t *testing.T) *engine.Session { + config := engine.SessionConfig{ + AvailableProbeServices: []model.OOAPIService{}, + KVStore: &kvstore.Memory{}, + Logger: model.DiscardLogger, + ProxyURL: nil, + SoftwareName: "miniooni", + SoftwareVersion: version.Version, + TempDir: os.TempDir(), + TorArgs: []string{}, + TorBinary: "", + TunnelDir: "", + } + sess, err := engine.NewSession(ctx, config) + if err != nil { + t.Fatal(err) + } + return sess +} + +func TestExperimentRunWithExample(t *testing.T) { + ctx := context.Background() + desc := &Experiment{ + Annotations: map[string]string{ + "platform": "linux", + }, + ExtraOptions: map[string]any{ + "SleepTime": int64(10 * time.Millisecond), + }, + Inputs: []string{}, + InputFilePaths: []string{}, + MaxRuntime: 0, + Name: "example", + NoCollector: true, + NoJSON: true, + Random: false, + ReportFile: "", + Session: newSession(ctx, t), + } + if err := desc.Run(ctx); err != nil { + t.Fatal(err) + } +} diff --git a/internal/oonirun/session.go b/internal/oonirun/session.go new file mode 100644 index 0000000..9346c5c --- /dev/null +++ b/internal/oonirun/session.go @@ -0,0 +1,33 @@ +package oonirun + +// +// Definition of session. +// +// TODO(bassosimone): we should eventually have a common definition +// of session (which probably means a few distinct definitions?) inside +// the model package as an interface. Until we do that, which seems an +// heavy refactoring right now, this local definition will do. +// + +import ( + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/model" +) + +// Session is the definition of Session used by this package. +type Session interface { + // A Session is also an InputLoaderSession. + engine.InputLoaderSession + + // A Session is also a SubmitterSession. + engine.SubmitterSession + + // DefaultHTTPClient returns the session's default HTTPClient. + DefaultHTTPClient() model.HTTPClient + + // Logger returns the logger used by this Session. + Logger() model.Logger + + // NewExperimentBuilder creates a new engine.ExperimentBuilder. + NewExperimentBuilder(name string) (engine.ExperimentBuilder, error) +}