From 0735e2018f693671658285d12a74360bd607e465 Mon Sep 17 00:00:00 2001 From: kelmenhorst <45046038+kelmenhorst@users.noreply.github.com> Date: Mon, 14 Feb 2022 15:24:36 +0100 Subject: [PATCH] feat: add oonireport client (#682) The oonireport client (re-)uploads a measurement report file. This can be helpful when the measurement was not uploaded at runtime. Usage: `./oonireport upload `, where `` is a json(l) file containing one OONI measurement per line. This pull request refers to https://github.com/ooni/probe/issues/2003 and https://github.com/ooni/probe/issues/950. Co-authored-by: Simone Basso --- internal/cmd/oonireport/oonireport.go | 156 ++++++++++++++++++ internal/cmd/oonireport/oonireport_test.go | 85 ++++++++++ .../cmd/oonireport/testdata/noentries.json | 0 .../oonireport/testdata/testmeasurement.json | 2 + 4 files changed, 243 insertions(+) create mode 100644 internal/cmd/oonireport/oonireport.go create mode 100644 internal/cmd/oonireport/oonireport_test.go create mode 100644 internal/cmd/oonireport/testdata/noentries.json create mode 100644 internal/cmd/oonireport/testdata/testmeasurement.json diff --git a/internal/cmd/oonireport/oonireport.go b/internal/cmd/oonireport/oonireport.go new file mode 100644 index 0000000..cd53d49 --- /dev/null +++ b/internal/cmd/oonireport/oonireport.go @@ -0,0 +1,156 @@ +// Command oonireport uploads reports stored on disk to the OONI collector. +package main + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "os" + "time" + + "github.com/apex/log" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/engine/probeservices" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/runtimex" + "github.com/ooni/probe-cli/v3/internal/version" + "github.com/pborman/getopt/v2" +) + +var startTime = time.Now() + +type logHandler struct { + io.Writer +} + +func (h *logHandler) HandleLog(e *log.Entry) (err error) { + s := fmt.Sprintf("[%14.6f] <%s> %s", time.Since(startTime).Seconds(), e.Level, e.Message) + if len(e.Fields) > 0 { + s += fmt.Sprintf(": %+v", e.Fields) + } + s += "\n" + _, err = h.Writer.Write([]byte(s)) + return +} + +const ( + softwareName = "miniooni" + softwareVersion = version.Version +) + +var ( + path string + control bool +) + +func fatalIfFalse(cond bool, msg string) { + if !cond { + panic(msg) + } +} + +func canOpen(filepath string) bool { + stat, err := os.Stat(filepath) + return err == nil && stat.Mode().IsRegular() +} + +func readLines(path string) []string { + // open measurement file + file, err := os.Open(path) + runtimex.PanicOnError(err, "Open file error.") + defer file.Close() + + scanner := bufio.NewScanner(file) + // the maximum line length should be selected really big + const maxCapacity = 800000 + buf := make([]byte, maxCapacity) + scanner.Buffer(buf, maxCapacity) + + // scan measurement file, one measurement per line + var lines []string + for scanner.Scan() { + line := scanner.Text() + lines = append(lines, line) + } + return lines +} + +// newSession creates a new session +func newSession(ctx context.Context) *engine.Session { + logger := &log.Logger{Level: log.InfoLevel, Handler: &logHandler{Writer: os.Stderr}} + + config := engine.SessionConfig{ + Logger: logger, + SoftwareName: softwareName, + SoftwareVersion: softwareVersion, + } + sess, err := engine.NewSession(ctx, config) + runtimex.PanicOnError(err, "Error when trying to create session.") + return sess +} + +// new Submitter creates a probe services client and submitter +func newSubmitter(sess *engine.Session, ctx context.Context) *probeservices.Submitter { + psc, err := sess.NewProbeServicesClient(ctx) + runtimex.PanicOnError(err, "error occurred while creating client") + submitter := probeservices.NewSubmitter(psc, sess.Logger()) + return submitter +} + +// toMeasurement loads an input string as model.Measurement +func toMeasurement(s string) *model.Measurement { + var mm model.Measurement + err := json.Unmarshal([]byte(s), &mm) + runtimex.PanicOnError(err, "json.Unmarshal error") + return &mm +} + +// submitAll submits the measurements in input. Returns the count of submitted measurements, both +// on success and on error, and the error that occurred (nil on success). +func submitAll(ctx context.Context, lines []string, subm *probeservices.Submitter) (int, error) { + submitted := 0 + for _, line := range lines { + mm := toMeasurement(line) + // submit the measurement + err := subm.Submit(ctx, mm) + if err != nil { + return submitted, err + } + submitted += 1 + } + return submitted, nil +} + +func mainWithArgs(args []string) { + fatalIfFalse(len(args) == 2, "Usage: ./oonireport upload ") + fatalIfFalse(args[0] == "upload", "Unsupported operation") + fatalIfFalse(canOpen(args[1]), "Cannot open measurement file") + + path = args[1] + lines := readLines(path) + + ctx := context.Background() + sess := newSession(ctx) + defer sess.Close() + + submitter := newSubmitter(sess, ctx) + + n, err := submitAll(ctx, lines, submitter) + fmt.Println("Submitted measurements: ", n) + runtimex.PanicOnError(err, "error occurred while submitting") +} + +func main() { + defer func() { + if s := recover(); s != nil { + fmt.Fprintf(os.Stderr, "FATAL: %s\n", s) + } + }() + // parse command line arguments + getopt.Parse() + args := getopt.Args() + mainWithArgs(args) +} diff --git a/internal/cmd/oonireport/oonireport_test.go b/internal/cmd/oonireport/oonireport_test.go new file mode 100644 index 0000000..ae335fb --- /dev/null +++ b/internal/cmd/oonireport/oonireport_test.go @@ -0,0 +1,85 @@ +package main + +import ( + "context" + "testing" +) + +func TestCanOpen(t *testing.T) { + ok := canOpen("testdata/testmeasurement.json") + if !ok { + t.Fatal("unexpected error") + } +} + +func TestReadLines(t *testing.T) { + lines := readLines("testdata/testmeasurement.json") + if lines == nil { + t.Fatal("unexpected error") + } + if len(lines) != 2 { + t.Fatal("unexpected number of measurements") + } +} + +func TestNewSessionAndSubmitter(t *testing.T) { + ctx := context.Background() + sess := newSession(ctx) + if sess == nil { + t.Fatal("unexpected nil session") + } + subm := newSubmitter(sess, ctx) + if subm == nil { + t.Fatal("unexpected nil submitter") + } +} + +func TestToMeasurement(t *testing.T) { + lines := readLines("testdata/testmeasurement.json") + line := lines[0] + mm := toMeasurement(line) + if mm == nil { + t.Fatal("unexpected error") + } +} + +func TestMainMissingFile(t *testing.T) { + defer func() { + var s interface{} + if s = recover(); s == nil { + t.Fatal("expected a panic here") + } + if s != "Cannot open measurement file" { + t.Fatal("unexpected panic message") + } + }() + mainWithArgs([]string{"upload", "notexist.json"}) +} + +func TestMainEmptyFile(t *testing.T) { + defer func() { + var s interface{} + if s = recover(); s != nil { + t.Fatal("unexpected panic") + } + }() + mainWithArgs([]string{"upload", "testdata/noentries.json"}) +} + +func TestSubmitAllFails(t *testing.T) { + ctx := context.Background() + sess := newSession(ctx) + subm := newSubmitter(sess, ctx) + lines := readLines("testdata/testmeasurement.json") + + ctx, cancel := context.WithCancel(ctx) + cancel() // fail immediately + + n, err := submitAll(ctx, lines, subm) + if err == nil { + t.Fatal("expected an error here") + } + if n != 0 { + t.Fatal("nothing should be submitted here") + } +} diff --git a/internal/cmd/oonireport/testdata/noentries.json b/internal/cmd/oonireport/testdata/noentries.json new file mode 100644 index 0000000..e69de29 diff --git a/internal/cmd/oonireport/testdata/testmeasurement.json b/internal/cmd/oonireport/testdata/testmeasurement.json new file mode 100644 index 0000000..22c386e --- /dev/null +++ b/internal/cmd/oonireport/testdata/testmeasurement.json @@ -0,0 +1,2 @@ +{"annotations":{"architecture":"amd64","engine_name":"ooniprobe-engine","engine_version":"3.14.0-alpha.1","platform":"linux"},"data_format_version":"0.2.0","extensions":{"dnst":0,"httpt":0,"netevents":0,"tcpconnect":0,"tlshandshake":0,"tunnel":0},"input":"ooni.org","measurement_start_time":"2022-02-09 08:44:29","probe_asn":"AS6805","probe_cc":"DE","probe_ip":"127.0.0.1","probe_network_name":"Telefonica Germany GmbH \u0026 Co.OHG","report_id":"","resolver_asn":"AS6805","resolver_ip":"62.109.121.37","resolver_network_name":"Telefonica Germany GmbH \u0026 Co.OHG","software_name":"miniooni","software_version":"3.14.0-alpha.1","test_keys":{"agent":"redirect","failed_operation":"top_level","failure":"unknown_failure: unknown targetURL scheme","network_events":null,"queries":null,"requests":null,"tcp_connect":null,"tls_handshakes":null},"test_name":"urlgetter","test_runtime":0.000180107,"test_start_time":"2022-02-09 08:44:29","test_version":"0.2.0"} +{"annotations":{"architecture":"amd64","engine_name":"ooniprobe-engine","engine_version":"3.14.0-alpha.1","platform":"linux"},"data_format_version":"0.2.0","extensions":{"dnst":0,"httpt":0,"netevents":0,"tcpconnect":0,"tlshandshake":0,"tunnel":0},"input":"google.com","measurement_start_time":"2022-02-09 08:44:45","options":["HTTP3Enabled=true"],"probe_asn":"AS6805","probe_cc":"DE","probe_ip":"127.0.0.1","probe_network_name":"Telefonica Germany GmbH \u0026 Co.OHG","report_id":"","resolver_asn":"AS6805","resolver_ip":"62.109.121.37","resolver_network_name":"Telefonica Germany GmbH \u0026 Co.OHG","software_name":"miniooni","software_version":"3.14.0-alpha.1","test_keys":{"agent":"redirect","failed_operation":"top_level","failure":"unknown_failure: unknown targetURL scheme","network_events":null,"queries":null,"requests":null,"tcp_connect":null,"tls_handshakes":null},"test_name":"urlgetter","test_runtime":0.000223706,"test_start_time":"2022-02-09 08:44:45","test_version":"0.2.0"}