Implement state tracking in database

This commit is contained in:
Arturo Filastò 2018-03-19 19:28:32 +01:00
parent 25c15dc3ab
commit 9f438ae068
5 changed files with 27 additions and 34 deletions

View File

@ -40,7 +40,7 @@ func init() {
for _, nt := range group.Nettests { for _, nt := range group.Nettests {
log.Debugf("Running test %T", nt) log.Debugf("Running test %T", nt)
msmtPath := filepath.Join(ctx.TempDir, msmtPath := filepath.Join(ctx.TempDir,
fmt.Sprintf("msmt-%s-%T.jsonl", nt, fmt.Sprintf("msmt-%T-%s.jsonl", nt,
time.Now().UTC().Format(time.RFC3339Nano))) time.Now().UTC().Format(time.RFC3339Nano)))
ctl := nettests.NewController(nt, ctx, result, msmtPath) ctl := nettests.NewController(nt, ctx, result, msmtPath)

View File

@ -128,7 +128,7 @@ func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, erro
(name, start_time, (name, start_time,
asn, ip, country, asn, ip, country,
state, failure, report_file, state, failure, report_file,
report_id, input, measurement_id, report_id, input,
result_id) result_id)
VALUES (:name,:start_time, VALUES (:name,:start_time,
:asn,:ip,:country, :asn,:ip,:country,

View File

@ -47,6 +47,7 @@ type Controller struct {
// Init should be called once to initialise the nettest // Init should be called once to initialise the nettest
func (c *Controller) Init(nt *mk.Nettest) error { func (c *Controller) Init(nt *mk.Nettest) error {
log.Debugf("Init: %v", nt) log.Debugf("Init: %v", nt)
c.msmts = make(map[int64]*database.Measurement)
msmtTemplate := database.Measurement{ msmtTemplate := database.Measurement{
ASN: "", ASN: "",
@ -75,8 +76,8 @@ func (c *Controller) Init(nt *mk.Nettest) error {
} }
nt.On("log", func(e mk.Event) { nt.On("log", func(e mk.Event) {
level := e.Value["verbosity"].(string) level := e.Value.LogLevel
msg := e.Value["message"].(string) msg := e.Value.Message
switch level { switch level {
case "ERROR": case "ERROR":
@ -100,23 +101,22 @@ func (c *Controller) Init(nt *mk.Nettest) error {
nt.On("status.report_created", func(e mk.Event) { nt.On("status.report_created", func(e mk.Event) {
log.Debugf("%s", e.Key) log.Debugf("%s", e.Key)
msmtTemplate.ReportID = e.Value["report_id"].(string) msmtTemplate.ReportID = e.Value.ReportID
}) })
nt.On("status.geoip_lookup", func(e mk.Event) { nt.On("status.geoip_lookup", func(e mk.Event) {
log.Debugf("%s", e.Key) log.Debugf("%s", e.Key)
msmtTemplate.ASN = e.Value["probe_asn"].(string) msmtTemplate.ASN = e.Value.ProbeASN
msmtTemplate.IP = e.Value["probe_ip"].(string) msmtTemplate.IP = e.Value.ProbeIP
msmtTemplate.CountryCode = e.Value["probe_cc"].(string) msmtTemplate.CountryCode = e.Value.ProbeCC
}) })
nt.On("status.measurement_started", func(e mk.Event) { nt.On("status.measurement_started", func(e mk.Event) {
log.Debugf("%s", e.Key) log.Debugf("%s", e.Key)
idx := e.Value["idx"].(int64) idx := e.Value.Idx
input := e.Value["input"].(string) msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, e.Value.Input)
msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, input)
if err != nil { if err != nil {
log.WithError(err).Error("Failed to create measurement") log.WithError(err).Error("Failed to create measurement")
return return
@ -125,9 +125,7 @@ func (c *Controller) Init(nt *mk.Nettest) error {
}) })
nt.On("status.progress", func(e mk.Event) { nt.On("status.progress", func(e mk.Event) {
perc := e.Value["percentage"].(float64) c.OnProgress(e.Value.Percentage, e.Value.Message)
msg := e.Value["message"].(string)
c.OnProgress(perc, msg)
}) })
nt.On("status.update.*", func(e mk.Event) { nt.On("status.update.*", func(e mk.Event) {
@ -137,36 +135,30 @@ func (c *Controller) Init(nt *mk.Nettest) error {
nt.On("failure.measurement", func(e mk.Event) { nt.On("failure.measurement", func(e mk.Event) {
log.Debugf("%s", e.Key) log.Debugf("%s", e.Key)
idx := e.Value["idx"].(int64) c.msmts[e.Value.Idx].Failed(c.Ctx.DB, e.Value.Failure)
failure := e.Value["failure"].(string)
c.msmts[idx].Failed(c.Ctx.DB, failure)
}) })
nt.On("failure.measurement_submission", func(e mk.Event) { nt.On("failure.measurement_submission", func(e mk.Event) {
log.Debugf("%s", e.Key) log.Debugf("%s", e.Key)
idx := e.Value["idx"].(int64) failure := e.Value.Failure
failure := e.Value["failure"].(string) c.msmts[e.Value.Idx].UploadFailed(c.Ctx.DB, failure)
c.msmts[idx].UploadFailed(c.Ctx.DB, failure)
}) })
nt.On("status.measurement_uploaded", func(e mk.Event) { nt.On("status.measurement_uploaded", func(e mk.Event) {
log.Debugf("%s", e.Key) log.Debugf("%s", e.Key)
idx := e.Value["idx"].(int64) c.msmts[e.Value.Idx].UploadSucceeded(c.Ctx.DB)
c.msmts[idx].UploadSucceeded(c.Ctx.DB)
}) })
nt.On("status.measurement_done", func(e mk.Event) { nt.On("status.measurement_done", func(e mk.Event) {
log.Debugf("%s", e.Key) log.Debugf("%s", e.Key)
idx := e.Value["idx"].(int64) c.msmts[e.Value.Idx].Done(c.Ctx.DB)
c.msmts[idx].Done(c.Ctx.DB)
}) })
nt.On("measurement", func(e mk.Event) { nt.On("measurement", func(e mk.Event) {
idx := e.Value["idx"].(int64) c.OnEntry(e.Value.Idx, e.Value.JSONStr)
c.OnEntry(idx, e.Value["json_str"].(string))
}) })
nt.On("end", func(e mk.Event) { nt.On("end", func(e mk.Event) {
@ -188,7 +180,7 @@ type Entry struct {
// OnEntry should be called every time there is a new entry // OnEntry should be called every time there is a new entry
func (c *Controller) OnEntry(idx int64, jsonStr string) { func (c *Controller) OnEntry(idx int64, jsonStr string) {
log.Debugf("OnEntry: %s", jsonStr) log.Debugf("OnEntry")
var entry Entry var entry Entry
json.Unmarshal([]byte(jsonStr), &entry) json.Unmarshal([]byte(jsonStr), &entry)
@ -197,6 +189,7 @@ func (c *Controller) OnEntry(idx int64, jsonStr string) {
if err != nil { if err != nil {
log.WithError(err).Error("failed to serialize summary") log.WithError(err).Error("failed to serialize summary")
} }
log.Debugf("Fetching: %s %v", idx, c.msmts[idx])
c.msmts[idx].WriteSummary(c.Ctx.DB, string(summaryBytes)) c.msmts[idx].WriteSummary(c.Ctx.DB, string(summaryBytes))
} }

View File

@ -19,9 +19,9 @@ func (d Dash) Run(ctl *nettests.Controller) error {
// DashSummary for the test // DashSummary for the test
// TODO: process 'receiver_data' to provide an array of performance for a chart. // TODO: process 'receiver_data' to provide an array of performance for a chart.
type DashSummary struct { type DashSummary struct {
Latency float32 Latency float64
Bitrate int64 Bitrate int64
Delay float32 Delay float64
} }
// Summary generates a summary for a test run // Summary generates a summary for a test run
@ -29,9 +29,9 @@ func (d Dash) Summary(tk map[string]interface{}) interface{} {
simple := tk["simple"].(map[string]interface{}) simple := tk["simple"].(map[string]interface{})
return DashSummary{ return DashSummary{
Latency: simple["connect_latency"].(float32), Latency: simple["connect_latency"].(float64),
Bitrate: simple["median_bitrate"].(int64), Bitrate: int64(simple["median_bitrate"].(float64)),
Delay: simple["min_playout_delay"].(float32), Delay: simple["min_playout_delay"].(float64),
} }
} }

View File

@ -26,7 +26,7 @@ type NDTSummary struct {
MinRTT int64 MinRTT int64
MSS int64 MSS int64
OutOfOrder int64 OutOfOrder int64
PacketLoss float32 PacketLoss float64
Timeouts int64 Timeouts int64
} }
@ -44,7 +44,7 @@ func (n NDT) Summary(tk map[string]interface{}) interface{} {
MinRTT: advanced["min_rtt"].(int64), MinRTT: advanced["min_rtt"].(int64),
MSS: advanced["mss"].(int64), MSS: advanced["mss"].(int64),
OutOfOrder: advanced["out_of_order"].(int64), OutOfOrder: advanced["out_of_order"].(int64),
PacketLoss: advanced["packet_loss"].(float32), PacketLoss: advanced["packet_loss"].(float64),
Timeouts: advanced["timeouts"].(int64), Timeouts: advanced["timeouts"].(int64),
} }
} }