Merge pull request #7 from OpenObservatory/feature/measurement-state

Keep track of the measurement state in the database
This commit is contained in:
Arturo Filastò 2018-03-22 14:51:44 +00:00 committed by GitHub
commit 09bb67ab1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 705 additions and 122 deletions

2
Gopkg.lock generated
View File

@ -89,7 +89,7 @@
branch = "master" branch = "master"
name = "github.com/measurement-kit/go-measurement-kit" name = "github.com/measurement-kit/go-measurement-kit"
packages = ["."] packages = ["."]
revision = "ae6643546db7c99bbc6ecb37b86d04baec04d295" revision = "6ae2401a8e498a90ccdd3edbda1841add079b70e"
[[projects]] [[projects]]
branch = "master" branch = "master"

View File

@ -33,3 +33,22 @@ was built as
``` ```
CGO_LDFLAGS="-L/path/to/measurement-kit/.libs/" CGO_CFLAGS="-I/path/to/measurement-kit/include" make build CGO_LDFLAGS="-L/path/to/measurement-kit/.libs/" CGO_CFLAGS="-I/path/to/measurement-kit/include" make build
``` ```
## Todo
* Add support for outputing structured logging messages, while tests are
running, to be consumed by the desktop app
* Add support for the missing tests, namely:
- HTTP Invalid Request Line
- HTTP Header Field Manipulation
- Facebook Messenger
- Telegram
- WhatsApp
- WebConnectivity
* Fix issue with the informed consent being bypassed on first run
* Finish the config file implementation
* Add support for listing results in the CLI

View File

@ -13,7 +13,7 @@ CREATE TABLE `results` (
`id` INTEGER PRIMARY KEY AUTOINCREMENT, `id` INTEGER PRIMARY KEY AUTOINCREMENT,
`name` VARCHAR(255), `name` VARCHAR(255),
`start_time` DATETIME, `start_time` DATETIME,
`end_time` DATETIME, `runtime` REAL,
`summary` JSON, `summary` JSON,
`done` TINYINT(1), `done` TINYINT(1),
`data_usage_up` INTEGER, `data_usage_up` INTEGER,
@ -24,18 +24,19 @@ CREATE TABLE `measurements` (
`id` INTEGER PRIMARY KEY AUTOINCREMENT, `id` INTEGER PRIMARY KEY AUTOINCREMENT,
`name` VARCHAR(255), `name` VARCHAR(255),
`start_time` DATETIME, `start_time` DATETIME,
`end_time` DATETIME, `runtime` REAL,
`summary` JSON, `summary` JSON,
`ip` VARCHAR(255), `ip` VARCHAR(255),
`asn` INTEGER, `asn` VARCHAR(16),
`country` VARCHAR(2), `country` VARCHAR(2),
`network_name` VARCHAR(255), `network_name` VARCHAR(255),
`state` TEXT, `state` TEXT,
`failure` VARCHAR(255), `failure` VARCHAR(255),
`upload_failure` VARCHAR(255),
`uploaded` TINYINT(1),
`report_file` VARCHAR(255), `report_file` VARCHAR(255),
`report_id` VARCHAR(255), `report_id` VARCHAR(255),
`input` VARCHAR(255), `input` VARCHAR(255),
`measurement_id` VARCHAR(255),
`result_id` INTEGER REFERENCES `results` (`id`) ON DELETE SET NULL ON UPDATE CASCADE `result_id` INTEGER REFERENCES `results` (`id`) ON DELETE SET NULL ON UPDATE CASCADE
); );

View File

