Start integrating upper/db as a new ORM

This commit is contained in:
Arturo Filastò 2018-09-05 18:40:37 +02:00
parent ec459a7f15
commit ff2f973523
9 changed files with 317 additions and 307 deletions

19
Gopkg.lock generated
View File

@ -146,9 +146,26 @@
revision = "c87af80f3cc5036b55b83d77171e156791085e2e"
version = "v1.7.1"
[[projects]]
name = "upper.io/db.v3"
packages = [
".",
"internal/cache",
"internal/cache/hashstructure",
"internal/immutable",
"internal/sqladapter",
"internal/sqladapter/compat",
"internal/sqladapter/exql",
"lib/reflectx",
"lib/sqlbuilder",
"sqlite"
]
revision = "199d13d76c7cfba05ea0327375056fdabc8bea80"
version = "v3.5.4"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "b2f5c39222a1fb405e3f48d2ae3b4758757fe708e12dbd23743c19135e225579"
inputs-digest = "bb552d1e6530dab8cdd5cc7a6c60bf2b9afbe77e6ea20a31b6d60fc44ad05e26"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -65,3 +65,7 @@ required = ["github.com/shuLhan/go-bindata/go-bindata"]
[[constraint]]
branch = "master"
name = "github.com/getsentry/raven-go"
[[constraint]]
name = "upper.io/db.v3"
version = "3.5.4"

View File

@ -45,13 +45,13 @@ func init() {
ID: result.ID,
Index: idx,
TotalCount: len(incompleteResults),
Name: result.Name,
Name: result.TestGroupName,
StartTime: result.StartTime,
NetworkName: result.NetworkName,
Country: result.Country,
ASN: result.ASN,
Summary: result.Summary,
Done: result.Done,
NetworkName: "FIXME", //result.NetworkName,
Country: "FIXME", //result.Country,
ASN: "FIXME", //result.ASN,
Summary: "{}", //result.Summary,
Done: result.IsDone,
DataUsageUp: result.DataUsageUp,
DataUsageDown: result.DataUsageDown,
})
@ -65,18 +65,19 @@ func init() {
ID: result.ID,
Index: idx,
TotalCount: len(doneResults),
Name: result.Name,
Name: result.TestGroupName,
StartTime: result.StartTime,
NetworkName: result.NetworkName,
Country: result.Country,
ASN: result.ASN,
Summary: result.Summary,
Done: result.Done,
NetworkName: "FIXME", //result.NetworkName,
Country: "FIXME", //result.Country,
ASN: "FIXME", //result.ASN,
Summary: "{}", //result.Summary,
Done: result.IsDone,
DataUsageUp: result.DataUsageUp,
DataUsageDown: result.DataUsageDown,
})
resultSummary.TotalTests++
netCount[result.ASN]++
// FIXME
// netCount[result.ASN]++
resultSummary.TotalDataUsageUp += result.DataUsageUp
resultSummary.TotalDataUsageDown += result.DataUsageDown
}

View File

