diff --git a/Gopkg.lock b/Gopkg.lock index ff5baa2..9a5b79e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -89,7 +89,7 @@ branch = "master" name = "github.com/measurement-kit/go-measurement-kit" packages = ["."] - revision = "ae6643546db7c99bbc6ecb37b86d04baec04d295" + revision = "6ae2401a8e498a90ccdd3edbda1841add079b70e" [[projects]] branch = "master" diff --git a/Readme.md b/Readme.md index cda215a..618a6b9 100644 --- a/Readme.md +++ b/Readme.md @@ -33,3 +33,22 @@ was built as ``` CGO_LDFLAGS="-L/path/to/measurement-kit/.libs/" CGO_CFLAGS="-I/path/to/measurement-kit/include" make build ``` + +## Todo + +* Add support for outputing structured logging messages, while tests are + running, to be consumed by the desktop app + +* Add support for the missing tests, namely: +- HTTP Invalid Request Line +- HTTP Header Field Manipulation +- Facebook Messenger +- Telegram +- WhatsApp +- WebConnectivity + +* Fix issue with the informed consent being bypassed on first run + +* Finish the config file implementation + +* Add support for listing results in the CLI diff --git a/data/migrations/1_create_msmt_results.sql b/data/migrations/1_create_msmt_results.sql index dbe2d39..6e65b0b 100644 --- a/data/migrations/1_create_msmt_results.sql +++ b/data/migrations/1_create_msmt_results.sql @@ -13,7 +13,7 @@ CREATE TABLE `results` ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `name` VARCHAR(255), `start_time` DATETIME, - `end_time` DATETIME, + `runtime` REAL, `summary` JSON, `done` TINYINT(1), `data_usage_up` INTEGER, @@ -24,18 +24,19 @@ CREATE TABLE `measurements` ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `name` VARCHAR(255), `start_time` DATETIME, - `end_time` DATETIME, + `runtime` REAL, `summary` JSON, `ip` VARCHAR(255), - `asn` INTEGER, + `asn` VARCHAR(16), `country` VARCHAR(2), `network_name` VARCHAR(255), `state` TEXT, `failure` VARCHAR(255), + `upload_failure` VARCHAR(255), + `uploaded` TINYINT(1), `report_file` VARCHAR(255), `report_id` VARCHAR(255), `input` VARCHAR(255), - `measurement_id` VARCHAR(255), `result_id` INTEGER REFERENCES `results` (`id`) ON DELETE SET NULL ON UPDATE CASCADE ); diff --git a/internal/bindata/bindata.go b/internal/bindata/bindata.go index e18f02c..45ee45f 100644 --- a/internal/bindata/bindata.go +++ b/internal/bindata/bindata.go @@ -130,19 +130,20 @@ func bindataDataDefaultconfigjson() (*asset, error) { } var _bindataDataMigrations1createmsmtresultssql = []byte( - "\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x93\x4f\x6f\xf2\x30\x0c\xc6\xef\xfd\x14\x3e\x82\xde\x97\xc3\x26\x71" + - "\xe2\x14\x5a\x6f\xeb\x56\x52\x94\xa6\xd3\x38\xb5\xd1\x1a\x50\x34\x9a\x56\x69\x22\xb4\x6f\x3f\x85\x7f\x2a\xac\x70" + - "\xde\xd5\xbf\xc7\x8e\xfd\xd8\x99\x4c\xe0\x5f\xad\x36\x46\x58\x09\x51\xb3\xd3\x41\x3f\x90\x59\x61\x65\x2d\xb5\x9d" + - "\xcb\x8d\xd2\x41\x10\xb1\x74\x09\x9c\xcc\x13\x84\xd2\xc8\xce\x6d\x6d\x57\xce\x2e\xa2\xb5\x14\x9d\x33\xfb\x1c\x8f" + - "\x86\xab\xa1\xae\x2e\x49\xde\xde\x7d\x36\x64\x48\x38\x5e\x3f\x0c\xa3\x00\x00\xa0\x54\x55\x09\x31\xe5\xf8\x8c\x0c" + - "\x96\x2c\x5e\x10\xb6\x82\x37\x5c\x01\xc9\x79\x1a\xd3\x90\xe1\x02\x29\xff\x7f\xd0\x6a\x51\xcb\x12\xde\x09\x0b\x5f" + - "\x08\x1b\x3d\x4e\xa7\xe3\x23\xe8\xac\x30\xb6\xb0\xca\xe3\x88\x70\xe4\xf1\x02\x8f\x48\xea\x6a\x18\x74\xae\xae\x85" + - "\xf9\x2e\xe1\x35\x4b\xe9\x31\x56\x35\x5a\x96\xc0\x63\xba\x8a\x29\x1f\x3d\x9c\xca\x57\xc2\x8a\xc2\x75\x62\x23\x0b" + - "\xd7\x9e\xdb\xfd\x0d\xab\x66\xa7\xcf\x38\x18\xcf\xae\x67\xbf\xb0\xf7\x4f\x1a\xa0\xda\xc1\xf2\xa2\xd3\xd7\x63\x7f" + - "\x36\x4e\x5b\x9f\x7d\x96\x9f\xc4\x5a\xda\x5d\x63\xbe\x8a\x7b\xcd\x5a\x6f\x33\x7e\x9c\xe6\x5a\x0b\xb5\x75\x66\x58" + - "\x6d\x64\xdb\x18\x5b\xac\xd5\xf6\x2e\xf7\x36\x0e\x50\xa5\x5b\x67\x07\x49\x6f\x19\xb7\x92\x0f\xb7\x5a\xf4\x37\xc4" + - "\xf0\x09\x19\xd2\x10\xb3\xfe\x29\xfb\x25\x8e\x21\xa5\x10\x61\x82\x1c\x21\x43\x0e\x34\x4f\x12\x1f\xca\x97\xde\x77" + - "\x08\x49\x16\x92\x08\xf7\x57\x71\xf3\x5b\xfd\x04\x00\x00\xff\xff\xc4\x16\x0a\x20\xcf\x03\x00\x00") + "\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x93\x31\xef\xda\x30\x10\xc5\xf7\x7c\x8a\x1b\x41\x2d\x03\x95\xe8\xc2" + + "\x64\x92\x6b\x9b\x36\x38\xc8\x71\xaa\x32\x25\x56\x63\x90\xd5\xc4\x89\x1c\x5b\xa8\xdf\xbe\x32\x24\x14\x68\xa0\xeb" + + "\x7f\x7d\xbf\xbb\x67\x9f\xdf\x79\xb1\x80\x77\x8d\x3a\x1a\x61\x25\x44\xed\x49\x07\xb7\x42\x66\x85\x95\x8d\xd4\x76" + + "\x23\x8f\x4a\x07\x41\xc4\xd2\x1d\x70\xb2\x49\x10\x4a\x23\x7b\x57\xdb\xbe\x5c\xdf\xa9\x8d\x14\xbd\x33\xe7\x1e\x8f" + + "\xa6\xdd\x50\x57\xf7\x24\xef\x5e\x1e\x1b\x32\x24\x1c\x1f\x0f\x86\x59\x00\x00\x50\xaa\xaa\x84\x98\x72\xfc\x8c\x0c" + + "\x76\x2c\xde\x12\xb6\x87\x6f\xb8\x07\x92\xf3\x34\xa6\x21\xc3\x2d\x52\xfe\xfe\x52\xab\x45\x23\x4b\xf8\x4e\x58\xf8" + + "\x85\xb0\xd9\x87\xd5\x6a\x3e\x80\xde\x0a\x63\x0b\xab\x3c\x8e\x08\x47\x1e\x6f\x71\x40\xc6\xe9\x8b\xce\x90\x24\x63" + + "\xb9\x6b\x1a\x61\x7e\x97\xf0\x35\x4b\xe9\xa0\x55\xad\x96\x25\xf0\x98\xee\x63\xca\x67\xcb\xd1\xb9\x12\x56\x14\xae" + + "\x17\x47\x59\xb8\xee\x7a\xd3\x7f\x61\xd5\x9e\xf4\x15\x07\xf3\xf5\xe3\xd8\x77\x2f\xfb\xd6\x66\x57\xdd\xa4\xb3\xe8" + + "\xf5\x5f\x7d\xf9\x71\x94\x7f\xb6\x4e\x5b\xef\x70\x6d\x19\x89\x96\xf6\xd4\x9a\x5f\xc5\xab\xbb\x5a\xff\xca\xf8\x63" + + "\x1c\xeb\x20\x54\xed\xcc\x74\xb5\xeb\xea\x56\x54\xc5\xff\x4b\x64\x35\x91\x9c\x91\x5d\x6b\x6c\x71\x50\xf5\x74\xeb" + + "\xc0\x7d\x06\x13\x54\xe9\xce\xd9\x27\x7d\x7e\x81\x8b\xdb\xec\x18\x7e\x42\x86\x34\xc4\xec\x76\xbf\x7d\xbc\x73\x48" + + "\x29\x44\x98\x20\x47\xc8\x90\x03\xcd\x93\xc4\x4b\xf9\xce\x47\x05\x21\xc9\x42\x12\xe1\x79\x5f\x9e\xfe\xb5\x3f\x01" + + "\x00\x00\xff\xff\x29\xbd\x69\x4f\xe4\x03\x00\x00") func bindataDataMigrations1createmsmtresultssqlBytes() ([]byte, error) { return bindataRead( diff --git a/internal/cli/run/run.go b/internal/cli/run/run.go index d65e019..c5368ba 100644 --- a/internal/cli/run/run.go +++ b/internal/cli/run/run.go @@ -1,6 +1,7 @@ package run import ( + "errors" "fmt" "path/filepath" "time" @@ -25,7 +26,11 @@ func init() { log.Errorf("%s", err) return err } - group := groups.NettestGroups[*nettestGroup] + group, ok := groups.NettestGroups[*nettestGroup] + if !ok { + log.Errorf("No test group named %s", *nettestGroup) + return errors.New("invalid test group name") + } log.Debugf("Running test group %s", group.Label) result, err := database.CreateResult(ctx.DB, database.Result{ @@ -40,22 +45,18 @@ func init() { for _, nt := range group.Nettests { log.Debugf("Running test %T", nt) msmtPath := filepath.Join(ctx.TempDir, - fmt.Sprintf("msmt-%s-%T.jsonl", nt, + fmt.Sprintf("msmt-%T-%s.jsonl", nt, time.Now().UTC().Format(time.RFC3339Nano))) - ctl := nettests.NewController(ctx, result, msmtPath) - if err := nt.Run(ctl); err != nil { + ctl := nettests.NewController(nt, ctx, result, msmtPath) + if err = nt.Run(ctl); err != nil { log.WithError(err).Errorf("Failed to run %s", group.Label) return err } - // XXX - // 1. Generate the summary - // 2. Link the measurement to the Result (this should probably happen in - // the nettest class) - // 3. Update the summary of the result and the other metadata in the db - // 4. Move the msmtPath into the final location ~/.ooni/msmts/ } - // result.Update(ctx.DB) + if err = result.Finished(ctx.DB, group.Summary); err != nil { + return err + } return nil }) } diff --git a/internal/database/models.go b/internal/database/models.go index c32bcca..d575df6 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -1,30 +1,58 @@ package database import ( + "fmt" + "os" + "path/filepath" "time" "github.com/apex/log" "github.com/jmoiron/sqlx" + ooni "github.com/openobservatory/gooni" "github.com/pkg/errors" ) +// ResultSummaryFunc is the function used to generate result summaries +type ResultSummaryFunc func(SummaryMap) (string, error) + +// SummaryMap contains a mapping from test name to serialized summary for it +type SummaryMap map[string]string + +// UpdateOne will run the specified update query and check that it only affected one row +func UpdateOne(db *sqlx.DB, query string, arg interface{}) error { + res, err := db.NamedExec(query, arg) + + if err != nil { + return errors.Wrap(err, "updating table") + } + count, err := res.RowsAffected() + if err != nil { + return errors.Wrap(err, "updating table") + } + if count != 1 { + return errors.New("inconsistent update count") + } + return nil +} + // Measurement model type Measurement struct { ID int64 `db:"id"` Name string `db:"name"` StartTime time.Time `db:"start_time"` - EndTime time.Time `db:"end_time"` + Runtime float64 `db:"runtime"` // Fractional number of seconds Summary string `db:"summary"` // XXX this should be JSON - ASN int64 `db:"asn"` + ASN string `db:"asn"` IP string `db:"ip"` CountryCode string `db:"country"` State string `db:"state"` Failure string `db:"failure"` + UploadFailure string `db:"upload_failure"` + Uploaded bool `db:"uploaded"` ReportFilePath string `db:"report_file"` ReportID string `db:"report_id"` Input string `db:"input"` - MeasurementID string `db:"measurement_id"` - ResultID string `db:"result_id"` + ResultID int64 `db:"result_id"` } // SetGeoIPInfo for the Measurement @@ -32,14 +60,108 @@ func (m *Measurement) SetGeoIPInfo() error { return nil } +// Failed writes the error string to the measurement +func (m *Measurement) Failed(db *sqlx.DB, failure string) error { + m.Failure = failure + + err := UpdateOne(db, `UPDATE measurements + SET failure = :failure, state = :state + WHERE id = :id`, m) + if err != nil { + return errors.Wrap(err, "updating measurement") + } + return nil +} + +// Done marks the measurement as completed +func (m *Measurement) Done(db *sqlx.DB) error { + runtime := time.Now().UTC().Sub(m.StartTime) + m.Runtime = runtime.Seconds() + m.State = "done" + + err := UpdateOne(db, `UPDATE measurements + SET state = :state, runtime = :runtime + WHERE id = :id`, m) + if err != nil { + return errors.Wrap(err, "updating measurement") + } + return nil +} + +// UploadFailed writes the error string for the upload failure to the measurement +func (m *Measurement) UploadFailed(db *sqlx.DB, failure string) error { + m.UploadFailure = failure + m.Uploaded = false + + err := UpdateOne(db, `UPDATE measurements + SET upload_failure = :upload_failure + WHERE id = :id`, m) + if err != nil { + return errors.Wrap(err, "updating measurement") + } + return nil +} + +// UploadSucceeded writes the error string for the upload failure to the measurement +func (m *Measurement) UploadSucceeded(db *sqlx.DB) error { + m.Uploaded = true + + err := UpdateOne(db, `UPDATE measurements + SET uploaded = :uploaded + WHERE id = :id`, m) + if err != nil { + return errors.Wrap(err, "updating measurement") + } + return nil +} + +// WriteSummary writes the summary to the measurement +func (m *Measurement) WriteSummary(db *sqlx.DB, summary string) error { + m.Summary = summary + + err := UpdateOne(db, `UPDATE measurements + SET summary = :summary + WHERE id = :id`, m) + if err != nil { + return errors.Wrap(err, "updating measurement") + } + return nil +} + +// AddToResult adds a measurement to a result +func (m *Measurement) AddToResult(db *sqlx.DB, result *Result) error { + m.ResultID = result.ID + finalPath := filepath.Join(result.MeasurementDir, + filepath.Base(m.ReportFilePath)) + + err := os.Rename(m.ReportFilePath, finalPath) + if err != nil { + return errors.Wrap(err, "moving report file") + } + m.ReportFilePath = finalPath + + err = UpdateOne(db, `UPDATE measurements + SET result_id = :result_id, report_file = :report_file + WHERE id = :id`, m) + if err != nil { + return errors.Wrap(err, "updating measurement") + } + return nil +} + // CreateMeasurement writes the measurement to the database a returns a pointer // to the Measurement -func CreateMeasurement(db *sqlx.DB, m Measurement) (*Measurement, error) { +func CreateMeasurement(db *sqlx.DB, m Measurement, i string) (*Measurement, error) { + // XXX Do we want to have this be part of something else? + m.StartTime = time.Now().UTC() + m.Input = i + m.State = "active" + res, err := db.NamedExec(`INSERT INTO measurements (name, start_time, - summary, asn, ip, country, + asn, ip, country, state, failure, report_file, - report_id, input, measurement_id, + report_id, input, result_id) VALUES (:name,:start_time, :asn,:ip,:country, @@ -60,52 +182,92 @@ func CreateMeasurement(db *sqlx.DB, m Measurement) (*Measurement, error) { // Result model type Result struct { - ID int64 `db:"id"` - Name string `db:"name"` - StartTime time.Time `db:"start_time"` - Runtime float64 `db:"runtime"` // Runtime is expressed in Microseconds - Summary string `db:"summary"` // XXX this should be JSON - Done bool `db:"done"` - DataUsageUp int64 `db:"data_usage_up"` - DataUsageDown int64 `db:"data_usage_down"` - - started time.Time + ID int64 `db:"id"` + Name string `db:"name"` + StartTime time.Time `db:"start_time"` + Runtime float64 `db:"runtime"` // Runtime is expressed in fractional seconds + Summary string `db:"summary"` // XXX this should be JSON + Done bool `db:"done"` + DataUsageUp int64 `db:"data_usage_up"` + DataUsageDown int64 `db:"data_usage_down"` + MeasurementDir string `db:"measurement_dir"` } -// Started marks the Result as having started -func (r *Result) Started(db *sqlx.DB) error { - r.started = time.Now() - return nil +// MakeSummaryMap return a mapping of test names to summaries for the given +// result +func MakeSummaryMap(db *sqlx.DB, r *Result) (SummaryMap, error) { + summaryMap := SummaryMap{} + + msmts := []Measurement{} + // XXX maybe we only want to select some of the columns + err := db.Select(&msmts, "SELECT name, summary FROM measurements WHERE result_id = $1", r.ID) + if err != nil { + return nil, errors.Wrap(err, "failed to get measurements") + } + for _, msmt := range msmts { + summaryMap[msmt.Name] = msmt.Summary + } + return summaryMap, nil } // Finished marks the result as done and sets the runtime -func (r *Result) Finished(db *sqlx.DB) error { +func (r *Result) Finished(db *sqlx.DB, makeSummary ResultSummaryFunc) error { if r.Done == true || r.Runtime != 0 { return errors.New("Result is already finished") } - r.Runtime = float64(time.Now().Sub(r.started)) / float64(time.Microsecond) + r.Runtime = time.Now().UTC().Sub(r.StartTime).Seconds() r.Done = true + // XXX add in here functionality to compute the summary + summaryMap, err := MakeSummaryMap(db, r) + if err != nil { + return err + } - res, err := db.NamedExec(`UPDATE results - SET done = true, runtime = :runtime + r.Summary, err = makeSummary(summaryMap) + if err != nil { + return err + } + + err = UpdateOne(db, `UPDATE results + SET done = :done, runtime = :runtime, summary = :summary WHERE id = :id`, r) if err != nil { - return errors.Wrap(err, "updating result") - } - count, err := res.RowsAffected() - if err != nil { - return errors.Wrap(err, "updating result") - } - if count != 1 { - return errors.New("inconsistent update count") + return errors.Wrap(err, "updating finished result") } return nil } +// MakeResultsPath creates and returns a directory for the result +func MakeResultsPath(r *Result) (string, error) { + home, err := ooni.GetOONIHome() + if err != nil { + return "", errors.Wrap(err, "default measurements path") + } + p := filepath.Join(home, "msmts", + fmt.Sprintf("%s-%s", r.Name, r.StartTime.Format(time.RFC3339Nano))) + + // If the path already exists, this is a problem. It should not clash, because + // we are using nanosecond precision for the starttime. + if _, e := os.Stat(p); e == nil { + return "", errors.New("results path already exists") + } + err = os.MkdirAll(p, 0700) + if err != nil { + return "", err + } + return p, nil +} + // CreateResult writes the Result to the database a returns a pointer // to the Result func CreateResult(db *sqlx.DB, r Result) (*Result, error) { log.Debugf("Creating result %v", r) + + p, err := MakeResultsPath(&r) + if err != nil { + return nil, err + } + r.MeasurementDir = p res, err := db.NamedExec(`INSERT INTO results (name, start_time) VALUES (:name,:start_time)`, diff --git a/internal/log/handlers/cli/cli.go b/internal/log/handlers/cli/cli.go index 10d7fba..7e137ba 100644 --- a/internal/log/handlers/cli/cli.go +++ b/internal/log/handlers/cli/cli.go @@ -60,15 +60,25 @@ func New(w io.Writer) *Handler { } } -// HandleLog implements log.Handler. -func (h *Handler) HandleLog(e *log.Entry) error { +// TypedLog is used for handling special "typed" logs to the CLI +func (h *Handler) TypedLog(t string, e *log.Entry) error { + switch t { + case "progress": + // XXX replace this with something more fancy like https://github.com/tj/go-progress + fmt.Fprintf(h.Writer, "%.1f%% [%s]: %s", e.Fields.Get("percentage").(float64)*100, e.Fields.Get("key"), e.Message) + fmt.Fprintln(h.Writer) + return nil + default: + return h.DefaultLog(e) + } +} + +// DefaultLog is the default way of printing out logs +func (h *Handler) DefaultLog(e *log.Entry) error { color := Colors[e.Level] level := Strings[e.Level] names := e.Fields.Names() - h.mu.Lock() - defer h.mu.Unlock() - color.Fprintf(h.Writer, "%s %-25s", bold.Sprintf("%*s", h.Padding+1, level), e.Message) for _, name := range names { @@ -82,3 +92,16 @@ func (h *Handler) HandleLog(e *log.Entry) error { return nil } + +// HandleLog implements log.Handler. +func (h *Handler) HandleLog(e *log.Entry) error { + h.mu.Lock() + defer h.mu.Unlock() + + t, isTyped := e.Fields["type"].(string) + if isTyped { + return h.TypedLog(t, e) + } + + return h.DefaultLog(e) +} diff --git a/internal/output/output.go b/internal/output/output.go new file mode 100644 index 0000000..bf76672 --- /dev/null +++ b/internal/output/output.go @@ -0,0 +1,14 @@ +package output + +import ( + "github.com/apex/log" +) + +// Progress logs a progress type event +func Progress(key string, perc float64, msg string) { + log.WithFields(log.Fields{ + "type": "progress", + "key": key, + "percentage": perc, + }).Info(msg) +} diff --git a/nettests/groups/groups.go b/nettests/groups/groups.go index a76f867..2eb0440 100644 --- a/nettests/groups/groups.go +++ b/nettests/groups/groups.go @@ -1,32 +1,127 @@ package groups import ( + "encoding/json" + + "github.com/apex/log" + "github.com/openobservatory/gooni/internal/database" "github.com/openobservatory/gooni/nettests" + "github.com/openobservatory/gooni/nettests/im" + "github.com/openobservatory/gooni/nettests/middlebox" "github.com/openobservatory/gooni/nettests/performance" "github.com/openobservatory/gooni/nettests/websites" ) +// NettestGroup base structure +type NettestGroup struct { + Label string + Nettests []nettests.Nettest + Summary database.ResultSummaryFunc +} + +// PerformanceSummary is the result summary for a performance test +type PerformanceSummary struct { + Upload int64 + Download int64 + Ping float64 + Bitrate int64 +} + +// MiddleboxSummary is the summary for the middlebox tests +type MiddleboxSummary struct { + Detected bool +} + +// IMSummary is the summary for the im tests +type IMSummary struct { + Detected bool +} + // NettestGroups that can be run by the user -var NettestGroups = map[string]nettests.NettestGroup{ - "websites": nettests.NettestGroup{ +var NettestGroups = map[string]NettestGroup{ + "websites": NettestGroup{ Label: "Websites", Nettests: []nettests.Nettest{ websites.WebConnectivity{}, }, + Summary: func(m database.SummaryMap) (string, error) { + return "{}", nil + }, }, - "performance": nettests.NettestGroup{ + "performance": NettestGroup{ Label: "Performance", Nettests: []nettests.Nettest{ performance.Dash{}, performance.NDT{}, }, + Summary: func(m database.SummaryMap) (string, error) { + var ( + err error + ndtSummary performance.NDTSummary + dashSummary performance.DashSummary + summary PerformanceSummary + ) + err = json.Unmarshal([]byte(m["Dash"]), &dashSummary) + if err != nil { + log.WithError(err).Error("failed to unmarshal Dash summary") + return "", err + } + err = json.Unmarshal([]byte(m["Ndt"]), &ndtSummary) + if err != nil { + log.WithError(err).Error("failed to unmarshal NDT summary") + return "", err + } + summary.Bitrate = dashSummary.Bitrate + summary.Download = ndtSummary.Download + summary.Upload = ndtSummary.Upload + summary.Ping = ndtSummary.AvgRTT + summaryBytes, err := json.Marshal(summary) + if err != nil { + return "", err + } + return string(summaryBytes), nil + }, }, - "middleboxes": nettests.NettestGroup{ - Label: "Middleboxes", - Nettests: []nettests.Nettest{}, + "middlebox": NettestGroup{ + Label: "Middleboxes", + Nettests: []nettests.Nettest{ + middlebox.HTTPInvalidRequestLine{}, + middlebox.HTTPHeaderFieldManipulation{}, + }, + Summary: func(m database.SummaryMap) (string, error) { + var ( + err error + hhfmSummary middlebox.HTTPHeaderFieldManipulationSummary + hirlSummary middlebox.HTTPInvalidRequestLineSummary + summary MiddleboxSummary + ) + err = json.Unmarshal([]byte(m["HttpHeaderFieldManipulation"]), &hhfmSummary) + if err != nil { + log.WithError(err).Error("failed to unmarshal hhfm summary") + return "", err + } + err = json.Unmarshal([]byte(m["HttpInvalidRequestLine"]), &hirlSummary) + if err != nil { + log.WithError(err).Error("failed to unmarshal hirl summary") + return "", err + } + summary.Detected = hirlSummary.Tampering == true || hhfmSummary.Tampering == true + summaryBytes, err := json.Marshal(summary) + if err != nil { + return "", err + } + return string(summaryBytes), nil + }, }, - "im": nettests.NettestGroup{ - Label: "Instant Messaging", - Nettests: []nettests.Nettest{}, + "im": NettestGroup{ + Label: "Instant Messaging", + Nettests: []nettests.Nettest{ + im.FacebookMessenger{}, + im.Telegram{}, + im.WhatsApp{}, + }, + Summary: func(m database.SummaryMap) (string, error) { + return "{}", nil + }, }, } diff --git a/nettests/im/facebook_messenger.go b/nettests/im/facebook_messenger.go new file mode 100644 index 0000000..bb7fcac --- /dev/null +++ b/nettests/im/facebook_messenger.go @@ -0,0 +1,36 @@ +package im + +import ( + "github.com/measurement-kit/go-measurement-kit" + "github.com/openobservatory/gooni/nettests" +) + +// FacebookMessenger test implementation +type FacebookMessenger struct { +} + +// Run starts the test +func (h FacebookMessenger) Run(ctl *nettests.Controller) error { + mknt := mk.NewNettest("FacebookMessenger") + ctl.Init(mknt) + return mknt.Run() +} + +// FacebookMessengerSummary for the test +type FacebookMessengerSummary struct { + DNSBlocking bool + TCPBlocking bool +} + +// Summary generates a summary for a test run +func (h FacebookMessenger) Summary(tk map[string]interface{}) interface{} { + return FacebookMessengerSummary{ + DNSBlocking: tk["facebook_dns_blocking"].(bool), + TCPBlocking: tk["facebook_tcp_blocking"].(bool), + } +} + +// LogSummary writes the summary to the standard output +func (h FacebookMessenger) LogSummary(s string) error { + return nil +} diff --git a/nettests/im/telegram.go b/nettests/im/telegram.go new file mode 100644 index 0000000..503d55c --- /dev/null +++ b/nettests/im/telegram.go @@ -0,0 +1,38 @@ +package im + +import ( + "github.com/measurement-kit/go-measurement-kit" + "github.com/openobservatory/gooni/nettests" +) + +// Telegram test implementation +type Telegram struct { +} + +// Run starts the test +func (h Telegram) Run(ctl *nettests.Controller) error { + mknt := mk.NewNettest("Telegram") + ctl.Init(mknt) + return mknt.Run() +} + +// TelegramSummary for the test +type TelegramSummary struct { + HTTPBlocking bool + TCPBlocking bool + WebBlocking bool +} + +// Summary generates a summary for a test run +func (h Telegram) Summary(tk map[string]interface{}) interface{} { + return TelegramSummary{ + TCPBlocking: tk["telegram_tcp_blocking"].(bool) == true, + HTTPBlocking: tk["telegram_http_blocking"].(bool) == true, + WebBlocking: tk["telegram_web_status"].(string) == "blocked", + } +} + +// LogSummary writes the summary to the standard output +func (h Telegram) LogSummary(s string) error { + return nil +} diff --git a/nettests/im/whatsapp.go b/nettests/im/whatsapp.go new file mode 100644 index 0000000..5dc26ba --- /dev/null +++ b/nettests/im/whatsapp.go @@ -0,0 +1,40 @@ +package im + +import ( + "github.com/measurement-kit/go-measurement-kit" + "github.com/openobservatory/gooni/nettests" +) + +// WhatsApp test implementation +type WhatsApp struct { +} + +// Run starts the test +func (h WhatsApp) Run(ctl *nettests.Controller) error { + mknt := mk.NewNettest("Whatsapp") + ctl.Init(mknt) + return mknt.Run() +} + +// WhatsAppSummary for the test +type WhatsAppSummary struct { + RegistrationServerBlocking bool + WebBlocking bool + EndpointsBlocking bool +} + +// Summary generates a summary for a test run +func (h WhatsApp) Summary(tk map[string]interface{}) interface{} { + const blk = "blocked" + + return WhatsAppSummary{ + RegistrationServerBlocking: tk["registration_server_status"].(string) == blk, + WebBlocking: tk["whatsapp_web_status"].(string) == blk, + EndpointsBlocking: tk["whatsapp_endpoints_status"].(string) == blk, + } +} + +// LogSummary writes the summary to the standard output +func (h WhatsApp) LogSummary(s string) error { + return nil +} diff --git a/nettests/middlebox/http_header_field_manipulation.go b/nettests/middlebox/http_header_field_manipulation.go new file mode 100644 index 0000000..c416985 --- /dev/null +++ b/nettests/middlebox/http_header_field_manipulation.go @@ -0,0 +1,43 @@ +package middlebox + +import ( + "github.com/measurement-kit/go-measurement-kit" + "github.com/openobservatory/gooni/nettests" +) + +// HTTPHeaderFieldManipulation test implementation +type HTTPHeaderFieldManipulation struct { +} + +// Run starts the test +func (h HTTPHeaderFieldManipulation) Run(ctl *nettests.Controller) error { + mknt := mk.NewNettest("HttpHeaderFieldManipulation") + ctl.Init(mknt) + return mknt.Run() +} + +// HTTPHeaderFieldManipulationSummary for the test +type HTTPHeaderFieldManipulationSummary struct { + Tampering bool +} + +// Summary generates a summary for a test run +func (h HTTPHeaderFieldManipulation) Summary(tk map[string]interface{}) interface{} { + tampering := false + for _, v := range tk["tampering"].(map[string]interface{}) { + t, ok := v.(bool) + // Ignore non booleans in the tampering map + if ok && t == true { + tampering = true + } + } + + return HTTPHeaderFieldManipulationSummary{ + Tampering: tampering, + } +} + +// LogSummary writes the summary to the standard output +func (h HTTPHeaderFieldManipulation) LogSummary(s string) error { + return nil +} diff --git a/nettests/middlebox/http_invalid_request_line.go b/nettests/middlebox/http_invalid_request_line.go new file mode 100644 index 0000000..8300fa1 --- /dev/null +++ b/nettests/middlebox/http_invalid_request_line.go @@ -0,0 +1,36 @@ +package middlebox + +import ( + "github.com/measurement-kit/go-measurement-kit" + "github.com/openobservatory/gooni/nettests" +) + +// HTTPInvalidRequestLine test implementation +type HTTPInvalidRequestLine struct { +} + +// Run starts the test +func (h HTTPInvalidRequestLine) Run(ctl *nettests.Controller) error { + mknt := mk.NewNettest("HttpInvalidRequestLine") + ctl.Init(mknt) + return mknt.Run() +} + +// HTTPInvalidRequestLineSummary for the test +type HTTPInvalidRequestLineSummary struct { + Tampering bool +} + +// Summary generates a summary for a test run +func (h HTTPInvalidRequestLine) Summary(tk map[string]interface{}) interface{} { + tampering := tk["tampering"].(bool) + + return HTTPInvalidRequestLineSummary{ + Tampering: tampering, + } +} + +// LogSummary writes the summary to the standard output +func (h HTTPInvalidRequestLine) LogSummary(s string) error { + return nil +} diff --git a/nettests/nettests.go b/nettests/nettests.go index 9ce9262..bc1cae1 100644 --- a/nettests/nettests.go +++ b/nettests/nettests.go @@ -1,11 +1,16 @@ package nettests import ( + "encoding/json" + "fmt" + "github.com/apex/log" "github.com/measurement-kit/go-measurement-kit" ooni "github.com/openobservatory/gooni" "github.com/openobservatory/gooni/internal/cli/version" + "github.com/openobservatory/gooni/internal/colors" "github.com/openobservatory/gooni/internal/database" + "github.com/openobservatory/gooni/internal/output" ) // Nettest interface. Every Nettest should implement this. @@ -15,19 +20,13 @@ type Nettest interface { LogSummary(string) error } -// NettestGroup base structure -type NettestGroup struct { - Label string - Nettests []Nettest - Summary func(s string) string -} - // NewController creates a nettest controller -func NewController(ctx *ooni.Context, res *database.Result, msmtPath string) *Controller { +func NewController(nt Nettest, ctx *ooni.Context, res *database.Result, msmtPath string) *Controller { return &Controller{ - ctx, - res, - msmtPath, + Ctx: ctx, + nt: nt, + res: res, + msmtPath: msmtPath, } } @@ -36,12 +35,25 @@ func NewController(ctx *ooni.Context, res *database.Result, msmtPath string) *Co type Controller struct { Ctx *ooni.Context res *database.Result - msmtPath string + nt Nettest + msmts map[int64]*database.Measurement + msmtPath string // XXX maybe we can drop this and just use a temporary file } // Init should be called once to initialise the nettest func (c *Controller) Init(nt *mk.Nettest) error { log.Debugf("Init: %v", nt) + c.msmts = make(map[int64]*database.Measurement) + + msmtTemplate := database.Measurement{ + ASN: "", + IP: "", + CountryCode: "", + ReportID: "", + Name: nt.Name, + ResultID: c.res.ID, + ReportFilePath: c.msmtPath, + } log.Debugf("OutputPath: %s", c.msmtPath) nt.Options = mk.NettestOptions{ @@ -60,8 +72,8 @@ func (c *Controller) Init(nt *mk.Nettest) error { } nt.On("log", func(e mk.Event) { - level := e.Value["verbosity"].(string) - msg := e.Value["message"].(string) + level := e.Value.LogLevel + msg := e.Value.Message switch level { case "ERROR": @@ -84,36 +96,80 @@ func (c *Controller) Init(nt *mk.Nettest) error { nt.On("status.report_created", func(e mk.Event) { log.Debugf("%s", e.Key) + + msmtTemplate.ReportID = e.Value.ReportID }) nt.On("status.geoip_lookup", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) + + msmtTemplate.ASN = e.Value.ProbeASN + msmtTemplate.IP = e.Value.ProbeIP + msmtTemplate.CountryCode = e.Value.ProbeCC + }) + + nt.On("status.measurement_started", func(e mk.Event) { + log.Debugf(colors.Red(e.Key)) + + idx := e.Value.Idx + msmt, err := database.CreateMeasurement(c.Ctx.DB, msmtTemplate, e.Value.Input) + if err != nil { + log.WithError(err).Error("Failed to create measurement") + return + } + c.msmts[idx] = msmt }) nt.On("status.progress", func(e mk.Event) { - perc := e.Value["percentage"].(float64) - msg := e.Value["message"].(string) - c.OnProgress(perc, msg) + log.Debugf(colors.Red(e.Key)) + c.OnProgress(e.Value.Percentage, e.Value.Message) }) nt.On("status.update.*", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) }) nt.On("failure.measurement", func(e mk.Event) { - log.Debugf("%s", e.Key) + log.Debugf(colors.Red(e.Key)) + + c.msmts[e.Value.Idx].Failed(c.Ctx.DB, e.Value.Failure) }) - nt.On("failure.report_submission", func(e mk.Event) { - log.Debugf("%s", e.Key) + nt.On("failure.measurement_submission", func(e mk.Event) { + log.Debugf(colors.Red(e.Key)) + + failure := e.Value.Failure + c.msmts[e.Value.Idx].UploadFailed(c.Ctx.DB, failure) + }) + + nt.On("status.measurement_uploaded", func(e mk.Event) { + log.Debugf(colors.Red(e.Key)) + + if err := c.msmts[e.Value.Idx].UploadSucceeded(c.Ctx.DB); err != nil { + log.WithError(err).Error("failed to mark msmt as uploaded") + } + }) + + nt.On("status.measurement_done", func(e mk.Event) { + log.Debugf(colors.Red(e.Key)) + + if err := c.msmts[e.Value.Idx].Done(c.Ctx.DB); err != nil { + log.WithError(err).Error("failed to mark msmt as done") + } }) nt.On("measurement", func(e mk.Event) { - c.OnEntry(e.Value["json_str"].(string)) + c.OnEntry(e.Value.Idx, e.Value.JSONStr) }) - nt.On("end", func(e mk.Event) { - c.OnEntry(e.Value["json_str"].(string)) + nt.On("status.end", func(e mk.Event) { + log.Debugf("status.end") + for idx, msmt := range c.msmts { + log.Debugf("adding msmt#%d to result", idx) + if err := msmt.AddToResult(c.Ctx.DB, c.res); err != nil { + log.WithError(err).Error("failed to add to result") + } + } }) return nil @@ -122,11 +178,29 @@ func (c *Controller) Init(nt *mk.Nettest) error { // OnProgress should be called when a new progress event is available. func (c *Controller) OnProgress(perc float64, msg string) { log.Debugf("OnProgress: %f - %s", perc, msg) + + key := fmt.Sprintf("%T", c.nt) + output.Progress(key, perc, msg) +} + +// Entry is an opaque measurement entry +type Entry struct { + TestKeys map[string]interface{} `json:"test_keys"` } // OnEntry should be called every time there is a new entry -func (c *Controller) OnEntry(jsonStr string) { - log.Debugf("OnEntry: %s", jsonStr) +func (c *Controller) OnEntry(idx int64, jsonStr string) { + log.Debugf("OnEntry") + + var entry Entry + json.Unmarshal([]byte(jsonStr), &entry) + summary := c.nt.Summary(entry.TestKeys) + summaryBytes, err := json.Marshal(summary) + if err != nil { + log.WithError(err).Error("failed to serialize summary") + } + log.Debugf("Fetching: %s %v", idx, c.msmts[idx]) + c.msmts[idx].WriteSummary(c.Ctx.DB, string(summaryBytes)) } // MKStart is the interface for the mk.Nettest Start() function diff --git a/nettests/performance/dash.go b/nettests/performance/dash.go index d64efc8..b3e3a87 100644 --- a/nettests/performance/dash.go +++ b/nettests/performance/dash.go @@ -19,9 +19,9 @@ func (d Dash) Run(ctl *nettests.Controller) error { // DashSummary for the test // TODO: process 'receiver_data' to provide an array of performance for a chart. type DashSummary struct { - Latency float32 + Latency float64 Bitrate int64 - Delay float32 + Delay float64 } // Summary generates a summary for a test run @@ -29,9 +29,9 @@ func (d Dash) Summary(tk map[string]interface{}) interface{} { simple := tk["simple"].(map[string]interface{}) return DashSummary{ - Latency: simple["connect_latency"].(float32), - Bitrate: simple["median_bitrate"].(int64), - Delay: simple["min_playout_delay"].(float32), + Latency: simple["connect_latency"].(float64), + Bitrate: int64(simple["median_bitrate"].(float64)), + Delay: simple["min_playout_delay"].(float64), } } diff --git a/nettests/performance/ndt.go b/nettests/performance/ndt.go index c6425ed..4c8159d 100644 --- a/nettests/performance/ndt.go +++ b/nettests/performance/ndt.go @@ -21,12 +21,12 @@ type NDTSummary struct { Upload int64 Download int64 Ping int64 - MaxRTT int64 - AvgRTT int64 - MinRTT int64 + MaxRTT float64 + AvgRTT float64 + MinRTT float64 MSS int64 OutOfOrder int64 - PacketLoss float32 + PacketLoss float64 Timeouts int64 } @@ -36,16 +36,16 @@ func (n NDT) Summary(tk map[string]interface{}) interface{} { advanced := tk["advanced"].(map[string]interface{}) return NDTSummary{ - Upload: simple["upload"].(int64), - Download: simple["download"].(int64), - Ping: simple["ping"].(int64), - MaxRTT: advanced["max_rtt"].(int64), - AvgRTT: advanced["avg_rtt"].(int64), - MinRTT: advanced["min_rtt"].(int64), - MSS: advanced["mss"].(int64), - OutOfOrder: advanced["out_of_order"].(int64), - PacketLoss: advanced["packet_loss"].(float32), - Timeouts: advanced["timeouts"].(int64), + Upload: int64(simple["upload"].(float64)), + Download: int64(simple["download"].(float64)), + Ping: int64(simple["ping"].(float64)), + MaxRTT: advanced["max_rtt"].(float64), + AvgRTT: advanced["avg_rtt"].(float64), + MinRTT: advanced["min_rtt"].(float64), + MSS: int64(advanced["mss"].(float64)), + OutOfOrder: int64(advanced["out_of_order"].(float64)), + PacketLoss: advanced["packet_loss"].(float64), + Timeouts: int64(advanced["timeouts"].(float64)), } }