@ -130,19 +130,20 @@ func bindataDataDefaultconfigjson() (*asset, error) {
} }
var _bindataDataMigrations1createmsmtresultssql = []byte( var _bindataDataMigrations1createmsmtresultssql = []byte(
"\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x93\x4f\x6f\xf2\x30\x0c\xc6\xef\xfd\x14\x3e\x82\xde\x97\xc3\x26\x71" + "\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x93\x31\xef\xda\x30\x10\xc5\xf7\x7c\x8a\x1b\x41\x2d\x03\x95\xe8\xc2" +
"\xe2\x14\x5a\x6f\xeb\x56\x52\x94\xa6\xd3\x38\xb5\xd1\x1a\x50\x34\x9a\x56\x69\x22\xb4\x6f\x3f\x85\x7f\x2a\xac\x70" + "\x64\x92\x6b\x9b\x36\x38\xc8\x71\xaa\x32\x25\x56\x63\x90\xd5\xc4\x89\x1c\x5b\xa8\xdf\xbe\x32\x24\x14\x68\xa0\xeb" +
"\xde\xd5\xbf\xc7\x8e\xfd\xd8\x99\x4c\xe0\x5f\xad\x36\x46\x58\x09\x51\xb3\xd3\x41\x3f\x90\x59\x61\x65\x2d\xb5\x9d" + "\x7f\x7d\xbf\xbb\x67\x9f\xdf\x79\xb1\x80\x77\x8d\x3a\x1a\x61\x25\x44\xed\x49\x07\xb7\x42\x66\x85\x95\x8d\xd4\x76" +
"\xcb\x8d\xd2\x41\x10\xb1\x74\x09\x9c\xcc\x13\x84\xd2\xc8\xce\x6d\x6d\x57\xce\x2e\xa2\xb5\x14\x9d\x33\xfb\x1c\x8f" + "\x23\x8f\x4a\x07\x41\xc4\xd2\x1d\x70\xb2\x49\x10\x4a\x23\x7b\x57\xdb\xbe\x5c\xdf\xa9\x8d\x14\xbd\x33\xe7\x1e\x8f" +
"\x86\xab\xa1\xae\x2e\x49\xde\xde\x7d\x36\x64\x48\x38\x5e\x3f\x0c\xa3\x00\x00\xa0\x54\x55\x09\x31\xe5\xf8\x8c\x0c" + "\xa6\xdd\x50\x57\xf7\x24\xef\x5e\x1e\x1b\x32\x24\x1c\x1f\x0f\x86\x59\x00\x00\x50\xaa\xaa\x84\x98\x72\xfc\x8c\x0c" +
"\x96\x2c\x5e\x10\xb6\x82\x37\x5c\x01\xc9\x79\x1a\xd3\x90\xe1\x02\x29\xff\x7f\xd0\x6a\x51\xcb\x12\xde\x09\x0b\x5f" + "\x76\x2c\xde\x12\xb6\x87\x6f\xb8\x07\x92\xf3\x34\xa6\x21\xc3\x2d\x52\xfe\xfe\x52\xab\x45\x23\x4b\xf8\x4e\x58\xf8" +
"\x08\x1b\x3d\x4e\xa7\xe3\x23\xe8\xac\x30\xb6\xb0\xca\xe3\x88\x70\xe4\xf1\x02\x8f\x48\xea\x6a\x18\x74\xae\xae\x85" + "\x85\xb0\xd9\x87\xd5\x6a\x3e\x80\xde\x0a\x63\x0b\xab\x3c\x8e\x08\x47\x1e\x6f\x71\x40\xc6\xe9\x8b\xce\x90\x24\x63" +
"\xf9\x2e\xe1\x35\x4b\xe9\x31\x56\x35\x5a\x96\xc0\x63\xba\x8a\x29\x1f\x3d\x9c\xca\x57\xc2\x8a\xc2\x75\x62\x23\x0b" + "\xb9\x6b\x1a\x61\x7e\x97\xf0\x35\x4b\xe9\xa0\x55\xad\x96\x25\xf0\x98\xee\x63\xca\x67\xcb\xd1\xb9\x12\x56\x14\xae" +
"\xd7\x9e\xdb\xfd\x0d\xab\x66\xa7\xcf\x38\x18\xcf\xae\x67\xbf\xb0\xf7\x4f\x1a\xa0\xda\xc1\xf2\xa2\xd3\xd7\x63\x7f" + "\x17\x47\x59\xb8\xee\x7a\xd3\x7f\x61\xd5\x9e\xf4\x15\x07\xf3\xf5\xe3\xd8\x77\x2f\xfb\xd6\x66\x57\xdd\xa4\xb3\xe8" +
"\x36\x4e\x5b\x9f\x7d\x96\x9f\xc4\x5a\xda\x5d\x63\xbe\x8a\x7b\xcd\x5a\x6f\x33\x7e\x9c\xe6\x5a\x0b\xb5\x75\x66\x58" + "\xf5\x5f\x7d\xf9\x71\x94\x7f\xb6\x4e\x5b\xef\x70\x6d\x19\x89\x96\xf6\xd4\x9a\x5f\xc5\xab\xbb\x5a\xff\xca\xf8\x63" +
"\x6d\x64\xdb\x18\x5b\xac\xd5\xf6\x2e\xf7\x36\x0e\x50\xa5\x5b\x67\x07\x49\x6f\x19\xb7\x92\x0f\xb7\x5a\xf4\x37\xc4" + "\x1c\xeb\x20\x54\xed\xcc\x74\xb5\xeb\xea\x56\x54\xc5\xff\x4b\x64\x35\x91\x9c\x91\x5d\x6b\x6c\x71\x50\xf5\x74\xeb" +
"\xf0\x09\x19\xd2\x10\xb3\xfe\x29\xfb\x25\x8e\x21\xa5\x10\x61\x82\x1c\x21\x43\x0e\x34\x4f\x12\x1f\xca\x97\xde\x77" + "\xc0\x7d\x06\x13\x54\xe9\xce\xd9\x27\x7d\x7e\x81\x8b\xdb\xec\x18\x7e\x42\x86\x34\xc4\xec\x76\xbf\x7d\xbc\x73\x48" +
"\x08\x49\x16\x92\x08\xf7\x57\x71\xf3\x5b\xfd\x04\x00\x00\xff\xff\xc4\x16\x0a\x20\xcf\x03\x00\x00") "\x29\x44\x98\x20\x47\xc8\x90\x03\xcd\x93\xc4\x4b\xf9\xce\x47\x05\x21\xc9\x42\x12\xe1\x79\x5f\x9e\xfe\xb5\x3f\x01" +
"\x00\x00\xff\xff\x29\xbd\x69\x4f\xe4\x03\x00\x00")
func bindataDataMigrations1createmsmtresultssqlBytes() ([]byte, error) { func bindataDataMigrations1createmsmtresultssqlBytes() ([]byte, error) {
return bindataRead( return bindataRead(

View File

@ -1,6 +1,7 @@
package run package run
import ( import (
"errors"
"fmt" "fmt"
"path/filepath" "path/filepath"
"time" "time"
@ -25,7 +26,11 @@ func init() {
log.Errorf("%s", err) log.Errorf("%s", err)
return err return err
} }
group := groups.NettestGroups[*nettestGroup] group, ok := groups.NettestGroups[*nettestGroup]
if !ok {
log.Errorf("No test group named %s", *nettestGroup)
return errors.New("invalid test group name")
}
log.Debugf("Running test group %s", group.Label) log.Debugf("Running test group %s", group.Label)
result, err := database.CreateResult(ctx.DB, database.Result{ result, err := database.CreateResult(ctx.DB, database.Result{
@ -40,22 +45,18 @@ func init() {
for _, nt := range group.Nettests { for _, nt := range group.Nettests {
log.Debugf("Running test %T", nt) log.Debugf("Running test %T", nt)
msmtPath := filepath.Join(ctx.TempDir, msmtPath := filepath.Join(ctx.TempDir,
fmt.Sprintf("msmt-%s-%T.jsonl", nt, fmt.Sprintf("msmt-%T-%s.jsonl", nt,
time.Now().UTC().Format(time.RFC3339Nano))) time.Now().UTC().Format(time.RFC3339Nano)))
ctl := nettests.NewController(ctx, result, msmtPath) ctl := nettests.NewController(nt, ctx, result, msmtPath)
if err := nt.Run(ctl); err != nil { if err = nt.Run(ctl); err != nil {
log.WithError(err).Errorf("Failed to run %s", group.Label) log.WithError(err).Errorf("Failed to run %s", group.Label)
return err return err
} }
// XXX
// 1. Generate the summary
// 2. Link the measurement to the Result (this should probably happen in
// the nettest class)
// 3. Update the summary of the result and the other metadata in the db
// 4. Move the msmtPath into the final location ~/.ooni/msmts/
} }
// result.Update(ctx.DB) if err = result.Finished(ctx.DB, group.Summary); err != nil {
return err
}
return nil return nil
}) })
} }

View File

@ -1,30 +1,58 @@
package database package database
import ( import (
"fmt"
"os"
"path/filepath"
"time" "time"
"github.com/apex/log" "github.com/apex/log"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
ooni "github.com/openobservatory/gooni"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
// ResultSummaryFunc is the function used to generate result summaries
type ResultSummaryFunc func(SummaryMap) (string, error)
// SummaryMap contains a mapping from test name to serialized summary for it
type SummaryMap map[string]string
// UpdateOne will run the specified update query and check that it only affected one row
func UpdateOne(db *sqlx.DB, query string, arg interface{}) error {
res, err := db.NamedExec(query, arg)
if err != nil {
return errors.Wrap(err, "updating table")
}
count, err := res.RowsAffected()
if err != nil {
return errors.Wrap(err, "updating table")
}
if count != 1 {
return errors.New("inconsistent update count")
}
return nil
}
// Measurement model // Measurement model
type Measurement struct { type Measurement struct {
ID int64 `db:"id"` ID int64 `db:"id"`
Name string `db:"name"` Name string `db:"name"`
StartTime time.Time `db:"start_time"` StartTime time.Time `db:"start_time"`
EndTime time.Time `db:"end_time"` Runtime float64 `db:"runtime"` // Fractional number of seconds
Summary string `db:"summary"` // XXX this should be JSON Summary string `db:"summary"` // XXX this should be JSON
ASN int64 `db:"asn"` ASN string `db:"asn"`
IP string `db:"ip"` IP string `db:"ip"`
CountryCode string `db:"country"` CountryCode string `db:"country"`
State string `db:"state"` State string `db:"state"`
Failure string `db:"failure"` Failure string `db:"failure"`
UploadFailure string `db:"upload_failure"`
Uploaded bool `db:"uploaded"`
ReportFilePath string `db:"report_file"` ReportFilePath string `db:"report_file"`
ReportID string `db:"report_id"` ReportID string `db:"report_id"`
Input string `db:"input"` Input string `db:"input"`
MeasurementID string `db:"measurement_id"` ResultID int64 `db:"result_id"`
ResultID string `db:"result_id"`
} }
// SetGeoIPInfo for the Measurement // SetGeoIPInfo for the Measurement
@ -32,14 +60,108 @@ func (m *Measurement) SetGeoIPInfo() error {
return nil return nil
} }
// Failed writes the error string to the measurement
func (m *Measurement) Failed(db *sqlx.DB, failure string) error {
m.Failure = failure
err := UpdateOne(db, `UPDATE measurements
SET failure = :failure, state = :state
WHERE id = :id`, m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
return nil
}
// Done marks the measurement as completed
func (m *Measurement) Done(db *sqlx.DB) error {
runtime := time.Now().UTC().Sub(m.StartTime)
m.Runtime = runtime.Seconds()
m.State = "done"
err := UpdateOne(db, `UPDATE measurements
SET state = :state, runtime = :runtime
WHERE id = :id`, m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
return nil
}
// UploadFailed writes the error string for the upload failure to the measurement
func (m *Measurement) UploadFailed(db *sqlx.DB, failure string) error {
m.UploadFailure = failure
m.Uploaded = false
err := UpdateOne(db, `UPDATE measurements
SET upload_failure = :upload_failure
WHERE id = :id`, m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
return nil
}
// UploadSucceeded writes the error string for the upload failure to the measurement
func (m *Measurement) UploadSucceeded(db *sqlx.DB) error {
m.Uploaded = true
err := UpdateOne(db, `UPDATE measurements
SET uploaded = :uploaded
WHERE id = :id`, m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
return nil
}
// WriteSummary writes the summary to the measurement
func (m *Measurement) WriteSummary(db *sqlx.DB, summary string) error {
m.Summary = summary
err := UpdateOne(db, `UPDATE measurements
SET summary = :summary
WHERE id = :id`, m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
return nil
}
// AddToResult adds a measurement to a result
func (m *Measurement) AddToResult(db *sqlx.DB, result *Result) error {
m.ResultID = result.ID
finalPath := filepath.Join(result.MeasurementDir,
filepath.Base(m.ReportFilePath))
err := os.Rename(m.ReportFilePath, finalPath)
if err != nil {
return errors.Wrap(err, "moving report file")
}
m.ReportFilePath = finalPath
err = UpdateOne(db, `UPDATE measurements
SET result_id = :result_id, report_file = :report_file
WHERE id = :id`, m)
if err != nil {
return errors.Wrap(err, "updating measurement")
}
return nil
}
// CreateMeasurement writes the measurement to the database a returns a pointer // CreateMeasurement writes the measurement to the database a returns a pointer
// to the Measurement // to the Measurement
func CreateMeasurement(db *sqlx.DB, m Measurement) (*Measurement, error) { func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, error) {
// XXX Do we want to have this be part of something else?
m.StartTime = time.Now().UTC()
m.Input = i
m.State = "active"
res, err := db.NamedExec(`INSERT INTO measurements res, err := db.NamedExec(`INSERT INTO measurements
(name, start_time, (name, start_time,
summary, asn, ip, country, asn, ip, country,
state, failure, report_file, state, failure, report_file,
report_id, input, measurement_id, report_id, input,
result_id) result_id)
VALUES (:name,:start_time, VALUES (:name,:start_time,
:asn,:ip,:country, :asn,:ip,:country,
@ -63,49 +185,89 @@ type Result struct {
ID int64 `db:"id"` ID int64 `db:"id"`
Name string `db:"name"` Name string `db:"name"`
StartTime time.Time `db:"start_time"` StartTime time.Time `db:"start_time"`
Runtime float64 `db:"runtime"` // Runtime is expressed in Microseconds Runtime float64 `db:"runtime"` // Runtime is expressed in fractional seconds
Summary string `db:"summary"` // XXX this should be JSON Summary string `db:"summary"` // XXX this should be JSON
Done bool `db:"done"` Done bool `db:"done"`
DataUsageUp int64 `db:"data_usage_up"` DataUsageUp int64 `db:"data_usage_up"`
DataUsageDown int64 `db:"data_usage_down"` DataUsageDown int64 `db:"data_usage_down"`
MeasurementDir string `db:"measurement_dir"`
started time.Time
} }
// Started marks the Result as having started // MakeSummaryMap return a mapping of test names to summaries for the given
func (r *Result) Started(db *sqlx.DB) error { // result
r.started = time.Now() func MakeSummaryMap(db *sqlx.DB, r *Result) (SummaryMap, error) {
return nil summaryMap := SummaryMap{}
msmts := []Measurement{}
// XXX maybe we only want to select some of the columns
err := db.Select(&msmts, "SELECT name, summary FROM measurements WHERE result_id = $1", r.ID)
if err != nil {
return nil, errors.Wrap(err, "failed to get measurements")
}
for _, msmt := range msmts {
summaryMap[msmt.Name] = msmt.Summary
}
return summaryMap, nil
} }
// Finished marks the result as done and sets the runtime // Finished marks the result as done and sets the runtime
func (r *Result) Finished(db *sqlx.DB) error { func (r *Result) Finished(db *sqlx.DB, makeSummary ResultSummaryFunc) error {
if r.Done == true || r.Runtime != 0 { if r.Done == true || r.Runtime != 0 {
return errors.New("Result is already finished") return errors.New("Result is already finished")
} }
r.Runtime = float64(time.Now().Sub(r.started)) / float64(time.Microsecond) r.Runtime = time.Now().UTC().Sub(r.StartTime).Seconds()
r.Done = true r.Done = true
// XXX add in here functionality to compute the summary
summaryMap, err := MakeSummaryMap(db, r)
if err != nil {
return err
}
res, err := db.NamedExec(`UPDATE results r.Summary, err = makeSummary(summaryMap)
SET done = true, runtime = :runtime if err != nil {
return err
}
err = UpdateOne(db, `UPDATE results
SET done = :done, runtime = :runtime, summary = :summary
WHERE id = :id`, r) WHERE id = :id`, r)
if err != nil { if err != nil {
return errors.Wrap(err, "updating result") return errors.Wrap(err, "updating finished result")
}
count, err := res.RowsAffected()
if err != nil {
return errors.Wrap(err, "updating result")
}
if count != 1 {
return errors.New("inconsistent update count")
} }
return nil return nil
} }
// MakeResultsPath creates and returns a directory for the result
func MakeResultsPath(r *Result) (string, error) {
home, err := ooni.GetOONIHome()
if err != nil {
return "", errors.Wrap(err, "default measurements path")
}
p := filepath.Join(home, "msmts",
fmt.Sprintf("%s-%s", r.Name, r.StartTime.Format(time.RFC3339Nano)))
// If the path already exists, this is a problem. It should not clash, because
// we are using nanosecond precision for the starttime.
if _, e := os.Stat(p); e == nil {
return "", errors.New("results path already exists")
}
err = os.MkdirAll(p, 0700)
if err != nil {
return "", err
}
return p, nil
}
// CreateResult writes the Result to the database a returns a pointer // CreateResult writes the Result to the database a returns a pointer
// to the Result // to the Result
func CreateResult(db *sqlx.DB, r Result) (*Result, error) { func CreateResult(db *sqlx.DB, r Result) (*Result, error) {
log.Debugf("Creating result %v", r) log.Debugf("Creating result %v", r)
p, err := MakeResultsPath(&r)
if err != nil {
return nil, err
}
r.MeasurementDir = p
res, err := db.NamedExec(`INSERT INTO results res, err := db.NamedExec(`INSERT INTO results
(name, start_time) (name, start_time)
VALUES (:name,:start_time)`, VALUES (:name,:start_time)`,

View File

@ -60,15 +60,25 @@ func New(w io.Writer) *Handler {
} }
} }
// HandleLog implements log.Handler. // TypedLog is used for handling special "typed" logs to the CLI
func (h *Handler) HandleLog(e *log.Entry) error { func (h *Handler) TypedLog(t string, e *log.Entry) error {
switch t {
case "progress":
// XXX replace this with something more fancy like https://github.com/tj/go-progress
fmt.Fprintf(h.Writer, "%.1f%% [%s]: %s", e.Fields.Get("percentage").(float64)*100, e.Fields.Get("key"), e.Message)
fmt.Fprintln(h.Writer)
return nil
default:
return h.DefaultLog(e)
}
}
// DefaultLog is the default way of printing out logs
func (h *Handler) DefaultLog(e *log.Entry) error {
color := Colors[e.Level] color := Colors[e.Level]
level := Strings[e.Level] level := Strings[e.Level]
names := e.Fields.Names() names := e.Fields.Names()
h.mu.Lock()
defer h.mu.Unlock()
color.Fprintf(h.Writer, "%s %-25s", bold.Sprintf("%*s", h.Padding+1, level), e.Message) color.Fprintf(h.Writer, "%s %-25s", bold.Sprintf("%*s", h.Padding+1, level), e.Message)
for _, name := range names { for _, name := range names {
@ -82,3 +92,16 @@ func (h *Handler) HandleLog(e *log.Entry) error {
return nil return nil
} }
// HandleLog implements log.Handler.
func (h *Handler) HandleLog(e *log.Entry) error {
h.mu.Lock()
defer h.mu.Unlock()
t, isTyped := e.Fields["type"].(string)
if isTyped {
return h.TypedLog(t, e)
}
return h.DefaultLog(e)
}

14
internal/output/output.go Normal file
View File

@ -0,0 +1,14 @@
package output
import (
"github.com/apex/log"
)
// Progress logs a progress type event
func Progress(key string, perc float64, msg string) {
log.WithFields(log.Fields{
"type": "progress",
"key": key,
"percentage": perc,
}).Info(msg)
}

View File

@ -1,32 +1,127 @@
package groups package groups
import ( import (
"encoding/json"
"github.com/apex/log"
"github.com/openobservatory/gooni/internal/database"
"github.com/openobservatory/gooni/nettests" "github.com/openobservatory/gooni/nettests"
"github.com/openobservatory/gooni/nettests/im"
"github.com/openobservatory/gooni/nettests/middlebox"
"github.com/openobservatory/gooni/nettests/performance" "github.com/openobservatory/gooni/nettests/performance"
"github.com/openobservatory/gooni/nettests/websites" "github.com/openobservatory/gooni/nettests/websites"
) )
// NettestGroup base structure
type NettestGroup struct {
Label string
Nettests []nettests.Nettest
Summary database.ResultSummaryFunc
}
// PerformanceSummary is the result summary for a performance test
type PerformanceSummary struct {
Upload int64
Download int64
Ping float64
Bitrate int64
}
// MiddleboxSummary is the summary for the middlebox tests
type MiddleboxSummary struct {
Detected bool
}
// IMSummary is the summary for the im tests
type IMSummary struct {
Detected bool
}
// NettestGroups that can be run by the user // NettestGroups that can be run by the user
var NettestGroups = map[string]nettests.NettestGroup{ var NettestGroups = map[string]NettestGroup{
"websites": nettests.NettestGroup{ "websites": NettestGroup{
Label: "Websites", Label: "Websites",
Nettests: []nettests.Nettest{ Nettests: []nettests.Nettest{
websites.WebConnectivity{}, websites.WebConnectivity{},
}, },
Summary: func(m database.SummaryMap) (string, error) {
return "{}", nil
}, },
"performance": nettests.NettestGroup{ },
"performance": NettestGroup{
Label: "Performance", Label: "Performance",
Nettests: []nettests.Nettest{ Nettests: []nettests.Nettest{
performance.Dash{}, performance.Dash{},
performance.NDT{}, performance.NDT{},
}, },
Summary: func(m database.SummaryMap) (string, error) {
var (
err error
ndtSummary performance.NDTSummary
dashSummary performance.DashSummary
summary PerformanceSummary
)
err = json.Unmarshal([]byte(m["Dash"]), &dashSummary)
if err != nil {
log.WithError(err).Error("failed to unmarshal Dash summary")
return "", err
}
err = json.Unmarshal([]byte(m["Ndt"]), &ndtSummary)
if err != nil {
log.WithError(err).Error("failed to unmarshal NDT summary")
return "", err
}
summary.Bitrate = dashSummary.Bitrate
summary.Download = ndtSummary.Download
summary.Upload = ndtSummary.Upload
summary.Ping = ndtSummary.AvgRTT
summaryBytes, err := json.Marshal(summary)
if err != nil {
return "", err
}
return string(summaryBytes), nil
}, },
"middleboxes": nettests.NettestGroup{ },
"middlebox": NettestGroup{
Label: "Middleboxes", Label: "Middleboxes",
Nettests: []nettests.Nettest{}, Nettests: []nettests.Nettest{
middlebox.HTTPInvalidRequestLine{},
middlebox.HTTPHeaderFieldManipulation{},
}, },
"im": nettests.NettestGroup{ Summary: func(m database.SummaryMap) (string, error) {
var (
err error
hhfmSummary middlebox.HTTPHeaderFieldManipulationSummary
hirlSummary middlebox.HTTPInvalidRequestLineSummary
summary MiddleboxSummary
)
err = json.Unmarshal([]byte(m["HttpHeaderFieldManipulation"]), &hhfmSummary)
if err != nil {
log.WithError(err).Error("failed to unmarshal hhfm summary")
return "", err
}
err = json.Unmarshal([]byte(m["HttpInvalidRequestLine"]), &hirlSummary)
if err != nil {
log.WithError(err).Error("failed to unmarshal hirl summary")
return "", err
}
summary.Detected = hirlSummary.Tampering == true || hhfmSummary.Tampering == true
summaryBytes, err := json.Marshal(summary)
if err != nil {
return "", err
}
return string(summaryBytes), nil
},
},
"im": NettestGroup{
Label: "Instant Messaging", Label: "Instant Messaging",
Nettests: []nettests.Nettest{}, Nettests: []nettests.Nettest{
im.FacebookMessenger{},
im.Telegram{},
im.WhatsApp{},
},
Summary: func(m database.SummaryMap) (string, error) {
return "{}", nil
},
}, },
} }

View File

@ -0,0 +1,36 @@
package im
import (
"github.com/measurement-kit/go-measurement-kit"
"github.com/openobservatory/gooni/nettests"
)
// FacebookMessenger test implementation
type FacebookMessenger struct {
}
// Run starts the test
func (h FacebookMessenger) Run(ctl *nettests.Controller) error {
mknt := mk.NewNettest("FacebookMessenger")
ctl.Init(mknt)
return mknt.Run()
}
// FacebookMessengerSummary for the test
type FacebookMessengerSummary struct {
DNSBlocking bool
TCPBlocking bool
}
// Summary generates a summary for a test run
func (h FacebookMessenger) Summary(tk map[string]interface{}) interface{} {
return FacebookMessengerSummary{
DNSBlocking: tk["facebook_dns_blocking"].(bool),
TCPBlocking: tk["facebook_tcp_blocking"].(bool),
}
}
// LogSummary writes the summary to the standard output
func (h FacebookMessenger) LogSummary(s string) error {
return nil
}

38
nettests/im/telegram.go Normal file
View File

@ -0,0 +1,38 @@
package im
import (
"github.com/measurement-kit/go-measurement-kit"
"github.com/openobservatory/gooni/nettests"
)
// Telegram test implementation
type Telegram struct {
}
// Run starts the test
func (h Telegram) Run(ctl *nettests.Controller) error {
mknt := mk.NewNettest("Telegram")
ctl.Init(mknt)
return mknt.Run()
}
// TelegramSummary for the test
type TelegramSummary struct {
HTTPBlocking bool
TCPBlocking bool
WebBlocking bool
}
// Summary generates a summary for a test run
func (h Telegram) Summary(tk map[string]interface{}) interface{} {
return TelegramSummary{
TCPBlocking: tk["telegram_tcp_blocking"].(bool) == true,
HTTPBlocking: tk["telegram_http_blocking"].(bool) == true,
WebBlocking: tk["telegram_web_status"].(string) == "blocked",
}
}
// LogSummary writes the summary to the standard output
func (h Telegram) LogSummary(s string) error {
return nil
}

40
nettests/im/whatsapp.go Normal file
View File

@ -0,0 +1,40 @@
package im
import (
"github.com/measurement-kit/go-measurement-kit"
"github.com/openobservatory/gooni/nettests"
)
// WhatsApp test implementation
type WhatsApp struct {
}
// Run starts the test
func (h WhatsApp) Run(ctl *nettests.Controller) error {
mknt := mk.NewNettest("Whatsapp")
ctl.Init(mknt)
return mknt.Run()
}
// WhatsAppSummary for the test
type WhatsAppSummary struct {
RegistrationServerBlocking bool
WebBlocking bool
EndpointsBlocking bool
}
// Summary generates a summary for a test run
func (h WhatsApp) Summary(tk map[string]interface{}) interface{} {
const blk = "blocked"
return WhatsAppSummary{
RegistrationServerBlocking: tk["registration_server_status"].(string) == blk,
WebBlocking: tk["whatsapp_web_status"].(string) == blk,
EndpointsBlocking: tk["whatsapp_endpoints_status"].(string) == blk,
}
}
// LogSummary writes the summary to the standard output
func (h WhatsApp) LogSummary(s string) error {
return nil
}

View File

@ -0,0 +1,43 @@
package middlebox
import (
"github.com/measurement-kit/go-measurement-kit"
"github.com/openobservatory/gooni/nettests"
)
// HTTPHeaderFieldManipulation test implementation
type HTTPHeaderFieldManipulation struct {
}
// Run starts the test
func (h HTTPHeaderFieldManipulation) Run(ctl *nettests.Controller) error {
mknt := mk.NewNettest("HttpHeaderFieldManipulation")
ctl.Init(mknt)
return mknt.Run()
}
// HTTPHeaderFieldManipulationSummary for the test
type HTTPHeaderFieldManipulationSummary struct {
Tampering bool
}
// Summary generates a summary for a test run
func (h HTTPHeaderFieldManipulation) Summary(tk map[string]interface{}) interface{} {
tampering := false
for _, v := range tk["tampering"].(map[string]interface{}) {
t, ok := v.(bool)
// Ignore non booleans in the tampering map
if ok && t == true {
tampering = true
}
}
return HTTPHeaderFieldManipulationSummary{
Tampering: tampering,
}
}
// LogSummary writes the summary to the standard output
func (h HTTPHeaderFieldManipulation) LogSummary(s string) error {
return nil
}

View File

@ -0,0 +1,36 @@
package middlebox
import (
"github.com/measurement-kit/go-measurement-kit"
"github.com/openobservatory/gooni/nettests"
)
// HTTPInvalidRequestLine test implementation
type HTTPInvalidRequestLine struct {
}
// Run starts the test
func (h HTTPInvalidRequestLine) Run(ctl *nettests.Controller) error {
mknt := mk.NewNettest("HttpInvalidRequestLine")
ctl.Init(mknt)
return mknt.Run()
}
// HTTPInvalidRequestLineSummary for the test
type HTTPInvalidRequestLineSummary struct {
Tampering bool
}
// Summary generates a summary for a test run
func (h HTTPInvalidRequestLine) Summary(tk map[string]interface{}) interface{} {
tampering := tk["tampering"].(bool)
return HTTPInvalidRequestLineSummary{
Tampering: tampering,
}
}
// LogSummary writes the summary to the standard output
func (h HTTPInvalidRequestLine) LogSummary(s string) error {
return nil
}

View File

@ -1,11 +1,16 @@
package nettests package nettests
import ( import (
"encoding/json"
"fmt"
"github.com/apex/log" "github.com/apex/log"
"github.com/measurement-kit/go-measurement-kit" "github.com/measurement-kit/go-measurement-kit"
ooni "github.com/openobservatory/gooni" ooni "github.com/openobservatory/gooni"
"github.com/openobservatory/gooni/internal/cli/version" "github.com/openobservatory/gooni/internal/cli/version"
"github.com/openobservatory/gooni/internal/colors"
"github.com/openobservatory/gooni/internal/database" "github.com/openobservatory/gooni/internal/database"
"github.com/openobservatory/gooni/internal/output"
) )
// Nettest interface. Every Nettest should implement this. // Nettest interface. Every Nettest should implement this.
@ -15,19 +20,13 @@ type Nettest interface {
LogSummary(string) error LogSummary(string) error
} }
// NettestGroup base structure
type NettestGroup struct {
Label string
Nettests []Nettest
Summary func(s string) string
}
// NewController creates a nettest controller // NewController creates a nettest controller
func NewController(ctx *ooni.Context, res *database.Result, msmtPath string) *Controller { func NewController(nt Nettest, ctx *ooni.Context, res *database.Result, msmtPath string) *Controller {
return &Controller{ return &Controller{
ctx, Ctx: ctx,
res, nt: nt,
msmtPath, res: res,
msmtPath: msmtPath,
} }
} }
@ -36,12 +35,25 @@ func NewController(ctx *ooni.Context, res *database.Result, msmtPath string) *Co
type Controller struct { type Controller struct {
Ctx *ooni.Context Ctx *ooni.Context
res *database.Result res *database.Result
msmtPath string nt Nettest
msmts map[int64]*database.Measurement
msmtPath string // XXX maybe we can drop this and just use a temporary file
} }
// Init should be called once to initialise the nettest // Init should be called once to initialise the nettest
func (c *Controller) Init(nt *mk.Nettest) error { func (c *Controller) Init(nt *mk.Nettest) error {
log.Debugf("Init: %v", nt) log.Debugf("Init: %v", nt)
c.msmts = make(map[int64]*database.Measurement)
msmtTemplate := database.Measurement{
ASN: "",
IP: "",
CountryCode: "",
ReportID: "",
Name: nt.Name,
ResultID: c.res.ID,
ReportFilePath: c.msmtPath,
}
log.Debugf("OutputPath: %s", c.msmtPath) log.Debugf("OutputPath: %s", c.msmtPath)
nt.Options = mk.NettestOptions{ nt.Options = mk.NettestOptions{
@ -60,8 +72,8 @@ func (c *Controller) Init(nt *mk.Nettest) error {
} }
nt.On("log", func(e mk.Event) { nt.On("log", func(e mk.Event) {
level := e.Value["verbosity"].(string) level := e.Value.LogLevel
msg := e.Value["message"].(string) msg := e.Value.Message
switch level { switch level {
case "ERROR": case "ERROR":
@ -84,36 +96,80 @@ func (c *Controller) Init(nt *mk.Nettest) error {
nt.On("status.report_created", func(e mk.Event) { nt.On("status.report_created", func(e mk.Event) {
log.Debugf("%s", e.Key) log.Debugf("%s", e.Key)
msmtTemplate.ReportID = e.Value.ReportID
}) })
nt.On("status.geoip_lookup", func(e mk.Event) { nt.On("status.geoip_lookup", func(e mk.Event) {
log.Debugf("%s", e.Key) log.Debugf(colors.Red(e.Key))
msmtTemplate.ASN = e.Value.ProbeASN
msmtTemplate.IP = e.Value.ProbeIP
msmtTemplate.CountryCode = e.Value.ProbeCC
})
nt.On("status.measurement_started", func(e mk.Event) {
log.Debugf(colors.Red(e.Key))
idx := e.Value.Idx
msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, e.Value.Input)
if err != nil {
log.WithError(err).Error("Failed to create measurement")
return
}
c.msmts[idx] = msmt
}) })
nt.On("status.progress", func(e mk.Event) { nt.On("status.progress", func(e mk.Event) {
perc := e.Value["percentage"].(float64) log.Debugf(colors.Red(e.Key))
msg := e.Value["message"].(string) c.OnProgress(e.Value.Percentage, e.Value.Message)
c.OnProgress(perc, msg)
}) })
nt.On("status.update.*", func(e mk.Event) { nt.On("status.update.*", func(e mk.Event) {
log.Debugf("%s", e.Key) log.Debugf(colors.Red(e.Key))
}) })
nt.On("failure.measurement", func(e mk.Event) { nt.On("failure.measurement", func(e mk.Event) {
log.Debugf("%s", e.Key) log.Debugf(colors.Red(e.Key))
c.msmts[e.Value.Idx].Failed(c.Ctx.DB, e.Value.Failure)
}) })
nt.On("failure.report_submission", func(e mk.Event) { nt.On("failure.measurement_submission", func(e mk.Event) {
log.Debugf("%s", e.Key) log.Debugf(colors.Red(e.Key))
failure := e.Value.Failure
c.msmts[e.Value.Idx].UploadFailed(c.Ctx.DB, failure)
})
nt.On("status.measurement_uploaded", func(e mk.Event) {
log.Debugf(colors.Red(e.Key))
if err := c.msmts[e.Value.Idx].UploadSucceeded(c.Ctx.DB); err != nil {
log.WithError(err).Error("failed to mark msmt as uploaded")
}
})
nt.On("status.measurement_done", func(e mk.Event) {
log.Debugf(colors.Red(e.Key))
if err := c.msmts[e.Value.Idx].Done(c.Ctx.DB); err != nil {
log.WithError(err).Error("failed to mark msmt as done")
}
}) })
nt.On("measurement", func(e mk.Event) { nt.On("measurement", func(e mk.Event) {
c.OnEntry(e.Value["json_str"].(string)) c.OnEntry(e.Value.Idx, e.Value.JSONStr)
}) })
nt.On("end", func(e mk.Event) { nt.On("status.end", func(e mk.Event) {
c.OnEntry(e.Value["json_str"].(string)) log.Debugf("status.end")
for idx, msmt := range c.msmts {
log.Debugf("adding msmt#%d to result", idx)
if err := msmt.AddToResult(c.Ctx.DB, c.res); err != nil {
log.WithError(err).Error("failed to add to result")
}
}
}) })
return nil return nil
@ -122,11 +178,29 @@ func (c *Controller) Init(nt *mk.Nettest) error {
// OnProgress should be called when a new progress event is available. // OnProgress should be called when a new progress event is available.
func (c *Controller) OnProgress(perc float64, msg string) { func (c *Controller) OnProgress(perc float64, msg string) {
log.Debugf("OnProgress: %f - %s", perc, msg) log.Debugf("OnProgress: %f - %s", perc, msg)
key := fmt.Sprintf("%T", c.nt)
output.Progress(key, perc, msg)
}
// Entry is an opaque measurement entry
type Entry struct {
TestKeys map[string]interface{} `json:"test_keys"`
} }
// OnEntry should be called every time there is a new entry // OnEntry should be called every time there is a new entry
func (c *Controller) OnEntry(jsonStr string) { func (c *Controller) OnEntry(idx int64, jsonStr string) {
log.Debugf("OnEntry: %s", jsonStr) log.Debugf("OnEntry")
var entry Entry
json.Unmarshal([]byte(jsonStr), &entry)
summary := c.nt.Summary(entry.TestKeys)
summaryBytes, err := json.Marshal(summary)
if err != nil {
log.WithError(err).Error("failed to serialize summary")
}
log.Debugf("Fetching: %s %v", idx, c.msmts[idx])
c.msmts[idx].WriteSummary(c.Ctx.DB, string(summaryBytes))
} }
// MKStart is the interface for the mk.Nettest Start() function // MKStart is the interface for the mk.Nettest Start() function

View File

@ -19,9 +19,9 @@ func (d Dash) Run(ctl *nettests.Controller) error {
// DashSummary for the test // DashSummary for the test
// TODO: process 'receiver_data' to provide an array of performance for a chart. // TODO: process 'receiver_data' to provide an array of performance for a chart.
type DashSummary struct { type DashSummary struct {
Latency float32 Latency float64
Bitrate int64 Bitrate int64
Delay float32 Delay float64
} }
// Summary generates a summary for a test run // Summary generates a summary for a test run
@ -29,9 +29,9 @@ func (d Dash) Summary(tk map[string]interface{}) interface{} {
simple := tk["simple"].(map[string]interface{}) simple := tk["simple"].(map[string]interface{})
return DashSummary{ return DashSummary{
Latency: simple["connect_latency"].(float32), Latency: simple["connect_latency"].(float64),
Bitrate: simple["median_bitrate"].(int64), Bitrate: int64(simple["median_bitrate"].(float64)),
Delay: simple["min_playout_delay"].(float32), Delay: simple["min_playout_delay"].(float64),
} }
} }

View File

@ -21,12 +21,12 @@ type NDTSummary struct {
Upload int64 Upload int64
Download int64 Download int64
Ping int64 Ping int64
MaxRTT int64 MaxRTT float64
AvgRTT int64 AvgRTT float64
MinRTT int64 MinRTT float64
MSS int64 MSS int64
OutOfOrder int64 OutOfOrder int64
PacketLoss float32 PacketLoss float64
Timeouts int64 Timeouts int64
} }
@ -36,16 +36,16 @@ func (n NDT) Summary(tk map[string]interface{}) interface{} {
advanced := tk["advanced"].(map[string]interface{}) advanced := tk["advanced"].(map[string]interface{})
return NDTSummary{ return NDTSummary{
Upload: simple["upload"].(int64), Upload: int64(simple["upload"].(float64)),
Download: simple["download"].(int64), Download: int64(simple["download"].(float64)),
Ping: simple["ping"].(int64), Ping: int64(simple["ping"].(float64)),
MaxRTT: advanced["max_rtt"].(int64), MaxRTT: advanced["max_rtt"].(float64),
AvgRTT: advanced["avg_rtt"].(int64), AvgRTT: advanced["avg_rtt"].(float64),
MinRTT: advanced["min_rtt"].(int64), MinRTT: advanced["min_rtt"].(float64),
MSS: advanced["mss"].(int64), MSS: int64(advanced["mss"].(float64)),
OutOfOrder: advanced["out_of_order"].(int64), OutOfOrder: int64(advanced["out_of_order"].(float64)),
PacketLoss: advanced["packet_loss"].(float32), PacketLoss: advanced["packet_loss"].(float64),
Timeouts: advanced["timeouts"].(int64), Timeouts: int64(advanced["timeouts"].(float64)),
} }
} }