@ -0,0 +1,156 @@
package database
import (
"time"
"github.com/apex/log"
"github.com/ooni/probe-cli/utils"
"github.com/pkg/errors"
"upper.io/db.v3/lib/sqlbuilder"
)
// ListMeasurements given a result ID
func ListMeasurements(db sqlbuilder.Database, resultID int64) ([]*Measurement, error) {
measurements := []*Measurement{}
/*
FIXME
rows, err := db.Query(`SELECT id, name,
start_time, runtime,
country,
asn,
summary,
input
FROM measurements
WHERE result_id = ?
ORDER BY start_time;`, resultID)
if err != nil {
return measurements, errors.Wrap(err, "failed to get measurement list")
}
for rows.Next() {
msmt := Measurement{}
err = rows.Scan(&msmt.ID, &msmt.Name,
&msmt.StartTime, &msmt.Runtime,
&msmt.CountryCode,
&msmt.ASN,
&msmt.Summary, &msmt.Input,
//&result.DataUsageUp, &result.DataUsageDown)
)
if err != nil {
log.WithError(err).Error("failed to fetch a row")
continue
}
measurements = append(measurements, &msmt)
}
*/
return measurements, nil
}
// ListResults return the list of results
func ListResults(db sqlbuilder.Database) ([]*Result, []*Result, error) {
doneResults := []*Result{}
incompleteResults := []*Result{}
/*
FIXME
rows, err := db.Query(`SELECT id, name,
start_time, runtime,
network_name, country,
asn,
summary, done
FROM results
WHERE done = 1
ORDER BY start_time;`)
if err != nil {
return doneResults, incompleteResults, errors.Wrap(err, "failed to get result done list")
}
for rows.Next() {
result := Result{}
err = rows.Scan(&result.ID, &result.Name,
&result.StartTime, &result.Runtime,
&result.NetworkName, &result.Country,
&result.ASN,
&result.Summary, &result.Done,
//&result.DataUsageUp, &result.DataUsageDown)
)
if err != nil {
log.WithError(err).Error("failed to fetch a row")
continue
}
doneResults = append(doneResults, &result)
}
*/
/*
FIXME
rows, err := db.Query(`SELECT
id, name,
start_time,
network_name, country,
asn
FROM results
WHERE done != 1
ORDER BY start_time;`)
if err != nil {
return doneResults, incompleteResults, errors.Wrap(err, "failed to get result done list")
}
*/
/*
for rows.Next() {
result := Result{Done: false}
err = rows.Scan(&result.ID, &result.Name, &result.StartTime,
&result.NetworkName, &result.Country,
&result.ASN)
if err != nil {
log.WithError(err).Error("failed to fetch a row")
continue
}
incompleteResults = append(incompleteResults, &result)
}
*/
return doneResults, incompleteResults, nil
}
// CreateMeasurement writes the measurement to the database a returns a pointer
// to the Measurement
func CreateMeasurement(sess sqlbuilder.Database, m Measurement, i string) (*Measurement, error) {
col := sess.Collection("measurements")
// XXX Do we want to have this be part of something else?
m.StartTime = time.Now().UTC()
// XXX insert also the URL and stuff
//m.Input = i
//m.State = "active"
newID, err := col.Insert(m)
if err != nil {
return nil, errors.Wrap(err, "creating measurement")
}
m.ID = newID.(int64)
return &m, nil
}
// CreateResult writes the Result to the database a returns a pointer
// to the Result
func CreateResult(sess sqlbuilder.Database, homePath string, r Result) (*Result, error) {
log.Debugf("Creating result %v", r)
col := sess.Collection("results")
p, err := utils.MakeResultsDir(homePath, r.TestGroupName, r.StartTime)
if err != nil {
return nil, err
}
r.MeasurementDir = p
newID, err := col.Insert(r)
if err != nil {
return nil, errors.Wrap(err, "creating result")
}
r.ID = newID.(int64)
return &r, nil
}

View File

