Fix MK integration

This commit is contained in:
Arturo Filastò 2018-03-08 13:46:21 +01:00
parent a0bf7fb832
commit 9cf8008c01
3 changed files with 99 additions and 38 deletions

View File

@ -28,7 +28,7 @@ func init() {
result, err := database.CreateResult(ctx.DB, database.Result{ result, err := database.CreateResult(ctx.DB, database.Result{
Name: *nettestGroup, Name: *nettestGroup,
StartTime: time.Now().UTC(), // XXX get this from MK StartTime: time.Now().UTC(),
}) })
if err != nil { if err != nil {
log.Errorf("%s", err) log.Errorf("%s", err)
@ -37,14 +37,17 @@ func init() {
for _, nt := range group.Nettests { for _, nt := range group.Nettests {
ctl := nettests.NewController(ctx, result) ctl := nettests.NewController(ctx, result)
nt.Run(ctl) if err := nt.Run(ctl); err != nil {
log.WithError(err).Errorf("Failed to run %s", group.Label)
return err
}
// XXX // XXX
// 1. Generate the summary // 1. Generate the summary
// 2. Link the measurement to the Result (this should probably happen in // 2. Link the measurement to the Result (this should probably happen in
// the nettest class) // the nettest class)
// 3. Update the summary of the result and the other metadata in the db // 3. Update the summary of the result and the other metadata in the db
} }
result.Update(ctx.DB) // result.Update(ctx.DB)
return nil return nil
}) })
} }

View File

@ -27,6 +27,11 @@ type Measurement struct {
ResultID string `db:"result_id"` ResultID string `db:"result_id"`
} }
// SetGeoIPInfo for the Measurement
func (m *Measurement) SetGeoIPInfo() error {
return nil
}
// CreateMeasurement writes the measurement to the database a returns a pointer // CreateMeasurement writes the measurement to the database a returns a pointer
// to the Measurement // to the Measurement
func CreateMeasurement(db *sqlx.DB, m Measurement) (*Measurement, error) { func CreateMeasurement(db *sqlx.DB, m Measurement) (*Measurement, error) {
@ -53,27 +58,47 @@ func CreateMeasurement(db *sqlx.DB, m Measurement) (*Measurement, error) {
return &m, nil return &m, nil
} }
// Update the measurement in the database
func (r Measurement) Update(db *sqlx.DB) error {
// XXX implement me
return nil
}
// Result model // Result model
type Result struct { type Result struct {
ID int64 `db:"id"` ID int64 `db:"id"`
Name string `db:"name"` Name string `db:"name"`
StartTime time.Time `db:"start_time"` StartTime time.Time `db:"start_time"`
EndTime time.Time `db:"end_time"` Runtime float64 `db:"runtime"` // Runtime is expressed in Microseconds
Summary string `db:"summary"` // XXX this should be JSON Summary string `db:"summary"` // XXX this should be JSON
Done bool `db:"done"` Done bool `db:"done"`
DataUsageUp int64 `db:"data_usage_up"` DataUsageUp int64 `db:"data_usage_up"`
DataUsageDown int64 `db:"data_usage_down"` DataUsageDown int64 `db:"data_usage_down"`
started time.Time
} }
// Update the Result in the database // Started marks the Result as having started
func (r Result) Update(db *sqlx.DB) error { func (r *Result) Started(db *sqlx.DB) error {
// XXX implement me r.started = time.Now()
return nil
}
// Finished marks the result as done and sets the runtime
func (r *Result) Finished(db *sqlx.DB) error {
if r.Done == true || r.Runtime != 0 {
return errors.New("Result is already finished")
}
r.Runtime = float64(time.Now().Sub(r.started)) / float64(time.Microsecond)
r.Done = true
res, err := db.NamedExec(`UPDATE results
SET done = true, runtime = :runtime
WHERE id = :id`, r)
if err != nil {
return errors.Wrap(err, "updating result")
}
count, err := res.RowsAffected()
if err != nil {
return errors.Wrap(err, "updating result")
}
if count != 1 {
return errors.New("inconsistent update count")
}
return nil return nil
} }
@ -90,7 +115,7 @@ func CreateResult(db *sqlx.DB, r Result) (*Result, error) {
} }
id, err := res.LastInsertId() id, err := res.LastInsertId()
if err != nil { if err != nil {
return nil, errors.Wrap(err, "creating measurement") return nil, errors.Wrap(err, "creating result")
} }
r.ID = id r.ID = id
return &r, nil return &r, nil

