feat: add oonireport client (#682)

The oonireport client (re-)uploads a measurement report file. This can be helpful when the measurement was not uploaded at runtime.

Usage: `./oonireport upload <file>`, where `<file>` is a json(l) file containing one OONI measurement per line.

This pull request refers to https://github.com/ooni/probe/issues/2003 and https://github.com/ooni/probe/issues/950.

Co-authored-by: Simone Basso <bassosimone@gmail.com>
This commit is contained in:
kelmenhorst 2022-02-14 15:24:36 +01:00 committed by GitHub
parent fbae9ddece
commit 0735e2018f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 243 additions and 0 deletions

View File

@ -0,0 +1,156 @@
// Command oonireport uploads reports stored on disk to the OONI collector.
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
"time"
"github.com/apex/log"
"github.com/ooni/probe-cli/v3/internal/engine"
"github.com/ooni/probe-cli/v3/internal/engine/probeservices"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/runtimex"
"github.com/ooni/probe-cli/v3/internal/version"
"github.com/pborman/getopt/v2"
)
var startTime = time.Now()
type logHandler struct {
io.Writer
}
func (h *logHandler) HandleLog(e *log.Entry) (err error) {
s := fmt.Sprintf("[%14.6f] <%s> %s", time.Since(startTime).Seconds(), e.Level, e.Message)
if len(e.Fields) > 0 {
s += fmt.Sprintf(": %+v", e.Fields)
}
s += "\n"
_, err = h.Writer.Write([]byte(s))
return
}
const (
softwareName = "miniooni"
softwareVersion = version.Version
)
var (
path string
control bool
)
func fatalIfFalse(cond bool, msg string) {
if !cond {
panic(msg)
}
}
func canOpen(filepath string) bool {
stat, err := os.Stat(filepath)
return err == nil && stat.Mode().IsRegular()
}
func readLines(path string) []string {
// open measurement file
file, err := os.Open(path)
runtimex.PanicOnError(err, "Open file error.")
defer file.Close()
scanner := bufio.NewScanner(file)
// the maximum line length should be selected really big
const maxCapacity = 800000
buf := make([]byte, maxCapacity)
scanner.Buffer(buf, maxCapacity)
// scan measurement file, one measurement per line
var lines []string
for scanner.Scan() {
line := scanner.Text()
lines = append(lines, line)
}
return lines
}
// newSession creates a new session
func newSession(ctx context.Context) *engine.Session {
logger := &log.Logger{Level: log.InfoLevel, Handler: &logHandler{Writer: os.Stderr}}
config := engine.SessionConfig{
Logger: logger,
SoftwareName: softwareName,
SoftwareVersion: softwareVersion,
}
sess, err := engine.NewSession(ctx, config)
runtimex.PanicOnError(err, "Error when trying to create session.")
return sess
}
// new Submitter creates a probe services client and submitter
func newSubmitter(sess *engine.Session, ctx context.Context) *probeservices.Submitter {
psc, err := sess.NewProbeServicesClient(ctx)
runtimex.PanicOnError(err, "error occurred while creating client")
submitter := probeservices.NewSubmitter(psc, sess.Logger())
return submitter
}
// toMeasurement loads an input string as model.Measurement
func toMeasurement(s string) *model.Measurement {
var mm model.Measurement
err := json.Unmarshal([]byte(s), &mm)
runtimex.PanicOnError(err, "json.Unmarshal error")
return &mm
}
// submitAll submits the measurements in input. Returns the count of submitted measurements, both
// on success and on error, and the error that occurred (nil on success).
func submitAll(ctx context.Context, lines []string, subm *probeservices.Submitter) (int, error) {
submitted := 0
for _, line := range lines {
mm := toMeasurement(line)
// submit the measurement
err := subm.Submit(ctx, mm)
if err != nil {
return submitted, err
}
submitted += 1
}
return submitted, nil
}
func mainWithArgs(args []string) {
fatalIfFalse(len(args) == 2, "Usage: ./oonireport upload <file>")
fatalIfFalse(args[0] == "upload", "Unsupported operation")
fatalIfFalse(canOpen(args[1]), "Cannot open measurement file")
path = args[1]
lines := readLines(path)
ctx := context.Background()
sess := newSession(ctx)
defer sess.Close()
submitter := newSubmitter(sess, ctx)
n, err := submitAll(ctx, lines, submitter)
fmt.Println("Submitted measurements: ", n)
runtimex.PanicOnError(err, "error occurred while submitting")
}
func main() {
defer func() {
if s := recover(); s != nil {
fmt.Fprintf(os.Stderr, "FATAL: %s\n", s)
}
}()
// parse command line arguments
getopt.Parse()
args := getopt.Args()
mainWithArgs(args)
}

