diff --git a/internal/cli/run/run.go b/internal/cli/run/run.go index 1fe3b1f..7a6e439 100644 --- a/internal/cli/run/run.go +++ b/internal/cli/run/run.go @@ -28,7 +28,7 @@ func init() { result, err := database.CreateResult(ctx.DB, database.Result{ Name: *nettestGroup, - StartTime: time.Now().UTC(), // XXX get this from MK + StartTime: time.Now().UTC(), }) if err != nil { log.Errorf("%s", err) @@ -37,14 +37,17 @@ func init() { for _, nt := range group.Nettests { ctl := nettests.NewController(ctx, result) - nt.Run(ctl) + if err := nt.Run(ctl); err != nil { + log.WithError(err).Errorf("Failed to run %s", group.Label) + return err + } // XXX // 1. Generate the summary // 2. Link the measurement to the Result (this should probably happen in // the nettest class) // 3. Update the summary of the result and the other metadata in the db } - result.Update(ctx.DB) + // result.Update(ctx.DB) return nil }) } diff --git a/internal/database/models.go b/internal/database/models.go index c0a2787..6b197af 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -27,6 +27,11 @@ type Measurement struct { ResultID string `db:"result_id"` } +// SetGeoIPInfo for the Measurement +func (m *Measurement) SetGeoIPInfo() error { + return nil +} + // CreateMeasurement writes the measurement to the database a returns a pointer // to the Measurement func CreateMeasurement(db *sqlx.DB, m Measurement) (*Measurement, error) { @@ -53,27 +58,47 @@ func CreateMeasurement(db *sqlx.DB, m Measurement) (*Measurement, error) { return &m, nil } -// Update the measurement in the database -func (r Measurement) Update(db *sqlx.DB) error { - // XXX implement me - return nil -} - // Result model type Result struct { ID int64 `db:"id"` Name string `db:"name"` StartTime time.Time `db:"start_time"` - EndTime time.Time `db:"end_time"` + Runtime float64 `db:"runtime"` // Runtime is expressed in Microseconds Summary string `db:"summary"` // XXX this should be JSON Done bool `db:"done"` DataUsageUp int64 `db:"data_usage_up"` DataUsageDown int64 `db:"data_usage_down"` + + started time.Time } -// Update the Result in the database -func (r Result) Update(db *sqlx.DB) error { - // XXX implement me +// Started marks the Result as having started +func (r *Result) Started(db *sqlx.DB) error { + r.started = time.Now() + return nil +} + +// Finished marks the result as done and sets the runtime +func (r *Result) Finished(db *sqlx.DB) error { + if r.Done == true || r.Runtime != 0 { + return errors.New("Result is already finished") + } + r.Runtime = float64(time.Now().Sub(r.started)) / float64(time.Microsecond) + r.Done = true + + res, err := db.NamedExec(`UPDATE results + SET done = true, runtime = :runtime + WHERE id = :id`, r) + if err != nil { + return errors.Wrap(err, "updating result") + } + count, err := res.RowsAffected() + if err != nil { + return errors.Wrap(err, "updating result") + } + if count != 1 { + return errors.New("inconsistent update count") + } return nil } @@ -90,7 +115,7 @@ func CreateResult(db *sqlx.DB, r Result) (*Result, error) { } id, err := res.LastInsertId() if err != nil { - return nil, errors.Wrap(err, "creating measurement") + return nil, errors.Wrap(err, "creating result") } r.ID = id return &r, nil diff --git a/nettests/nettests.go b/nettests/nettests.go index 0d7e208..0fbf50f 100644 --- a/nettests/nettests.go +++ b/nettests/nettests.go @@ -38,7 +38,6 @@ type Controller struct { // Init should be called once to initialise the nettest func (c *Controller) Init(nt *mk.Nettest) { - log.Debugf("Init: %s", nt) nt.Options = mk.NettestOptions{ IncludeIP: c.ctx.Config.Sharing.IncludeIP, IncludeASN: c.ctx.Config.Sharing.IncludeASN, @@ -49,39 +48,73 @@ func (c *Controller) Init(nt *mk.Nettest) { // XXX GeoIPCountryPath: "", - GeoASNPath: "", - OutputPath: "", - CaBundlePath: "", + GeoIPASNPath: "", + OutputPath: "/tmp/measurement.jsonl", + CaBundlePath: "/etc/ssl/cert.pem", } - nt.RegisterEventHandler(func(event interface{}) { - e := event.(map[string]interface{}) - if e["type"].(string) == "LOG" { - msg := e["message"].(string) - switch level := e["verbosity"].(string); level { - case "ERROR": - log.Error(msg) - case "INFO": - log.Info(msg) - default: - log.Debug(msg) - } - } else { - log.WithFields(log.Fields{ - "key": "event", - "value": e, - }).Info("got event") + nt.On("log", func(e mk.Event) { + level := e.Value["verbosity"].(string) + msg := e.Value["message"].(string) + + switch level { + case "ERROR": + log.Error(msg) + case "INFO": + log.Info(msg) + default: + log.Debug(msg) } + }) + + nt.On("status.queued", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("status.started", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("status.report_created", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("status.geoip_lookup", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("status.progress", func(e mk.Event) { + perc := e.Value["percentage"].(float64) + msg := e.Value["message"].(string) + c.OnProgress(perc, msg) + }) + + nt.On("status.update.*", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("failure.measurement", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("failure.report_submission", func(e mk.Event) { + log.Debugf("%s", e.Key) + }) + + nt.On("measurement", func(e mk.Event) { + c.OnEntry(e.Value["json_str"].(string)) + }) + } // OnProgress should be called when a new progress event is available. -func (c *Controller) OnProgress(perc float32, msg string) { +func (c *Controller) OnProgress(perc float64, msg string) { log.Debugf("OnProgress: %f - %s", perc, msg) } // OnEntry should be called every time there is a new entry -func (c *Controller) OnEntry(entry string) { - log.Debugf("OnEntry: %s", entry) +func (c *Controller) OnEntry(jsonStr string) { + log.Debugf("OnEntry: %s", jsonStr) } // MKStart is the interface for the mk.Nettest Start() function