From e0bb4000e97869fb73eb0ab0af888e545672ca0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Tue, 20 Mar 2018 12:38:33 +0100 Subject: [PATCH] Implement most of the measurement & result DB workflow --- internal/cli/run/run.go | 10 ++--- internal/database/models.go | 90 ++++++++++++++++++++++++++++--------- nettests/groups/groups.go | 29 +++++++++--- nettests/nettests.go | 43 ++++++++++-------- 4 files changed, 120 insertions(+), 52 deletions(-) diff --git a/internal/cli/run/run.go b/internal/cli/run/run.go index 0621a8d..2de0c9d 100644 --- a/internal/cli/run/run.go +++ b/internal/cli/run/run.go @@ -48,14 +48,10 @@ func init() { log.WithError(err).Errorf("Failed to run %s", group.Label) return err } - // XXX - // 1. Generate the summary - // 2. Link the measurement to the Result (this should probably happen in - // the nettest class) - // 3. Update the summary of the result and the other metadata in the db - // 4. Move the msmtPath into the final location ~/.ooni/msmts/ } - // result.Update(ctx.DB) + if err = result.Finished(ctx.DB); err != nil { + return err + } return nil }) } diff --git a/internal/database/models.go b/internal/database/models.go index 4216dfc..01d44cf 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -1,10 +1,14 @@ package database import ( + "fmt" + "os" + "path/filepath" "time" "github.com/apex/log" "github.com/jmoiron/sqlx" + ooni "github.com/openobservatory/gooni" "github.com/pkg/errors" ) @@ -30,7 +34,7 @@ type Measurement struct { ID int64 `db:"id"` Name string `db:"name"` StartTime time.Time `db:"start_time"` - Runtime float64 `db:"runtime"` + Runtime float64 `db:"runtime"` // Fractional number of seconds Summary string `db:"summary"` // XXX this should be JSON ASN string `db:"asn"` IP string `db:"ip"` @@ -65,10 +69,12 @@ func (m *Measurement) Failed(db *sqlx.DB, failure string) error { // Done marks the measurement as completed func (m *Measurement) Done(db *sqlx.DB) error { + runtime := time.Now().UTC().Sub(m.StartTime) + m.Runtime = runtime.Seconds() m.State = "done" err := UpdateOne(db, `UPDATE measurements - SET state = :state + SET state = :state, runtime = :runtime WHERE id = :id`, m) if err != nil { return errors.Wrap(err, "updating measurement") @@ -116,6 +122,27 @@ func (m *Measurement) WriteSummary(db *sqlx.DB, summary string) error { return nil } +//AddToResult adds a measurement to a result +func (m *Measurement) AddToResult(db *sqlx.DB, result *Result) error { + m.ResultID = result.ID + finalPath := filepath.Join(result.MeasurementDir, + filepath.Base(m.ReportFilePath)) + + err := os.Rename(m.ReportFilePath, finalPath) + if err != nil { + return errors.Wrap(err, "moving report file") + } + m.ReportFilePath = finalPath + + err = UpdateOne(db, `UPDATE measurements + SET result_id = :result_id, report_file = :report_file + WHERE id = :id`, m) + if err != nil { + return errors.Wrap(err, "updating measurement") + } + return nil +} + // CreateMeasurement writes the measurement to the database a returns a pointer // to the Measurement func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, error) { @@ -149,22 +176,15 @@ func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, erro // Result model type Result struct { - ID int64 `db:"id"` - Name string `db:"name"` - StartTime time.Time `db:"start_time"` - Runtime float64 `db:"runtime"` // Runtime is expressed in Microseconds - Summary string `db:"summary"` // XXX this should be JSON - Done bool `db:"done"` - DataUsageUp int64 `db:"data_usage_up"` - DataUsageDown int64 `db:"data_usage_down"` - - started time.Time -} - -// Started marks the Result as having started -func (r *Result) Started(db *sqlx.DB) error { - r.started = time.Now() - return nil + ID int64 `db:"id"` + Name string `db:"name"` + StartTime time.Time `db:"start_time"` + Runtime float64 `db:"runtime"` // Runtime is expressed in fractional seconds + Summary string `db:"summary"` // XXX this should be JSON + Done bool `db:"done"` + DataUsageUp int64 `db:"data_usage_up"` + DataUsageDown int64 `db:"data_usage_down"` + MeasurementDir string `db:"measurement_dir"` } // Finished marks the result as done and sets the runtime @@ -172,22 +192,50 @@ func (r *Result) Finished(db *sqlx.DB) error { if r.Done == true || r.Runtime != 0 { return errors.New("Result is already finished") } - r.Runtime = float64(time.Now().Sub(r.started)) / float64(time.Microsecond) + r.Runtime = time.Now().UTC().Sub(r.StartTime).Seconds() r.Done = true + // XXX add in here functionality to compute the summary err := UpdateOne(db, `UPDATE results - SET done = true, runtime = :runtime + SET done = :done, runtime = :runtime WHERE id = :id`, r) if err != nil { - return errors.Wrap(err, "updating result") + return errors.Wrap(err, "updating finished result") } return nil } +// MakeResultsPath creates and returns a directory for the result +func MakeResultsPath(r *Result) (string, error) { + home, err := ooni.GetOONIHome() + if err != nil { + return "", errors.Wrap(err, "default measurements path") + } + p := filepath.Join(home, "msmts", + fmt.Sprintf("%s-%s", r.Name, r.StartTime.Format(time.RFC3339Nano))) + + // If the path already exists, this is a problem. It should not clash, because + // we are using nanosecond precision for the starttime. + if _, e := os.Stat(p); e == nil { + return "", errors.New("results path already exists") + } + err = os.MkdirAll(p, 0700) + if err != nil { + return "", err + } + return p, nil +} + // CreateResult writes the Result to the database a returns a pointer // to the Result func CreateResult(db *sqlx.DB, r Result) (*Result, error) { log.Debugf("Creating result %v", r) + + p, err := MakeResultsPath(&r) + if err != nil { + return nil, err + } + r.MeasurementDir = p res, err := db.NamedExec(`INSERT INTO results (name, start_time) VALUES (:name,:start_time)`, diff --git a/nettests/groups/groups.go b/nettests/groups/groups.go index a76f867..e10b6b2 100644 --- a/nettests/groups/groups.go +++ b/nettests/groups/groups.go @@ -6,27 +6,46 @@ import ( "github.com/openobservatory/gooni/nettests/websites" ) +// NettestGroup base structure +type NettestGroup struct { + Label string + Nettests []nettests.Nettest + Summary func(s string) string +} + // NettestGroups that can be run by the user -var NettestGroups = map[string]nettests.NettestGroup{ - "websites": nettests.NettestGroup{ +var NettestGroups = map[string]NettestGroup{ + "websites": NettestGroup{ Label: "Websites", Nettests: []nettests.Nettest{ websites.WebConnectivity{}, }, + Summary: func(s string) string { + return "{}" + }, }, - "performance": nettests.NettestGroup{ + "performance": NettestGroup{ Label: "Performance", Nettests: []nettests.Nettest{ performance.Dash{}, performance.NDT{}, }, + Summary: func(s string) string { + return "{}" + }, }, - "middleboxes": nettests.NettestGroup{ + "middleboxes": NettestGroup{ Label: "Middleboxes", Nettests: []nettests.Nettest{}, + Summary: func(s string) string { + return "{}" + }, }, - "im": nettests.NettestGroup{ + "im": NettestGroup{ Label: "Instant Messaging", Nettests: []nettests.Nettest{}, + Summary: func(s string) string { + return "{}" + }, }, } diff --git a/nettests/nettests.go b/nettests/nettests.go index b268682..ad189b2 100644 --- a/nettests/nettests.go +++ b/nettests/nettests.go @@ -7,6 +7,7 @@ import ( "github.com/measurement-kit/go-measurement-kit" ooni "github.com/openobservatory/gooni" "github.com/openobservatory/gooni/internal/cli/version" + "github.com/openobservatory/gooni/internal/colors" "github.com/openobservatory/gooni/internal/database" ) @@ -17,13 +18,6 @@ type Nettest interface { LogSummary(string) error } -// NettestGroup base structure -type NettestGroup struct { - Label string - Nettests []Nettest - Summary func(s string) string -} - // NewController creates a nettest controller func NewController(nt Nettest, ctx *ooni.Context, res *database.Result, msmtPath string) *Controller { return &Controller{ @@ -41,7 +35,7 @@ type Controller struct { res *database.Result nt Nettest msmts map[int64]*database.Measurement - msmtPath string + msmtPath string // XXX maybe we can drop this and just use a temporary file } // Init should be called once to initialise the nettest @@ -105,7 +99,7 @@ func (c *Controller) Init(nt *mk.Nettest) error { }) nt.On("status.geoip_lookup", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) msmtTemplate.ASN = e.Value.ProbeASN msmtTemplate.IP = e.Value.ProbeIP @@ -113,7 +107,7 @@ func (c *Controller) Init(nt *mk.Nettest) error { }) nt.On("status.measurement_started", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) idx := e.Value.Idx msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, e.Value.Input) @@ -125,44 +119,55 @@ func (c *Controller) Init(nt *mk.Nettest) error { }) nt.On("status.progress", func(e mk.Event) { + log.Debugf(colors.Red(e.Key)) c.OnProgress(e.Value.Percentage, e.Value.Message) }) nt.On("status.update.*", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) }) nt.On("failure.measurement", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) c.msmts[e.Value.Idx].Failed(c.Ctx.DB, e.Value.Failure) }) nt.On("failure.measurement_submission", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) failure := e.Value.Failure c.msmts[e.Value.Idx].UploadFailed(c.Ctx.DB, failure) }) nt.On("status.measurement_uploaded", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) - c.msmts[e.Value.Idx].UploadSucceeded(c.Ctx.DB) + if err := c.msmts[e.Value.Idx].UploadSucceeded(c.Ctx.DB); err != nil { + log.WithError(err).Error("failed to mark msmt as uploaded") + } }) nt.On("status.measurement_done", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) - c.msmts[e.Value.Idx].Done(c.Ctx.DB) + if err := c.msmts[e.Value.Idx].Done(c.Ctx.DB); err != nil { + log.WithError(err).Error("failed to mark msmt as done") + } }) nt.On("measurement", func(e mk.Event) { c.OnEntry(e.Value.Idx, e.Value.JSONStr) }) - nt.On("end", func(e mk.Event) { - log.Debugf("end") + nt.On("status.end", func(e mk.Event) { + log.Debugf("status.end") + for idx, msmt := range c.msmts { + log.Debugf("adding msmt#%d to result", idx) + if err := msmt.AddToResult(c.Ctx.DB, c.res); err != nil { + log.WithError(err).Error("failed to add to result") + } + } }) return nil