Update the measurement, network and url creation to the new schema

This commit is contained in:
Arturo Filastò 2018-09-07 12:55:27 +02:00
parent 71ed0e969f
commit 35bd334cfc
6 changed files with 138 additions and 80 deletions

View File

@ -3,9 +3,7 @@ package run
import ( import (
"errors" "errors"
"fmt" "fmt"
"path/filepath"
"strings" "strings"
"time"
"github.com/alecthomas/kingpin" "github.com/alecthomas/kingpin"
"github.com/apex/log" "github.com/apex/log"
@ -14,7 +12,6 @@ import (
"github.com/ooni/probe-cli/internal/database" "github.com/ooni/probe-cli/internal/database"
"github.com/ooni/probe-cli/nettests" "github.com/ooni/probe-cli/nettests"
"github.com/ooni/probe-cli/nettests/groups" "github.com/ooni/probe-cli/nettests/groups"
"github.com/ooni/probe-cli/utils"
) )
func init() { func init() {
@ -55,24 +52,13 @@ func init() {
return err return err
} }
network := database.Network{ network, err := database.CreateNetwork(ctx.DB, ctx.Location)
ASN: ctx.Location.ASN,
CountryCode: ctx.Location.CountryCode,
NetworkName: ctx.Location.NetworkName,
IP: ctx.Location.IP,
}
newID, err := ctx.DB.Collection("networks").Insert(network)
if err != nil { if err != nil {
log.WithError(err).Error("Failed to create the network row") log.WithError(err).Error("Failed to create the network row")
return nil return nil
} }
network.ID = newID.(int64)
result, err := database.CreateResult(ctx.DB, ctx.Home, database.Result{ result, err := database.CreateResult(ctx.DB, ctx.Home, *nettestGroup, network.ID)
TestGroupName: *nettestGroup,
StartTime: time.Now().UTC(),
NetworkID: network.ID,
})
if err != nil { if err != nil {
log.Errorf("DB result error: %s", err) log.Errorf("DB result error: %s", err)
return err return err
@ -80,16 +66,13 @@ func init() {
for _, nt := range group.Nettests { for _, nt := range group.Nettests {
log.Debugf("Running test %T", nt) log.Debugf("Running test %T", nt)
msmtPath := filepath.Join(ctx.TempDir, ctl := nettests.NewController(nt, ctx, result)
fmt.Sprintf("msmt-%T-%s.jsonl", nt,
time.Now().UTC().Format(utils.ResultTimestamp)))
ctl := nettests.NewController(nt, ctx, result, msmtPath)
if err = nt.Run(ctl); err != nil { if err = nt.Run(ctl); err != nil {
log.WithError(err).Errorf("Failed to run %s", group.Label) log.WithError(err).Errorf("Failed to run %s", group.Label)
return err return err
} }
} }
if err = result.Finished(ctx.DB, group.Summary); err != nil { if err = result.Finished(ctx.DB, group.Summary); err != nil {
return err return err
} }

View File

@ -1,6 +1,7 @@
package database package database
import ( import (
"database/sql"
"time" "time"
"github.com/apex/log" "github.com/apex/log"
@ -117,41 +118,65 @@ func ListResults(db sqlbuilder.Database) ([]*Result, []*Result, error) {
// CreateMeasurement writes the measurement to the database a returns a pointer // CreateMeasurement writes the measurement to the database a returns a pointer
// to the Measurement // to the Measurement
func CreateMeasurement(sess sqlbuilder.Database, m Measurement, i string) (*Measurement, error) { func CreateMeasurement(sess sqlbuilder.Database, reportID sql.NullString, testName string, resultID int64, reportFilePath string, urlID sql.NullInt64) (*Measurement, error) {
col := sess.Collection("measurements") msmt := Measurement{
ReportID: reportID,
TestName: testName,
ResultID: resultID,
ReportFilePath: reportFilePath,
URLID: urlID,
// XXX Do we want to have this be part of something else? // XXX Do we want to have this be part of something else?
m.StartTime = time.Now().UTC() StartTime: time.Now().UTC(),
m.TestKeys = "" TestKeys: "",
}
// XXX insert also the URL and stuff newID, err := sess.Collection("measurements").Insert(msmt)
//m.Input = i
//m.State = "active"
newID, err := col.Insert(m)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "creating measurement") return nil, errors.Wrap(err, "creating measurement")
} }
m.ID = newID.(int64) msmt.ID = newID.(int64)
return &m, nil return &msmt, nil
} }
// CreateResult writes the Result to the database a returns a pointer // CreateResult writes the Result to the database a returns a pointer
// to the Result // to the Result
func CreateResult(sess sqlbuilder.Database, homePath string, r Result) (*Result, error) { func CreateResult(sess sqlbuilder.Database, homePath string, testGroupName string, networkID int64) (*Result, error) {
log.Debugf("Creating result %v", r) startTime := time.Now().UTC()
col := sess.Collection("results") p, err := utils.MakeResultsDir(homePath, testGroupName, startTime)
p, err := utils.MakeResultsDir(homePath, r.TestGroupName, r.StartTime)
if err != nil { if err != nil {
return nil, err return nil, err
} }
r.MeasurementDir = p
newID, err := col.Insert(r) result := Result{
TestGroupName: testGroupName,
StartTime: startTime,
NetworkID: networkID,
}
result.MeasurementDir = p
log.Debugf("Creating result %v", result)
newID, err := sess.Collection("results").Insert(result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "creating result") return nil, errors.Wrap(err, "creating result")
} }
r.ID = newID.(int64) result.ID = newID.(int64)
return &r, nil return &result, nil
}
// CreateNetwork will create a new network in the network table
func CreateNetwork(sess sqlbuilder.Database, location *utils.LocationInfo) (*Network, error) {
network := Network{
ASN: location.ASN,
CountryCode: location.CountryCode,
NetworkName: location.NetworkName,
IP: location.IP,
}
newID, err := sess.Collection("networks").Insert(network)
if err != nil {
return nil, err
}
network.ID = newID.(int64)
return &network, nil
} }

View File

@ -5,7 +5,6 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
"time"
) )
func TestMeasurementWorkflow(t *testing.T) { func TestMeasurementWorkflow(t *testing.T) {
@ -25,21 +24,18 @@ func TestMeasurementWorkflow(t *testing.T) {
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
result, err := CreateResult(sess, tmpdir, Result{ result, err := CreateResult(sess, tmpdir, "websites", 0)
TestGroupName: "websites",
StartTime: time.Now().UTC(),
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
msmtTemplate := Measurement{ reportID := sql.NullString{String: "", Valid: false}
ReportID: sql.NullString{String: "", Valid: false}, testName := "antani"
TestName: "antani", resultID := result.ID
ResultID: result.ID, reportFilePath := tmpdir
ReportFilePath: tmpdir, urlID := sql.NullInt64{Int64: 0, Valid: false}
}
m1, err := CreateMeasurement(sess, msmtTemplate, "") m1, err := CreateMeasurement(sess, reportID, testName, resultID, reportFilePath, urlID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -23,7 +23,7 @@ type Network struct {
// URL represents URLs from the testing lists // URL represents URLs from the testing lists
type URL struct { type URL struct {
ID int64 `db:"id"` ID int64 `db:"id"`
URL int64 `db:"url"` URL string `db:"url"`
CategoryCode string `db:"category_code"` CategoryCode string `db:"category_code"`
CountryCode string `db:"country_code"` CountryCode string `db:"country_code"`
} }
@ -42,7 +42,7 @@ type Measurement struct {
UploadFailureMsg sql.NullString `db:"upload_failure_msg,omitempty"` UploadFailureMsg sql.NullString `db:"upload_failure_msg,omitempty"`
IsRerun bool `db:"is_rerun"` IsRerun bool `db:"is_rerun"`
ReportID sql.NullString `db:"report_id,omitempty"` ReportID sql.NullString `db:"report_id,omitempty"`
URLID string `db:"url_id"` // Used to reference URL URLID sql.NullInt64 `db:"url_id,omitempty"` // Used to reference URL
MeasurementID sql.NullInt64 `db:"measurement_id,omitempty"` MeasurementID sql.NullInt64 `db:"measurement_id,omitempty"`
IsAnomaly sql.NullBool `db:"is_anomaly,omitempty"` IsAnomaly sql.NullBool `db:"is_anomaly,omitempty"`
// FIXME we likely want to support JSON. See: https://github.com/upper/db/issues/462 // FIXME we likely want to support JSON. See: https://github.com/upper/db/issues/462

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"time"
"github.com/apex/log" "github.com/apex/log"
"github.com/fatih/color" "github.com/fatih/color"
@ -24,7 +25,10 @@ type Nettest interface {
} }
// NewController creates a nettest controller // NewController creates a nettest controller
func NewController(nt Nettest, ctx *ooni.Context, res *database.Result, msmtPath string) *Controller { func NewController(nt Nettest, ctx *ooni.Context, res *database.Result) *Controller {
msmtPath := filepath.Join(ctx.TempDir,
fmt.Sprintf("msmt-%T-%s.jsonl", nt,
time.Now().UTC().Format(utils.ResultTimestamp)))
return &Controller{ return &Controller{
Ctx: ctx, Ctx: ctx,
nt: nt, nt: nt,
@ -41,6 +45,7 @@ type Controller struct {
nt Nettest nt Nettest
msmts map[int64]*database.Measurement msmts map[int64]*database.Measurement
msmtPath string // XXX maybe we can drop this and just use a temporary file msmtPath string // XXX maybe we can drop this and just use a temporary file
inputIdxMap map[int64]int64 // Used to map mk idx to database id
} }
func getCaBundlePath() string { func getCaBundlePath() string {
@ -51,6 +56,11 @@ func getCaBundlePath() string {
return "/etc/ssl/cert.pem" return "/etc/ssl/cert.pem"
} }
func (c *Controller) SetInputIdxMap(inputIdxMap map[int64]int64) error {
c.inputIdxMap = inputIdxMap
return nil
}
// Init should be called once to initialise the nettest // Init should be called once to initialise the nettest
func (c *Controller) Init(nt *mk.Nettest) error { func (c *Controller) Init(nt *mk.Nettest) error {
log.Debugf("Init: %v", nt) log.Debugf("Init: %v", nt)
@ -58,12 +68,11 @@ func (c *Controller) Init(nt *mk.Nettest) error {
c.msmts = make(map[int64]*database.Measurement) c.msmts = make(map[int64]*database.Measurement)
msmtTemplate := database.Measurement{ // These values are shared by every measurement
ReportID: sql.NullString{String: "", Valid: false}, reportID := sql.NullString{String: "", Valid: false}
TestName: nt.Name, testName := nt.Name
ResultID: c.res.ID, resultID := c.res.ID
ReportFilePath: c.msmtPath, reportFilePath := c.msmtPath
}
// This is to workaround homedirs having UTF-8 characters in them. // This is to workaround homedirs having UTF-8 characters in them.
// See: https://github.com/measurement-kit/measurement-kit/issues/1635 // See: https://github.com/measurement-kit/measurement-kit/issues/1635
@ -157,7 +166,7 @@ func (c *Controller) Init(nt *mk.Nettest) error {
nt.On("status.report_created", func(e mk.Event) { nt.On("status.report_created", func(e mk.Event) {
log.Debugf("%s", e.Key) log.Debugf("%s", e.Key)
msmtTemplate.ReportID = sql.NullString{String: e.Value.ReportID, Valid: true} reportID = sql.NullString{String: e.Value.ReportID, Valid: true}
}) })
nt.On("status.geoip_lookup", func(e mk.Event) { nt.On("status.geoip_lookup", func(e mk.Event) {
@ -175,7 +184,11 @@ func (c *Controller) Init(nt *mk.Nettest) error {
log.Debugf(color.RedString(e.Key)) log.Debugf(color.RedString(e.Key))
idx := e.Value.Idx idx := e.Value.Idx
msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, e.Value.Input) urlID := sql.NullInt64{Int64: 0, Valid: false}
if c.inputIdxMap != nil {
urlID = sql.NullInt64{Int64: c.inputIdxMap[idx], Valid: true}
}
msmt, err := database.CreateMeasurement(c.Ctx.DB, reportID, testName, resultID, reportFilePath, urlID)
if err != nil { if err != nil {
log.WithError(err).Error("Failed to create measurement") log.WithError(err).Error("Failed to create measurement")
return return

View File

@ -6,7 +6,9 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"github.com/apex/log"
"github.com/measurement-kit/go-measurement-kit" "github.com/measurement-kit/go-measurement-kit"
"github.com/ooni/probe-cli/internal/database"
"github.com/ooni/probe-cli/nettests" "github.com/ooni/probe-cli/nettests"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -14,6 +16,7 @@ import (
// URLInfo contains the URL and the citizenlab category code for that URL // URLInfo contains the URL and the citizenlab category code for that URL
type URLInfo struct { type URLInfo struct {
URL string `json:"url"` URL string `json:"url"`
CountryCode string `json:"country_code"`
CategoryCode string `json:"category_code"` CategoryCode string `json:"category_code"`
} }
@ -24,10 +27,11 @@ type URLResponse struct {
const orchestrateBaseURL = "https://events.proteus.test.ooni.io" const orchestrateBaseURL = "https://events.proteus.test.ooni.io"
func lookupURLs(ctl *nettests.Controller) ([]string, error) { func lookupURLs(ctl *nettests.Controller) ([]string, map[int64]int64, error) {
var ( var (
parsed = new(URLResponse) parsed = new(URLResponse)
urls []string urls []string
urlIDMap map[int64]int64
) )
// XXX pass in the configuration for category codes // XXX pass in the configuration for category codes
reqURL := fmt.Sprintf("%s/api/v1/urls?probe_cc=%s", reqURL := fmt.Sprintf("%s/api/v1/urls?probe_cc=%s",
@ -36,22 +40,58 @@ func lookupURLs(ctl *nettests.Controller) ([]string, error) {
resp, err := http.Get(reqURL) resp, err := http.Get(reqURL)
if err != nil { if err != nil {
return urls, errors.Wrap(err, "failed to perform request") return urls, urlIDMap, errors.Wrap(err, "failed to perform request")
} }
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
return urls, errors.Wrap(err, "failed to read response body") return urls, urlIDMap, errors.Wrap(err, "failed to read response body")
} }
err = json.Unmarshal([]byte(body), &parsed) err = json.Unmarshal([]byte(body), &parsed)
if err != nil { if err != nil {
return urls, errors.Wrap(err, "failed to parse json") return urls, urlIDMap, errors.Wrap(err, "failed to parse json")
} }
for _, url := range parsed.Results { for idx, url := range parsed.Results {
var urlID int64
res, err := ctl.Ctx.DB.Update("urls").Set(
"url", url.URL,
"category_code", url.CategoryCode,
"country_code", url.CountryCode,
).Where("url = ? AND country_code = ?", url.URL, url.CountryCode).Exec()
if err != nil {
log.Error("Failed to write to the URL table")
} else {
affected, err := res.RowsAffected()
if err != nil {
log.Error("Failed to get affected row count")
} else if affected == 0 {
newID, err := ctl.Ctx.DB.Collection("urls").Insert(
database.URL{
URL: url.URL,
CategoryCode: url.CategoryCode,
CountryCode: url.CountryCode,
})
if err != nil {
log.Error("Failed to insert into the URLs table")
}
urlID = newID.(int64)
} else {
lastID, err := res.LastInsertId()
if err != nil {
log.Error("failed to get URL ID")
}
urlID = lastID
}
}
urlIDMap[int64(idx)] = urlID
urls = append(urls, url.URL) urls = append(urls, url.URL)
} }
return urls, nil return urls, urlIDMap, nil
} }
// WebConnectivity test implementation // WebConnectivity test implementation
@ -63,10 +103,11 @@ func (n WebConnectivity) Run(ctl *nettests.Controller) error {
nt := mk.NewNettest("WebConnectivity") nt := mk.NewNettest("WebConnectivity")
ctl.Init(nt) ctl.Init(nt)
urls, err := lookupURLs(ctl) urls, urlIDMap, err := lookupURLs(ctl)
if err != nil { if err != nil {
return err return err
} }
ctl.SetInputIdxMap(urlIDMap)
nt.Options.Inputs = urls nt.Options.Inputs = urls
return nt.Run() return nt.Run()