From e2499dc4b0462fb802ba910a83b69b66df2ad3ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 19 Mar 2018 16:23:30 +0100 Subject: [PATCH 01/14] Keep track of the measurement state in the database --- internal/cli/run/run.go | 2 +- internal/database/models.go | 110 +++++++++++++++++++++++++++++++----- nettests/nettests.go | 87 +++++++++++++++++++++++++--- 3 files changed, 176 insertions(+), 23 deletions(-) diff --git a/internal/cli/run/run.go b/internal/cli/run/run.go index d65e019..d142cdc 100644 --- a/internal/cli/run/run.go +++ b/internal/cli/run/run.go @@ -43,7 +43,7 @@ func init() { fmt.Sprintf("msmt-%s-%T.jsonl", nt, time.Now().UTC().Format(time.RFC3339Nano))) - ctl := nettests.NewController(ctx, result, msmtPath) + ctl := nettests.NewController(nt, ctx, result, msmtPath) if err := nt.Run(ctl); err != nil { log.WithError(err).Errorf("Failed to run %s", group.Label) return err diff --git a/internal/database/models.go b/internal/database/models.go index c32bcca..fd98e36 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -8,23 +8,41 @@ import ( "github.com/pkg/errors" ) +// UpdateOne will run the specified update query and check that it only affected one row +func UpdateOne(db *sqlx.DB, query string, arg interface{}) error { + res, err := db.NamedExec(query, arg) + + if err != nil { + return errors.Wrap(err, "updating table") + } + count, err := res.RowsAffected() + if err != nil { + return errors.Wrap(err, "updating table") + } + if count != 1 { + return errors.New("inconsistent update count") + } + return nil +} + // Measurement model type Measurement struct { ID int64 `db:"id"` Name string `db:"name"` StartTime time.Time `db:"start_time"` - EndTime time.Time `db:"end_time"` + Runtime float64 `db:"runtime"` Summary string `db:"summary"` // XXX this should be JSON - ASN int64 `db:"asn"` + ASN string `db:"asn"` IP string `db:"ip"` CountryCode string `db:"country"` State string `db:"state"` Failure string `db:"failure"` + UploadFailure string `db:"upload_failure"` + Uploaded bool `db:"uploaded"` ReportFilePath string `db:"report_file"` ReportID string `db:"report_id"` Input string `db:"input"` - MeasurementID string `db:"measurement_id"` - ResultID string `db:"result_id"` + ResultID int64 `db:"result_id"` } // SetGeoIPInfo for the Measurement @@ -32,12 +50,83 @@ func (m *Measurement) SetGeoIPInfo() error { return nil } +// Failed writes the error string to the measurement +func (m *Measurement) Failed(db *sqlx.DB, failure string) error { + m.Failure = failure + + err := UpdateOne(db, `UPDATE measurements + SET failure = :failure, state = :state + WHERE id = :id`, m) + if err != nil { + return errors.Wrap(err, "updating measurement") + } + return nil +} + +// Done marks the measurement as completed +func (m *Measurement) Done(db *sqlx.DB) error { + m.State = "done" + + err := UpdateOne(db, `UPDATE measurements + SET state = :state + WHERE id = :id`, m) + if err != nil { + return errors.Wrap(err, "updating measurement") + } + return nil +} + +// UploadFailed writes the error string for the upload failure to the measurement +func (m *Measurement) UploadFailed(db *sqlx.DB, failure string) error { + m.UploadFailure = failure + m.Uploaded = false + + err := UpdateOne(db, `UPDATE measurements + SET upload_failure = :upload_failure + WHERE id = :id`, m) + if err != nil { + return errors.Wrap(err, "updating measurement") + } + return nil +} + +// UploadSucceeded writes the error string for the upload failure to the measurement +func (m *Measurement) UploadSucceeded(db *sqlx.DB) error { + m.Uploaded = true + + err := UpdateOne(db, `UPDATE measurements + SET uploaded = :uploaded + WHERE id = :id`, m) + if err != nil { + return errors.Wrap(err, "updating measurement") + } + return nil +} + +// WriteSummary writes the summary to the measurement +func (m *Measurement) WriteSummary(db *sqlx.DB, summary string) error { + m.Summary = summary + + err := UpdateOne(db, `UPDATE measurements + SET summary = :summary + WHERE id = :id`, m) + if err != nil { + return errors.Wrap(err, "updating measurement") + } + return nil +} + // CreateMeasurement writes the measurement to the database a returns a pointer // to the Measurement -func CreateMeasurement(db *sqlx.DB, m Measurement) (*Measurement, error) { +func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, error) { + // XXX Do we want to have this be part of something else? + m.StartTime = time.Now().UTC() + m.Input = i + m.State = "active" + res, err := db.NamedExec(`INSERT INTO measurements (name, start_time, - summary, asn, ip, country, + asn, ip, country, state, failure, report_file, report_id, input, measurement_id, result_id) @@ -86,19 +175,12 @@ func (r *Result) Finished(db *sqlx.DB) error { r.Runtime = float64(time.Now().Sub(r.started)) / float64(time.Microsecond) r.Done = true - res, err := db.NamedExec(`UPDATE results + err := UpdateOne(db, `UPDATE results SET done = true, runtime = :runtime WHERE id = :id`, r) if err != nil { return errors.Wrap(err, "updating result") } - count, err := res.RowsAffected() - if err != nil { - return errors.Wrap(err, "updating result") - } - if count != 1 { - return errors.New("inconsistent update count") - } return nil } diff --git a/nettests/nettests.go b/nettests/nettests.go index 9ce9262..007f639 100644 --- a/nettests/nettests.go +++ b/nettests/nettests.go @@ -1,6 +1,8 @@ package nettests import ( + "encoding/json" + "github.com/apex/log" "github.com/measurement-kit/go-measurement-kit" ooni "github.com/openobservatory/gooni" @@ -23,11 +25,12 @@ type NettestGroup struct { } // NewController creates a nettest controller -func NewController(ctx *ooni.Context, res *database.Result, msmtPath string) *Controller { +func NewController(nt Nettest, ctx *ooni.Context, res *database.Result, msmtPath string) *Controller { return &Controller{ - ctx, - res, - msmtPath, + Ctx: ctx, + nt: nt, + res: res, + msmtPath: msmtPath, } } @@ -36,6 +39,8 @@ func NewController(ctx *ooni.Context, res *database.Result, msmtPath string) *Co type Controller struct { Ctx *ooni.Context res *database.Result + nt Nettest + msmts map[int64]*database.Measurement msmtPath string } @@ -43,6 +48,16 @@ type Controller struct { func (c *Controller) Init(nt *mk.Nettest) error { log.Debugf("Init: %v", nt) + msmtTemplate := database.Measurement{ + ASN: "", + IP: "", + CountryCode: "", + ReportID: "", + Name: nt.Name, + ResultID: c.res.ID, + ReportFilePath: c.msmtPath, + } + log.Debugf("OutputPath: %s", c.msmtPath) nt.Options = mk.NettestOptions{ IncludeIP: c.Ctx.Config.Sharing.IncludeIP, @@ -84,10 +99,29 @@ func (c *Controller) Init(nt *mk.Nettest) error { nt.On("status.report_created", func(e mk.Event) { log.Debugf("%s", e.Key) + + msmtTemplate.ReportID = e.Value["report_id"].(string) }) nt.On("status.geoip_lookup", func(e mk.Event) { log.Debugf("%s", e.Key) + + msmtTemplate.ASN = e.Value["probe_asn"].(string) + msmtTemplate.IP = e.Value["probe_ip"].(string) + msmtTemplate.CountryCode = e.Value["probe_cc"].(string) + }) + + nt.On("status.measurement_started", func(e mk.Event) { + log.Debugf("%s", e.Key) + + idx := e.Value["idx"].(int64) + input := e.Value["input"].(string) + msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, input) + if err != nil { + log.WithError(err).Error("Failed to create measurement") + return + } + c.msmts[idx] = msmt }) nt.On("status.progress", func(e mk.Event) { @@ -102,18 +136,41 @@ func (c *Controller) Init(nt *mk.Nettest) error { nt.On("failure.measurement", func(e mk.Event) { log.Debugf("%s", e.Key) + + idx := e.Value["idx"].(int64) + failure := e.Value["failure"].(string) + c.msmts[idx].Failed(c.Ctx.DB, failure) }) - nt.On("failure.report_submission", func(e mk.Event) { + nt.On("failure.measurement_submission", func(e mk.Event) { log.Debugf("%s", e.Key) + + idx := e.Value["idx"].(int64) + failure := e.Value["failure"].(string) + c.msmts[idx].UploadFailed(c.Ctx.DB, failure) + }) + + nt.On("status.measurement_uploaded", func(e mk.Event) { + log.Debugf("%s", e.Key) + + idx := e.Value["idx"].(int64) + c.msmts[idx].UploadSucceeded(c.Ctx.DB) + }) + + nt.On("status.measurement_done", func(e mk.Event) { + log.Debugf("%s", e.Key) + + idx := e.Value["idx"].(int64) + c.msmts[idx].Done(c.Ctx.DB) }) nt.On("measurement", func(e mk.Event) { - c.OnEntry(e.Value["json_str"].(string)) + idx := e.Value["idx"].(int64) + c.OnEntry(idx, e.Value["json_str"].(string)) }) nt.On("end", func(e mk.Event) { - c.OnEntry(e.Value["json_str"].(string)) + log.Debugf("end") }) return nil @@ -124,9 +181,23 @@ func (c *Controller) OnProgress(perc float64, msg string) { log.Debugf("OnProgress: %f - %s", perc, msg) } +// Entry is an opaque measurement entry +type Entry struct { + TestKeys map[string]interface{} `json:"test_keys"` +} + // OnEntry should be called every time there is a new entry -func (c *Controller) OnEntry(jsonStr string) { +func (c *Controller) OnEntry(idx int64, jsonStr string) { log.Debugf("OnEntry: %s", jsonStr) + + var entry Entry + json.Unmarshal([]byte(jsonStr), &entry) + summary := c.nt.Summary(entry.TestKeys) + summaryBytes, err := json.Marshal(summary) + if err != nil { + log.WithError(err).Error("failed to serialize summary") + } + c.msmts[idx].WriteSummary(c.Ctx.DB, string(summaryBytes)) } // MKStart is the interface for the mk.Nettest Start() function From 9e51faddbf0f09c06b94c6a8c383b92251915b7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 19 Mar 2018 19:28:07 +0100 Subject: [PATCH 02/14] Bump MK version --- Gopkg.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Gopkg.lock b/Gopkg.lock index ff5baa2..9a5b79e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -89,7 +89,7 @@ branch = "master" name = "github.com/measurement-kit/go-measurement-kit" packages = ["."] - revision = "ae6643546db7c99bbc6ecb37b86d04baec04d295" + revision = "6ae2401a8e498a90ccdd3edbda1841add079b70e" [[projects]] branch = "master" From 25c15dc3abd380d64cf7dd7da5b1fcf801c34bdd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 19 Mar 2018 19:28:22 +0100 Subject: [PATCH 03/14] Edit SQL creation --- data/migrations/1_create_msmt_results.sql | 7 +++--- internal/bindata/bindata.go | 27 ++++++++++++----------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/data/migrations/1_create_msmt_results.sql b/data/migrations/1_create_msmt_results.sql index dbe2d39..0110441 100644 --- a/data/migrations/1_create_msmt_results.sql +++ b/data/migrations/1_create_msmt_results.sql @@ -24,18 +24,19 @@ CREATE TABLE `measurements` ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `name` VARCHAR(255), `start_time` DATETIME, - `end_time` DATETIME, + `runtime` INTEGER, `summary` JSON, `ip` VARCHAR(255), - `asn` INTEGER, + `asn` VARCHAR(16), `country` VARCHAR(2), `network_name` VARCHAR(255), `state` TEXT, `failure` VARCHAR(255), + `upload_failure` VARCHAR(255), + `uploaded` TINYINT(1), `report_file` VARCHAR(255), `report_id` VARCHAR(255), `input` VARCHAR(255), - `measurement_id` VARCHAR(255), `result_id` INTEGER REFERENCES `results` (`id`) ON DELETE SET NULL ON UPDATE CASCADE ); diff --git a/internal/bindata/bindata.go b/internal/bindata/bindata.go index e18f02c..f8fba7e 100644 --- a/internal/bindata/bindata.go +++ b/internal/bindata/bindata.go @@ -130,19 +130,20 @@ func bindataDataDefaultconfigjson() (*asset, error) { } var _bindataDataMigrations1createmsmtresultssql = []byte( - "\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x93\x4f\x6f\xf2\x30\x0c\xc6\xef\xfd\x14\x3e\x82\xde\x97\xc3\x26\x71" + - "\xe2\x14\x5a\x6f\xeb\x56\x52\x94\xa6\xd3\x38\xb5\xd1\x1a\x50\x34\x9a\x56\x69\x22\xb4\x6f\x3f\x85\x7f\x2a\xac\x70" + - "\xde\xd5\xbf\xc7\x8e\xfd\xd8\x99\x4c\xe0\x5f\xad\x36\x46\x58\x09\x51\xb3\xd3\x41\x3f\x90\x59\x61\x65\x2d\xb5\x9d" + - "\xcb\x8d\xd2\x41\x10\xb1\x74\x09\x9c\xcc\x13\x84\xd2\xc8\xce\x6d\x6d\x57\xce\x2e\xa2\xb5\x14\x9d\x33\xfb\x1c\x8f" + - "\x86\xab\xa1\xae\x2e\x49\xde\xde\x7d\x36\x64\x48\x38\x5e\x3f\x0c\xa3\x00\x00\xa0\x54\x55\x09\x31\xe5\xf8\x8c\x0c" + - "\x96\x2c\x5e\x10\xb6\x82\x37\x5c\x01\xc9\x79\x1a\xd3\x90\xe1\x02\x29\xff\x7f\xd0\x6a\x51\xcb\x12\xde\x09\x0b\x5f" + - "\x08\x1b\x3d\x4e\xa7\xe3\x23\xe8\xac\x30\xb6\xb0\xca\xe3\x88\x70\xe4\xf1\x02\x8f\x48\xea\x6a\x18\x74\xae\xae\x85" + - "\xf9\x2e\xe1\x35\x4b\xe9\x31\x56\x35\x5a\x96\xc0\x63\xba\x8a\x29\x1f\x3d\x9c\xca\x57\xc2\x8a\xc2\x75\x62\x23\x0b" + - "\xd7\x9e\xdb\xfd\x0d\xab\x66\xa7\xcf\x38\x18\xcf\xae\x67\xbf\xb0\xf7\x4f\x1a\xa0\xda\xc1\xf2\xa2\xd3\xd7\x63\x7f" + - "\x36\x4e\x5b\x9f\x7d\x96\x9f\xc4\x5a\xda\x5d\x63\xbe\x8a\x7b\xcd\x5a\x6f\x33\x7e\x9c\xe6\x5a\x0b\xb5\x75\x66\x58" + - "\x6d\x64\xdb\x18\x5b\xac\xd5\xf6\x2e\xf7\x36\x0e\x50\xa5\x5b\x67\x07\x49\x6f\x19\xb7\x92\x0f\xb7\x5a\xf4\x37\xc4" + - "\xf0\x09\x19\xd2\x10\xb3\xfe\x29\xfb\x25\x8e\x21\xa5\x10\x61\x82\x1c\x21\x43\x0e\x34\x4f\x12\x1f\xca\x97\xde\x77" + - "\x08\x49\x16\x92\x08\xf7\x57\x71\xf3\x5b\xfd\x04\x00\x00\xff\xff\xc4\x16\x0a\x20\xcf\x03\x00\x00") + "\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x93\xc1\x6e\xdb\x30\x0c\x86\xef\x7e\x0a\x1e\x13\x6c\x3d\x74\x40\x77" + + "\xe9\x49\xb5\xb9\xcd\x9b\x23\x07\xb2\x3c\x2c\x27\x5b\x98\xd4\x40\x98\x2d\x1b\xb2\x84\x60\x6f\x3f\xa8\xb5\xb3\xb8" + + "\x53\xb3\x5b\xaf\xff\x47\x52\x24\x7f\xea\xe6\x06\xde\xf5\xfa\x68\x85\x53\x90\x0d\x27\x93\x5c\x0a\x95\x13\x4e\xf5" + + "\xca\xb8\x07\x75\xd4\x26\x49\x32\x56\xee\x81\x93\x87\x02\xa1\xb5\x6a\xf2\x9d\x9b\xda\xfb\x95\xda\x2b\x31\x79\xfb" + + "\x94\x13\x50\xbc\x1a\x1a\xb9\x26\xf5\x78\xf5\xd9\x94\x21\xe1\xf8\xf2\x61\xd8\x24\x00\x00\xad\x96\x2d\xe4\x94\xe3" + + "\x67\x64\xb0\x67\xf9\x8e\xb0\x03\x7c\xc3\x03\x90\x9a\x97\x39\x4d\x19\xee\x90\xf2\xf7\xcf\xb1\x46\xf4\xaa\x85\xef" + + "\x84\xa5\x5f\x08\xdb\x7c\xb8\xbb\xdb\xce\x60\x72\xc2\xba\xc6\xe9\x80\x33\xc2\x91\xe7\x3b\x9c\x91\x32\x32\x0e\x26" + + "\xdf\xf7\xc2\xfe\x6e\xe1\x6b\x55\xd2\x59\x93\x83\x51\x2d\xf0\x9c\x1e\x72\xca\x37\xb7\x4b\x79\x29\x9c\x68\xfc\x24" + + "\x8e\xaa\xf1\xe3\xb9\xdd\x7f\xa1\x1c\x4e\xe6\x8c\x93\xed\xfd\xcb\xd9\x57\xeb\x7d\xab\x05\x58\x6f\x9e\xf5\x75\xdb" + + "\xb1\xf1\xf5\x18\x2d\x2e\x26\xf3\x57\xbf\xfd\xb8\xc8\x3f\x07\x6f\x5c\xa8\x70\x4e\x59\x88\x51\xee\x34\xd8\x5f\xcd" + + "\xb5\x76\x5d\x58\x34\xfe\x58\x26\x7b\x14\xba\xf3\x36\x1e\xed\xc7\x6e\x10\xb2\xf9\x7f\x88\x92\x11\xf3\xac\x1a\x07" + + "\xeb\x9a\x47\xdd\xc5\x53\x67\x1e\x6c\x88\x50\x6d\x46\xef\x5e\xc9\x0b\x87\xdc\x5c\xda\xc7\xf0\x13\x32\xa4\x29\x56" + + "\x97\x77\x1e\x1c\xde\x42\x49\x21\xc3\x02\x39\x42\x85\x1c\x68\x5d\x14\x41\xaa\xf7\xc1\x2d\x48\x49\x95\x92\x0c\x9f" + + "\x4e\xe6\xd5\x3f\xf7\x27\x00\x00\xff\xff\xb1\x3d\xbb\xc8\xec\x03\x00\x00") func bindataDataMigrations1createmsmtresultssqlBytes() ([]byte, error) { return bindataRead( From 9f438ae068b752dfb17e108e1caa1b7608c6689f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 19 Mar 2018 19:28:32 +0100 Subject: [PATCH 04/14] Implement state tracking in database --- internal/cli/run/run.go | 2 +- internal/database/models.go | 2 +- nettests/nettests.go | 43 +++++++++++++++--------------------- nettests/performance/dash.go | 10 ++++----- nettests/performance/ndt.go | 4 ++-- 5 files changed, 27 insertions(+), 34 deletions(-) diff --git a/internal/cli/run/run.go b/internal/cli/run/run.go index d142cdc..0621a8d 100644 --- a/internal/cli/run/run.go +++ b/internal/cli/run/run.go @@ -40,7 +40,7 @@ func init() { for _, nt := range group.Nettests { log.Debugf("Running test %T", nt) msmtPath := filepath.Join(ctx.TempDir, - fmt.Sprintf("msmt-%s-%T.jsonl", nt, + fmt.Sprintf("msmt-%T-%s.jsonl", nt, time.Now().UTC().Format(time.RFC3339Nano))) ctl := nettests.NewController(nt, ctx, result, msmtPath) diff --git a/internal/database/models.go b/internal/database/models.go index fd98e36..4216dfc 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -128,7 +128,7 @@ func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, erro (name, start_time, asn, ip, country, state, failure, report_file, - report_id, input, measurement_id, + report_id, input, result_id) VALUES (:name,:start_time, :asn,:ip,:country, diff --git a/nettests/nettests.go b/nettests/nettests.go index 007f639..b268682 100644 --- a/nettests/nettests.go +++ b/nettests/nettests.go @@ -47,6 +47,7 @@ type Controller struct { // Init should be called once to initialise the nettest func (c *Controller) Init(nt *mk.Nettest) error { log.Debugf("Init: %v", nt) + c.msmts = make(map[int64]*database.Measurement) msmtTemplate := database.Measurement{ ASN: "", @@ -75,8 +76,8 @@ func (c *Controller) Init(nt *mk.Nettest) error { } nt.On("log", func(e mk.Event) { - level := e.Value["verbosity"].(string) - msg := e.Value["message"].(string) + level := e.Value.LogLevel + msg := e.Value.Message switch level { case "ERROR": @@ -100,23 +101,22 @@ func (c *Controller) Init(nt *mk.Nettest) error { nt.On("status.report_created", func(e mk.Event) { log.Debugf("%s", e.Key) - msmtTemplate.ReportID = e.Value["report_id"].(string) + msmtTemplate.ReportID = e.Value.ReportID }) nt.On("status.geoip_lookup", func(e mk.Event) { log.Debugf("%s", e.Key) - msmtTemplate.ASN = e.Value["probe_asn"].(string) - msmtTemplate.IP = e.Value["probe_ip"].(string) - msmtTemplate.CountryCode = e.Value["probe_cc"].(string) + msmtTemplate.ASN = e.Value.ProbeASN + msmtTemplate.IP = e.Value.ProbeIP + msmtTemplate.CountryCode = e.Value.ProbeCC }) nt.On("status.measurement_started", func(e mk.Event) { log.Debugf("%s", e.Key) - idx := e.Value["idx"].(int64) - input := e.Value["input"].(string) - msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, input) + idx := e.Value.Idx + msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, e.Value.Input) if err != nil { log.WithError(err).Error("Failed to create measurement") return @@ -125,9 +125,7 @@ func (c *Controller) Init(nt *mk.Nettest) error { }) nt.On("status.progress", func(e mk.Event) { - perc := e.Value["percentage"].(float64) - msg := e.Value["message"].(string) - c.OnProgress(perc, msg) + c.OnProgress(e.Value.Percentage, e.Value.Message) }) nt.On("status.update.*", func(e mk.Event) { @@ -137,36 +135,30 @@ func (c *Controller) Init(nt *mk.Nettest) error { nt.On("failure.measurement", func(e mk.Event) { log.Debugf("%s", e.Key) - idx := e.Value["idx"].(int64) - failure := e.Value["failure"].(string) - c.msmts[idx].Failed(c.Ctx.DB, failure) + c.msmts[e.Value.Idx].Failed(c.Ctx.DB, e.Value.Failure) }) nt.On("failure.measurement_submission", func(e mk.Event) { log.Debugf("%s", e.Key) - idx := e.Value["idx"].(int64) - failure := e.Value["failure"].(string) - c.msmts[idx].UploadFailed(c.Ctx.DB, failure) + failure := e.Value.Failure + c.msmts[e.Value.Idx].UploadFailed(c.Ctx.DB, failure) }) nt.On("status.measurement_uploaded", func(e mk.Event) { log.Debugf("%s", e.Key) - idx := e.Value["idx"].(int64) - c.msmts[idx].UploadSucceeded(c.Ctx.DB) + c.msmts[e.Value.Idx].UploadSucceeded(c.Ctx.DB) }) nt.On("status.measurement_done", func(e mk.Event) { log.Debugf("%s", e.Key) - idx := e.Value["idx"].(int64) - c.msmts[idx].Done(c.Ctx.DB) + c.msmts[e.Value.Idx].Done(c.Ctx.DB) }) nt.On("measurement", func(e mk.Event) { - idx := e.Value["idx"].(int64) - c.OnEntry(idx, e.Value["json_str"].(string)) + c.OnEntry(e.Value.Idx, e.Value.JSONStr) }) nt.On("end", func(e mk.Event) { @@ -188,7 +180,7 @@ type Entry struct { // OnEntry should be called every time there is a new entry func (c *Controller) OnEntry(idx int64, jsonStr string) { - log.Debugf("OnEntry: %s", jsonStr) + log.Debugf("OnEntry") var entry Entry json.Unmarshal([]byte(jsonStr), &entry) @@ -197,6 +189,7 @@ func (c *Controller) OnEntry(idx int64, jsonStr string) { if err != nil { log.WithError(err).Error("failed to serialize summary") } + log.Debugf("Fetching: %s %v", idx, c.msmts[idx]) c.msmts[idx].WriteSummary(c.Ctx.DB, string(summaryBytes)) } diff --git a/nettests/performance/dash.go b/nettests/performance/dash.go index d64efc8..b3e3a87 100644 --- a/nettests/performance/dash.go +++ b/nettests/performance/dash.go @@ -19,9 +19,9 @@ func (d Dash) Run(ctl *nettests.Controller) error { // DashSummary for the test // TODO: process 'receiver_data' to provide an array of performance for a chart. type DashSummary struct { - Latency float32 + Latency float64 Bitrate int64 - Delay float32 + Delay float64 } // Summary generates a summary for a test run @@ -29,9 +29,9 @@ func (d Dash) Summary(tk map[string]interface{}) interface{} { simple := tk["simple"].(map[string]interface{}) return DashSummary{ - Latency: simple["connect_latency"].(float32), - Bitrate: simple["median_bitrate"].(int64), - Delay: simple["min_playout_delay"].(float32), + Latency: simple["connect_latency"].(float64), + Bitrate: int64(simple["median_bitrate"].(float64)), + Delay: simple["min_playout_delay"].(float64), } } diff --git a/nettests/performance/ndt.go b/nettests/performance/ndt.go index c6425ed..082c245 100644 --- a/nettests/performance/ndt.go +++ b/nettests/performance/ndt.go @@ -26,7 +26,7 @@ type NDTSummary struct { MinRTT int64 MSS int64 OutOfOrder int64 - PacketLoss float32 + PacketLoss float64 Timeouts int64 } @@ -44,7 +44,7 @@ func (n NDT) Summary(tk map[string]interface{}) interface{} { MinRTT: advanced["min_rtt"].(int64), MSS: advanced["mss"].(int64), OutOfOrder: advanced["out_of_order"].(int64), - PacketLoss: advanced["packet_loss"].(float32), + PacketLoss: advanced["packet_loss"].(float64), Timeouts: advanced["timeouts"].(int64), } } From 74420f5f16712933f1d7bdfc7b7a22fd102bbef5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 19 Mar 2018 19:31:17 +0100 Subject: [PATCH 05/14] Go considers the default type of JSON float64 --- nettests/performance/ndt.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/nettests/performance/ndt.go b/nettests/performance/ndt.go index 082c245..0a53f77 100644 --- a/nettests/performance/ndt.go +++ b/nettests/performance/ndt.go @@ -36,16 +36,16 @@ func (n NDT) Summary(tk map[string]interface{}) interface{} { advanced := tk["advanced"].(map[string]interface{}) return NDTSummary{ - Upload: simple["upload"].(int64), - Download: simple["download"].(int64), - Ping: simple["ping"].(int64), - MaxRTT: advanced["max_rtt"].(int64), - AvgRTT: advanced["avg_rtt"].(int64), - MinRTT: advanced["min_rtt"].(int64), - MSS: advanced["mss"].(int64), - OutOfOrder: advanced["out_of_order"].(int64), + Upload: int64(simple["upload"].(float64)), + Download: int64(simple["download"].(float64)), + Ping: int64(simple["ping"].(float64)), + MaxRTT: int64(advanced["max_rtt"].(float64)), + AvgRTT: int64(advanced["avg_rtt"].(float64)), + MinRTT: int64(advanced["min_rtt"].(float64)), + MSS: int64(advanced["mss"].(float64)), + OutOfOrder: int64(advanced["out_of_order"].(float64)), PacketLoss: advanced["packet_loss"].(float64), - Timeouts: advanced["timeouts"].(int64), + Timeouts: int64(advanced["timeouts"].(float64)), } } From 541e2c0c1be0644819ce38d48f3cbc449511634b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Tue, 20 Mar 2018 12:38:21 +0100 Subject: [PATCH 06/14] Update schema --- data/migrations/1_create_msmt_results.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data/migrations/1_create_msmt_results.sql b/data/migrations/1_create_msmt_results.sql index 0110441..6e65b0b 100644 --- a/data/migrations/1_create_msmt_results.sql +++ b/data/migrations/1_create_msmt_results.sql @@ -13,7 +13,7 @@ CREATE TABLE `results` ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `name` VARCHAR(255), `start_time` DATETIME, - `end_time` DATETIME, + `runtime` REAL, `summary` JSON, `done` TINYINT(1), `data_usage_up` INTEGER, @@ -24,7 +24,7 @@ CREATE TABLE `measurements` ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `name` VARCHAR(255), `start_time` DATETIME, - `runtime` INTEGER, + `runtime` REAL, `summary` JSON, `ip` VARCHAR(255), `asn` VARCHAR(16), From 6a70fe7da1eaed4d22410779528b3df5fd524060 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Tue, 20 Mar 2018 12:38:28 +0100 Subject: [PATCH 07/14] Update bindata --- internal/bindata/bindata.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/internal/bindata/bindata.go b/internal/bindata/bindata.go index f8fba7e..45ee45f 100644 --- a/internal/bindata/bindata.go +++ b/internal/bindata/bindata.go @@ -130,20 +130,20 @@ func bindataDataDefaultconfigjson() (*asset, error) { } var _bindataDataMigrations1createmsmtresultssql = []byte( - "\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x93\xc1\x6e\xdb\x30\x0c\x86\xef\x7e\x0a\x1e\x13\x6c\x3d\x74\x40\x77" + - "\xe9\x49\xb5\xb9\xcd\x9b\x23\x07\xb2\x3c\x2c\x27\x5b\x98\xd4\x40\x98\x2d\x1b\xb2\x84\x60\x6f\x3f\xa8\xb5\xb3\xb8" + - "\x53\xb3\x5b\xaf\xff\x47\x52\x24\x7f\xea\xe6\x06\xde\xf5\xfa\x68\x85\x53\x90\x0d\x27\x93\x5c\x0a\x95\x13\x4e\xf5" + - "\xca\xb8\x07\x75\xd4\x26\x49\x32\x56\xee\x81\x93\x87\x02\xa1\xb5\x6a\xf2\x9d\x9b\xda\xfb\x95\xda\x2b\x31\x79\xfb" + - "\x94\x13\x50\xbc\x1a\x1a\xb9\x26\xf5\x78\xf5\xd9\x94\x21\xe1\xf8\xf2\x61\xd8\x24\x00\x00\xad\x96\x2d\xe4\x94\xe3" + - "\x67\x64\xb0\x67\xf9\x8e\xb0\x03\x7c\xc3\x03\x90\x9a\x97\x39\x4d\x19\xee\x90\xf2\xf7\xcf\xb1\x46\xf4\xaa\x85\xef" + - "\x84\xa5\x5f\x08\xdb\x7c\xb8\xbb\xdb\xce\x60\x72\xc2\xba\xc6\xe9\x80\x33\xc2\x91\xe7\x3b\x9c\x91\x32\x32\x0e\x26" + - "\xdf\xf7\xc2\xfe\x6e\xe1\x6b\x55\xd2\x59\x93\x83\x51\x2d\xf0\x9c\x1e\x72\xca\x37\xb7\x4b\x79\x29\x9c\x68\xfc\x24" + - "\x8e\xaa\xf1\xe3\xb9\xdd\x7f\xa1\x1c\x4e\xe6\x8c\x93\xed\xfd\xcb\xd9\x57\xeb\x7d\xab\x05\x58\x6f\x9e\xf5\x75\xdb" + - "\xb1\xf1\xf5\x18\x2d\x2e\x26\xf3\x57\xbf\xfd\xb8\xc8\x3f\x07\x6f\x5c\xa8\x70\x4e\x59\x88\x51\xee\x34\xd8\x5f\xcd" + - "\xb5\x76\x5d\x58\x34\xfe\x58\x26\x7b\x14\xba\xf3\x36\x1e\xed\xc7\x6e\x10\xb2\xf9\x7f\x88\x92\x11\xf3\xac\x1a\x07" + - "\xeb\x9a\x47\xdd\xc5\x53\x67\x1e\x6c\x88\x50\x6d\x46\xef\x5e\xc9\x0b\x87\xdc\x5c\xda\xc7\xf0\x13\x32\xa4\x29\x56" + - "\x97\x77\x1e\x1c\xde\x42\x49\x21\xc3\x02\x39\x42\x85\x1c\x68\x5d\x14\x41\xaa\xf7\xc1\x2d\x48\x49\x95\x92\x0c\x9f" + - "\x4e\xe6\xd5\x3f\xf7\x27\x00\x00\xff\xff\xb1\x3d\xbb\xc8\xec\x03\x00\x00") + "\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x93\x31\xef\xda\x30\x10\xc5\xf7\x7c\x8a\x1b\x41\x2d\x03\x95\xe8\xc2" + + "\x64\x92\x6b\x9b\x36\x38\xc8\x71\xaa\x32\x25\x56\x63\x90\xd5\xc4\x89\x1c\x5b\xa8\xdf\xbe\x32\x24\x14\x68\xa0\xeb" + + "\x7f\x7d\xbf\xbb\x67\x9f\xdf\x79\xb1\x80\x77\x8d\x3a\x1a\x61\x25\x44\xed\x49\x07\xb7\x42\x66\x85\x95\x8d\xd4\x76" + + "\x23\x8f\x4a\x07\x41\xc4\xd2\x1d\x70\xb2\x49\x10\x4a\x23\x7b\x57\xdb\xbe\x5c\xdf\xa9\x8d\x14\xbd\x33\xe7\x1e\x8f" + + "\xa6\xdd\x50\x57\xf7\x24\xef\x5e\x1e\x1b\x32\x24\x1c\x1f\x0f\x86\x59\x00\x00\x50\xaa\xaa\x84\x98\x72\xfc\x8c\x0c" + + "\x76\x2c\xde\x12\xb6\x87\x6f\xb8\x07\x92\xf3\x34\xa6\x21\xc3\x2d\x52\xfe\xfe\x52\xab\x45\x23\x4b\xf8\x4e\x58\xf8" + + "\x85\xb0\xd9\x87\xd5\x6a\x3e\x80\xde\x0a\x63\x0b\xab\x3c\x8e\x08\x47\x1e\x6f\x71\x40\xc6\xe9\x8b\xce\x90\x24\x63" + + "\xb9\x6b\x1a\x61\x7e\x97\xf0\x35\x4b\xe9\xa0\x55\xad\x96\x25\xf0\x98\xee\x63\xca\x67\xcb\xd1\xb9\x12\x56\x14\xae" + + "\x17\x47\x59\xb8\xee\x7a\xd3\x7f\x61\xd5\x9e\xf4\x15\x07\xf3\xf5\xe3\xd8\x77\x2f\xfb\xd6\x66\x57\xdd\xa4\xb3\xe8" + + "\xf5\x5f\x7d\xf9\x71\x94\x7f\xb6\x4e\x5b\xef\x70\x6d\x19\x89\x96\xf6\xd4\x9a\x5f\xc5\xab\xbb\x5a\xff\xca\xf8\x63" + + "\x1c\xeb\x20\x54\xed\xcc\x74\xb5\xeb\xea\x56\x54\xc5\xff\x4b\x64\x35\x91\x9c\x91\x5d\x6b\x6c\x71\x50\xf5\x74\xeb" + + "\xc0\x7d\x06\x13\x54\xe9\xce\xd9\x27\x7d\x7e\x81\x8b\xdb\xec\x18\x7e\x42\x86\x34\xc4\xec\x76\xbf\x7d\xbc\x73\x48" + + "\x29\x44\x98\x20\x47\xc8\x90\x03\xcd\x93\xc4\x4b\xf9\xce\x47\x05\x21\xc9\x42\x12\xe1\x79\x5f\x9e\xfe\xb5\x3f\x01" + + "\x00\x00\xff\xff\x29\xbd\x69\x4f\xe4\x03\x00\x00") func bindataDataMigrations1createmsmtresultssqlBytes() ([]byte, error) { return bindataRead( From e0bb4000e97869fb73eb0ab0af888e545672ca0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Tue, 20 Mar 2018 12:38:33 +0100 Subject: [PATCH 08/14] Implement most of the measurement & result DB workflow --- internal/cli/run/run.go | 10 ++--- internal/database/models.go | 90 ++++++++++++++++++++++++++++--------- nettests/groups/groups.go | 29 +++++++++--- nettests/nettests.go | 43 ++++++++++-------- 4 files changed, 120 insertions(+), 52 deletions(-) diff --git a/internal/cli/run/run.go b/internal/cli/run/run.go index 0621a8d..2de0c9d 100644 --- a/internal/cli/run/run.go +++ b/internal/cli/run/run.go @@ -48,14 +48,10 @@ func init() { log.WithError(err).Errorf("Failed to run %s", group.Label) return err } - // XXX - // 1. Generate the summary - // 2. Link the measurement to the Result (this should probably happen in - // the nettest class) - // 3. Update the summary of the result and the other metadata in the db - // 4. Move the msmtPath into the final location ~/.ooni/msmts/ } - // result.Update(ctx.DB) + if err = result.Finished(ctx.DB); err != nil { + return err + } return nil }) } diff --git a/internal/database/models.go b/internal/database/models.go index 4216dfc..01d44cf 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -1,10 +1,14 @@ package database import ( + "fmt" + "os" + "path/filepath" "time" "github.com/apex/log" "github.com/jmoiron/sqlx" + ooni "github.com/openobservatory/gooni" "github.com/pkg/errors" ) @@ -30,7 +34,7 @@ type Measurement struct { ID int64 `db:"id"` Name string `db:"name"` StartTime time.Time `db:"start_time"` - Runtime float64 `db:"runtime"` + Runtime float64 `db:"runtime"` // Fractional number of seconds Summary string `db:"summary"` // XXX this should be JSON ASN string `db:"asn"` IP string `db:"ip"` @@ -65,10 +69,12 @@ func (m *Measurement) Failed(db *sqlx.DB, failure string) error { // Done marks the measurement as completed func (m *Measurement) Done(db *sqlx.DB) error { + runtime := time.Now().UTC().Sub(m.StartTime) + m.Runtime = runtime.Seconds() m.State = "done" err := UpdateOne(db, `UPDATE measurements - SET state = :state + SET state = :state, runtime = :runtime WHERE id = :id`, m) if err != nil { return errors.Wrap(err, "updating measurement") @@ -116,6 +122,27 @@ func (m *Measurement) WriteSummary(db *sqlx.DB, summary string) error { return nil } +//AddToResult adds a measurement to a result +func (m *Measurement) AddToResult(db *sqlx.DB, result *Result) error { + m.ResultID = result.ID + finalPath := filepath.Join(result.MeasurementDir, + filepath.Base(m.ReportFilePath)) + + err := os.Rename(m.ReportFilePath, finalPath) + if err != nil { + return errors.Wrap(err, "moving report file") + } + m.ReportFilePath = finalPath + + err = UpdateOne(db, `UPDATE measurements + SET result_id = :result_id, report_file = :report_file + WHERE id = :id`, m) + if err != nil { + return errors.Wrap(err, "updating measurement") + } + return nil +} + // CreateMeasurement writes the measurement to the database a returns a pointer // to the Measurement func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, error) { @@ -149,22 +176,15 @@ func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, erro // Result model type Result struct { - ID int64 `db:"id"` - Name string `db:"name"` - StartTime time.Time `db:"start_time"` - Runtime float64 `db:"runtime"` // Runtime is expressed in Microseconds - Summary string `db:"summary"` // XXX this should be JSON - Done bool `db:"done"` - DataUsageUp int64 `db:"data_usage_up"` - DataUsageDown int64 `db:"data_usage_down"` - - started time.Time -} - -// Started marks the Result as having started -func (r *Result) Started(db *sqlx.DB) error { - r.started = time.Now() - return nil + ID int64 `db:"id"` + Name string `db:"name"` + StartTime time.Time `db:"start_time"` + Runtime float64 `db:"runtime"` // Runtime is expressed in fractional seconds + Summary string `db:"summary"` // XXX this should be JSON + Done bool `db:"done"` + DataUsageUp int64 `db:"data_usage_up"` + DataUsageDown int64 `db:"data_usage_down"` + MeasurementDir string `db:"measurement_dir"` } // Finished marks the result as done and sets the runtime @@ -172,22 +192,50 @@ func (r *Result) Finished(db *sqlx.DB) error { if r.Done == true || r.Runtime != 0 { return errors.New("Result is already finished") } - r.Runtime = float64(time.Now().Sub(r.started)) / float64(time.Microsecond) + r.Runtime = time.Now().UTC().Sub(r.StartTime).Seconds() r.Done = true + // XXX add in here functionality to compute the summary err := UpdateOne(db, `UPDATE results - SET done = true, runtime = :runtime + SET done = :done, runtime = :runtime WHERE id = :id`, r) if err != nil { - return errors.Wrap(err, "updating result") + return errors.Wrap(err, "updating finished result") } return nil } +// MakeResultsPath creates and returns a directory for the result +func MakeResultsPath(r *Result) (string, error) { + home, err := ooni.GetOONIHome() + if err != nil { + return "", errors.Wrap(err, "default measurements path") + } + p := filepath.Join(home, "msmts", + fmt.Sprintf("%s-%s", r.Name, r.StartTime.Format(time.RFC3339Nano))) + + // If the path already exists, this is a problem. It should not clash, because + // we are using nanosecond precision for the starttime. + if _, e := os.Stat(p); e == nil { + return "", errors.New("results path already exists") + } + err = os.MkdirAll(p, 0700) + if err != nil { + return "", err + } + return p, nil +} + // CreateResult writes the Result to the database a returns a pointer // to the Result func CreateResult(db *sqlx.DB, r Result) (*Result, error) { log.Debugf("Creating result %v", r) + + p, err := MakeResultsPath(&r) + if err != nil { + return nil, err + } + r.MeasurementDir = p res, err := db.NamedExec(`INSERT INTO results (name, start_time) VALUES (:name,:start_time)`, diff --git a/nettests/groups/groups.go b/nettests/groups/groups.go index a76f867..e10b6b2 100644 --- a/nettests/groups/groups.go +++ b/nettests/groups/groups.go @@ -6,27 +6,46 @@ import ( "github.com/openobservatory/gooni/nettests/websites" ) +// NettestGroup base structure +type NettestGroup struct { + Label string + Nettests []nettests.Nettest + Summary func(s string) string +} + // NettestGroups that can be run by the user -var NettestGroups = map[string]nettests.NettestGroup{ - "websites": nettests.NettestGroup{ +var NettestGroups = map[string]NettestGroup{ + "websites": NettestGroup{ Label: "Websites", Nettests: []nettests.Nettest{ websites.WebConnectivity{}, }, + Summary: func(s string) string { + return "{}" + }, }, - "performance": nettests.NettestGroup{ + "performance": NettestGroup{ Label: "Performance", Nettests: []nettests.Nettest{ performance.Dash{}, performance.NDT{}, }, + Summary: func(s string) string { + return "{}" + }, }, - "middleboxes": nettests.NettestGroup{ + "middleboxes": NettestGroup{ Label: "Middleboxes", Nettests: []nettests.Nettest{}, + Summary: func(s string) string { + return "{}" + }, }, - "im": nettests.NettestGroup{ + "im": NettestGroup{ Label: "Instant Messaging", Nettests: []nettests.Nettest{}, + Summary: func(s string) string { + return "{}" + }, }, } diff --git a/nettests/nettests.go b/nettests/nettests.go index b268682..ad189b2 100644 --- a/nettests/nettests.go +++ b/nettests/nettests.go @@ -7,6 +7,7 @@ import ( "github.com/measurement-kit/go-measurement-kit" ooni "github.com/openobservatory/gooni" "github.com/openobservatory/gooni/internal/cli/version" + "github.com/openobservatory/gooni/internal/colors" "github.com/openobservatory/gooni/internal/database" ) @@ -17,13 +18,6 @@ type Nettest interface { LogSummary(string) error } -// NettestGroup base structure -type NettestGroup struct { - Label string - Nettests []Nettest - Summary func(s string) string -} - // NewController creates a nettest controller func NewController(nt Nettest, ctx *ooni.Context, res *database.Result, msmtPath string) *Controller { return &Controller{ @@ -41,7 +35,7 @@ type Controller struct { res *database.Result nt Nettest msmts map[int64]*database.Measurement - msmtPath string + msmtPath string // XXX maybe we can drop this and just use a temporary file } // Init should be called once to initialise the nettest @@ -105,7 +99,7 @@ func (c *Controller) Init(nt *mk.Nettest) error { }) nt.On("status.geoip_lookup", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) msmtTemplate.ASN = e.Value.ProbeASN msmtTemplate.IP = e.Value.ProbeIP @@ -113,7 +107,7 @@ func (c *Controller) Init(nt *mk.Nettest) error { }) nt.On("status.measurement_started", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) idx := e.Value.Idx msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, e.Value.Input) @@ -125,44 +119,55 @@ func (c *Controller) Init(nt *mk.Nettest) error { }) nt.On("status.progress", func(e mk.Event) { + log.Debugf(colors.Red(e.Key)) c.OnProgress(e.Value.Percentage, e.Value.Message) }) nt.On("status.update.*", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) }) nt.On("failure.measurement", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) c.msmts[e.Value.Idx].Failed(c.Ctx.DB, e.Value.Failure) }) nt.On("failure.measurement_submission", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) failure := e.Value.Failure c.msmts[e.Value.Idx].UploadFailed(c.Ctx.DB, failure) }) nt.On("status.measurement_uploaded", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) - c.msmts[e.Value.Idx].UploadSucceeded(c.Ctx.DB) + if err := c.msmts[e.Value.Idx].UploadSucceeded(c.Ctx.DB); err != nil { + log.WithError(err).Error("failed to mark msmt as uploaded") + } }) nt.On("status.measurement_done", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) - c.msmts[e.Value.Idx].Done(c.Ctx.DB) + if err := c.msmts[e.Value.Idx].Done(c.Ctx.DB); err != nil { + log.WithError(err).Error("failed to mark msmt as done") + } }) nt.On("measurement", func(e mk.Event) { c.OnEntry(e.Value.Idx, e.Value.JSONStr) }) - nt.On("end", func(e mk.Event) { - log.Debugf("end") + nt.On("status.end", func(e mk.Event) { + log.Debugf("status.end") + for idx, msmt := range c.msmts { + log.Debugf("adding msmt#%d to result", idx) + if err := msmt.AddToResult(c.Ctx.DB, c.res); err != nil { + log.WithError(err).Error("failed to add to result") + } + } }) return nil From a747b76ecf748a2ea0696b765bd206c7baaadeaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Tue, 20 Mar 2018 12:43:34 +0100 Subject: [PATCH 09/14] Add some todo items to the readme --- Readme.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/Readme.md b/Readme.md index cda215a..e79c916 100644 --- a/Readme.md +++ b/Readme.md @@ -33,3 +33,27 @@ was built as ``` CGO_LDFLAGS="-L/path/to/measurement-kit/.libs/" CGO_CFLAGS="-I/path/to/measurement-kit/include" make build ``` + +## Todo + +* Add support for generating the Result Summary based on the measurements. I + would imagine this would lookup the summary of every measurement that we care + about in the DB and based on that generate the summary with a function that + lives inside of the groups definition. + +* Add support for outputing structured logging messages, while tests are + running, to be consumed by the desktop app + +* Add support for the missing tests, namely: +- HTTP Invalid Request Line +- HTTP Header Field Manipulation +- Facebook Messenger +- Telegram +- WhatsApp +- WebConnectivity + +* Fix issue with the informed consent being bypassed on first run + +* Finish the config file implementation + +* Add support for listing results in the CLI From ce0e077175fb7d55f680297321d59d4ff33cd51f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Tue, 20 Mar 2018 14:19:19 +0100 Subject: [PATCH 10/14] Add hooks for generating result summaries --- internal/cli/run/run.go | 4 +-- internal/database/models.go | 38 +++++++++++++++++++++++--- nettests/groups/groups.go | 54 ++++++++++++++++++++++++++++++------- nettests/performance/ndt.go | 12 ++++----- 4 files changed, 88 insertions(+), 20 deletions(-) diff --git a/internal/cli/run/run.go b/internal/cli/run/run.go index 2de0c9d..10f71e3 100644 --- a/internal/cli/run/run.go +++ b/internal/cli/run/run.go @@ -44,12 +44,12 @@ func init() { time.Now().UTC().Format(time.RFC3339Nano))) ctl := nettests.NewController(nt, ctx, result, msmtPath) - if err := nt.Run(ctl); err != nil { + if err = nt.Run(ctl); err != nil { log.WithError(err).Errorf("Failed to run %s", group.Label) return err } } - if err = result.Finished(ctx.DB); err != nil { + if err = result.Finished(ctx.DB, group.Summary); err != nil { return err } return nil diff --git a/internal/database/models.go b/internal/database/models.go index 01d44cf..258b4b1 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -12,6 +12,12 @@ import ( "github.com/pkg/errors" ) +// ResultSummaryFunc is the function used to generate result summaries +type ResultSummaryFunc func(SummaryMap) (string, error) + +// SummaryMap contains a mapping from test name to serialized summary for it +type SummaryMap map[string]string + // UpdateOne will run the specified update query and check that it only affected one row func UpdateOne(db *sqlx.DB, query string, arg interface{}) error { res, err := db.NamedExec(query, arg) @@ -187,17 +193,43 @@ type Result struct { MeasurementDir string `db:"measurement_dir"` } +// MakeSummaryMap return a mapping of test names to summaries for the given +// result +func MakeSummaryMap(db *sqlx.DB, r *Result) (SummaryMap, error) { + summaryMap := SummaryMap{} + + msmts := []Measurement{} + // XXX maybe we only want to select some of the columns + err := db.Select(&msmts, "SELECT name, summary FROM measurements WHERE result_id = $1", r.ID) + if err != nil { + return nil, errors.Wrap(err, "failed to get measurements") + } + for _, msmt := range msmts { + summaryMap[msmt.Name] = msmt.Summary + } + return summaryMap, nil +} + // Finished marks the result as done and sets the runtime -func (r *Result) Finished(db *sqlx.DB) error { +func (r *Result) Finished(db *sqlx.DB, makeSummary ResultSummaryFunc) error { if r.Done == true || r.Runtime != 0 { return errors.New("Result is already finished") } r.Runtime = time.Now().UTC().Sub(r.StartTime).Seconds() r.Done = true // XXX add in here functionality to compute the summary + summaryMap, err := MakeSummaryMap(db, r) + if err != nil { + return err + } - err := UpdateOne(db, `UPDATE results - SET done = :done, runtime = :runtime + r.Summary, err = makeSummary(summaryMap) + if err != nil { + return err + } + + err = UpdateOne(db, `UPDATE results + SET done = :done, runtime = :runtime, summary = :summary WHERE id = :id`, r) if err != nil { return errors.Wrap(err, "updating finished result") diff --git a/nettests/groups/groups.go b/nettests/groups/groups.go index e10b6b2..5cc5861 100644 --- a/nettests/groups/groups.go +++ b/nettests/groups/groups.go @@ -1,6 +1,10 @@ package groups import ( + "encoding/json" + + "github.com/apex/log" + "github.com/openobservatory/gooni/internal/database" "github.com/openobservatory/gooni/nettests" "github.com/openobservatory/gooni/nettests/performance" "github.com/openobservatory/gooni/nettests/websites" @@ -10,7 +14,15 @@ import ( type NettestGroup struct { Label string Nettests []nettests.Nettest - Summary func(s string) string + Summary database.ResultSummaryFunc +} + +// PerformanceSummary is the result summary for a performance test +type PerformanceSummary struct { + Upload int64 + Download int64 + Ping float64 + Bitrate int64 } // NettestGroups that can be run by the user @@ -20,8 +32,8 @@ var NettestGroups = map[string]NettestGroup{ Nettests: []nettests.Nettest{ websites.WebConnectivity{}, }, - Summary: func(s string) string { - return "{}" + Summary: func(m database.SummaryMap) (string, error) { + return "{}", nil }, }, "performance": NettestGroup{ @@ -30,22 +42,46 @@ var NettestGroups = map[string]NettestGroup{ performance.Dash{}, performance.NDT{}, }, - Summary: func(s string) string { - return "{}" + Summary: func(m database.SummaryMap) (string, error) { + var ( + err error + ndtSummary performance.NDTSummary + dashSummary performance.DashSummary + summary PerformanceSummary + ) + err = json.Unmarshal([]byte(m["Dash"]), &dashSummary) + if err != nil { + log.WithError(err).Error("failed to unmarshal Dash summary") + return "", err + } + err = json.Unmarshal([]byte(m["Ndt"]), &ndtSummary) + if err != nil { + log.WithError(err).Error("failed to unmarshal Dash summary") + return "", err + } + summary.Bitrate = dashSummary.Bitrate + summary.Download = ndtSummary.Download + summary.Upload = ndtSummary.Upload + summary.Ping = ndtSummary.AvgRTT + summaryBytes, err := json.Marshal(summary) + if err != nil { + return "", err + } + return string(summaryBytes), nil }, }, "middleboxes": NettestGroup{ Label: "Middleboxes", Nettests: []nettests.Nettest{}, - Summary: func(s string) string { - return "{}" + Summary: func(m database.SummaryMap) (string, error) { + return "{}", nil }, }, "im": NettestGroup{ Label: "Instant Messaging", Nettests: []nettests.Nettest{}, - Summary: func(s string) string { - return "{}" + Summary: func(m database.SummaryMap) (string, error) { + return "{}", nil }, }, } diff --git a/nettests/performance/ndt.go b/nettests/performance/ndt.go index 0a53f77..4c8159d 100644 --- a/nettests/performance/ndt.go +++ b/nettests/performance/ndt.go @@ -21,9 +21,9 @@ type NDTSummary struct { Upload int64 Download int64 Ping int64 - MaxRTT int64 - AvgRTT int64 - MinRTT int64 + MaxRTT float64 + AvgRTT float64 + MinRTT float64 MSS int64 OutOfOrder int64 PacketLoss float64 @@ -39,9 +39,9 @@ func (n NDT) Summary(tk map[string]interface{}) interface{} { Upload: int64(simple["upload"].(float64)), Download: int64(simple["download"].(float64)), Ping: int64(simple["ping"].(float64)), - MaxRTT: int64(advanced["max_rtt"].(float64)), - AvgRTT: int64(advanced["avg_rtt"].(float64)), - MinRTT: int64(advanced["min_rtt"].(float64)), + MaxRTT: advanced["max_rtt"].(float64), + AvgRTT: advanced["avg_rtt"].(float64), + MinRTT: advanced["min_rtt"].(float64), MSS: int64(advanced["mss"].(float64)), OutOfOrder: int64(advanced["out_of_order"].(float64)), PacketLoss: advanced["packet_loss"].(float64), From 268b4ce5bb3fd8d3282acf86fa872db5a012a3b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Tue, 20 Mar 2018 14:19:45 +0100 Subject: [PATCH 11/14] Remove item from todo --- Readme.md | 5 ----- 1 file changed, 5 deletions(-) diff --git a/Readme.md b/Readme.md index e79c916..618a6b9 100644 --- a/Readme.md +++ b/Readme.md @@ -36,11 +36,6 @@ CGO_LDFLAGS="-L/path/to/measurement-kit/.libs/" CGO_CFLAGS="-I/path/to/measureme ## Todo -* Add support for generating the Result Summary based on the measurements. I - would imagine this would lookup the summary of every measurement that we care - about in the DB and based on that generate the summary with a function that - lives inside of the groups definition. - * Add support for outputing structured logging messages, while tests are running, to be consumed by the desktop app From bd6c5b20d09b09f54ab1aa0a286a285b7ce45da1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Tue, 20 Mar 2018 14:31:05 +0100 Subject: [PATCH 12/14] Minor style fixes --- internal/database/models.go | 2 +- nettests/groups/groups.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/database/models.go b/internal/database/models.go index 258b4b1..d575df6 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -128,7 +128,7 @@ func (m *Measurement) WriteSummary(db *sqlx.DB, summary string) error { return nil } -//AddToResult adds a measurement to a result +// AddToResult adds a measurement to a result func (m *Measurement) AddToResult(db *sqlx.DB, result *Result) error { m.ResultID = result.ID finalPath := filepath.Join(result.MeasurementDir, diff --git a/nettests/groups/groups.go b/nettests/groups/groups.go index 5cc5861..091c7b3 100644 --- a/nettests/groups/groups.go +++ b/nettests/groups/groups.go @@ -56,7 +56,7 @@ var NettestGroups = map[string]NettestGroup{ } err = json.Unmarshal([]byte(m["Ndt"]), &ndtSummary) if err != nil { - log.WithError(err).Error("failed to unmarshal Dash summary") + log.WithError(err).Error("failed to unmarshal NDT summary") return "", err } summary.Bitrate = dashSummary.Bitrate From 58e452ea4e9c0932c43ae4eb2e9289ee2351c796 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Wed, 21 Mar 2018 12:18:45 +0100 Subject: [PATCH 13/14] Add support for more tests - Implement middlebox tests & summary generator - Implement IM tests (summary generator missing) --- nettests/groups/groups.go | 50 +++++++++++++++++-- nettests/im/facebook_messenger.go | 36 +++++++++++++ nettests/im/telegram.go | 38 ++++++++++++++ nettests/im/whatsapp.go | 40 +++++++++++++++ .../http_header_field_manipulation.go | 43 ++++++++++++++++ .../middlebox/http_invalid_request_line.go | 36 +++++++++++++ 6 files changed, 238 insertions(+), 5 deletions(-) create mode 100644 nettests/im/facebook_messenger.go create mode 100644 nettests/im/telegram.go create mode 100644 nettests/im/whatsapp.go create mode 100644 nettests/middlebox/http_header_field_manipulation.go create mode 100644 nettests/middlebox/http_invalid_request_line.go diff --git a/nettests/groups/groups.go b/nettests/groups/groups.go index 091c7b3..9a1df41 100644 --- a/nettests/groups/groups.go +++ b/nettests/groups/groups.go @@ -6,6 +6,8 @@ import ( "github.com/apex/log" "github.com/openobservatory/gooni/internal/database" "github.com/openobservatory/gooni/nettests" + "github.com/openobservatory/gooni/nettests/im" + "github.com/openobservatory/gooni/nettests/middlebox" "github.com/openobservatory/gooni/nettests/performance" "github.com/openobservatory/gooni/nettests/websites" ) @@ -25,6 +27,16 @@ type PerformanceSummary struct { Bitrate int64 } +// MiddleboxSummary is the summary for the middlebox tests +type MiddleboxSummary struct { + Detected bool +} + +// IMSummary is the summary for the im tests +type IMSummary struct { + Detected bool +} + // NettestGroups that can be run by the user var NettestGroups = map[string]NettestGroup{ "websites": NettestGroup{ @@ -71,15 +83,43 @@ var NettestGroups = map[string]NettestGroup{ }, }, "middleboxes": NettestGroup{ - Label: "Middleboxes", - Nettests: []nettests.Nettest{}, + Label: "Middleboxes", + Nettests: []nettests.Nettest{ + middlebox.HTTPInvalidRequestLine{}, + middlebox.HTTPHeaderFieldManipulation{}, + }, Summary: func(m database.SummaryMap) (string, error) { - return "{}", nil + var ( + err error + hhfmSummary middlebox.HTTPHeaderFieldManipulationSummary + hirlSummary middlebox.HTTPInvalidRequestLineSummary + summary MiddleboxSummary + ) + err = json.Unmarshal([]byte(m["HttpHeaderFieldManipulation"]), &hhfmSummary) + if err != nil { + log.WithError(err).Error("failed to unmarshal hhfm summary") + return "", err + } + err = json.Unmarshal([]byte(m["HttpInvalidRequestLine"]), &hirlSummary) + if err != nil { + log.WithError(err).Error("failed to unmarshal hirl summary") + return "", err + } + summary.Detected = hirlSummary.Tampering == true || hhfmSummary.Tampering == true + summaryBytes, err := json.Marshal(summary) + if err != nil { + return "", err + } + return string(summaryBytes), nil }, }, "im": NettestGroup{ - Label: "Instant Messaging", - Nettests: []nettests.Nettest{}, + Label: "Instant Messaging", + Nettests: []nettests.Nettest{ + im.FacebookMessenger{}, + im.Telegram{}, + im.WhatsApp{}, + }, Summary: func(m database.SummaryMap) (string, error) { return "{}", nil }, diff --git a/nettests/im/facebook_messenger.go b/nettests/im/facebook_messenger.go new file mode 100644 index 0000000..bb7fcac --- /dev/null +++ b/nettests/im/facebook_messenger.go @@ -0,0 +1,36 @@ +package im + +import ( + "github.com/measurement-kit/go-measurement-kit" + "github.com/openobservatory/gooni/nettests" +) + +// FacebookMessenger test implementation +type FacebookMessenger struct { +} + +// Run starts the test +func (h FacebookMessenger) Run(ctl *nettests.Controller) error { + mknt := mk.NewNettest("FacebookMessenger") + ctl.Init(mknt) + return mknt.Run() +} + +// FacebookMessengerSummary for the test +type FacebookMessengerSummary struct { + DNSBlocking bool + TCPBlocking bool +} + +// Summary generates a summary for a test run +func (h FacebookMessenger) Summary(tk map[string]interface{}) interface{} { + return FacebookMessengerSummary{ + DNSBlocking: tk["facebook_dns_blocking"].(bool), + TCPBlocking: tk["facebook_tcp_blocking"].(bool), + } +} + +// LogSummary writes the summary to the standard output +func (h FacebookMessenger) LogSummary(s string) error { + return nil +} diff --git a/nettests/im/telegram.go b/nettests/im/telegram.go new file mode 100644 index 0000000..503d55c --- /dev/null +++ b/nettests/im/telegram.go @@ -0,0 +1,38 @@ +package im + +import ( + "github.com/measurement-kit/go-measurement-kit" + "github.com/openobservatory/gooni/nettests" +) + +// Telegram test implementation +type Telegram struct { +} + +// Run starts the test +func (h Telegram) Run(ctl *nettests.Controller) error { + mknt := mk.NewNettest("Telegram") + ctl.Init(mknt) + return mknt.Run() +} + +// TelegramSummary for the test +type TelegramSummary struct { + HTTPBlocking bool + TCPBlocking bool + WebBlocking bool +} + +// Summary generates a summary for a test run +func (h Telegram) Summary(tk map[string]interface{}) interface{} { + return TelegramSummary{ + TCPBlocking: tk["telegram_tcp_blocking"].(bool) == true, + HTTPBlocking: tk["telegram_http_blocking"].(bool) == true, + WebBlocking: tk["telegram_web_status"].(string) == "blocked", + } +} + +// LogSummary writes the summary to the standard output +func (h Telegram) LogSummary(s string) error { + return nil +} diff --git a/nettests/im/whatsapp.go b/nettests/im/whatsapp.go new file mode 100644 index 0000000..5dc26ba --- /dev/null +++ b/nettests/im/whatsapp.go @@ -0,0 +1,40 @@ +package im + +import ( + "github.com/measurement-kit/go-measurement-kit" + "github.com/openobservatory/gooni/nettests" +) + +// WhatsApp test implementation +type WhatsApp struct { +} + +// Run starts the test +func (h WhatsApp) Run(ctl *nettests.Controller) error { + mknt := mk.NewNettest("Whatsapp") + ctl.Init(mknt) + return mknt.Run() +} + +// WhatsAppSummary for the test +type WhatsAppSummary struct { + RegistrationServerBlocking bool + WebBlocking bool + EndpointsBlocking bool +} + +// Summary generates a summary for a test run +func (h WhatsApp) Summary(tk map[string]interface{}) interface{} { + const blk = "blocked" + + return WhatsAppSummary{ + RegistrationServerBlocking: tk["registration_server_status"].(string) == blk, + WebBlocking: tk["whatsapp_web_status"].(string) == blk, + EndpointsBlocking: tk["whatsapp_endpoints_status"].(string) == blk, + } +} + +// LogSummary writes the summary to the standard output +func (h WhatsApp) LogSummary(s string) error { + return nil +} diff --git a/nettests/middlebox/http_header_field_manipulation.go b/nettests/middlebox/http_header_field_manipulation.go new file mode 100644 index 0000000..c416985 --- /dev/null +++ b/nettests/middlebox/http_header_field_manipulation.go @@ -0,0 +1,43 @@ +package middlebox + +import ( + "github.com/measurement-kit/go-measurement-kit" + "github.com/openobservatory/gooni/nettests" +) + +// HTTPHeaderFieldManipulation test implementation +type HTTPHeaderFieldManipulation struct { +} + +// Run starts the test +func (h HTTPHeaderFieldManipulation) Run(ctl *nettests.Controller) error { + mknt := mk.NewNettest("HttpHeaderFieldManipulation") + ctl.Init(mknt) + return mknt.Run() +} + +// HTTPHeaderFieldManipulationSummary for the test +type HTTPHeaderFieldManipulationSummary struct { + Tampering bool +} + +// Summary generates a summary for a test run +func (h HTTPHeaderFieldManipulation) Summary(tk map[string]interface{}) interface{} { + tampering := false + for _, v := range tk["tampering"].(map[string]interface{}) { + t, ok := v.(bool) + // Ignore non booleans in the tampering map + if ok && t == true { + tampering = true + } + } + + return HTTPHeaderFieldManipulationSummary{ + Tampering: tampering, + } +} + +// LogSummary writes the summary to the standard output +func (h HTTPHeaderFieldManipulation) LogSummary(s string) error { + return nil +} diff --git a/nettests/middlebox/http_invalid_request_line.go b/nettests/middlebox/http_invalid_request_line.go new file mode 100644 index 0000000..8300fa1 --- /dev/null +++ b/nettests/middlebox/http_invalid_request_line.go @@ -0,0 +1,36 @@ +package middlebox + +import ( + "github.com/measurement-kit/go-measurement-kit" + "github.com/openobservatory/gooni/nettests" +) + +// HTTPInvalidRequestLine test implementation +type HTTPInvalidRequestLine struct { +} + +// Run starts the test +func (h HTTPInvalidRequestLine) Run(ctl *nettests.Controller) error { + mknt := mk.NewNettest("HttpInvalidRequestLine") + ctl.Init(mknt) + return mknt.Run() +} + +// HTTPInvalidRequestLineSummary for the test +type HTTPInvalidRequestLineSummary struct { + Tampering bool +} + +// Summary generates a summary for a test run +func (h HTTPInvalidRequestLine) Summary(tk map[string]interface{}) interface{} { + tampering := tk["tampering"].(bool) + + return HTTPInvalidRequestLineSummary{ + Tampering: tampering, + } +} + +// LogSummary writes the summary to the standard output +func (h HTTPInvalidRequestLine) LogSummary(s string) error { + return nil +} From e852713ed7122e7ab9106a437d44d738031c1337 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Thu, 22 Mar 2018 15:22:29 +0100 Subject: [PATCH 14/14] Add functions for structured progress logging --- internal/cli/run/run.go | 7 ++++++- internal/log/handlers/cli/cli.go | 33 +++++++++++++++++++++++++++----- internal/output/output.go | 14 ++++++++++++++ nettests/groups/groups.go | 2 +- nettests/nettests.go | 5 +++++ 5 files changed, 54 insertions(+), 7 deletions(-) create mode 100644 internal/output/output.go diff --git a/internal/cli/run/run.go b/internal/cli/run/run.go index 10f71e3..c5368ba 100644 --- a/internal/cli/run/run.go +++ b/internal/cli/run/run.go @@ -1,6 +1,7 @@ package run import ( + "errors" "fmt" "path/filepath" "time" @@ -25,7 +26,11 @@ func init() { log.Errorf("%s", err) return err } - group := groups.NettestGroups[*nettestGroup] + group, ok := groups.NettestGroups[*nettestGroup] + if !ok { + log.Errorf("No test group named %s", *nettestGroup) + return errors.New("invalid test group name") + } log.Debugf("Running test group %s", group.Label) result, err := database.CreateResult(ctx.DB, database.Result{ diff --git a/internal/log/handlers/cli/cli.go b/internal/log/handlers/cli/cli.go index 10d7fba..7e137ba 100644 --- a/internal/log/handlers/cli/cli.go +++ b/internal/log/handlers/cli/cli.go @@ -60,15 +60,25 @@ func New(w io.Writer) *Handler { } } -// HandleLog implements log.Handler. -func (h *Handler) HandleLog(e *log.Entry) error { +// TypedLog is used for handling special "typed" logs to the CLI +func (h *Handler) TypedLog(t string, e *log.Entry) error { + switch t { + case "progress": + // XXX replace this with something more fancy like https://github.com/tj/go-progress + fmt.Fprintf(h.Writer, "%.1f%% [%s]: %s", e.Fields.Get("percentage").(float64)*100, e.Fields.Get("key"), e.Message) + fmt.Fprintln(h.Writer) + return nil + default: + return h.DefaultLog(e) + } +} + +// DefaultLog is the default way of printing out logs +func (h *Handler) DefaultLog(e *log.Entry) error { color := Colors[e.Level] level := Strings[e.Level] names := e.Fields.Names() - h.mu.Lock() - defer h.mu.Unlock() - color.Fprintf(h.Writer, "%s %-25s", bold.Sprintf("%*s", h.Padding+1, level), e.Message) for _, name := range names { @@ -82,3 +92,16 @@ func (h *Handler) HandleLog(e *log.Entry) error { return nil } + +// HandleLog implements log.Handler. +func (h *Handler) HandleLog(e *log.Entry) error { + h.mu.Lock() + defer h.mu.Unlock() + + t, isTyped := e.Fields["type"].(string) + if isTyped { + return h.TypedLog(t, e) + } + + return h.DefaultLog(e) +} diff --git a/internal/output/output.go b/internal/output/output.go new file mode 100644 index 0000000..bf76672 --- /dev/null +++ b/internal/output/output.go @@ -0,0 +1,14 @@ +package output + +import ( + "github.com/apex/log" +) + +// Progress logs a progress type event +func Progress(key string, perc float64, msg string) { + log.WithFields(log.Fields{ + "type": "progress", + "key": key, + "percentage": perc, + }).Info(msg) +} diff --git a/nettests/groups/groups.go b/nettests/groups/groups.go index 9a1df41..2eb0440 100644 --- a/nettests/groups/groups.go +++ b/nettests/groups/groups.go @@ -82,7 +82,7 @@ var NettestGroups = map[string]NettestGroup{ return string(summaryBytes), nil }, }, - "middleboxes": NettestGroup{ + "middlebox": NettestGroup{ Label: "Middleboxes", Nettests: []nettests.Nettest{ middlebox.HTTPInvalidRequestLine{}, diff --git a/nettests/nettests.go b/nettests/nettests.go index ad189b2..bc1cae1 100644 --- a/nettests/nettests.go +++ b/nettests/nettests.go @@ -2,6 +2,7 @@ package nettests import ( "encoding/json" + "fmt" "github.com/apex/log" "github.com/measurement-kit/go-measurement-kit" @@ -9,6 +10,7 @@ import ( "github.com/openobservatory/gooni/internal/cli/version" "github.com/openobservatory/gooni/internal/colors" "github.com/openobservatory/gooni/internal/database" + "github.com/openobservatory/gooni/internal/output" ) // Nettest interface. Every Nettest should implement this. @@ -176,6 +178,9 @@ func (c *Controller) Init(nt *mk.Nettest) error { // OnProgress should be called when a new progress event is available. func (c *Controller) OnProgress(perc float64, msg string) { log.Debugf("OnProgress: %f - %s", perc, msg) + + key := fmt.Sprintf("%T", c.nt) + output.Progress(key, perc, msg) } // Entry is an opaque measurement entry