Implement most of the measurement & result DB workflow
This commit is contained in:
parent
6a70fe7da1
commit
e0bb4000e9
|
@ -48,14 +48,10 @@ func init() {
|
||||||
log.WithError(err).Errorf("Failed to run %s", group.Label)
|
log.WithError(err).Errorf("Failed to run %s", group.Label)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// XXX
|
|
||||||
// 1. Generate the summary
|
|
||||||
// 2. Link the measurement to the Result (this should probably happen in
|
|
||||||
// the nettest class)
|
|
||||||
// 3. Update the summary of the result and the other metadata in the db
|
|
||||||
// 4. Move the msmtPath into the final location ~/.ooni/msmts/
|
|
||||||
}
|
}
|
||||||
// result.Update(ctx.DB)
|
if err = result.Finished(ctx.DB); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,14 @@
|
||||||
package database
|
package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/apex/log"
|
"github.com/apex/log"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
|
ooni "github.com/openobservatory/gooni"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -30,7 +34,7 @@ type Measurement struct {
|
||||||
ID int64 `db:"id"`
|
ID int64 `db:"id"`
|
||||||
Name string `db:"name"`
|
Name string `db:"name"`
|
||||||
StartTime time.Time `db:"start_time"`
|
StartTime time.Time `db:"start_time"`
|
||||||
Runtime float64 `db:"runtime"`
|
Runtime float64 `db:"runtime"` // Fractional number of seconds
|
||||||
Summary string `db:"summary"` // XXX this should be JSON
|
Summary string `db:"summary"` // XXX this should be JSON
|
||||||
ASN string `db:"asn"`
|
ASN string `db:"asn"`
|
||||||
IP string `db:"ip"`
|
IP string `db:"ip"`
|
||||||
|
@ -65,10 +69,12 @@ func (m *Measurement) Failed(db *sqlx.DB, failure string) error {
|
||||||
|
|
||||||
// Done marks the measurement as completed
|
// Done marks the measurement as completed
|
||||||
func (m *Measurement) Done(db *sqlx.DB) error {
|
func (m *Measurement) Done(db *sqlx.DB) error {
|
||||||
|
runtime := time.Now().UTC().Sub(m.StartTime)
|
||||||
|
m.Runtime = runtime.Seconds()
|
||||||
m.State = "done"
|
m.State = "done"
|
||||||
|
|
||||||
err := UpdateOne(db, `UPDATE measurements
|
err := UpdateOne(db, `UPDATE measurements
|
||||||
SET state = :state
|
SET state = :state, runtime = :runtime
|
||||||
WHERE id = :id`, m)
|
WHERE id = :id`, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "updating measurement")
|
return errors.Wrap(err, "updating measurement")
|
||||||
|
@ -116,6 +122,27 @@ func (m *Measurement) WriteSummary(db *sqlx.DB, summary string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//AddToResult adds a measurement to a result
|
||||||
|
func (m *Measurement) AddToResult(db *sqlx.DB, result *Result) error {
|
||||||
|
m.ResultID = result.ID
|
||||||
|
finalPath := filepath.Join(result.MeasurementDir,
|
||||||
|
filepath.Base(m.ReportFilePath))
|
||||||
|
|
||||||
|
err := os.Rename(m.ReportFilePath, finalPath)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "moving report file")
|
||||||
|
}
|
||||||
|
m.ReportFilePath = finalPath
|
||||||
|
|
||||||
|
err = UpdateOne(db, `UPDATE measurements
|
||||||
|
SET result_id = :result_id, report_file = :report_file
|
||||||
|
WHERE id = :id`, m)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "updating measurement")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// CreateMeasurement writes the measurement to the database a returns a pointer
|
// CreateMeasurement writes the measurement to the database a returns a pointer
|
||||||
// to the Measurement
|
// to the Measurement
|
||||||
func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, error) {
|
func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, error) {
|
||||||
|
@ -149,22 +176,15 @@ func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, erro
|
||||||
|
|
||||||
// Result model
|
// Result model
|
||||||
type Result struct {
|
type Result struct {
|
||||||
ID int64 `db:"id"`
|
ID int64 `db:"id"`
|
||||||
Name string `db:"name"`
|
Name string `db:"name"`
|
||||||
StartTime time.Time `db:"start_time"`
|
StartTime time.Time `db:"start_time"`
|
||||||
Runtime float64 `db:"runtime"` // Runtime is expressed in Microseconds
|
Runtime float64 `db:"runtime"` // Runtime is expressed in fractional seconds
|
||||||
Summary string `db:"summary"` // XXX this should be JSON
|
Summary string `db:"summary"` // XXX this should be JSON
|
||||||
Done bool `db:"done"`
|
Done bool `db:"done"`
|
||||||
DataUsageUp int64 `db:"data_usage_up"`
|
DataUsageUp int64 `db:"data_usage_up"`
|
||||||
DataUsageDown int64 `db:"data_usage_down"`
|
DataUsageDown int64 `db:"data_usage_down"`
|
||||||
|
MeasurementDir string `db:"measurement_dir"`
|
||||||
started time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// Started marks the Result as having started
|
|
||||||
func (r *Result) Started(db *sqlx.DB) error {
|
|
||||||
r.started = time.Now()
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finished marks the result as done and sets the runtime
|
// Finished marks the result as done and sets the runtime
|
||||||
|
@ -172,22 +192,50 @@ func (r *Result) Finished(db *sqlx.DB) error {
|
||||||
if r.Done == true || r.Runtime != 0 {
|
if r.Done == true || r.Runtime != 0 {
|
||||||
return errors.New("Result is already finished")
|
return errors.New("Result is already finished")
|
||||||
}
|
}
|
||||||
r.Runtime = float64(time.Now().Sub(r.started)) / float64(time.Microsecond)
|
r.Runtime = time.Now().UTC().Sub(r.StartTime).Seconds()
|
||||||
r.Done = true
|
r.Done = true
|
||||||
|
// XXX add in here functionality to compute the summary
|
||||||
|
|
||||||
err := UpdateOne(db, `UPDATE results
|
err := UpdateOne(db, `UPDATE results
|
||||||
SET done = true, runtime = :runtime
|
SET done = :done, runtime = :runtime
|
||||||
WHERE id = :id`, r)
|
WHERE id = :id`, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "updating result")
|
return errors.Wrap(err, "updating finished result")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MakeResultsPath creates and returns a directory for the result
|
||||||
|
func MakeResultsPath(r *Result) (string, error) {
|
||||||
|
home, err := ooni.GetOONIHome()
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.Wrap(err, "default measurements path")
|
||||||
|
}
|
||||||
|
p := filepath.Join(home, "msmts",
|
||||||
|
fmt.Sprintf("%s-%s", r.Name, r.StartTime.Format(time.RFC3339Nano)))
|
||||||
|
|
||||||
|
// If the path already exists, this is a problem. It should not clash, because
|
||||||
|
// we are using nanosecond precision for the starttime.
|
||||||
|
if _, e := os.Stat(p); e == nil {
|
||||||
|
return "", errors.New("results path already exists")
|
||||||
|
}
|
||||||
|
err = os.MkdirAll(p, 0700)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
// CreateResult writes the Result to the database a returns a pointer
|
// CreateResult writes the Result to the database a returns a pointer
|
||||||
// to the Result
|
// to the Result
|
||||||
func CreateResult(db *sqlx.DB, r Result) (*Result, error) {
|
func CreateResult(db *sqlx.DB, r Result) (*Result, error) {
|
||||||
log.Debugf("Creating result %v", r)
|
log.Debugf("Creating result %v", r)
|
||||||
|
|
||||||
|
p, err := MakeResultsPath(&r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.MeasurementDir = p
|
||||||
res, err := db.NamedExec(`INSERT INTO results
|
res, err := db.NamedExec(`INSERT INTO results
|
||||||
(name, start_time)
|
(name, start_time)
|
||||||
VALUES (:name,:start_time)`,
|
VALUES (:name,:start_time)`,
|
||||||
|
|
|
@ -6,27 +6,46 @@ import (
|
||||||
"github.com/openobservatory/gooni/nettests/websites"
|
"github.com/openobservatory/gooni/nettests/websites"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NettestGroup base structure
|
||||||
|
type NettestGroup struct {
|
||||||
|
Label string
|
||||||
|
Nettests []nettests.Nettest
|
||||||
|
Summary func(s string) string
|
||||||
|
}
|
||||||
|
|
||||||
// NettestGroups that can be run by the user
|
// NettestGroups that can be run by the user
|
||||||
var NettestGroups = map[string]nettests.NettestGroup{
|
var NettestGroups = map[string]NettestGroup{
|
||||||
"websites": nettests.NettestGroup{
|
"websites": NettestGroup{
|
||||||
Label: "Websites",
|
Label: "Websites",
|
||||||
Nettests: []nettests.Nettest{
|
Nettests: []nettests.Nettest{
|
||||||
websites.WebConnectivity{},
|
websites.WebConnectivity{},
|
||||||
},
|
},
|
||||||
|
Summary: func(s string) string {
|
||||||
|
return "{}"
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"performance": nettests.NettestGroup{
|
"performance": NettestGroup{
|
||||||
Label: "Performance",
|
Label: "Performance",
|
||||||
Nettests: []nettests.Nettest{
|
Nettests: []nettests.Nettest{
|
||||||
performance.Dash{},
|
performance.Dash{},
|
||||||
performance.NDT{},
|
performance.NDT{},
|
||||||
},
|
},
|
||||||
|
Summary: func(s string) string {
|
||||||
|
return "{}"
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"middleboxes": nettests.NettestGroup{
|
"middleboxes": NettestGroup{
|
||||||
Label: "Middleboxes",
|
Label: "Middleboxes",
|
||||||
Nettests: []nettests.Nettest{},
|
Nettests: []nettests.Nettest{},
|
||||||
|
Summary: func(s string) string {
|
||||||
|
return "{}"
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"im": nettests.NettestGroup{
|
"im": NettestGroup{
|
||||||
Label: "Instant Messaging",
|
Label: "Instant Messaging",
|
||||||
Nettests: []nettests.Nettest{},
|
Nettests: []nettests.Nettest{},
|
||||||
|
Summary: func(s string) string {
|
||||||
|
return "{}"
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"github.com/measurement-kit/go-measurement-kit"
|
"github.com/measurement-kit/go-measurement-kit"
|
||||||
ooni "github.com/openobservatory/gooni"
|
ooni "github.com/openobservatory/gooni"
|
||||||
"github.com/openobservatory/gooni/internal/cli/version"
|
"github.com/openobservatory/gooni/internal/cli/version"
|
||||||
|
"github.com/openobservatory/gooni/internal/colors"
|
||||||
"github.com/openobservatory/gooni/internal/database"
|
"github.com/openobservatory/gooni/internal/database"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -17,13 +18,6 @@ type Nettest interface {
|
||||||
LogSummary(string) error
|
LogSummary(string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NettestGroup base structure
|
|
||||||
type NettestGroup struct {
|
|
||||||
Label string
|
|
||||||
Nettests []Nettest
|
|
||||||
Summary func(s string) string
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewController creates a nettest controller
|
// NewController creates a nettest controller
|
||||||
func NewController(nt Nettest, ctx *ooni.Context, res *database.Result, msmtPath string) *Controller {
|
func NewController(nt Nettest, ctx *ooni.Context, res *database.Result, msmtPath string) *Controller {
|
||||||
return &Controller{
|
return &Controller{
|
||||||
|
@ -41,7 +35,7 @@ type Controller struct {
|
||||||
res *database.Result
|
res *database.Result
|
||||||
nt Nettest
|
nt Nettest
|
||||||
msmts map[int64]*database.Measurement
|
msmts map[int64]*database.Measurement
|
||||||
msmtPath string
|
msmtPath string // XXX maybe we can drop this and just use a temporary file
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init should be called once to initialise the nettest
|
// Init should be called once to initialise the nettest
|
||||||
|
@ -105,7 +99,7 @@ func (c *Controller) Init(nt *mk.Nettest) error {
|
||||||
})
|
})
|
||||||
|
|
||||||
nt.On("status.geoip_lookup", func(e mk.Event) {
|
nt.On("status.geoip_lookup", func(e mk.Event) {
|
||||||
log.Debugf("%s", e.Key)
|
log.Debugf(colors.Red(e.Key))
|
||||||
|
|
||||||
msmtTemplate.ASN = e.Value.ProbeASN
|
msmtTemplate.ASN = e.Value.ProbeASN
|
||||||
msmtTemplate.IP = e.Value.ProbeIP
|
msmtTemplate.IP = e.Value.ProbeIP
|
||||||
|
@ -113,7 +107,7 @@ func (c *Controller) Init(nt *mk.Nettest) error {
|
||||||
})
|
})
|
||||||
|
|
||||||
nt.On("status.measurement_started", func(e mk.Event) {
|
nt.On("status.measurement_started", func(e mk.Event) {
|
||||||
log.Debugf("%s", e.Key)
|
log.Debugf(colors.Red(e.Key))
|
||||||
|
|
||||||
idx := e.Value.Idx
|
idx := e.Value.Idx
|
||||||
msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, e.Value.Input)
|
msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, e.Value.Input)
|
||||||
|
@ -125,44 +119,55 @@ func (c *Controller) Init(nt *mk.Nettest) error {
|
||||||
})
|
})
|
||||||
|
|
||||||
nt.On("status.progress", func(e mk.Event) {
|
nt.On("status.progress", func(e mk.Event) {
|
||||||
|
log.Debugf(colors.Red(e.Key))
|
||||||
c.OnProgress(e.Value.Percentage, e.Value.Message)
|
c.OnProgress(e.Value.Percentage, e.Value.Message)
|
||||||
})
|
})
|
||||||
|
|
||||||
nt.On("status.update.*", func(e mk.Event) {
|
nt.On("status.update.*", func(e mk.Event) {
|
||||||
log.Debugf("%s", e.Key)
|
log.Debugf(colors.Red(e.Key))
|
||||||
})
|
})
|
||||||
|
|
||||||
nt.On("failure.measurement", func(e mk.Event) {
|
nt.On("failure.measurement", func(e mk.Event) {
|
||||||
log.Debugf("%s", e.Key)
|
log.Debugf(colors.Red(e.Key))
|
||||||
|
|
||||||
c.msmts[e.Value.Idx].Failed(c.Ctx.DB, e.Value.Failure)
|
c.msmts[e.Value.Idx].Failed(c.Ctx.DB, e.Value.Failure)
|
||||||
})
|
})
|
||||||
|
|
||||||
nt.On("failure.measurement_submission", func(e mk.Event) {
|
nt.On("failure.measurement_submission", func(e mk.Event) {
|
||||||
log.Debugf("%s", e.Key)
|
log.Debugf(colors.Red(e.Key))
|
||||||
|
|
||||||
failure := e.Value.Failure
|
failure := e.Value.Failure
|
||||||
c.msmts[e.Value.Idx].UploadFailed(c.Ctx.DB, failure)
|
c.msmts[e.Value.Idx].UploadFailed(c.Ctx.DB, failure)
|
||||||
})
|
})
|
||||||
|
|
||||||
nt.On("status.measurement_uploaded", func(e mk.Event) {
|
nt.On("status.measurement_uploaded", func(e mk.Event) {
|
||||||
log.Debugf("%s", e.Key)
|
log.Debugf(colors.Red(e.Key))
|
||||||
|
|
||||||
c.msmts[e.Value.Idx].UploadSucceeded(c.Ctx.DB)
|
if err := c.msmts[e.Value.Idx].UploadSucceeded(c.Ctx.DB); err != nil {
|
||||||
|
log.WithError(err).Error("failed to mark msmt as uploaded")
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
nt.On("status.measurement_done", func(e mk.Event) {
|
nt.On("status.measurement_done", func(e mk.Event) {
|
||||||
log.Debugf("%s", e.Key)
|
log.Debugf(colors.Red(e.Key))
|
||||||
|
|
||||||
c.msmts[e.Value.Idx].Done(c.Ctx.DB)
|
if err := c.msmts[e.Value.Idx].Done(c.Ctx.DB); err != nil {
|
||||||
|
log.WithError(err).Error("failed to mark msmt as done")
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
nt.On("measurement", func(e mk.Event) {
|
nt.On("measurement", func(e mk.Event) {
|
||||||
c.OnEntry(e.Value.Idx, e.Value.JSONStr)
|
c.OnEntry(e.Value.Idx, e.Value.JSONStr)
|
||||||
})
|
})
|
||||||
|
|
||||||
nt.On("end", func(e mk.Event) {
|
nt.On("status.end", func(e mk.Event) {
|
||||||
log.Debugf("end")
|
log.Debugf("status.end")
|
||||||
|
for idx, msmt := range c.msmts {
|
||||||
|
log.Debugf("adding msmt#%d to result", idx)
|
||||||
|
if err := msmt.AddToResult(c.Ctx.DB, c.res); err != nil {
|
||||||
|
log.WithError(err).Error("failed to add to result")
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in New Issue
Block a user