refactor(ooniprobe): migrate database to internal (#979)

See https://github.com/ooni/probe/issues/2352

Co-authored-by: decfox <decfox@github.com>
Co-authored-by: Simone Basso <bassosimone@gmail.com>
This commit is contained in:
DecFox
2022-11-15 15:05:30 +05:30
committed by GitHub
parent d6def35286
commit 6b01264373
20 changed files with 53 additions and 56 deletions
+1 -1
View File
@@ -6,8 +6,8 @@ import (
"github.com/alecthomas/kingpin"
"github.com/apex/log"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/cli/root"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/database"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/output"
"github.com/ooni/probe-cli/v3/internal/database"
)
func init() {
+1 -1
View File
@@ -8,7 +8,7 @@ import (
"github.com/alecthomas/kingpin"
"github.com/apex/log"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/cli/root"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/database"
"github.com/ooni/probe-cli/v3/internal/database"
"github.com/upper/db/v4"
)
+1 -1
View File
@@ -4,8 +4,8 @@ import (
"github.com/alecthomas/kingpin"
"github.com/apex/log"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/cli/root"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/database"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/output"
"github.com/ooni/probe-cli/v3/internal/database"
)
func init() {
-367
View File
@@ -1,367 +0,0 @@
package database
import (
"database/sql"
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"path/filepath"
"reflect"
"time"
"github.com/apex/log"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/enginex"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/utils"
"github.com/pkg/errors"
"github.com/upper/db/v4"
)
// ListMeasurements given a result ID
func ListMeasurements(sess db.Session, resultID int64) ([]MeasurementURLNetwork, error) {
measurements := []MeasurementURLNetwork{}
req := sess.SQL().Select(
db.Raw("networks.*"),
db.Raw("urls.*"),
db.Raw("measurements.*"),
db.Raw("results.*"),
).From("results").
Join("measurements").On("results.result_id = measurements.result_id").
Join("networks").On("results.network_id = networks.network_id").
LeftJoin("urls").On("urls.url_id = measurements.url_id").
OrderBy("measurements.measurement_start_time").
Where("results.result_id = ?", resultID)
if err := req.All(&measurements); err != nil {
log.Errorf("failed to run query %s: %v", req.String(), err)
return measurements, err
}
return measurements, nil
}
// GetMeasurementJSON returns a map[string]interface{} given a database and a measurementID
func GetMeasurementJSON(sess db.Session, measurementID int64) (map[string]interface{}, error) {
var (
measurement MeasurementURLNetwork
msmtJSON map[string]interface{}
)
req := sess.SQL().Select(
db.Raw("urls.*"),
db.Raw("measurements.*"),
).From("measurements").
LeftJoin("urls").On("urls.url_id = measurements.url_id").
Where("measurements.measurement_id= ?", measurementID)
if err := req.One(&measurement); err != nil {
log.Errorf("failed to run query %s: %v", req.String(), err)
return nil, err
}
if measurement.Measurement.IsUploaded {
// TODO(bassosimone): this should be a function exposed by probe-engine
reportID := measurement.Measurement.ReportID.String
measurementURL := &url.URL{
Scheme: "https",
Host: "api.ooni.io",
Path: "/api/v1/raw_measurement",
}
query := url.Values{}
query.Add("report_id", reportID)
if measurement.URL.URL.Valid == true {
query.Add("input", measurement.URL.URL.String)
}
measurementURL.RawQuery = query.Encode()
log.Debugf("using %s", measurementURL.String())
resp, err := http.Get(measurementURL.String())
if err != nil {
log.Errorf("failed to fetch the measurement %s %s", reportID, measurement.URL.URL.String)
return nil, err
}
defer resp.Body.Close()
if err := json.NewDecoder(resp.Body).Decode(&msmtJSON); err != nil {
log.Error("failed to unmarshal the measurement_json")
return nil, err
}
return msmtJSON, nil
}
// MeasurementFilePath might be NULL because the measurement from a
// 3.0.0-beta install
if measurement.Measurement.MeasurementFilePath.Valid == false {
log.Error("invalid measurement_file_path")
log.Error("backup your OONI_HOME and run `ooniprobe reset`")
return nil, errors.New("cannot access measurement file")
}
measurementFilePath := measurement.Measurement.MeasurementFilePath.String
b, err := os.ReadFile(measurementFilePath)
if err != nil {
return nil, err
}
if err := json.Unmarshal(b, &msmtJSON); err != nil {
log.Error("failed to unmarshal the measurement_json")
log.Error("backup your OONI_HOME and run `ooniprobe reset`")
return nil, err
}
return msmtJSON, nil
}
// ListResults return the list of results
func ListResults(sess db.Session) ([]ResultNetwork, []ResultNetwork, error) {
doneResults := []ResultNetwork{}
incompleteResults := []ResultNetwork{}
req := sess.SQL().Select(
db.Raw("networks.network_name"),
db.Raw("networks.network_type"),
db.Raw("networks.ip"),
db.Raw("networks.asn"),
db.Raw("networks.network_country_code"),
db.Raw("results.result_id"),
db.Raw("results.test_group_name"),
db.Raw("results.result_start_time"),
db.Raw("results.network_id"),
db.Raw("results.result_is_viewed"),
db.Raw("results.result_runtime"),
db.Raw("results.result_is_done"),
db.Raw("results.result_is_uploaded"),
db.Raw("results.result_data_usage_up"),
db.Raw("results.result_data_usage_down"),
db.Raw("results.measurement_dir"),
db.Raw("COUNT(CASE WHEN measurements.is_anomaly = TRUE THEN 1 END) as anomaly_count"),
db.Raw("COUNT() as total_count"),
// The test_keys column are concanetated with the "|" character as a separator.
// We consider this to be safe since we only really care about values of the
// performance test_keys where the values are all numbers and none of the keys
// contain the "|" character.
db.Raw("group_concat(test_keys, '|') as test_keys"),
).From("results").
Join("networks").On("results.network_id = networks.network_id").
Join("measurements").On("measurements.result_id = results.result_id").
OrderBy("results.result_start_time").
GroupBy(
db.Raw("networks.network_name"),
db.Raw("networks.network_type"),
db.Raw("networks.ip"),
db.Raw("networks.asn"),
db.Raw("networks.network_country_code"),
db.Raw("results.result_id"),
db.Raw("results.test_group_name"),
db.Raw("results.result_start_time"),
db.Raw("results.network_id"),
db.Raw("results.result_is_viewed"),
db.Raw("results.result_runtime"),
db.Raw("results.result_is_done"),
db.Raw("results.result_is_uploaded"),
db.Raw("results.result_data_usage_up"),
db.Raw("results.result_data_usage_down"),
db.Raw("results.measurement_dir"),
)
if err := req.Where("result_is_done = true").All(&doneResults); err != nil {
return doneResults, incompleteResults, errors.Wrap(err, "failed to get result done list")
}
if err := req.Where("result_is_done = false").All(&incompleteResults); err != nil {
return doneResults, incompleteResults, errors.Wrap(err, "failed to get result done list")
}
return doneResults, incompleteResults, nil
}
// DeleteResult will delete a particular result and the relative measurement on
// disk.
func DeleteResult(sess db.Session, resultID int64) error {
var result Result
res := sess.Collection("results").Find("result_id", resultID)
if err := res.One(&result); err != nil {
if err == db.ErrNoMoreRows {
return err
}
log.WithError(err).Error("error in obtaining the result")
return err
}
if err := res.Delete(); err != nil {
log.WithError(err).Error("failed to delete the result directory")
return err
}
os.RemoveAll(result.MeasurementDir)
return nil
}
// UpdateUploadedStatus will check if all the measurements inside of a given result set have been uploaded and if so will set the is_uploaded flag to true
func UpdateUploadedStatus(sess db.Session, result *Result) error {
err := sess.Tx(func(tx db.Session) error {
uploadedTotal := UploadedTotalCount{}
req := tx.SQL().Select(
db.Raw("SUM(measurements.measurement_is_uploaded)"),
db.Raw("COUNT(*)"),
).From("results").
Join("measurements").On("measurements.result_id = results.result_id").
Where("results.result_id = ?", result.ID)
err := req.One(&uploadedTotal)
if err != nil {
log.WithError(err).Error("failed to retrieve total vs uploaded counts")
return err
}
if uploadedTotal.UploadedCount == uploadedTotal.TotalCount {
result.IsUploaded = true
} else {
result.IsUploaded = false
}
err = tx.Collection("results").Find("result_id", result.ID).Update(result)
if err != nil {
log.WithError(err).Error("failed to update result")
return errors.Wrap(err, "updating result")
}
return nil
})
if err != nil {
log.WithError(err).Error("Failed to write to the results table")
return err
}
return nil
}
// CreateMeasurement writes the measurement to the database a returns a pointer
// to the Measurement
func CreateMeasurement(sess db.Session, reportID sql.NullString, testName string, measurementDir string, idx int, resultID int64, urlID sql.NullInt64) (*Measurement, error) {
// TODO we should look into generating this file path in a more robust way.
// If there are two identical test_names in the same test group there is
// going to be a clash of test_name
msmtFilePath := filepath.Join(measurementDir, fmt.Sprintf("msmt-%s-%d.json", testName, idx))
msmt := Measurement{
ReportID: reportID,
TestName: testName,
ResultID: resultID,
MeasurementFilePath: sql.NullString{String: msmtFilePath, Valid: true},
URLID: urlID,
IsFailed: false,
IsDone: false,
// XXX Do we want to have this be part of something else?
StartTime: time.Now().UTC(),
TestKeys: "",
}
newID, err := sess.Collection("measurements").Insert(msmt)
if err != nil {
return nil, errors.Wrap(err, "creating measurement")
}
msmt.ID = newID.ID().(int64)
return &msmt, nil
}
// CreateResult writes the Result to the database a returns a pointer
// to the Result
func CreateResult(sess db.Session, homePath string, testGroupName string, networkID int64) (*Result, error) {
startTime := time.Now().UTC()
p, err := utils.MakeResultsDir(homePath, testGroupName, startTime)
if err != nil {
return nil, err
}
result := Result{
TestGroupName: testGroupName,
StartTime: startTime,
NetworkID: networkID,
}
result.MeasurementDir = p
log.Debugf("Creating result %v", result)
newID, err := sess.Collection("results").Insert(result)
if err != nil {
return nil, errors.Wrap(err, "creating result")
}
result.ID = newID.ID().(int64)
return &result, nil
}
// CreateNetwork will create a new network in the network table
func CreateNetwork(sess db.Session, loc enginex.LocationProvider) (*Network, error) {
network := Network{
ASN: loc.ProbeASN(),
CountryCode: loc.ProbeCC(),
NetworkName: loc.ProbeNetworkName(),
// On desktop we consider it to always be wifi
NetworkType: "wifi",
IP: loc.ProbeIP(),
}
newID, err := sess.Collection("networks").Insert(network)
if err != nil {
return nil, err
}
network.ID = newID.ID().(int64)
return &network, nil
}
// CreateOrUpdateURL will create a new URL entry to the urls table if it doesn't
// exists, otherwise it will update the category code of the one already in
// there.
func CreateOrUpdateURL(sess db.Session, urlStr string, categoryCode string, countryCode string) (int64, error) {
var url URL
err := sess.Tx(func(tx db.Session) error {
res := tx.Collection("urls").Find(
db.Cond{"url": urlStr, "url_country_code": countryCode},
)
err := res.One(&url)
if err == db.ErrNoMoreRows {
url = URL{
URL: sql.NullString{String: urlStr, Valid: true},
CategoryCode: sql.NullString{String: categoryCode, Valid: true},
CountryCode: sql.NullString{String: countryCode, Valid: true},
}
newID, insErr := tx.Collection("urls").Insert(url)
if insErr != nil {
log.Error("Failed to insert into the URLs table")
return insErr
}
url.ID = sql.NullInt64{Int64: newID.ID().(int64), Valid: true}
} else if err != nil {
log.WithError(err).Error("Failed to get single result")
return err
} else {
url.CategoryCode = sql.NullString{String: categoryCode, Valid: true}
res.Update(url)
}
return nil
})
if err != nil {
log.WithError(err).Error("Failed to write to the URL table")
return 0, err
}
return url.ID.Int64, nil
}
// AddTestKeys writes the summary to the measurement
func AddTestKeys(sess db.Session, msmt *Measurement, tk interface{}) error {
var (
isAnomaly bool
isAnomalyValid bool
)
tkBytes, err := json.Marshal(tk)
if err != nil {
log.WithError(err).Error("failed to serialize summary")
}
// This is necessary so that we can extract from the the opaque testKeys just
// the IsAnomaly field of bool type.
// Maybe generics are not so bad after-all, heh golang?
isAnomalyValue := reflect.ValueOf(tk).FieldByName("IsAnomaly")
if isAnomalyValue.IsValid() == true && isAnomalyValue.Kind() == reflect.Bool {
isAnomaly = isAnomalyValue.Bool()
isAnomalyValid = true
}
msmt.TestKeys = string(tkBytes)
msmt.IsAnomaly = sql.NullBool{Bool: isAnomaly, Valid: isAnomalyValid}
err = sess.Collection("measurements").Find("measurement_id", msmt.ID).Update(msmt)
if err != nil {
log.WithError(err).Error("failed to update measurement")
return errors.Wrap(err, "updating measurement")
}
return nil
}
@@ -1,397 +0,0 @@
package database
import (
"database/sql"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"testing"
"github.com/upper/db/v4"
)
type locationInfo struct {
asn uint
countryCode string
ip string
networkName string
resolverIP string
}
func (lp *locationInfo) ProbeASN() uint {
return lp.asn
}
func (lp *locationInfo) ProbeASNString() string {
return fmt.Sprintf("AS%d", lp.asn)
}
func (lp *locationInfo) ProbeCC() string {
return lp.countryCode
}
func (lp *locationInfo) ProbeIP() string {
return lp.ip
}
func (lp *locationInfo) ProbeNetworkName() string {
return lp.networkName
}
func (lp *locationInfo) ResolverIP() string {
return lp.resolverIP
}
func TestMeasurementWorkflow(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "dbtest")
if err != nil {
t.Fatal(err)
}
defer os.Remove(tmpfile.Name())
tmpdir, err := ioutil.TempDir("", "oonitest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)
sess, err := Connect(tmpfile.Name())
if err != nil {
t.Fatal(err)
}
location := locationInfo{
asn: 0,
countryCode: "IT",
networkName: "Unknown",
}
network, err := CreateNetwork(sess, &location)
if err != nil {
t.Fatal(err)
}
result, err := CreateResult(sess, tmpdir, "websites", network.ID)
if err != nil {
t.Fatal(err)
}
reportID := sql.NullString{String: "", Valid: false}
testName := "antani"
resultID := result.ID
msmtFilePath := tmpdir
urlID := sql.NullInt64{Int64: 0, Valid: false}
m1, err := CreateMeasurement(sess, reportID, testName, msmtFilePath, 0, resultID, urlID)
if err != nil {
t.Fatal(err)
}
m1.IsUploaded = true
m1.IsAnomaly = sql.NullBool{Valid: true, Bool: false}
err = sess.Collection("measurements").Find("measurement_id", m1.ID).Update(m1)
if err != nil {
t.Fatal(err)
}
m2, err := CreateMeasurement(sess, reportID, testName, msmtFilePath, 0, resultID, urlID)
if err != nil {
t.Fatal(err)
}
m2.IsUploaded = false
m2.IsAnomaly = sql.NullBool{Valid: true, Bool: true}
err = sess.Collection("measurements").Find("measurement_id", m2.ID).Update(m2)
if err != nil {
t.Fatal(err)
}
if m2.ResultID != m1.ResultID {
t.Error("result_id mismatch")
}
err = UpdateUploadedStatus(sess, result)
if err != nil {
t.Fatal(err)
}
result.Finished(sess)
var r Result
err = sess.Collection("measurements").Find("result_id", result.ID).One(&r)
if err != nil {
t.Fatal(err)
}
if r.IsUploaded == true {
t.Error("result should be marked as not uploaded")
}
done, incomplete, err := ListResults(sess)
if err != nil {
t.Fatal(err)
}
if len(incomplete) != 0 {
t.Error("there should be 0 incomplete result")
}
if len(done) != 1 {
t.Error("there should be 1 done result")
}
if done[0].TotalCount != 2 {
t.Error("there should be a total of 2 measurements in the result")
}
if done[0].AnomalyCount != 1 {
t.Error("there should be a total of 1 anomalies in the result")
}
msmts, err := ListMeasurements(sess, resultID)
if err != nil {
t.Fatal(err)
}
if msmts[0].Network.NetworkType != "wifi" {
t.Error("network_type should be wifi")
}
}
func TestDeleteResult(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "dbtest")
if err != nil {
t.Fatal(err)
}
defer os.Remove(tmpfile.Name())
tmpdir, err := ioutil.TempDir("", "oonitest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)
sess, err := Connect(tmpfile.Name())
if err != nil {
t.Fatal(err)
}
location := locationInfo{
asn: 0,
countryCode: "IT",
networkName: "Unknown",
}
network, err := CreateNetwork(sess, &location)
if err != nil {
t.Fatal(err)
}
result, err := CreateResult(sess, tmpdir, "websites", network.ID)
if err != nil {
t.Fatal(err)
}
reportID := sql.NullString{String: "", Valid: false}
testName := "antani"
resultID := result.ID
msmtFilePath := tmpdir
urlID := sql.NullInt64{Int64: 0, Valid: false}
m1, err := CreateMeasurement(sess, reportID, testName, msmtFilePath, 0, resultID, urlID)
if err != nil {
t.Fatal(err)
}
var m2 Measurement
err = sess.Collection("measurements").Find("measurement_id", m1.ID).One(&m2)
if err != nil {
t.Fatal(err)
}
if m2.ResultID != m1.ResultID {
t.Error("result_id mismatch")
}
err = DeleteResult(sess, resultID)
if err != nil {
t.Fatal(err)
}
totalResults, err := sess.Collection("results").Find().Count()
if err != nil {
t.Fatal(err)
}
totalMeasurements, err := sess.Collection("measurements").Find().Count()
if err != nil {
t.Fatal(err)
}
if totalResults != 0 {
t.Fatal("results should be zero")
}
if totalMeasurements != 0 {
t.Fatal("measurements should be zero")
}
err = DeleteResult(sess, 20)
if err != db.ErrNoMoreRows {
t.Fatal(err)
}
}
func TestNetworkCreate(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "dbtest")
if err != nil {
t.Fatal(err)
}
defer os.Remove(tmpfile.Name())
sess, err := Connect(tmpfile.Name())
if err != nil {
t.Fatal(err)
}
l1 := locationInfo{
asn: 2,
countryCode: "IT",
networkName: "Antaninet",
}
l2 := locationInfo{
asn: 3,
countryCode: "IT",
networkName: "Fufnet",
}
_, err = CreateNetwork(sess, &l1)
if err != nil {
t.Fatal(err)
}
_, err = CreateNetwork(sess, &l2)
if err != nil {
t.Fatal(err)
}
}
func TestURLCreation(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "dbtest")
if err != nil {
t.Fatal(err)
}
defer os.Remove(tmpfile.Name())
sess, err := Connect(tmpfile.Name())
if err != nil {
t.Fatal(err)
}
newID1, err := CreateOrUpdateURL(sess, "https://google.com", "GMB", "XX")
if err != nil {
t.Fatal(err)
}
newID2, err := CreateOrUpdateURL(sess, "https://google.com", "SRCH", "XX")
if err != nil {
t.Fatal(err)
}
newID3, err := CreateOrUpdateURL(sess, "https://facebook.com", "GRP", "XX")
if err != nil {
t.Fatal(err)
}
newID4, err := CreateOrUpdateURL(sess, "https://facebook.com", "GMP", "XX")
if err != nil {
t.Fatal(err)
}
newID5, err := CreateOrUpdateURL(sess, "https://google.com", "SRCH", "XX")
if err != nil {
t.Fatal(err)
}
if newID2 != newID1 {
t.Error("inserting the same URL with different category code should produce the same result")
}
if newID3 == newID1 {
t.Error("inserting different URL should produce different ids")
}
if newID4 != newID3 {
t.Error("inserting the same URL with different category code should produce the same result")
}
if newID5 != newID1 {
t.Error("the ID of google should still be the same")
}
}
func TestPerformanceTestKeys(t *testing.T) {
var tk PerformanceTestKeys
ndtS := "{\"download\":100.0,\"upload\":20.0,\"ping\":2.2}"
dashS := "{\"median_bitrate\":102.0}"
if err := json.Unmarshal([]byte(ndtS), &tk); err != nil {
t.Fatal("failed to parse ndtS")
}
if err := json.Unmarshal([]byte(dashS), &tk); err != nil {
t.Fatal("failed to parse dashS")
}
if tk.Bitrate != 102.0 {
t.Fatalf("error Bitrate %f", tk.Bitrate)
}
if tk.Download != 100.0 {
t.Fatalf("error Download %f", tk.Download)
}
}
func TestGetMeasurementJSON(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "dbtest")
if err != nil {
t.Fatal(err)
}
defer os.Remove(tmpfile.Name())
tmpdir, err := ioutil.TempDir("", "oonitest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)
sess, err := Connect(tmpfile.Name())
if err != nil {
t.Fatal(err)
}
location := locationInfo{
asn: 0,
countryCode: "IT",
networkName: "Unknown",
}
network, err := CreateNetwork(sess, &location)
if err != nil {
t.Fatal(err)
}
result, err := CreateResult(sess, tmpdir, "websites", network.ID)
if err != nil {
t.Fatal(err)
}
reportID := sql.NullString{String: "20210111T085144Z_ndt_RU_3216_n1_qMVnP0PTX7ObUSmD", Valid: true}
testName := "antani"
resultID := result.ID
msmtFilePath := tmpdir
urlID := sql.NullInt64{Int64: 0, Valid: false}
msmt, err := CreateMeasurement(sess, reportID, testName, msmtFilePath, 0, resultID, urlID)
if err != nil {
t.Fatal(err)
}
msmt.IsUploaded = true
err = sess.Collection("measurements").Find("measurement_id", msmt.ID).Update(msmt)
if err != nil {
t.Fatal(err)
}
tk, err := GetMeasurementJSON(sess, msmt.ID)
if err != nil {
t.Fatal(err)
}
if tk["probe_asn"] != "AS3216" {
t.Error("inconsistent measurement downloaded")
}
}
@@ -1,72 +0,0 @@
package database
import (
"context"
"database/sql"
"embed"
"github.com/apex/log"
"github.com/ooni/probe-cli/v3/internal/netxlite"
migrate "github.com/rubenv/sql-migrate"
"github.com/upper/db/v4"
"github.com/upper/db/v4/adapter/sqlite"
)
//go:embed migrations/*.sql
var efs embed.FS
func readAsset(path string) ([]byte, error) {
filep, err := efs.Open(path)
if err != nil {
return nil, err
}
return netxlite.ReadAllContext(context.Background(), filep)
}
func readAssetDir(path string) ([]string, error) {
var out []string
lst, err := efs.ReadDir(path)
if err != nil {
return nil, err
}
for _, e := range lst {
out = append(out, e.Name())
}
return out, nil
}
// RunMigrations runs the database migrations
func RunMigrations(sess *sql.DB) error {
log.Debugf("running migrations")
migrations := &migrate.AssetMigrationSource{
Asset: readAsset,
AssetDir: readAssetDir,
Dir: "migrations",
}
n, err := migrate.Exec(sess, "sqlite3", migrations, migrate.Up)
if err != nil {
return err
}
log.Debugf("performed %d migrations", n)
return nil
}
// Connect to the database
func Connect(path string) (sess db.Session, err error) {
settings := sqlite.ConnectionURL{
Database: path,
Options: map[string]string{"_foreign_keys": "1"},
}
sess, err = sqlite.Open(settings)
if err != nil {
log.WithError(err).Error("failed to open the DB")
return nil, err
}
err = RunMigrations(sess.Driver().(*sql.DB))
if err != nil {
log.WithError(err).Error("failed to run DB migration")
return nil, err
}
return sess, err
}
@@ -1,32 +0,0 @@
package database
import (
"io/ioutil"
"os"
"testing"
"github.com/apex/log"
)
func TestConnect(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "dbtest")
if err != nil {
t.Error(err)
}
defer os.Remove(tmpfile.Name())
sess, err := Connect(tmpfile.Name())
if err != nil {
t.Error(err)
}
colls, err := sess.Collections()
if err != nil {
t.Error(err)
}
if len(colls) < 1 {
log.Fatal("missing tables")
}
}
@@ -1,168 +0,0 @@
-- +migrate Down
-- +migrate StatementBegin
DROP TABLE `results`;
DROP TABLE `measurements`;
DROP TABLE `urls`;
DROP TABLE `networks`;
-- +migrate StatementEnd
-- +migrate Up
-- +migrate StatementBegin
CREATE TABLE `urls` (
`url_id` INTEGER PRIMARY KEY AUTOINCREMENT,
`url` VARCHAR(255) NOT NULL, -- XXX is this long enough?
`category_code` VARCHAR(5) NOT NULL, -- The citizenlab category code for the
-- site. We use the string NONE to denote
-- no known category code.
`url_country_code` VARCHAR(2) NOT NULL -- The two letter country code which this
-- URL belongs to
);
-- We create a separate table for networks for 2 reasons:
-- 1. For some of the views where need the total number of measured networks,
-- it's going to be much more efficient to just lookup the count of rows in this
-- table.
-- 2. (most important) We want to avoid duplicating a bunch of information that
-- is going to be common to several networks the user is on.
-- Example:
-- We may wish to add to this table the location from of the probe from the GPS
-- or add support for allowing the user to "correct" a misclassified measurement
-- or distinguishing between wifi and mobile.
CREATE TABLE `networks` (
`network_id` INTEGER PRIMARY KEY AUTOINCREMENT,
`network_name` VARCHAR(255) NOT NULL, -- String name representing the network_name which by default is populated based
-- on the ASN.
-- We use a separate key to reference the rows in
-- this tables, because we may wish to "enrich"
-- this with more data in the future.
`network_type` VARCHAR(16) NOT NULL, -- One of wifi, mobile
`ip` VARCHAR(40) NOT NULL, -- Stores a string representation of an ipv4 or ipv6 address.
-- The longest ip is an ipv6 address like:
-- 0000:0000:0000:0000:0000:0000:0000:0000,
-- which is 39 chars.
`asn` INT(4) NOT NULL,
`network_country_code` VARCHAR(2) NOT NULL -- The two letter country code
);
CREATE TABLE `results` (
`result_id` INTEGER PRIMARY KEY AUTOINCREMENT,
-- This can be one of "websites", "im", "performance", "middlebox".
`test_group_name` VARCHAR(16) NOT NULL,
-- We use a different start_time and runtime, because we want to also have
-- data to measure the overhead of creating a report and other factors that
-- go into the test.
-- That is to say: `SUM(runtime) FROM measurements` will always be <=
-- `runtime FROM results` (most times <)
`result_start_time` DATETIME NOT NULL,
`result_runtime` REAL,
-- Used to indicate if the user has seen this result
`result_is_viewed` TINYINT(1) NOT NULL,
-- This is a flag used to indicate if the result is done or is currently running.
`result_is_done` TINYINT(1) NOT NULL,
`result_data_usage_up` REAL NOT NULL,
`result_data_usage_down` REAL NOT NULL,
-- It's probably reasonable to set the maximum length to 260 as this is the
-- maximum length of file paths on windows.
`measurement_dir` VARCHAR(260) NOT NULL,
`network_id` INTEGER NOT NULL,
CONSTRAINT `fk_network_id`
FOREIGN KEY(`network_id`)
REFERENCES `networks`(`network_id`)
);
CREATE TABLE `measurements` (
`measurement_id` INTEGER PRIMARY KEY AUTOINCREMENT,
-- This can be one of:
-- facebook_messenger
-- telegram
-- whatsapp
-- http_header_field_manipulation
-- http_invalid_request_line
-- dash
-- ndt
`test_name` VARCHAR(64) NOT NULL,
`measurement_start_time` DATETIME NOT NULL,
`measurement_runtime` REAL NOT NULL,
-- Note for golang: we used to have state be one of `done` and `active`, so
-- this is equivalent to done being true or false.
-- `state` TEXT,
`measurement_is_done` TINYINT(1) NOT NULL,
-- The reason to have a dedicated is_uploaded flag, instead of just using
-- is_upload_failed, is that we may not have uploaded the measurement due
-- to a setting.
`measurement_is_uploaded` TINYINT(1) NOT NULL,
-- This is the measurement failed to run and the user should be offerred to
-- re-run it.
`measurement_is_failed` TINYINT(1) NOT NULL,
`measurement_failure_msg` VARCHAR(255),
`measurement_is_upload_failed` TINYINT(1) NOT NULL,
`measurement_upload_failure_msg` VARCHAR(255),
-- Is used to indicate that this particular measurement has been re-run and
-- therefore the UI can take this into account to either hide it from the
-- result view or at the very least disable the ability to re-run it.
-- XXX do we also want to have a reference to the re-run measurement?
`measurement_is_rerun` TINYINT(1) NOT NULL,
-- This is the server-side report_id returned by the collector. By using
-- report_id & input, you can query the api to fetch this measurement.
-- Ex.
-- GET https://api.ooni.io/api/v1/measurements?input=$INPUT&report_id=$REPORT_ID
-- Extract the first item from the `result[]` list and then fetch:
-- `measurement_url` to get the JSON of this measurement row.
-- These two values (`report_id`, `input`) are useful to fetch a
-- measurement that has already been processed by the pipeline, to
-- implement cleanup of already uploaded measurements.
`report_id` VARCHAR(255), -- This can be NULL when no report file has been
-- created.
`url_id` INTEGER,
-- This is not yet a feature of the collector, but we are planning to add
-- this at some point in the near future.
-- See: https://github.com/ooni/pipeline/blob/master/docs/ooni-uuid.md &
-- https://github.com/ooni/pipeline/issues/48
`collector_measurement_id` INT(64),
-- This indicates in the case of a websites test, that a site is likely
-- blocked, or for an IM test if the IM tests says the app is likely
-- blocked, or if a middlebox was detected.
-- You can `JOIN` a `COUNT()` of this value in the results view to get a count of
-- blocked sites or blocked IM apps
`is_anomaly` TINYINT(1),
-- This is an opaque JSON structure, where we store some of the test_keys
-- we need for the measurement details views and some result views (ex. the
-- upload/download speed of NDT, the reason for blocking of a site,
-- etc.)
`test_keys` JSON NOT NULL,
-- The cross table reference to JOIN the two tables together.
`result_id` INTEGER NOT NULL,
-- This is a variable used internally to track the path to the on-disk
-- measurements.json. It may make sense to write one file per entry by
-- hooking MK and preventing it from writing to a file on disk which may
-- have many measurements per file.
`report_file_path` VARCHAR(260) NOT NULL,
CONSTRAINT `fk_result_id`
FOREIGN KEY (`result_id`)
REFERENCES `results`(`result_id`)
ON DELETE CASCADE, -- If we delete a result we also want
-- all the measurements to be deleted as well.
FOREIGN KEY (`url_id`) REFERENCES `urls`(`url_id`)
);
-- +migrate StatementEnd
@@ -1,164 +0,0 @@
-- +migrate Down
-- +migrate StatementBegin
PRAGMA foreign_keys=off;
ALTER TABLE `measurements` RENAME TO `_measurements_new`;
CREATE TABLE `measurements` (
`measurement_id` INTEGER PRIMARY KEY AUTOINCREMENT,
`test_name` VARCHAR(64) NOT NULL,
`measurement_start_time` DATETIME NOT NULL,
`measurement_runtime` REAL NOT NULL,
`measurement_is_done` TINYINT(1) NOT NULL,
`measurement_is_uploaded` TINYINT(1) NOT NULL,
`measurement_is_failed` TINYINT(1) NOT NULL,
`measurement_failure_msg` VARCHAR(255),
`measurement_is_upload_failed` TINYINT(1) NOT NULL,
`measurement_upload_failure_msg` VARCHAR(255),
`measurement_is_rerun` TINYINT(1) NOT NULL,
`report_id` VARCHAR(255),
`url_id` INTEGER,
`collector_measurement_id` INT(64),
`is_anomaly` TINYINT(1),
`test_keys` JSON NOT NULL,
`result_id` INTEGER NOT NULL,
`report_file_path` VARCHAR(260) NOT NULL,
CONSTRAINT `fk_result_id`
FOREIGN KEY (`result_id`)
REFERENCES `results`(`result_id`)
ON DELETE CASCADE,
FOREIGN KEY (`url_id`) REFERENCES `urls`(`url_id`)
);
INSERT INTO measurements (
`measurement_id`,
`test_name`,
`measurement_start_time`,
`measurement_runtime`,
`measurement_is_done`,
`measurement_is_uploaded`,
`measurement_is_failed`,
`measurement_failure_msg`,
`measurement_is_upload_failed`,
`measurement_upload_failure_msg`,
`measurement_is_rerun`,
`report_id`,
`url_id`,
`collector_measurement_id`,
`is_anomaly`,
`test_keys`,
`result_id`,
`report_file_path`
)
SELECT `measurement_id`,
`test_name`,
`measurement_start_time`,
`measurement_runtime`,
`measurement_is_done`,
`measurement_is_uploaded`,
`measurement_is_failed`,
`measurement_failure_msg`,
`measurement_is_upload_failed`,
`measurement_upload_failure_msg`,
`measurement_is_rerun`,
`report_id`,
`url_id`,
`collector_measurement_id`,
`is_anomaly`,
`test_keys`,
`result_id`,
`report_file_path`
FROM _measurements_new;
DROP TABLE _measurements_new;
PRAGMA foreign_keys=on;
-- +migrate StatementEnd
-- +migrate Up
-- +migrate StatementBegin
PRAGMA foreign_keys=off;
-- SQLite3 does not support adding columns or dropping constraints, so we need
-- to re-create the table and copy the data over.
ALTER TABLE `measurements` RENAME TO `_measurements_old`;
CREATE TABLE `measurements` (
`measurement_id` INTEGER PRIMARY KEY AUTOINCREMENT,
`test_name` VARCHAR(64) NOT NULL,
`measurement_start_time` DATETIME NOT NULL,
`measurement_runtime` REAL NOT NULL,
`measurement_is_done` TINYINT(1) NOT NULL,
`measurement_is_uploaded` TINYINT(1) NOT NULL,
`measurement_is_failed` TINYINT(1) NOT NULL,
`measurement_failure_msg` VARCHAR(255),
`measurement_is_upload_failed` TINYINT(1) NOT NULL,
`measurement_upload_failure_msg` VARCHAR(255),
`measurement_is_rerun` TINYINT(1) NOT NULL,
`report_id` VARCHAR(255),
`url_id` INTEGER,
`collector_measurement_id` INT(64),
`is_anomaly` TINYINT(1),
`test_keys` JSON NOT NULL,
`result_id` INTEGER NOT NULL,
`report_file_path` VARCHAR(260),
`measurement_file_path` TEXT,
CONSTRAINT `fk_result_id`
FOREIGN KEY (`result_id`)
REFERENCES `results`(`result_id`)
ON DELETE CASCADE,
FOREIGN KEY (`url_id`) REFERENCES `urls`(`url_id`)
);
INSERT INTO measurements (
`measurement_id`,
`test_name`,
`measurement_start_time`,
`measurement_runtime`,
`measurement_is_done`,
`measurement_is_uploaded`,
`measurement_is_failed`,
`measurement_failure_msg`,
`measurement_is_upload_failed`,
`measurement_upload_failure_msg`,
`measurement_is_rerun`,
`report_id`,
`url_id`,
`collector_measurement_id`,
`is_anomaly`,
`test_keys`,
`result_id`,
`report_file_path`,
`measurement_file_path`
)
SELECT `measurement_id`,
`test_name`,
`measurement_start_time`,
`measurement_runtime`,
`measurement_is_done`,
`measurement_is_uploaded`,
`measurement_is_failed`,
`measurement_failure_msg`,
`measurement_is_upload_failed`,
`measurement_upload_failure_msg`,
`measurement_is_rerun`,
`report_id`,
`url_id`,
`collector_measurement_id`,
`is_anomaly`,
`test_keys`,
`result_id`,
`report_file_path`,
NULL
FROM _measurements_old;
DROP TABLE _measurements_old;
PRAGMA foreign_keys=on;
-- +migrate StatementEnd
@@ -1,15 +0,0 @@
-- +migrate Down
-- +migrate StatementBegin
ALTER TABLE `results`
DROP COLUMN result_is_uploaded;
-- +migrate StatementEnd
-- +migrate Up
-- +migrate StatementBegin
ALTER TABLE `results`
ADD COLUMN result_is_uploaded TINYINT(1) DEFAULT 1 NOT NULL;
-- +migrate StatementEnd
-160
View File
@@ -1,160 +0,0 @@
package database
import (
"database/sql"
"time"
"github.com/pkg/errors"
"github.com/upper/db/v4"
)
// ResultNetwork is used to represent the structure made from the JOIN
// between the results and networks tables.
type ResultNetwork struct {
Result `db:",inline"`
Network `db:",inline"`
AnomalyCount uint64 `db:"anomaly_count"`
TotalCount uint64 `db:"total_count"`
TestKeys string `db:"test_keys"`
}
// UploadedTotalCount is the count of the measurements which have been uploaded vs the total measurements in a given result set
type UploadedTotalCount struct {
UploadedCount int64 `db:",inline"`
TotalCount int64 `db:",inline"`
}
// MeasurementURLNetwork is used for the JOIN between Measurement and URL
type MeasurementURLNetwork struct {
Measurement `db:",inline"`
Network `db:",inline"`
Result `db:",inline"`
URL `db:",inline"`
}
// Network represents a network tested by the user
type Network struct {
ID int64 `db:"network_id,omitempty"`
NetworkName string `db:"network_name"`
NetworkType string `db:"network_type"`
IP string `db:"ip"`
ASN uint `db:"asn"`
CountryCode string `db:"network_country_code"`
}
// URL represents URLs from the testing lists
type URL struct {
ID sql.NullInt64 `db:"url_id,omitempty"`
URL sql.NullString `db:"url"`
CategoryCode sql.NullString `db:"category_code"`
CountryCode sql.NullString `db:"url_country_code"`
}
// Measurement model
type Measurement struct {
ID int64 `db:"measurement_id,omitempty"`
TestName string `db:"test_name"`
StartTime time.Time `db:"measurement_start_time"`
Runtime float64 `db:"measurement_runtime"` // Fractional number of seconds
IsDone bool `db:"measurement_is_done"`
IsUploaded bool `db:"measurement_is_uploaded"`
IsFailed bool `db:"measurement_is_failed"`
FailureMsg sql.NullString `db:"measurement_failure_msg,omitempty"`
IsUploadFailed bool `db:"measurement_is_upload_failed"`
UploadFailureMsg sql.NullString `db:"measurement_upload_failure_msg,omitempty"`
IsRerun bool `db:"measurement_is_rerun"`
ReportID sql.NullString `db:"report_id,omitempty"`
URLID sql.NullInt64 `db:"url_id,omitempty"` // Used to reference URL
MeasurementID sql.NullInt64 `db:"collector_measurement_id,omitempty"`
IsAnomaly sql.NullBool `db:"is_anomaly,omitempty"`
// FIXME we likely want to support JSON. See: https://github.com/upper/db/issues/462
TestKeys string `db:"test_keys"`
ResultID int64 `db:"result_id"`
ReportFilePath sql.NullString `db:"report_file_path,omitempty"`
MeasurementFilePath sql.NullString `db:"measurement_file_path,omitempty"`
}
// Result model
type Result struct {
ID int64 `db:"result_id,omitempty"`
TestGroupName string `db:"test_group_name"`
StartTime time.Time `db:"result_start_time"`
NetworkID int64 `db:"network_id"` // Used to include a Network
Runtime float64 `db:"result_runtime"` // Runtime is expressed in fractional seconds
IsViewed bool `db:"result_is_viewed"`
IsDone bool `db:"result_is_done"`
IsUploaded bool `db:"result_is_uploaded"`
DataUsageUp float64 `db:"result_data_usage_up"`
DataUsageDown float64 `db:"result_data_usage_down"`
MeasurementDir string `db:"measurement_dir"`
}
// PerformanceTestKeys is the result summary for a performance test
type PerformanceTestKeys struct {
Upload float64 `json:"upload"`
Download float64 `json:"download"`
Ping float64 `json:"ping"`
Bitrate float64 `json:"median_bitrate"`
}
// Finished marks the result as done and sets the runtime
func (r *Result) Finished(sess db.Session) error {
if r.IsDone == true || r.Runtime != 0 {
return errors.New("Result is already finished")
}
r.Runtime = time.Now().UTC().Sub(r.StartTime).Seconds()
r.IsDone = true
err := sess.Collection("results").Find("result_id", r.ID).Update(r)
if err != nil {
return errors.Wrap(err, "updating finished result")
}
return nil
}
// Failed writes the error string to the measurement
func (m *Measurement) Failed(sess db.Session, failure string) error {
m.FailureMsg = sql.NullString{String: failure, Valid: true}
m.IsFailed = true
err := sess.Collection("measurements").Find("measurement_id", m.ID).Update(m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
return nil
}
// Done marks the measurement as completed
func (m *Measurement) Done(sess db.Session) error {
runtime := time.Now().UTC().Sub(m.StartTime)
m.Runtime = runtime.Seconds()
m.IsDone = true
err := sess.Collection("measurements").Find("measurement_id", m.ID).Update(m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
return nil
}
// UploadFailed writes the error string for the upload failure to the measurement
func (m *Measurement) UploadFailed(sess db.Session, failure string) error {
m.UploadFailureMsg = sql.NullString{String: failure, Valid: true}
m.IsUploaded = false
err := sess.Collection("measurements").Find("measurement_id", m.ID).Update(m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
return nil
}
// UploadSucceeded writes the error string for the upload failure to the measurement
func (m *Measurement) UploadSucceeded(sess db.Session) error {
m.IsUploaded = true
err := sess.Collection("measurements").Find("measurement_id", m.ID).Update(m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
return nil
}
-22
View File
@@ -1,22 +0,0 @@
// Package enginex contains ooni/probe-engine extensions.
package enginex
import (
"github.com/apex/log"
)
// Logger is the logger used by the engine.
var Logger = log.WithFields(log.Fields{
"type": "engine",
})
// LocationProvider is an interface that returns the current location. The
// github.com/ooni/probe-cli/v3/internal/engine/session.Session implements it.
type LocationProvider interface {
ProbeASN() uint
ProbeASNString() string
ProbeCC() string
ProbeIP() string
ProbeNetworkName() string
ResolverIP() string
}
@@ -8,8 +8,8 @@ import (
"time"
"github.com/apex/log"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/database"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/utils"
"github.com/ooni/probe-cli/v3/internal/database"
)
func formatSpeed(speed float64) string {
+1 -1
View File
@@ -8,9 +8,9 @@ import (
"github.com/apex/log"
"github.com/fatih/color"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/database"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/ooni"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/output"
"github.com/ooni/probe-cli/v3/internal/database"
engine "github.com/ooni/probe-cli/v3/internal/engine"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/pkg/errors"
@@ -7,8 +7,8 @@ import (
"path"
"testing"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/database"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/ooni"
"github.com/ooni/probe-cli/v3/internal/database"
"github.com/ooni/probe-cli/v3/internal/model"
)
+1 -1
View File
@@ -7,8 +7,8 @@ import (
"time"
"github.com/apex/log"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/database"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/ooni"
"github.com/ooni/probe-cli/v3/internal/database"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/pkg/errors"
)
+7 -3
View File
@@ -11,10 +11,9 @@ import (
"github.com/apex/log"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/config"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/database"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/enginex"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/utils"
"github.com/ooni/probe-cli/v3/internal/atomicx"
"github.com/ooni/probe-cli/v3/internal/database"
"github.com/ooni/probe-cli/v3/internal/engine"
"github.com/ooni/probe-cli/v3/internal/kvstore"
"github.com/ooni/probe-cli/v3/internal/legacy/assetsdir"
@@ -26,6 +25,11 @@ import (
// DefaultSoftwareName is the default software name.
const DefaultSoftwareName = "ooniprobe-cli"
// logger is the logger used by the engine.
var logger = log.WithFields(log.Fields{
"type": "engine",
})
// ProbeCLI is the OONI Probe CLI context.
type ProbeCLI interface {
Config() *config.Config
@@ -231,7 +235,7 @@ func (p *Probe) NewSession(ctx context.Context, runType model.RunType) (*engine.
}
return engine.NewSession(ctx, engine.SessionConfig{
KVStore: kvstore,
Logger: enginex.Logger,
Logger: logger,
SoftwareName: softwareName,
SoftwareVersion: p.softwareVersion,
TempDir: p.tempDir,
+1 -1
View File
@@ -8,7 +8,7 @@ import (
"github.com/apex/log"
"github.com/mitchellh/go-wordwrap"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/database"
"github.com/ooni/probe-cli/v3/internal/database"
)
// MeasurementJSON prints the JSON of a measurement
+1 -26
View File
@@ -1,11 +1,9 @@
package utils
import (
"errors"
"fmt"
"os"
"path/filepath"
"time"
"github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/utils/homedir"
)
@@ -53,26 +51,6 @@ func FileExists(path string) bool {
return err == nil && stat.Mode().IsRegular()
}
// ResultTimestamp is a windows friendly timestamp
const ResultTimestamp = "2006-01-02T150405.999999999Z0700"
// MakeResultsDir creates and returns a directory for the result
func MakeResultsDir(home string, name string, ts time.Time) (string, error) {
p := filepath.Join(home, "msmts",
fmt.Sprintf("%s-%s", name, ts.Format(ResultTimestamp)))
// If the path already exists, this is a problem. It should not clash, because
// we are using nanosecond precision for the starttime.
if _, e := os.Stat(p); e == nil {
return "", errors.New("results path already exists")
}
err := os.MkdirAll(p, 0700)
if err != nil {
return "", err
}
return p, nil
}
// GetOONIHome returns the path to the OONI Home
func GetOONIHome() (string, error) {
if ooniHome := os.Getenv("OONI_HOME"); ooniHome != "" {
@@ -96,8 +74,5 @@ func DidLegacyInformedConsent() bool {
}
path := filepath.Join(filepath.Join(home, ".ooni"), "initialized")
if FileExists(path) {
return true
}
return false
return FileExists(path)
}