Implement writing using the new single measurement pattern

This commit is contained in:
Arturo Filastò 2020-01-28 11:53:00 +01:00
parent af46a495f4
commit e3d68457b3
4 changed files with 49 additions and 118 deletions

View File

@ -1,12 +1,12 @@
package database package database
import ( import (
"bufio"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"io" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath"
"reflect" "reflect"
"time" "time"
@ -55,44 +55,17 @@ func GetMeasurementJSON(sess sqlbuilder.Database, measurementID int64) (map[stri
log.Errorf("failed to run query %s: %v", req.String(), err) log.Errorf("failed to run query %s: %v", req.String(), err)
return nil, err return nil, err
} }
reportFilePath := measurement.Measurement.ReportFilePath measurementFilePath := measurement.Measurement.MeasurementFilePath.String
// If the url->url is NULL then we are dealing with a single entry // TODO handle the case in which we have MeasurementFilePath == NULL because
// measurement and all we have to do is read the file and return it. // it's a beta measurement
if measurement.URL.URL.Valid == false { b, err := ioutil.ReadFile(measurementFilePath)
b, err := ioutil.ReadFile(reportFilePath)
if err != nil {
return nil, err
}
if err := json.Unmarshal(b, &msmtJSON); err != nil {
return nil, err
}
return msmtJSON, nil
}
// When the URL is a string then we need to seek until we reach the
// measurement line in the file that matches the target input
url := measurement.URL.URL.String
file, err := os.Open(reportFilePath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer file.Close() if err := json.Unmarshal(b, &msmtJSON); err != nil {
reader := bufio.NewReader(file) return nil, err
for {
line, err := reader.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if err := json.Unmarshal([]byte(line), &msmtJSON); err != nil {
return nil, err
}
if msmtJSON["input"].(string) == url {
return msmtJSON, nil
}
} }
return nil, errors.New("Could not find measurement") return msmtJSON, nil
} }
// GetResultTestKeys returns a list of TestKeys for a given result // GetResultTestKeys returns a list of TestKeys for a given result
@ -197,15 +170,16 @@ func DeleteResult(sess sqlbuilder.Database, resultID int64) error {
// CreateMeasurement writes the measurement to the database a returns a pointer // CreateMeasurement writes the measurement to the database a returns a pointer
// to the Measurement // to the Measurement
func CreateMeasurement(sess sqlbuilder.Database, reportID sql.NullString, testName string, resultID int64, reportFilePath string, urlID sql.NullInt64) (*Measurement, error) { func CreateMeasurement(sess sqlbuilder.Database, reportID sql.NullString, testName string, measurementDir string, idx int, resultID int64, urlID sql.NullInt64) (*Measurement, error) {
msmtFilePath := filepath.Join(measurementDir, fmt.Sprintf("msmt-%d.json", idx))
msmt := Measurement{ msmt := Measurement{
ReportID: reportID, ReportID: reportID,
TestName: testName, TestName: testName,
ResultID: resultID, ResultID: resultID,
ReportFilePath: reportFilePath, MeasurementFilePath: sql.NullString{String: msmtFilePath, Valid: true},
URLID: urlID, URLID: urlID,
IsFailed: false, IsFailed: false,
IsDone: false, IsDone: false,
// XXX Do we want to have this be part of something else? // XXX Do we want to have this be part of something else?
StartTime: time.Now().UTC(), StartTime: time.Now().UTC(),
TestKeys: "", TestKeys: "",

View File

@ -2,11 +2,8 @@ package database
import ( import (
"database/sql" "database/sql"
"os"
"path/filepath"
"time" "time"
"github.com/ooni/probe-cli/utils/shutil"
"github.com/pkg/errors" "github.com/pkg/errors"
"upper.io/db.v3/lib/sqlbuilder" "upper.io/db.v3/lib/sqlbuilder"
) )
@ -62,9 +59,10 @@ type Measurement struct {
MeasurementID sql.NullInt64 `db:"collector_measurement_id,omitempty"` MeasurementID sql.NullInt64 `db:"collector_measurement_id,omitempty"`
IsAnomaly sql.NullBool `db:"is_anomaly,omitempty"` IsAnomaly sql.NullBool `db:"is_anomaly,omitempty"`
// FIXME we likely want to support JSON. See: https://github.com/upper/db/issues/462 // FIXME we likely want to support JSON. See: https://github.com/upper/db/issues/462
TestKeys string `db:"test_keys"` TestKeys string `db:"test_keys"`
ResultID int64 `db:"result_id"` ResultID int64 `db:"result_id"`
ReportFilePath string `db:"report_file_path"` ReportFilePath sql.NullString `db:"report_file_path,omitempty"`
MeasurementFilePath sql.NullString `db:"measurement_file_path,omitempty"`
} }
// Result model // Result model
@ -150,32 +148,3 @@ func (m *Measurement) UploadSucceeded(sess sqlbuilder.Database) error {
} }
return nil return nil
} }
// AddToResult adds a measurement to a result
func (m *Measurement) AddToResult(sess sqlbuilder.Database, result *Result) error {
var err error
m.ResultID = result.ID
finalPath := filepath.Join(result.MeasurementDir,
filepath.Base(m.ReportFilePath))
// If the finalPath already exists, it means it has already been moved there.
// This happens in multi input reports
if _, err = os.Stat(finalPath); os.IsNotExist(err) {
err := shutil.CopyFile(m.ReportFilePath, finalPath, false)
if err != nil {
return errors.Wrap(err, "copying report file")
}
err = os.Remove(m.ReportFilePath)
if err != nil {
return errors.Wrap(err, "deleting report file")
}
}
m.ReportFilePath = finalPath
err = sess.Collection("measurements").Find("measurement_id", m.ID).Update(m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
return nil
}

View File

@ -63,26 +63,27 @@ func MeasurementItem(msmt database.MeasurementURLNetwork, isFirst bool, isLast b
"is_first": isFirst, "is_first": isFirst,
"is_last": isLast, "is_last": isLast,
"id": msmt.Measurement.ID, "id": msmt.Measurement.ID,
"test_name": msmt.TestName, "test_name": msmt.TestName,
"test_group_name": msmt.Result.TestGroupName, "test_group_name": msmt.Result.TestGroupName,
"start_time": msmt.Measurement.StartTime, "start_time": msmt.Measurement.StartTime,
"test_keys": msmt.TestKeys, "test_keys": msmt.TestKeys,
"network_country_code": msmt.Network.CountryCode, "network_country_code": msmt.Network.CountryCode,
"network_name": msmt.Network.NetworkName, "network_name": msmt.Network.NetworkName,
"asn": msmt.Network.ASN, "asn": msmt.Network.ASN,
"runtime": msmt.Measurement.Runtime, "runtime": msmt.Measurement.Runtime,
"url": msmt.URL.URL.String, "url": msmt.URL.URL.String,
"url_category_code": msmt.URL.CategoryCode.String, "url_category_code": msmt.URL.CategoryCode.String,
"url_country_code": msmt.URL.CountryCode.String, "url_country_code": msmt.URL.CountryCode.String,
"is_anomaly": msmt.IsAnomaly.Bool, "is_anomaly": msmt.IsAnomaly.Bool,
"is_uploaded": msmt.IsUploaded, "is_uploaded": msmt.IsUploaded,
"is_upload_failed": msmt.IsUploadFailed, "is_upload_failed": msmt.IsUploadFailed,
"upload_failure_msg": msmt.UploadFailureMsg.String, "upload_failure_msg": msmt.UploadFailureMsg.String,
"is_failed": msmt.IsFailed, "is_failed": msmt.IsFailed,
"failure_msg": msmt.FailureMsg.String, "failure_msg": msmt.FailureMsg.String,
"is_done": msmt.Measurement.IsDone, "is_done": msmt.Measurement.IsDone,
"report_file_path": msmt.ReportFilePath, "report_file_path": msmt.ReportFilePath.String,
"measurement_file_path": msmt.MeasurementFilePath.String,
}).Info("measurement") }).Info("measurement")
} }