View File

@ -38,7 +38,6 @@ type Controller struct {
// Init should be called once to initialise the nettest // Init should be called once to initialise the nettest
func (c *Controller) Init(nt *mk.Nettest) { func (c *Controller) Init(nt *mk.Nettest) {
log.Debugf("Init: %s", nt)
nt.Options = mk.NettestOptions{ nt.Options = mk.NettestOptions{
IncludeIP: c.ctx.Config.Sharing.IncludeIP, IncludeIP: c.ctx.Config.Sharing.IncludeIP,
IncludeASN: c.ctx.Config.Sharing.IncludeASN, IncludeASN: c.ctx.Config.Sharing.IncludeASN,
@ -49,39 +48,73 @@ func (c *Controller) Init(nt *mk.Nettest) {
// XXX // XXX
GeoIPCountryPath: "", GeoIPCountryPath: "",
GeoASNPath: "", GeoIPASNPath: "",
OutputPath: "", OutputPath: "/tmp/measurement.jsonl",
CaBundlePath: "", CaBundlePath: "/etc/ssl/cert.pem",
} }
nt.RegisterEventHandler(func(event interface{}) { nt.On("log", func(e mk.Event) {
e := event.(map[string]interface{}) level := e.Value["verbosity"].(string)
if e["type"].(string) == "LOG" { msg := e.Value["message"].(string)
msg := e["message"].(string)
switch level := e["verbosity"].(string); level { switch level {
case "ERROR": case "ERROR":
log.Error(msg) log.Error(msg)
case "INFO": case "INFO":
log.Info(msg) log.Info(msg)
default: default:
log.Debug(msg) log.Debug(msg)
}
} else {
log.WithFields(log.Fields{
"key": "event",
"value": e,
}).Info("got event")
} }
}) })
nt.On("status.queued", func(e mk.Event) {
log.Debugf("%s", e.Key)
})
nt.On("status.started", func(e mk.Event) {
log.Debugf("%s", e.Key)
})
nt.On("status.report_created", func(e mk.Event) {
log.Debugf("%s", e.Key)
})
nt.On("status.geoip_lookup", func(e mk.Event) {
log.Debugf("%s", e.Key)
})
nt.On("status.progress", func(e mk.Event) {
perc := e.Value["percentage"].(float64)
msg := e.Value["message"].(string)
c.OnProgress(perc, msg)
})
nt.On("status.update.*", func(e mk.Event) {
log.Debugf("%s", e.Key)
})
nt.On("failure.measurement", func(e mk.Event) {
log.Debugf("%s", e.Key)
})
nt.On("failure.report_submission", func(e mk.Event) {
log.Debugf("%s", e.Key)
})
nt.On("measurement", func(e mk.Event) {
c.OnEntry(e.Value["json_str"].(string))
})
} }
// OnProgress should be called when a new progress event is available. // OnProgress should be called when a new progress event is available.
func (c *Controller) OnProgress(perc float32, msg string) { func (c *Controller) OnProgress(perc float64, msg string) {
log.Debugf("OnProgress: %f - %s", perc, msg) log.Debugf("OnProgress: %f - %s", perc, msg)
} }
// OnEntry should be called every time there is a new entry // OnEntry should be called every time there is a new entry
func (c *Controller) OnEntry(entry string) { func (c *Controller) OnEntry(jsonStr string) {
log.Debugf("OnEntry: %s", entry) log.Debugf("OnEntry: %s", jsonStr)
} }
// MKStart is the interface for the mk.Nettest Start() function // MKStart is the interface for the mk.Nettest Start() function