View File

@ -0,0 +1,85 @@
package main
import (
"context"
"testing"
)
func TestCanOpen(t *testing.T) {
ok := canOpen("testdata/testmeasurement.json")
if !ok {
t.Fatal("unexpected error")
}
}
func TestReadLines(t *testing.T) {
lines := readLines("testdata/testmeasurement.json")
if lines == nil {
t.Fatal("unexpected error")
}
if len(lines) != 2 {
t.Fatal("unexpected number of measurements")
}
}
func TestNewSessionAndSubmitter(t *testing.T) {
ctx := context.Background()
sess := newSession(ctx)
if sess == nil {
t.Fatal("unexpected nil session")
}
subm := newSubmitter(sess, ctx)
if subm == nil {
t.Fatal("unexpected nil submitter")
}
}
func TestToMeasurement(t *testing.T) {
lines := readLines("testdata/testmeasurement.json")
line := lines[0]
mm := toMeasurement(line)
if mm == nil {
t.Fatal("unexpected error")
}
}
func TestMainMissingFile(t *testing.T) {
defer func() {
var s interface{}
if s = recover(); s == nil {
t.Fatal("expected a panic here")
}
if s != "Cannot open measurement file" {
t.Fatal("unexpected panic message")
}
}()
mainWithArgs([]string{"upload", "notexist.json"})
}
func TestMainEmptyFile(t *testing.T) {
defer func() {
var s interface{}
if s = recover(); s != nil {
t.Fatal("unexpected panic")
}
}()
mainWithArgs([]string{"upload", "testdata/noentries.json"})
}
func TestSubmitAllFails(t *testing.T) {
ctx := context.Background()
sess := newSession(ctx)
subm := newSubmitter(sess, ctx)
lines := readLines("testdata/testmeasurement.json")
ctx, cancel := context.WithCancel(ctx)
cancel() // fail immediately
n, err := submitAll(ctx, lines, subm)
if err == nil {
t.Fatal("expected an error here")
}
if n != 0 {
t.Fatal("nothing should be submitted here")
}
}

View File

View File

@ -0,0 +1,2 @@
{"annotations":{"architecture":"amd64","engine_name":"ooniprobe-engine","engine_version":"3.14.0-alpha.1","platform":"linux"},"data_format_version":"0.2.0","extensions":{"dnst":0,"httpt":0,"netevents":0,"tcpconnect":0,"tlshandshake":0,"tunnel":0},"input":"ooni.org","measurement_start_time":"2022-02-09 08:44:29","probe_asn":"AS6805","probe_cc":"DE","probe_ip":"127.0.0.1","probe_network_name":"Telefonica Germany GmbH \u0026 Co.OHG","report_id":"","resolver_asn":"AS6805","resolver_ip":"62.109.121.37","resolver_network_name":"Telefonica Germany GmbH \u0026 Co.OHG","software_name":"miniooni","software_version":"3.14.0-alpha.1","test_keys":{"agent":"redirect","failed_operation":"top_level","failure":"unknown_failure: unknown targetURL scheme","network_events":null,"queries":null,"requests":null,"tcp_connect":null,"tls_handshakes":null},"test_name":"urlgetter","test_runtime":0.000180107,"test_start_time":"2022-02-09 08:44:29","test_version":"0.2.0"}
{"annotations":{"architecture":"amd64","engine_name":"ooniprobe-engine","engine_version":"3.14.0-alpha.1","platform":"linux"},"data_format_version":"0.2.0","extensions":{"dnst":0,"httpt":0,"netevents":0,"tcpconnect":0,"tlshandshake":0,"tunnel":0},"input":"google.com","measurement_start_time":"2022-02-09 08:44:45","options":["HTTP3Enabled=true"],"probe_asn":"AS6805","probe_cc":"DE","probe_ip":"127.0.0.1","probe_network_name":"Telefonica Germany GmbH \u0026 Co.OHG","report_id":"","resolver_asn":"AS6805","resolver_ip":"62.109.121.37","resolver_network_name":"Telefonica Germany GmbH \u0026 Co.OHG","software_name":"miniooni","software_version":"3.14.0-alpha.1","test_keys":{"agent":"redirect","failed_operation":"top_level","failure":"unknown_failure: unknown targetURL scheme","network_events":null,"queries":null,"requests":null,"tcp_connect":null,"tls_handshakes":null},"test_name":"urlgetter","test_runtime":0.000223706,"test_start_time":"2022-02-09 08:44:45","test_version":"0.2.0"}