From 9f438ae068b752dfb17e108e1caa1b7608c6689f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 19 Mar 2018 19:28:32 +0100 Subject: [PATCH] Implement state tracking in database --- internal/cli/run/run.go | 2 +- internal/database/models.go | 2 +- nettests/nettests.go | 43 +++++++++++++++--------------------- nettests/performance/dash.go | 10 ++++----- nettests/performance/ndt.go | 4 ++-- 5 files changed, 27 insertions(+), 34 deletions(-) diff --git a/internal/cli/run/run.go b/internal/cli/run/run.go index d142cdc..0621a8d 100644 --- a/internal/cli/run/run.go +++ b/internal/cli/run/run.go @@ -40,7 +40,7 @@ func init() { for _, nt := range group.Nettests { log.Debugf("Running test %T", nt) msmtPath := filepath.Join(ctx.TempDir, - fmt.Sprintf("msmt-%s-%T.jsonl", nt, + fmt.Sprintf("msmt-%T-%s.jsonl", nt, time.Now().UTC().Format(time.RFC3339Nano))) ctl := nettests.NewController(nt, ctx, result, msmtPath) diff --git a/internal/database/models.go b/internal/database/models.go index fd98e36..4216dfc 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -128,7 +128,7 @@ func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, erro (name, start_time, asn, ip, country, state, failure, report_file, - report_id, input, measurement_id, + report_id, input, result_id) VALUES (:name,:start_time, :asn,:ip,:country, diff --git a/nettests/nettests.go b/nettests/nettests.go index 007f639..b268682 100644 --- a/nettests/nettests.go +++ b/nettests/nettests.go @@ -47,6 +47,7 @@ type Controller struct { // Init should be called once to initialise the nettest func (c *Controller) Init(nt *mk.Nettest) error { log.Debugf("Init: %v", nt) + c.msmts = make(map[int64]*database.Measurement) msmtTemplate := database.Measurement{ ASN: "", @@ -75,8 +76,8 @@ func (c *Controller) Init(nt *mk.Nettest) error { } nt.On("log", func(e mk.Event) { - level := e.Value["verbosity"].(string) - msg := e.Value["message"].(string) + level := e.Value.LogLevel + msg := e.Value.Message switch level { case "ERROR": @@ -100,23 +101,22 @@ func (c *Controller) Init(nt *mk.Nettest) error { nt.On("status.report_created", func(e mk.Event) { log.Debugf("%s", e.Key) - msmtTemplate.ReportID = e.Value["report_id"].(string) + msmtTemplate.ReportID = e.Value.ReportID }) nt.On("status.geoip_lookup", func(e mk.Event) { log.Debugf("%s", e.Key) - msmtTemplate.ASN = e.Value["probe_asn"].(string) - msmtTemplate.IP = e.Value["probe_ip"].(string) - msmtTemplate.CountryCode = e.Value["probe_cc"].(string) + msmtTemplate.ASN = e.Value.ProbeASN + msmtTemplate.IP = e.Value.ProbeIP + msmtTemplate.CountryCode = e.Value.ProbeCC }) nt.On("status.measurement_started", func(e mk.Event) { log.Debugf("%s", e.Key) - idx := e.Value["idx"].(int64) - input := e.Value["input"].(string) - msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, input) + idx := e.Value.Idx + msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, e.Value.Input) if err != nil { log.WithError(err).Error("Failed to create measurement") return @@ -125,9 +125,7 @@ func (c *Controller) Init(nt *mk.Nettest) error { }) nt.On("status.progress", func(e mk.Event) { - perc := e.Value["percentage"].(float64) - msg := e.Value["message"].(string) - c.OnProgress(perc, msg) + c.OnProgress(e.Value.Percentage, e.Value.Message) }) nt.On("status.update.*", func(e mk.Event) { @@ -137,36 +135,30 @@ func (c *Controller) Init(nt *mk.Nettest) error { nt.On("failure.measurement", func(e mk.Event) { log.Debugf("%s", e.Key) - idx := e.Value["idx"].(int64) - failure := e.Value["failure"].(string) - c.msmts[idx].Failed(c.Ctx.DB, failure) + c.msmts[e.Value.Idx].Failed(c.Ctx.DB, e.Value.Failure) }) nt.On("failure.measurement_submission", func(e mk.Event) { log.Debugf("%s", e.Key) - idx := e.Value["idx"].(int64) - failure := e.Value["failure"].(string) - c.msmts[idx].UploadFailed(c.Ctx.DB, failure) + failure := e.Value.Failure + c.msmts[e.Value.Idx].UploadFailed(c.Ctx.DB, failure) }) nt.On("status.measurement_uploaded", func(e mk.Event) { log.Debugf("%s", e.Key) - idx := e.Value["idx"].(int64) - c.msmts[idx].UploadSucceeded(c.Ctx.DB) + c.msmts[e.Value.Idx].UploadSucceeded(c.Ctx.DB) }) nt.On("status.measurement_done", func(e mk.Event) { log.Debugf("%s", e.Key) - idx := e.Value["idx"].(int64) - c.msmts[idx].Done(c.Ctx.DB) + c.msmts[e.Value.Idx].Done(c.Ctx.DB) }) nt.On("measurement", func(e mk.Event) { - idx := e.Value["idx"].(int64) - c.OnEntry(idx, e.Value["json_str"].(string)) + c.OnEntry(e.Value.Idx, e.Value.JSONStr) }) nt.On("end", func(e mk.Event) { @@ -188,7 +180,7 @@ type Entry struct { // OnEntry should be called every time there is a new entry func (c *Controller) OnEntry(idx int64, jsonStr string) { - log.Debugf("OnEntry: %s", jsonStr) + log.Debugf("OnEntry") var entry Entry json.Unmarshal([]byte(jsonStr), &entry) @@ -197,6 +189,7 @@ func (c *Controller) OnEntry(idx int64, jsonStr string) { if err != nil { log.WithError(err).Error("failed to serialize summary") } + log.Debugf("Fetching: %s %v", idx, c.msmts[idx]) c.msmts[idx].WriteSummary(c.Ctx.DB, string(summaryBytes)) } diff --git a/nettests/performance/dash.go b/nettests/performance/dash.go index d64efc8..b3e3a87 100644 --- a/nettests/performance/dash.go +++ b/nettests/performance/dash.go @@ -19,9 +19,9 @@ func (d Dash) Run(ctl *nettests.Controller) error { // DashSummary for the test // TODO: process 'receiver_data' to provide an array of performance for a chart. type DashSummary struct { - Latency float32 + Latency float64 Bitrate int64 - Delay float32 + Delay float64 } // Summary generates a summary for a test run @@ -29,9 +29,9 @@ func (d Dash) Summary(tk map[string]interface{}) interface{} { simple := tk["simple"].(map[string]interface{}) return DashSummary{ - Latency: simple["connect_latency"].(float32), - Bitrate: simple["median_bitrate"].(int64), - Delay: simple["min_playout_delay"].(float32), + Latency: simple["connect_latency"].(float64), + Bitrate: int64(simple["median_bitrate"].(float64)), + Delay: simple["min_playout_delay"].(float64), } } diff --git a/nettests/performance/ndt.go b/nettests/performance/ndt.go index c6425ed..082c245 100644 --- a/nettests/performance/ndt.go +++ b/nettests/performance/ndt.go @@ -26,7 +26,7 @@ type NDTSummary struct { MinRTT int64 MSS int64 OutOfOrder int64 - PacketLoss float32 + PacketLoss float64 Timeouts int64 } @@ -44,7 +44,7 @@ func (n NDT) Summary(tk map[string]interface{}) interface{} { MinRTT: advanced["min_rtt"].(int64), MSS: advanced["mss"].(int64), OutOfOrder: advanced["out_of_order"].(int64), - PacketLoss: advanced["packet_loss"].(float32), + PacketLoss: advanced["packet_loss"].(float64), Timeouts: advanced["timeouts"].(int64), } }