@ -1,22 +1,24 @@
package database
import (
"database/sql"
"github.com/apex/log"
"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3" // this is needed to load the sqlite3 driver
"github.com/ooni/probe-cli/internal/bindata"
migrate "github.com/rubenv/sql-migrate"
"upper.io/db.v3/lib/sqlbuilder"
"upper.io/db.v3/sqlite"
)
// RunMigrations runs the database migrations
func RunMigrations(db *sqlx.DB) error {
func RunMigrations(db *sql.DB) error {
log.Debugf("running migrations")
migrations := &migrate.AssetMigrationSource{
Asset: bindata.Asset,
AssetDir: bindata.AssetDir,
Dir: "data/migrations",
}
n, err := migrate.Exec(db.DB, "sqlite3", migrations, migrate.Up)
n, err := migrate.Exec(db, "sqlite3", migrations, migrate.Up)
if err != nil {
return err
}
@ -25,15 +27,15 @@ func RunMigrations(db *sqlx.DB) error {
}
// Connect to the database
func Connect(path string) (db *sqlx.DB, err error) {
db, err = sqlx.Connect("sqlite3", path)
if err != nil {
return
func Connect(path string) (db sqlbuilder.Database, err error) {
settings := sqlite.ConnectionURL{
Database: path,
}
sess, err := sqlite.Open(settings)
err = RunMigrations(db)
err = RunMigrations(sess.Driver().(*sql.DB))
if err != nil {
db = nil
}
return
return sess, err
}

View File

@ -0,0 +1,31 @@
package database
import (
"io/ioutil"
"os"
"testing"
"github.com/apex/log"
)
func TestConnect(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "dbtest")
if err != nil {
t.Error(err)
}
sess, err := Connect(tmpfile.Name())
if err != nil {
t.Error(err)
}
colls, err := sess.Collections()
if err != nil {
t.Error(err)
}
if len(colls) < 1 {
log.Fatal("missing tables")
}
defer os.Remove(tmpfile.Name())
}

View File

@ -5,84 +5,76 @@ import (
"path/filepath"
"time"
"github.com/apex/log"
"github.com/jmoiron/sqlx"
"github.com/ooni/probe-cli/nettests/summary"
"github.com/ooni/probe-cli/utils"
"github.com/pkg/errors"
"upper.io/db.v3/lib/sqlbuilder"
)
// UpdateOne will run the specified update query and check that it only affected one row
func UpdateOne(db *sqlx.DB, query string, arg interface{}) error {
res, err := db.NamedExec(query, arg)
if err != nil {
return errors.Wrap(err, "updating table")
}
count, err := res.RowsAffected()
if err != nil {
return errors.Wrap(err, "updating table")
}
if count != 1 {
return errors.New("inconsistent update count")
}
return nil
// Network represents a network tested by the user
type Network struct {
ID int64 `db:"id"`
NetworkName string `db:"network_name"`
IP string `db:"ip"`
ASN int `db:"asn"`
CountryCode string `db:"country_code"`
}
// ListMeasurements given a result ID
func ListMeasurements(db *sqlx.DB, resultID int64) ([]*Measurement, error) {
measurements := []*Measurement{}
rows, err := db.Query(`SELECT id, name,
start_time, runtime,
country,
asn,
summary,
input
FROM measurements
WHERE result_id = ?
ORDER BY start_time;`, resultID)
if err != nil {
return measurements, errors.Wrap(err, "failed to get measurement list")
}
for rows.Next() {
msmt := Measurement{}
err = rows.Scan(&msmt.ID, &msmt.Name,
&msmt.StartTime, &msmt.Runtime,
&msmt.CountryCode,
&msmt.ASN,
&msmt.Summary, &msmt.Input,
//&result.DataUsageUp, &result.DataUsageDown)
)
if err != nil {
log.WithError(err).Error("failed to fetch a row")
continue
}
measurements = append(measurements, &msmt)
}
return measurements, nil
type URL struct {
ID int64 `db:"id"`
URL int64 `db:"url"`
CategoryCode string `db:"category_code"`
CountryCode string `db:"country_code"`
}
// Measurement model
type Measurement struct {
ID int64 `db:"id"`
TestName string `db:"test_name"`
StartTime time.Time `db:"start_time"`
Runtime float64 `db:"runtime"` // Fractional number of seconds
NetworkID int64 `db:"network_id"` // Used to include a Network
IsDone bool `db:"is_done"`
IsUploaded bool `db:"is_uploaded"`
IsFailed string `db:"is_failed"`
FailureMsg string `db:"failure_msg"`
IsUploadFailed bool `db:"is_upload_failed"`
UploadFailureMsg string `db:"upload_failure_msg"`
IsRerun bool `db:"is_rerun"`
ReportID string `db:"report_id"`
URLID string `db:"url_id"` // Used to reference URL
MeasurementID int64 `db:"measurement_id"`
IsAnomaly bool `db:"is_anomaly"`
TestKeys struct{} `db:"test_keys"`
ResultID int64 `db:"result_id"`
ReportFilePath string `db:"report_file_path"`
}
// Result model
type Result struct {
ID int64 `db:"id"`
Name string `db:"name"`
TestGroupName string `db:"test_group_name"`
StartTime time.Time `db:"start_time"`
Runtime float64 `db:"runtime"` // Fractional number of seconds
Summary string `db:"summary"` // XXX this should be JSON
ASN string `db:"asn"`
IP string `db:"ip"`
CountryCode string `db:"country"`
State string `db:"state"`
Failure string `db:"failure"`
UploadFailure string `db:"upload_failure"`
Uploaded bool `db:"uploaded"`
ReportFilePath string `db:"report_file"`
ReportID string `db:"report_id"`
Input string `db:"input"`
ResultID int64 `db:"result_id"`
Runtime float64 `db:"runtime"` // Runtime is expressed in fractional seconds
IsViewed bool `db:"is_viewed"`
IsDone bool `db:"is_done"`
DataUsageUp int64 `db:"data_usage_up"`
DataUsageDown int64 `db:"data_usage_down"`
MeasurementDir string `db:"measurement_dir"`
}
// Finished marks the result as done and sets the runtime
func (r *Result) Finished(sess sqlbuilder.Database, makeSummary summary.ResultSummaryFunc) error {
if r.IsDone == true || r.Runtime != 0 {
return errors.New("Result is already finished")
}
r.Runtime = time.Now().UTC().Sub(r.StartTime).Seconds()
r.IsDone = true
err := sess.Collection("results").Find("id", r.ID).Update(r)
if err != nil {
return errors.Wrap(err, "updating finished result")
}
return nil
}
// SetGeoIPInfo for the Measurement
@ -91,12 +83,9 @@ func (m *Measurement) SetGeoIPInfo() error {
}
// Failed writes the error string to the measurement
func (m *Measurement) Failed(db *sqlx.DB, failure string) error {
m.Failure = failure
err := UpdateOne(db, `UPDATE measurements
SET failure = :failure, state = :state
WHERE id = :id`, m)
func (m *Measurement) Failed(sess sqlbuilder.Database, failure string) error {
m.FailureMsg = failure
err := sess.Collection("measurements").Find("id", m.ID).Update(m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
@ -104,14 +93,12 @@ func (m *Measurement) Failed(db *sqlx.DB, failure string) error {
}
// Done marks the measurement as completed
func (m *Measurement) Done(db *sqlx.DB) error {
func (m *Measurement) Done(sess sqlbuilder.Database) error {
runtime := time.Now().UTC().Sub(m.StartTime)
m.Runtime = runtime.Seconds()
m.State = "done"
m.IsDone = true
err := UpdateOne(db, `UPDATE measurements
SET state = :state, runtime = :runtime
WHERE id = :id`, m)
err := sess.Collection("measurements").Find("id", m.ID).Update(m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
@ -119,13 +106,11 @@ func (m *Measurement) Done(db *sqlx.DB) error {
}
// UploadFailed writes the error string for the upload failure to the measurement
func (m *Measurement) UploadFailed(db *sqlx.DB, failure string) error {
m.UploadFailure = failure
m.Uploaded = false
func (m *Measurement) UploadFailed(sess sqlbuilder.Database, failure string) error {
m.UploadFailureMsg = failure
m.IsUploaded = false
err := UpdateOne(db, `UPDATE measurements
SET upload_failure = :upload_failure
WHERE id = :id`, m)
err := sess.Collection("measurements").Find("id", m.ID).Update(m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
@ -133,12 +118,10 @@ func (m *Measurement) UploadFailed(db *sqlx.DB, failure string) error {
}
// UploadSucceeded writes the error string for the upload failure to the measurement
func (m *Measurement) UploadSucceeded(db *sqlx.DB) error {
m.Uploaded = true
func (m *Measurement) UploadSucceeded(sess sqlbuilder.Database) error {
m.IsUploaded = true
err := UpdateOne(db, `UPDATE measurements
SET uploaded = :uploaded
WHERE id = :id`, m)
err := sess.Collection("measurements").Find("id", m.ID).Update(m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
@ -146,12 +129,10 @@ func (m *Measurement) UploadSucceeded(db *sqlx.DB) error {
}
// WriteSummary writes the summary to the measurement
func (m *Measurement) WriteSummary(db *sqlx.DB, summary string) error {
m.Summary = summary
func (m *Measurement) WriteSummary(sess sqlbuilder.Database, summary string) error {
// XXX remove m.Summary = summary
err := UpdateOne(db, `UPDATE measurements
SET summary = :summary
WHERE id = :id`, m)
err := sess.Collection("measurements").Find("id", m.ID).Update(m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
@ -159,7 +140,7 @@ func (m *Measurement) WriteSummary(db *sqlx.DB, summary string) error {
}
// AddToResult adds a measurement to a result
func (m *Measurement) AddToResult(db *sqlx.DB, result *Result) error {
func (m *Measurement) AddToResult(sess sqlbuilder.Database, result *Result) error {
var err error
m.ResultID = result.ID
@ -176,191 +157,9 @@ func (m *Measurement) AddToResult(db *sqlx.DB, result *Result) error {
}
m.ReportFilePath = finalPath
err = UpdateOne(db, `UPDATE measurements
SET result_id = :result_id, report_file = :report_file
WHERE id = :id`, m)
err = sess.Collection("measurements").Find("id", m.ID).Update(m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
return nil
}
// CreateMeasurement writes the measurement to the database a returns a pointer
// to the Measurement
func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, error) {
// XXX Do we want to have this be part of something else?
m.StartTime = time.Now().UTC()
m.Input = i
m.State = "active"
res, err := db.NamedExec(`INSERT INTO measurements
(name, start_time,
asn, ip, country,
state, failure, report_file,
report_id, input,
result_id)
VALUES (:name,:start_time,
:asn,:ip,:country,
:state,:failure,:report_file,
:report_id,:input,
:result_id)`,
m)
if err != nil {
return nil, errors.Wrap(err, "creating measurement")
}
id, err := res.LastInsertId()
if err != nil {
return nil, errors.Wrap(err, "creating measurement")
}
m.ID = id
return &m, nil
}
// Result model
type Result struct {
ID int64 `db:"id"`
Name string `db:"name"`
StartTime time.Time `db:"start_time"`
Country string `db:"country"`
ASN string `db:"asn"`
NetworkName string `db:"network_name"`
Runtime float64 `db:"runtime"` // Runtime is expressed in fractional seconds
Summary string `db:"summary"` // XXX this should be JSON
Done bool `db:"done"`
DataUsageUp int64 `db:"data_usage_up"`
DataUsageDown int64 `db:"data_usage_down"`
MeasurementDir string `db:"measurement_dir"`
}
// ListResults return the list of results
func ListResults(db *sqlx.DB) ([]*Result, []*Result, error) {
doneResults := []*Result{}
incompleteResults := []*Result{}
rows, err := db.Query(`SELECT id, name,
start_time, runtime,
network_name, country,
asn,
summary, done
FROM results
WHERE done = 1
ORDER BY start_time;`)
if err != nil {
return doneResults, incompleteResults, errors.Wrap(err, "failed to get result done list")
}
for rows.Next() {
result := Result{}
err = rows.Scan(&result.ID, &result.Name,
&result.StartTime, &result.Runtime,
&result.NetworkName, &result.Country,
&result.ASN,
&result.Summary, &result.Done,
//&result.DataUsageUp, &result.DataUsageDown)
)
if err != nil {
log.WithError(err).Error("failed to fetch a row")
continue
}
doneResults = append(doneResults, &result)
}
rows, err = db.Query(`SELECT
id, name,
start_time,
network_name, country,
asn
FROM results
WHERE done != 1
ORDER BY start_time;`)
if err != nil {
return doneResults, incompleteResults, errors.Wrap(err, "failed to get result done list")
}
for rows.Next() {
result := Result{Done: false}
err = rows.Scan(&result.ID, &result.Name, &result.StartTime,
&result.NetworkName, &result.Country,
&result.ASN)
if err != nil {
log.WithError(err).Error("failed to fetch a row")
continue
}
incompleteResults = append(incompleteResults, &result)
}
return doneResults, incompleteResults, nil
}
// MakeSummaryMap return a mapping of test names to summaries for the given
// result
func MakeSummaryMap(db *sqlx.DB, r *Result) (summary.SummaryMap, error) {
summaryMap := summary.SummaryMap{}
msmts := []Measurement{}
// XXX maybe we only want to select some of the columns
err := db.Select(&msmts, "SELECT name, summary FROM measurements WHERE result_id = $1", r.ID)
if err != nil {
return nil, errors.Wrap(err, "failed to get measurements")
}
for _, msmt := range msmts {
val, ok := summaryMap[msmt.Name]
if ok {
summaryMap[msmt.Name] = append(val, msmt.Summary)
} else {
summaryMap[msmt.Name] = []string{msmt.Summary}
}
}
return summaryMap, nil
}
// Finished marks the result as done and sets the runtime
func (r *Result) Finished(db *sqlx.DB, makeSummary summary.ResultSummaryFunc) error {
if r.Done == true || r.Runtime != 0 {
return errors.New("Result is already finished")
}
r.Runtime = time.Now().UTC().Sub(r.StartTime).Seconds()
r.Done = true
// XXX add in here functionality to compute the summary
summaryMap, err := MakeSummaryMap(db, r)
if err != nil {
return err
}
r.Summary, err = makeSummary(summaryMap)
if err != nil {
return err
}
err = UpdateOne(db, `UPDATE results
SET done = :done, runtime = :runtime, summary = :summary
WHERE id = :id`, r)
if err != nil {
return errors.Wrap(err, "updating finished result")
}
return nil
}
// CreateResult writes the Result to the database a returns a pointer
// to the Result
func CreateResult(db *sqlx.DB, homePath string, r Result) (*Result, error) {
log.Debugf("Creating result %v", r)
p, err := utils.MakeResultsDir(homePath, r.Name, r.StartTime)
if err != nil {
return nil, err
}
r.MeasurementDir = p
res, err := db.NamedExec(`INSERT INTO results
(name, start_time, country, network_name, asn)
VALUES (:name,:start_time,:country,:network_name,:asn)`,
r)
if err != nil {
return nil, errors.Wrap(err, "creating result")
}
id, err := res.LastInsertId()
if err != nil {
return nil, errors.Wrap(err, "creating result")
}
r.ID = id
return &r, nil
}

View File

@ -58,11 +58,8 @@ func (c *Controller) Init(nt *mk.Nettest) error {
c.msmts = make(map[int64]*database.Measurement)
msmtTemplate := database.Measurement{
ASN: "",
IP: "",
CountryCode: "",
ReportID: "",
Name: nt.Name,
TestName: nt.Name,
ResultID: c.res.ID,
ReportFilePath: c.msmtPath,
}
@ -165,9 +162,12 @@ func (c *Controller) Init(nt *mk.Nettest) error {
nt.On("status.geoip_lookup", func(e mk.Event) {
log.Debugf(color.RedString(e.Key))
/* FIXME
Put this into the network table
msmtTemplate.ASN = e.Value.ProbeASN
msmtTemplate.IP = e.Value.ProbeIP
msmtTemplate.CountryCode = e.Value.ProbeCC
*/
})
nt.On("status.measurement_start", func(e mk.Event) {

View File

@ -6,7 +6,6 @@ import (
"path"
"github.com/apex/log"
"github.com/jmoiron/sqlx"
"github.com/ooni/probe-cli/config"
"github.com/ooni/probe-cli/internal/bindata"
"github.com/ooni/probe-cli/internal/database"
@ -14,6 +13,7 @@ import (
"github.com/ooni/probe-cli/internal/onboard"
"github.com/ooni/probe-cli/utils"
"github.com/pkg/errors"
"upper.io/db.v3/lib/sqlbuilder"
)
const Version = "3.0.0-dev.0"
@ -21,7 +21,7 @@ const Version = "3.0.0-dev.0"
// Context for OONI Probe
type Context struct {
Config *config.Config
DB *sqlx.DB
DB sqlbuilder.Database
Location *utils.LocationInfo
Home string