diff --git a/Gopkg.lock b/Gopkg.lock index 5e758c1..ebb8fda 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -24,11 +24,7 @@ [[projects]] name = "github.com/apex/log" - packages = [ - ".", - "handlers/cli", - "handlers/json" - ] + packages = ["."] revision = "0296d6eb16bb28f8a0c55668affcf4876dc269be" version = "v1.0.0" @@ -93,7 +89,7 @@ branch = "master" name = "github.com/measurement-kit/go-measurement-kit" packages = ["."] - revision = "bc9d9a377259df26dd4d86c9dcc0953c92dde23b" + revision = "408b54290aef34398a50353e7a66618574d0f075" [[projects]] branch = "master" @@ -181,6 +177,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "6b04c53f65567785f5fc4ea3563d11c7744c450d1a0c45c0b8047ad10bf766ea" + inputs-digest = "03c2415c162eb74c1efe403a53e129a83d3612e0b21ac4ec8ea88dd27ea3f145" solver-name = "gps-cdcl" solver-version = 1 diff --git a/internal/cli/run/run.go b/internal/cli/run/run.go index 5013c0b..77252bd 100644 --- a/internal/cli/run/run.go +++ b/internal/cli/run/run.go @@ -28,7 +28,7 @@ func init() { result, err := database.CreateResult(ctx.DB, database.Result{ Name: *nettestGroup, - StartTime: time.Now().UTC(), // XXX get this from MK + StartTime: time.Now().UTC(), }) if err != nil { log.Errorf("DB result error: %s", err) @@ -37,14 +37,17 @@ func init() { for _, nt := range group.Nettests { ctl := nettests.NewController(ctx, result) - nt.Run(ctl) + if err := nt.Run(ctl); err != nil { + log.WithError(err).Errorf("Failed to run %s", group.Label) + return err + } // XXX // 1. Generate the summary // 2. Link the measurement to the Result (this should probably happen in // the nettest class) // 3. Update the summary of the result and the other metadata in the db } - result.Update(ctx.DB) + // result.Update(ctx.DB) return nil }) } diff --git a/internal/database/models.go b/internal/database/models.go index 28888cc..c32bcca 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -27,6 +27,11 @@ type Measurement struct { ResultID string `db:"result_id"` } +// SetGeoIPInfo for the Measurement +func (m *Measurement) SetGeoIPInfo() error { + return nil +} + // CreateMeasurement writes the measurement to the database a returns a pointer // to the Measurement func CreateMeasurement(db *sqlx.DB, m Measurement) (*Measurement, error) { @@ -53,35 +58,47 @@ func CreateMeasurement(db *sqlx.DB, m Measurement) (*Measurement, error) { return &m, nil } -// Update the measurement in the database -func (r Measurement) Update(db *sqlx.DB) error { - // XXX implement me - return nil -} - // Result model type Result struct { ID int64 `db:"id"` Name string `db:"name"` StartTime time.Time `db:"start_time"` - EndTime time.Time `db:"end_time"` + Runtime float64 `db:"runtime"` // Runtime is expressed in Microseconds Summary string `db:"summary"` // XXX this should be JSON Done bool `db:"done"` DataUsageUp int64 `db:"data_usage_up"` DataUsageDown int64 `db:"data_usage_down"` + + started time.Time } -// Update the Result in the database -func (r Result) Update(db *sqlx.DB) error { - log.Debugf("Updating result %v", r) - _, err := db.NamedExec(`UPDATE results SET - (name, start_time, end_time, summary, done, data_usage_up, data_usage_down) = - (:name, :start_time, :end_time, :summary, :done, :data_usage_up, :data_usage_down) - WHERE id = :id`, r) +// Started marks the Result as having started +func (r *Result) Started(db *sqlx.DB) error { + r.started = time.Now() + return nil +} +// Finished marks the result as done and sets the runtime +func (r *Result) Finished(db *sqlx.DB) error { + if r.Done == true || r.Runtime != 0 { + return errors.New("Result is already finished") + } + r.Runtime = float64(time.Now().Sub(r.started)) / float64(time.Microsecond) + r.Done = true + + res, err := db.NamedExec(`UPDATE results + SET done = true, runtime = :runtime + WHERE id = :id`, r) if err != nil { return errors.Wrap(err, "updating result") } + count, err := res.RowsAffected() + if err != nil { + return errors.Wrap(err, "updating result") + } + if count != 1 { + return errors.New("inconsistent update count") + } return nil } @@ -98,7 +115,7 @@ func CreateResult(db *sqlx.DB, r Result) (*Result, error) { } id, err := res.LastInsertId() if err != nil { - return nil, errors.Wrap(err, "creating measurement") + return nil, errors.Wrap(err, "creating result") } r.ID = id return &r, nil diff --git a/nettests/nettests.go b/nettests/nettests.go index 2793728..3e97249 100644 --- a/nettests/nettests.go +++ b/nettests/nettests.go @@ -49,39 +49,73 @@ func (c *Controller) Init(nt *mk.Nettest) { // XXX GeoIPCountryPath: "", - GeoASNPath: "", - OutputPath: "", - CaBundlePath: "", + GeoIPASNPath: "", + OutputPath: "/tmp/measurement.jsonl", + CaBundlePath: "/etc/ssl/cert.pem", } - nt.RegisterEventHandler(func(event interface{}) { - e := event.(map[string]interface{}) - if e["type"].(string) == "LOG" { - msg := e["message"].(string) - switch level := e["verbosity"].(string); level { - case "ERROR": - log.Error(msg) - case "INFO": - log.Info(msg) - default: - log.Debug(msg) - } - } else { - log.WithFields(log.Fields{ - "key": "event", - "value": e, - }).Info("got event") + nt.On("log", func(e mk.Event) { + level := e.Value["verbosity"].(string) + msg := e.Value["message"].(string) + + switch level { + case "ERROR": + log.Error(msg) + case "INFO": + log.Info(msg) + default: + log.Debug(msg) } + }) + + nt.On("status.queued", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("status.started", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("status.report_created", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("status.geoip_lookup", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("status.progress", func(e mk.Event) { + perc := e.Value["percentage"].(float64) + msg := e.Value["message"].(string) + c.OnProgress(perc, msg) + }) + + nt.On("status.update.*", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("failure.measurement", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("failure.report_submission", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("measurement", func(e mk.Event) { + c.OnEntry(e.Value["json_str"].(string)) + }) + } // OnProgress should be called when a new progress event is available. -func (c *Controller) OnProgress(perc float32, msg string) { +func (c *Controller) OnProgress(perc float64, msg string) { log.Debugf("OnProgress: %f - %s", perc, msg) } // OnEntry should be called every time there is a new entry -func (c *Controller) OnEntry(entry string) { - log.Debugf("OnEntry: %s", entry) +func (c *Controller) OnEntry(jsonStr string) { + log.Debugf("OnEntry: %s", jsonStr) } // MKStart is the interface for the mk.Nettest Start() function