Add support for tracking the is_uploaded status in the results table (#312)
* Add support for tracking the is_uploaded status in the results table * Add unit tests for measurement upload status This implements: https://github.com/ooni/probe/issues/1457 * Update cmd/ooniprobe/internal/database/actions.go Co-authored-by: Simone Basso <bassosimone@gmail.com>
This commit is contained in:
parent
764293795e
commit
ac7d7dc8a3
|
@ -83,6 +83,7 @@ func init() {
|
||||||
MeasurementAnomalyCount: 0,
|
MeasurementAnomalyCount: 0,
|
||||||
TestKeys: "{}", // FIXME this used to be Summary we probably need to use a list now
|
TestKeys: "{}", // FIXME this used to be Summary we probably need to use a list now
|
||||||
Done: result.IsDone,
|
Done: result.IsDone,
|
||||||
|
IsUploaded: result.IsUploaded,
|
||||||
DataUsageUp: result.DataUsageUp,
|
DataUsageUp: result.DataUsageUp,
|
||||||
DataUsageDown: result.DataUsageDown,
|
DataUsageDown: result.DataUsageDown,
|
||||||
})
|
})
|
||||||
|
|
|
@ -57,7 +57,7 @@ func GetMeasurementJSON(sess sqlbuilder.Database, measurementID int64) (map[stri
|
||||||
log.Errorf("failed to run query %s: %v", req.String(), err)
|
log.Errorf("failed to run query %s: %v", req.String(), err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if measurement.IsUploaded {
|
if measurement.Measurement.IsUploaded {
|
||||||
// TODO(bassosimone): this should be a function exposed by probe-engine
|
// TODO(bassosimone): this should be a function exposed by probe-engine
|
||||||
reportID := measurement.Measurement.ReportID.String
|
reportID := measurement.Measurement.ReportID.String
|
||||||
measurementURL := &url.URL{
|
measurementURL := &url.URL{
|
||||||
|
@ -204,6 +204,46 @@ func DeleteResult(sess sqlbuilder.Database, resultID int64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateUploadedStatus will check if all the measurements inside of a given result set have been uploaded and if so will set the is_uploaded flag to true
|
||||||
|
func UpdateUploadedStatus(sess sqlbuilder.Database, result *Result) error {
|
||||||
|
tx, err := sess.NewTx(nil)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("failed to create transaction")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
uploadedTotal := UploadedTotalCount{}
|
||||||
|
req := tx.Select(
|
||||||
|
db.Raw("SUM(measurements.measurement_is_uploaded)"),
|
||||||
|
db.Raw("COUNT(*)"),
|
||||||
|
).From("results").
|
||||||
|
Join("measurements").On("measurements.result_id = results.result_id").
|
||||||
|
Where("results.result_id = ?", result.ID)
|
||||||
|
|
||||||
|
err = req.One(&uploadedTotal)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("failed to retrieve total vs uploaded counts")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if uploadedTotal.UploadedCount == uploadedTotal.TotalCount {
|
||||||
|
result.IsUploaded = true
|
||||||
|
} else {
|
||||||
|
result.IsUploaded = false
|
||||||
|
}
|
||||||
|
err = tx.Collection("results").Find("result_id", result.ID).Update(result)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("failed to update result")
|
||||||
|
return errors.Wrap(err, "updating result")
|
||||||
|
}
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("Failed to write to the results table")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// CreateMeasurement writes the measurement to the database a returns a pointer
|
// CreateMeasurement writes the measurement to the database a returns a pointer
|
||||||
// to the Measurement
|
// to the Measurement
|
||||||
func CreateMeasurement(sess sqlbuilder.Database, reportID sql.NullString, testName string, measurementDir string, idx int, resultID int64, urlID sql.NullInt64) (*Measurement, error) {
|
func CreateMeasurement(sess sqlbuilder.Database, reportID sql.NullString, testName string, measurementDir string, idx int, resultID int64, urlID sql.NullInt64) (*Measurement, error) {
|
||||||
|
|
|
@ -86,15 +86,36 @@ func TestMeasurementWorkflow(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
m1.IsUploaded = true
|
||||||
|
err = sess.Collection("measurements").Find("measurement_id", m1.ID).Update(m1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
var m2 Measurement
|
var m2 Measurement
|
||||||
err = sess.Collection("measurements").Find("measurement_id", m1.ID).One(&m2)
|
err = sess.Collection("measurements").Find("measurement_id", m1.ID).One(&m2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
m2.IsUploaded = false
|
||||||
|
err = sess.Collection("measurements").Find("measurement_id", m2.ID).Update(m2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
if m2.ResultID != m1.ResultID {
|
if m2.ResultID != m1.ResultID {
|
||||||
t.Error("result_id mismatch")
|
t.Error("result_id mismatch")
|
||||||
}
|
}
|
||||||
|
err = UpdateUploadedStatus(sess, result)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var r Result
|
||||||
|
err = sess.Collection("measurements").Find("result_id", result.ID).One(&r)
|
||||||
|
if r.IsUploaded == true {
|
||||||
|
t.Error("result should be marked as not uploaded")
|
||||||
|
}
|
||||||
|
|
||||||
done, incomplete, err := ListResults(sess)
|
done, incomplete, err := ListResults(sess)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -166,6 +187,7 @@ func TestDeleteResult(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if m2.ResultID != m1.ResultID {
|
if m2.ResultID != m1.ResultID {
|
||||||
t.Error("result_id mismatch")
|
t.Error("result_id mismatch")
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
-- +migrate Down
|
||||||
|
-- +migrate StatementBegin
|
||||||
|
|
||||||
|
ALTER TABLE `results`
|
||||||
|
DROP COLUMN result_is_uploaded;
|
||||||
|
|
||||||
|
-- +migrate StatementEnd
|
||||||
|
|
||||||
|
-- +migrate Up
|
||||||
|
-- +migrate StatementBegin
|
||||||
|
|
||||||
|
ALTER TABLE `results`
|
||||||
|
ADD COLUMN result_is_uploaded TINYINT(1) DEFAULT 1 NOT NULL;
|
||||||
|
|
||||||
|
-- +migrate StatementEnd
|
|
@ -15,6 +15,12 @@ type ResultNetwork struct {
|
||||||
Network `db:",inline"`
|
Network `db:",inline"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UploadedTotalCount is the count of the measurements which have been uploaded vs the total measurements in a given result set
|
||||||
|
type UploadedTotalCount struct {
|
||||||
|
UploadedCount int64 `db:",inline"`
|
||||||
|
TotalCount int64 `db:",inline"`
|
||||||
|
}
|
||||||
|
|
||||||
// MeasurementURLNetwork is used for the JOIN between Measurement and URL
|
// MeasurementURLNetwork is used for the JOIN between Measurement and URL
|
||||||
type MeasurementURLNetwork struct {
|
type MeasurementURLNetwork struct {
|
||||||
Measurement `db:",inline"`
|
Measurement `db:",inline"`
|
||||||
|
@ -74,6 +80,7 @@ type Result struct {
|
||||||
Runtime float64 `db:"result_runtime"` // Runtime is expressed in fractional seconds
|
Runtime float64 `db:"result_runtime"` // Runtime is expressed in fractional seconds
|
||||||
IsViewed bool `db:"result_is_viewed"`
|
IsViewed bool `db:"result_is_viewed"`
|
||||||
IsDone bool `db:"result_is_done"`
|
IsDone bool `db:"result_is_done"`
|
||||||
|
IsUploaded bool `db:"result_is_uploaded"`
|
||||||
DataUsageUp float64 `db:"result_data_usage_up"`
|
DataUsageUp float64 `db:"result_data_usage_up"`
|
||||||
DataUsageDown float64 `db:"result_data_usage_down"`
|
DataUsageDown float64 `db:"result_data_usage_down"`
|
||||||
MeasurementDir string `db:"measurement_dir"`
|
MeasurementDir string `db:"measurement_dir"`
|
||||||
|
|
|
@ -214,7 +214,7 @@ func (c *Controller) Run(builder *engine.ExperimentBuilder, inputs []string) err
|
||||||
return errors.Wrap(err, "failed to add test keys to summary")
|
return errors.Wrap(err, "failed to add test keys to summary")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
database.UpdateUploadedStatus(c.Probe.DB(), c.res)
|
||||||
log.Debugf("status.end")
|
log.Debugf("status.end")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,7 +78,7 @@ func MeasurementItem(msmt database.MeasurementURLNetwork, isFirst bool, isLast b
|
||||||
"url_category_code": msmt.URL.CategoryCode.String,
|
"url_category_code": msmt.URL.CategoryCode.String,
|
||||||
"url_country_code": msmt.URL.CountryCode.String,
|
"url_country_code": msmt.URL.CountryCode.String,
|
||||||
"is_anomaly": msmt.IsAnomaly.Bool,
|
"is_anomaly": msmt.IsAnomaly.Bool,
|
||||||
"is_uploaded": msmt.IsUploaded,
|
"is_uploaded": msmt.Measurement.IsUploaded,
|
||||||
"is_upload_failed": msmt.IsUploadFailed,
|
"is_upload_failed": msmt.IsUploadFailed,
|
||||||
"upload_failure_msg": msmt.UploadFailureMsg.String,
|
"upload_failure_msg": msmt.UploadFailureMsg.String,
|
||||||
"is_failed": msmt.IsFailed,
|
"is_failed": msmt.IsFailed,
|
||||||
|
@ -102,6 +102,7 @@ type ResultItemData struct {
|
||||||
NetworkName string
|
NetworkName string
|
||||||
ASN uint
|
ASN uint
|
||||||
Done bool
|
Done bool
|
||||||
|
IsUploaded bool
|
||||||
DataUsageDown float64
|
DataUsageDown float64
|
||||||
DataUsageUp float64
|
DataUsageUp float64
|
||||||
Index int
|
Index int
|
||||||
|
@ -123,6 +124,7 @@ func ResultItem(result ResultItemData) {
|
||||||
"asn": result.ASN,
|
"asn": result.ASN,
|
||||||
"runtime": result.Runtime,
|
"runtime": result.Runtime,
|
||||||
"is_done": result.Done,
|
"is_done": result.Done,
|
||||||
|
"is_uploaded": result.IsUploaded,
|
||||||
"data_usage_down": result.DataUsageDown,
|
"data_usage_down": result.DataUsageDown,
|
||||||
"data_usage_up": result.DataUsageUp,
|
"data_usage_up": result.DataUsageUp,
|
||||||
"index": result.Index,
|
"index": result.Index,
|
||||||
|
|
Loading…
Reference in New Issue
Block a user