diff --git a/cmd/ooniprobe/internal/cli/list/list.go b/cmd/ooniprobe/internal/cli/list/list.go index 4ade056..05f45b6 100644 --- a/cmd/ooniprobe/internal/cli/list/list.go +++ b/cmd/ooniprobe/internal/cli/list/list.go @@ -83,6 +83,7 @@ func init() { MeasurementAnomalyCount: 0, TestKeys: "{}", // FIXME this used to be Summary we probably need to use a list now Done: result.IsDone, + IsUploaded: result.IsUploaded, DataUsageUp: result.DataUsageUp, DataUsageDown: result.DataUsageDown, }) diff --git a/cmd/ooniprobe/internal/database/actions.go b/cmd/ooniprobe/internal/database/actions.go index 8ec6fe0..5a9e2dd 100644 --- a/cmd/ooniprobe/internal/database/actions.go +++ b/cmd/ooniprobe/internal/database/actions.go @@ -57,7 +57,7 @@ func GetMeasurementJSON(sess sqlbuilder.Database, measurementID int64) (map[stri log.Errorf("failed to run query %s: %v", req.String(), err) return nil, err } - if measurement.IsUploaded { + if measurement.Measurement.IsUploaded { // TODO(bassosimone): this should be a function exposed by probe-engine reportID := measurement.Measurement.ReportID.String measurementURL := &url.URL{ @@ -204,6 +204,46 @@ func DeleteResult(sess sqlbuilder.Database, resultID int64) error { return nil } +// UpdateUploadedStatus will check if all the measurements inside of a given result set have been uploaded and if so will set the is_uploaded flag to true +func UpdateUploadedStatus(sess sqlbuilder.Database, result *Result) error { + tx, err := sess.NewTx(nil) + if err != nil { + log.WithError(err).Error("failed to create transaction") + return err + } + + uploadedTotal := UploadedTotalCount{} + req := tx.Select( + db.Raw("SUM(measurements.measurement_is_uploaded)"), + db.Raw("COUNT(*)"), + ).From("results"). + Join("measurements").On("measurements.result_id = results.result_id"). + Where("results.result_id = ?", result.ID) + + err = req.One(&uploadedTotal) + if err != nil { + log.WithError(err).Error("failed to retrieve total vs uploaded counts") + return err + } + if uploadedTotal.UploadedCount == uploadedTotal.TotalCount { + result.IsUploaded = true + } else { + result.IsUploaded = false + } + err = tx.Collection("results").Find("result_id", result.ID).Update(result) + if err != nil { + log.WithError(err).Error("failed to update result") + return errors.Wrap(err, "updating result") + } + err = tx.Commit() + if err != nil { + log.WithError(err).Error("Failed to write to the results table") + return err + } + + return nil +} + // CreateMeasurement writes the measurement to the database a returns a pointer // to the Measurement func CreateMeasurement(sess sqlbuilder.Database, reportID sql.NullString, testName string, measurementDir string, idx int, resultID int64, urlID sql.NullInt64) (*Measurement, error) { diff --git a/cmd/ooniprobe/internal/database/actions_test.go b/cmd/ooniprobe/internal/database/actions_test.go index 902a100..ea0bace 100644 --- a/cmd/ooniprobe/internal/database/actions_test.go +++ b/cmd/ooniprobe/internal/database/actions_test.go @@ -86,15 +86,36 @@ func TestMeasurementWorkflow(t *testing.T) { if err != nil { t.Fatal(err) } + m1.IsUploaded = true + err = sess.Collection("measurements").Find("measurement_id", m1.ID).Update(m1) + if err != nil { + t.Fatal(err) + } var m2 Measurement err = sess.Collection("measurements").Find("measurement_id", m1.ID).One(&m2) if err != nil { t.Fatal(err) } + m2.IsUploaded = false + err = sess.Collection("measurements").Find("measurement_id", m2.ID).Update(m2) + if err != nil { + t.Fatal(err) + } + if m2.ResultID != m1.ResultID { t.Error("result_id mismatch") } + err = UpdateUploadedStatus(sess, result) + if err != nil { + t.Fatal(err) + } + + var r Result + err = sess.Collection("measurements").Find("result_id", result.ID).One(&r) + if r.IsUploaded == true { + t.Error("result should be marked as not uploaded") + } done, incomplete, err := ListResults(sess) if err != nil { @@ -166,6 +187,7 @@ func TestDeleteResult(t *testing.T) { if err != nil { t.Fatal(err) } + if m2.ResultID != m1.ResultID { t.Error("result_id mismatch") } diff --git a/cmd/ooniprobe/internal/database/migrations/3_results_is_uploaded.sql b/cmd/ooniprobe/internal/database/migrations/3_results_is_uploaded.sql new file mode 100644 index 0000000..94cc775 --- /dev/null +++ b/cmd/ooniprobe/internal/database/migrations/3_results_is_uploaded.sql @@ -0,0 +1,15 @@ +-- +migrate Down +-- +migrate StatementBegin + +ALTER TABLE `results` +DROP COLUMN result_is_uploaded; + +-- +migrate StatementEnd + +-- +migrate Up +-- +migrate StatementBegin + +ALTER TABLE `results` +ADD COLUMN result_is_uploaded TINYINT(1) DEFAULT 1 NOT NULL; + +-- +migrate StatementEnd \ No newline at end of file diff --git a/cmd/ooniprobe/internal/database/models.go b/cmd/ooniprobe/internal/database/models.go index 8356741..a9ec835 100644 --- a/cmd/ooniprobe/internal/database/models.go +++ b/cmd/ooniprobe/internal/database/models.go @@ -15,6 +15,12 @@ type ResultNetwork struct { Network `db:",inline"` } +// UploadedTotalCount is the count of the measurements which have been uploaded vs the total measurements in a given result set +type UploadedTotalCount struct { + UploadedCount int64 `db:",inline"` + TotalCount int64 `db:",inline"` +} + // MeasurementURLNetwork is used for the JOIN between Measurement and URL type MeasurementURLNetwork struct { Measurement `db:",inline"` @@ -74,6 +80,7 @@ type Result struct { Runtime float64 `db:"result_runtime"` // Runtime is expressed in fractional seconds IsViewed bool `db:"result_is_viewed"` IsDone bool `db:"result_is_done"` + IsUploaded bool `db:"result_is_uploaded"` DataUsageUp float64 `db:"result_data_usage_up"` DataUsageDown float64 `db:"result_data_usage_down"` MeasurementDir string `db:"measurement_dir"` diff --git a/cmd/ooniprobe/internal/nettests/nettests.go b/cmd/ooniprobe/internal/nettests/nettests.go index ff9d987..faf653b 100644 --- a/cmd/ooniprobe/internal/nettests/nettests.go +++ b/cmd/ooniprobe/internal/nettests/nettests.go @@ -214,7 +214,7 @@ func (c *Controller) Run(builder *engine.ExperimentBuilder, inputs []string) err return errors.Wrap(err, "failed to add test keys to summary") } } - + database.UpdateUploadedStatus(c.Probe.DB(), c.res) log.Debugf("status.end") return nil } diff --git a/cmd/ooniprobe/internal/output/output.go b/cmd/ooniprobe/internal/output/output.go index 245f7d0..3c21268 100644 --- a/cmd/ooniprobe/internal/output/output.go +++ b/cmd/ooniprobe/internal/output/output.go @@ -78,7 +78,7 @@ func MeasurementItem(msmt database.MeasurementURLNetwork, isFirst bool, isLast b "url_category_code": msmt.URL.CategoryCode.String, "url_country_code": msmt.URL.CountryCode.String, "is_anomaly": msmt.IsAnomaly.Bool, - "is_uploaded": msmt.IsUploaded, + "is_uploaded": msmt.Measurement.IsUploaded, "is_upload_failed": msmt.IsUploadFailed, "upload_failure_msg": msmt.UploadFailureMsg.String, "is_failed": msmt.IsFailed, @@ -102,6 +102,7 @@ type ResultItemData struct { NetworkName string ASN uint Done bool + IsUploaded bool DataUsageDown float64 DataUsageUp float64 Index int @@ -123,6 +124,7 @@ func ResultItem(result ResultItemData) { "asn": result.ASN, "runtime": result.Runtime, "is_done": result.Done, + "is_uploaded": result.IsUploaded, "data_usage_down": result.DataUsageDown, "data_usage_up": result.DataUsageUp, "index": result.Index,