View File

@ -3,7 +3,6 @@ package nettests
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"path/filepath"
"time" "time"
"github.com/apex/log" "github.com/apex/log"
@ -11,7 +10,6 @@ import (
ooni "github.com/ooni/probe-cli" ooni "github.com/ooni/probe-cli"
"github.com/ooni/probe-cli/internal/database" "github.com/ooni/probe-cli/internal/database"
"github.com/ooni/probe-cli/internal/output" "github.com/ooni/probe-cli/internal/output"
"github.com/ooni/probe-cli/utils"
engine "github.com/ooni/probe-engine" engine "github.com/ooni/probe-engine"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -25,14 +23,10 @@ type Nettest interface {
// NewController creates a nettest controller // NewController creates a nettest controller
func NewController(nt Nettest, ctx *ooni.Context, res *database.Result) *Controller { func NewController(nt Nettest, ctx *ooni.Context, res *database.Result) *Controller {
msmtPath := filepath.Join(ctx.TempDir,
fmt.Sprintf("msmt-%T-%s.jsonl", nt,
time.Now().UTC().Format(utils.ResultTimestamp)))
return &Controller{ return &Controller{
Ctx: ctx, Ctx: ctx,
nt: nt, nt: nt,
res: res, res: res,
msmtPath: msmtPath,
} }
} }
@ -46,7 +40,6 @@ type Controller struct {
ntIndex int ntIndex int
ntStartTime time.Time // used to calculate the eta ntStartTime time.Time // used to calculate the eta
msmts map[int64]*database.Measurement msmts map[int64]*database.Measurement
msmtPath string // XXX maybe we can drop this and just use a temporary file
inputIdxMap map[int64]int64 // Used to map mk idx to database id inputIdxMap map[int64]int64 // Used to map mk idx to database id
// numInputs is the total number of inputs // numInputs is the total number of inputs
@ -91,7 +84,6 @@ func (c *Controller) Run(builder *engine.ExperimentBuilder, inputs []string) err
log.Debug(color.RedString("status.queued")) log.Debug(color.RedString("status.queued"))
log.Debug(color.RedString("status.started")) log.Debug(color.RedString("status.started"))
log.Debugf("OutputPath: %s", c.msmtPath)
if c.Ctx.Config.Sharing.UploadResults { if c.Ctx.Config.Sharing.UploadResults {
if err := exp.OpenReport(); err != nil { if err := exp.OpenReport(); err != nil {
@ -118,8 +110,9 @@ func (c *Controller) Run(builder *engine.ExperimentBuilder, inputs []string) err
if c.inputIdxMap != nil { if c.inputIdxMap != nil {
urlID = sql.NullInt64{Int64: c.inputIdxMap[idx64], Valid: true} urlID = sql.NullInt64{Int64: c.inputIdxMap[idx64], Valid: true}
} }
msmt, err := database.CreateMeasurement( msmt, err := database.CreateMeasurement(
c.Ctx.DB, reportID, exp.Name(), resultID, c.msmtPath, urlID, c.Ctx.DB, reportID, exp.Name(), c.res.MeasurementDir, idx, resultID, urlID,
) )
if err != nil { if err != nil {
return errors.Wrap(err, "failed to create measurement") return errors.Wrap(err, "failed to create measurement")
@ -149,7 +142,7 @@ func (c *Controller) Run(builder *engine.ExperimentBuilder, inputs []string) err
} }
} }
if err := exp.SaveMeasurement(measurement, c.msmtPath); err != nil { if err := exp.SaveMeasurement(measurement, msmt.MeasurementFilePath.String); err != nil {
return errors.Wrap(err, "failed to save measurement on disk") return errors.Wrap(err, "failed to save measurement on disk")
} }
if err := c.msmts[idx64].Done(c.Ctx.DB); err != nil { if err := c.msmts[idx64].Done(c.Ctx.DB); err != nil {
@ -178,12 +171,6 @@ func (c *Controller) Run(builder *engine.ExperimentBuilder, inputs []string) err
} }
log.Debugf("status.end") log.Debugf("status.end")
for idx, msmt := range c.msmts {
log.Debugf("adding msmt#%d to result", idx)
if err := msmt.AddToResult(c.Ctx.DB, c.res); err != nil {
return errors.Wrap(err, "failed to add to result")
}
}
return nil return nil
} }