Integrate further with ooni/probe-engine: episode two (#46)
* utils/geoip.go: use github.com/ooni/probe-engine
Let's start using the engine by rewriting utils/geoip.go to
be just a thin wrapper around the engine functionality.
* Ready for review
* Checkpoint: the im tests are converted
Still have some doubts with respect to the variables that
are passed to MK via probe-engine. Will double check.
* fix(i/c/r/run.go): write the correct logic
* nettests: one more comment and also fix a format string
* Tweak previous
* progress
* Fix doofus
* better comment
* XXX => actionable comment
* Add glue to simplify test keys management
Making the concept of measurement more abstract in the engine is
not feasible because, when submitting a measurement, we need to
modify it to update the report ID and the measurement ID. Therefore,
returning a serialized measurement is not a good idea. We will
keep using a model.Measurement in the engine.
Changing model.Measurement.TestKeys's type from a `interface{}`
pointing to a well defined data structure to `map[string]interface{}`
is a regression because means that we are moving from code that
has a clear and defined structure to code that is more complicated
to parse and validate. Since we're already suffering havily from
the lack of a good schema, I'm not going to make the situation
worst by worsening the engine. At least for ndt7 and psiphon, we
now have a good schema and I don't want to lose that.
However, the current code in this repository is expecting the
test keys to be a `map[string]interface{}`. This choice was
dictated by the fact that we receive a JSON from Measurement Kit
and by the fact that there's not a clear schema.
To solve this tension, in this commit I am going to write glue
adapter code that makes sure that the TestKeys of a Measurement
are converted to `map[string]interface{}`. This will be done
using a type cast where possible and JSON serialization and parsing
otherwise. In a perfect world, glue is not a good idea, but in a
real world it may actually be useful.
When all tests in the engine will have a clear Go data structure,
we'll then remove the glue and just cast to the proper data
structure from `interface{}` where required.
* nettests/performance: use probe-engine
* go.{mod,sum}: upgrade to latest probe-engine
* nettests/middlebox: use ooni/probe-engine
* Update to the latest probe-engine
* web_connectivity: rewrite to use probe-engine
* Cosmetic change suggested by @hellais
* nettests/nettests.go: remove unused code
* nettests/nettests.go: fix progress
* nettests/nettests.go: remove go-measurement-kit code
* We don't depend on go-measurement-kit anymore
* Improve non-verbose output where possible
See also: https://github.com/measurement-kit/measurement-kit/issues/1856
* Make web_connectivity output pleasant
* Update to the latest probe-engine
* nettests/nettests.go: honour sharing settings
* Update to the latest probe-engine
* Use log.WithFields for probe-engine
* Update go.mod go.sum
* Revert "Update go.mod go.sum"
This reverts commit 5ecd38d8236f4a4e9b77ddb8e8a0d1e3cdd4b818.
* Revert "Revert "Update go.mod go.sum""
This reverts commit 6114b31eca98826112032776bd0feff02d763ecd.
* Upgrade ooni/probe-engine
* Unset GOPATH before running go build commands
* Dockefile: fix linux build by using latest
* Update to the latest ooni/probe-engine
```
go get -u github.com/ooni/probe-engine
go mod tidy
```
* Repair build
This commit is contained in:
committed by
Arturo Filastò
parent
df629237be
commit
b9b555ba68
@@ -1,8 +1,8 @@
|
||||
package im
|
||||
|
||||
import (
|
||||
"github.com/measurement-kit/go-measurement-kit"
|
||||
"github.com/ooni/probe-cli/nettests"
|
||||
"github.com/ooni/probe-engine/experiment/fbmessenger"
|
||||
)
|
||||
|
||||
// FacebookMessenger test implementation
|
||||
@@ -11,9 +11,10 @@ type FacebookMessenger struct {
|
||||
|
||||
// Run starts the test
|
||||
func (h FacebookMessenger) Run(ctl *nettests.Controller) error {
|
||||
mknt := mk.NewNettest("FacebookMessenger")
|
||||
ctl.Init(mknt)
|
||||
return mknt.Run()
|
||||
experiment := fbmessenger.NewExperiment(ctl.Ctx.Session, fbmessenger.Config{
|
||||
LogLevel: "INFO",
|
||||
})
|
||||
return ctl.Run(experiment, []string{""})
|
||||
}
|
||||
|
||||
// FacebookMessengerTestKeys for the test
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package im
|
||||
|
||||
import (
|
||||
"github.com/measurement-kit/go-measurement-kit"
|
||||
"github.com/ooni/probe-cli/nettests"
|
||||
"github.com/ooni/probe-engine/experiment/telegram"
|
||||
)
|
||||
|
||||
// Telegram test implementation
|
||||
@@ -11,9 +11,10 @@ type Telegram struct {
|
||||
|
||||
// Run starts the test
|
||||
func (h Telegram) Run(ctl *nettests.Controller) error {
|
||||
mknt := mk.NewNettest("Telegram")
|
||||
ctl.Init(mknt)
|
||||
return mknt.Run()
|
||||
experiment := telegram.NewExperiment(ctl.Ctx.Session, telegram.Config{
|
||||
LogLevel: "INFO",
|
||||
})
|
||||
return ctl.Run(experiment, []string{""})
|
||||
}
|
||||
|
||||
// TelegramTestKeys for the test
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package im
|
||||
|
||||
import (
|
||||
"github.com/measurement-kit/go-measurement-kit"
|
||||
"github.com/ooni/probe-cli/nettests"
|
||||
"github.com/ooni/probe-engine/experiment/whatsapp"
|
||||
)
|
||||
|
||||
// WhatsApp test implementation
|
||||
@@ -11,9 +11,10 @@ type WhatsApp struct {
|
||||
|
||||
// Run starts the test
|
||||
func (h WhatsApp) Run(ctl *nettests.Controller) error {
|
||||
mknt := mk.NewNettest("Whatsapp")
|
||||
ctl.Init(mknt)
|
||||
return mknt.Run()
|
||||
experiment := whatsapp.NewExperiment(ctl.Ctx.Session, whatsapp.Config{
|
||||
LogLevel: "INFO",
|
||||
})
|
||||
return ctl.Run(experiment, []string{""})
|
||||
}
|
||||
|
||||
// WhatsAppTestKeys for the test
|
||||
|
||||
@@ -3,8 +3,8 @@ package middlebox
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/measurement-kit/go-measurement-kit"
|
||||
"github.com/ooni/probe-cli/nettests"
|
||||
"github.com/ooni/probe-engine/experiment/hhfm"
|
||||
)
|
||||
|
||||
// HTTPHeaderFieldManipulation test implementation
|
||||
@@ -13,9 +13,10 @@ type HTTPHeaderFieldManipulation struct {
|
||||
|
||||
// Run starts the test
|
||||
func (h HTTPHeaderFieldManipulation) Run(ctl *nettests.Controller) error {
|
||||
mknt := mk.NewNettest("HttpHeaderFieldManipulation")
|
||||
ctl.Init(mknt)
|
||||
return mknt.Run()
|
||||
experiment := hhfm.NewExperiment(ctl.Ctx.Session, hhfm.Config{
|
||||
LogLevel: "INFO",
|
||||
})
|
||||
return ctl.Run(experiment, []string{""})
|
||||
}
|
||||
|
||||
// HTTPHeaderFieldManipulationTestKeys for the test
|
||||
|
||||
@@ -3,8 +3,8 @@ package middlebox
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/measurement-kit/go-measurement-kit"
|
||||
"github.com/ooni/probe-cli/nettests"
|
||||
"github.com/ooni/probe-engine/experiment/hirl"
|
||||
)
|
||||
|
||||
// HTTPInvalidRequestLine test implementation
|
||||
@@ -13,9 +13,10 @@ type HTTPInvalidRequestLine struct {
|
||||
|
||||
// Run starts the test
|
||||
func (h HTTPInvalidRequestLine) Run(ctl *nettests.Controller) error {
|
||||
mknt := mk.NewNettest("HttpInvalidRequestLine")
|
||||
ctl.Init(mknt)
|
||||
return mknt.Run()
|
||||
experiment := hirl.NewExperiment(ctl.Ctx.Session, hirl.Config{
|
||||
LogLevel: "INFO",
|
||||
})
|
||||
return ctl.Run(experiment, []string{""})
|
||||
}
|
||||
|
||||
// HTTPInvalidRequestLineTestKeys for the test
|
||||
|
||||
+121
-218
@@ -1,22 +1,23 @@
|
||||
package nettests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/apex/log"
|
||||
"github.com/fatih/color"
|
||||
"github.com/measurement-kit/go-measurement-kit"
|
||||
ooni "github.com/ooni/probe-cli"
|
||||
"github.com/ooni/probe-cli/internal/crashreport"
|
||||
"github.com/ooni/probe-cli/internal/database"
|
||||
"github.com/ooni/probe-cli/internal/enginex"
|
||||
"github.com/ooni/probe-cli/internal/output"
|
||||
"github.com/ooni/probe-cli/utils"
|
||||
"github.com/ooni/probe-cli/utils/strcase"
|
||||
"github.com/ooni/probe-cli/version"
|
||||
"github.com/ooni/probe-engine/experiment"
|
||||
"github.com/ooni/probe-engine/experiment/handler"
|
||||
"github.com/ooni/probe-engine/model"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Nettest interface. Every Nettest should implement this.
|
||||
@@ -50,6 +51,12 @@ type Controller struct {
|
||||
msmts map[int64]*database.Measurement
|
||||
msmtPath string // XXX maybe we can drop this and just use a temporary file
|
||||
inputIdxMap map[int64]int64 // Used to map mk idx to database id
|
||||
|
||||
// numInputs is the total number of inputs
|
||||
numInputs int
|
||||
|
||||
// curInputIdx is the current input index
|
||||
curInputIdx int
|
||||
}
|
||||
|
||||
// SetInputIdxMap is used to set the mapping of index into input. This mapping
|
||||
@@ -67,252 +74,148 @@ func (c *Controller) SetNettestIndex(i, n int) {
|
||||
c.ntIndex = i
|
||||
}
|
||||
|
||||
// Init should be called once to initialise the nettest
|
||||
func (c *Controller) Init(nt *mk.Nettest) error {
|
||||
log.Debugf("Init: %v", nt)
|
||||
err := c.Ctx.MaybeLocationLookup()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Run runs the selected nettest using the related experiment
|
||||
// with the specified inputs.
|
||||
//
|
||||
// This function will continue to run in most cases but will
|
||||
// immediately halt if something's wrong with the file system.
|
||||
func (c *Controller) Run(exp *experiment.Experiment, inputs []string) error {
|
||||
ctx := context.Background()
|
||||
|
||||
// This will configure the controller as handler for the callbacks
|
||||
// called by ooni/probe-engine/experiment.Experiment.
|
||||
exp.Callbacks = handler.Callbacks(c)
|
||||
c.numInputs = len(inputs)
|
||||
|
||||
c.msmts = make(map[int64]*database.Measurement)
|
||||
|
||||
// These values are shared by every measurement
|
||||
reportID := sql.NullString{String: "", Valid: false}
|
||||
testName := strcase.ToSnake(nt.Name)
|
||||
var reportID sql.NullString
|
||||
resultID := c.res.ID
|
||||
reportFilePath := c.msmtPath
|
||||
geoIPCountryPath := c.Ctx.Session.CountryDatabasePath()
|
||||
geoIPASNPath := c.Ctx.Session.ASNDatabasePath()
|
||||
msmtPath := c.msmtPath
|
||||
|
||||
log.Debugf("OutputPath: %s", msmtPath)
|
||||
nt.Options = mk.NettestOptions{
|
||||
IncludeIP: c.Ctx.Config.Sharing.IncludeIP,
|
||||
IncludeASN: c.Ctx.Config.Sharing.IncludeASN,
|
||||
IncludeCountry: c.Ctx.Config.Sharing.IncludeCountry,
|
||||
LogLevel: "INFO",
|
||||
log.Debug(color.RedString("status.queued"))
|
||||
log.Debug(color.RedString("status.started"))
|
||||
log.Debugf("OutputPath: %s", c.msmtPath)
|
||||
|
||||
ProbeCC: c.Ctx.Session.ProbeCC(),
|
||||
ProbeASN: c.Ctx.Session.ProbeASNString(),
|
||||
ProbeIP: c.Ctx.Session.ProbeIP(),
|
||||
|
||||
DisableReportFile: false,
|
||||
DisableCollector: !c.Ctx.Config.Sharing.UploadResults,
|
||||
RandomizeInput: false, // It's important to disable input randomization to ensure the URLs are written in sync to the DB
|
||||
SoftwareName: "ooniprobe-desktop",
|
||||
SoftwareVersion: version.Version,
|
||||
CollectorBaseURL: c.Ctx.Config.Advanced.CollectorURL,
|
||||
BouncerBaseURL: c.Ctx.Config.Advanced.BouncerURL,
|
||||
|
||||
OutputPath: msmtPath,
|
||||
GeoIPCountryPath: geoIPCountryPath,
|
||||
GeoIPASNPath: geoIPASNPath,
|
||||
CaBundlePath: c.Ctx.Session.CABundlePath(),
|
||||
if c.Ctx.Config.Sharing.UploadResults {
|
||||
if err := exp.OpenReport(ctx); err != nil {
|
||||
log.Debugf(
|
||||
"%s: %s", color.RedString("failure.report_create"), err.Error(),
|
||||
)
|
||||
} else {
|
||||
defer exp.CloseReport(ctx)
|
||||
log.Debugf(color.RedString("status.report_create"))
|
||||
reportID = sql.NullString{String: exp.ReportID(), Valid: true}
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("CaBundlePath: %s", nt.Options.CaBundlePath)
|
||||
log.Debugf("GeoIPASNPath: %s", nt.Options.GeoIPASNPath)
|
||||
log.Debugf("GeoIPCountryPath: %s", nt.Options.GeoIPCountryPath)
|
||||
|
||||
nt.On("log", func(e mk.Event) {
|
||||
level := e.Value.LogLevel
|
||||
msg := e.Value.Message
|
||||
|
||||
switch level {
|
||||
case "ERROR":
|
||||
log.Errorf("%v: %s", color.RedString("mklog"), msg)
|
||||
case "INFO":
|
||||
log.Infof("%v: %s", color.BlueString("mklog"), msg)
|
||||
default:
|
||||
log.Debugf("%v: %s", color.WhiteString("mklog"), msg)
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
nt.On("status.queued", func(e mk.Event) {
|
||||
log.Debugf("%s", e.Key)
|
||||
})
|
||||
|
||||
nt.On("status.started", func(e mk.Event) {
|
||||
log.Debugf("%s", e.Key)
|
||||
})
|
||||
|
||||
nt.On("status.report_create", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
|
||||
reportID = sql.NullString{String: e.Value.ReportID, Valid: true}
|
||||
})
|
||||
|
||||
nt.On("failure.report_create", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
log.Debugf("%v", e.Value)
|
||||
})
|
||||
|
||||
nt.On("status.geoip_lookup", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
})
|
||||
|
||||
nt.On("status.measurement_start", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
idx := e.Value.Idx
|
||||
urlID := sql.NullInt64{Int64: 0, Valid: false}
|
||||
for idx, input := range inputs {
|
||||
c.curInputIdx = idx // allow for precise progress
|
||||
idx64 := int64(idx)
|
||||
log.Debug(color.RedString("status.measurement_start"))
|
||||
var urlID sql.NullInt64
|
||||
if c.inputIdxMap != nil {
|
||||
urlID = sql.NullInt64{Int64: c.inputIdxMap[idx], Valid: true}
|
||||
urlID = sql.NullInt64{Int64: c.inputIdxMap[idx64], Valid: true}
|
||||
}
|
||||
msmt, err := database.CreateMeasurement(c.Ctx.DB, reportID, testName, resultID, reportFilePath, urlID)
|
||||
msmt, err := database.CreateMeasurement(
|
||||
c.Ctx.DB, reportID, exp.TestName, resultID, c.msmtPath, urlID,
|
||||
)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to create measurement")
|
||||
return
|
||||
return errors.Wrap(err, "failed to create measurement")
|
||||
}
|
||||
c.msmts[idx] = msmt
|
||||
})
|
||||
c.msmts[idx64] = msmt
|
||||
|
||||
nt.On("status.progress", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
perc := e.Value.Percentage
|
||||
if c.ntCount > 0 {
|
||||
perc = float64(c.ntIndex)/float64(c.ntCount) + perc/float64(c.ntCount)
|
||||
}
|
||||
c.OnProgress(perc, e.Value.Message)
|
||||
})
|
||||
|
||||
nt.On("status.update.*", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
})
|
||||
|
||||
// XXX should these be made into permanent failures?
|
||||
nt.On("failure.asn_lookup", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
log.Debugf("%v", e.Value)
|
||||
})
|
||||
nt.On("failure.cc_lookup", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
log.Debugf("%v", e.Value)
|
||||
})
|
||||
nt.On("failure.ip_lookup", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
log.Debugf("%v", e.Value)
|
||||
})
|
||||
|
||||
nt.On("failure.resolver_lookup", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
log.Debugf("%v", e.Value)
|
||||
})
|
||||
|
||||
nt.On("failure.report_close", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
log.Debugf("%v", e.Value)
|
||||
})
|
||||
|
||||
nt.On("failure.startup", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
|
||||
c.msmts[e.Value.Idx].Failed(c.Ctx.DB, e.Value.Failure)
|
||||
})
|
||||
|
||||
nt.On("failure.measurement", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
|
||||
c.msmts[e.Value.Idx].Failed(c.Ctx.DB, e.Value.Failure)
|
||||
})
|
||||
|
||||
nt.On("failure.measurement_submission", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
|
||||
failure := e.Value.Failure
|
||||
c.msmts[e.Value.Idx].UploadFailed(c.Ctx.DB, failure)
|
||||
})
|
||||
|
||||
nt.On("status.measurement_submission", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
|
||||
// XXX maybe this should change once MK is aligned with the spec
|
||||
if c.Ctx.Config.Sharing.UploadResults == true {
|
||||
if err := c.msmts[e.Value.Idx].UploadSucceeded(c.Ctx.DB); err != nil {
|
||||
log.WithError(err).Error("failed to mark msmt as uploaded")
|
||||
measurement, err := exp.Measure(ctx, input)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug(color.RedString("failure.measurement"))
|
||||
if err := c.msmts[idx64].Failed(c.Ctx.DB, err.Error()); err != nil {
|
||||
return errors.Wrap(err, "failed to mark measurement as failed")
|
||||
}
|
||||
continue
|
||||
}
|
||||
})
|
||||
|
||||
nt.On("status.measurement_done", func(e mk.Event) {
|
||||
log.Debugf(color.RedString(e.Key))
|
||||
|
||||
if err := c.msmts[e.Value.Idx].Done(c.Ctx.DB); err != nil {
|
||||
log.WithError(err).Error("failed to mark msmt as done")
|
||||
// Make sure we share what the user wants us to share.
|
||||
if c.Ctx.Config.Sharing.IncludeIP == false {
|
||||
measurement.ProbeIP = model.DefaultProbeIP
|
||||
}
|
||||
if c.Ctx.Config.Sharing.IncludeASN == false {
|
||||
measurement.ProbeASN = fmt.Sprintf("AS%d", model.DefaultProbeASN)
|
||||
}
|
||||
if c.Ctx.Config.Sharing.IncludeCountry == false {
|
||||
measurement.ProbeCC = model.DefaultProbeCC
|
||||
}
|
||||
})
|
||||
|
||||
nt.On("measurement", func(e mk.Event) {
|
||||
log.Debugf("status.end")
|
||||
|
||||
crashreport.CapturePanicAndWait(func() {
|
||||
c.OnEntry(e.Value.Idx, e.Value.JSONStr)
|
||||
}, nil)
|
||||
})
|
||||
|
||||
nt.On("status.end", func(e mk.Event) {
|
||||
log.Debugf("status.end")
|
||||
|
||||
for idx, msmt := range c.msmts {
|
||||
log.Debugf("adding msmt#%d to result", idx)
|
||||
if err := msmt.AddToResult(c.Ctx.DB, c.res); err != nil {
|
||||
log.WithError(err).Error("failed to add to result")
|
||||
if c.Ctx.Config.Sharing.UploadResults {
|
||||
// Implementation note: SubmitMeasurement will fail here if we did fail
|
||||
// to open the report but we still want to continue. There will be a
|
||||
// bit of a spew in the logs, perhaps, but stopping seems less efficient.
|
||||
if err := exp.SubmitMeasurement(ctx, &measurement); err != nil {
|
||||
log.Debug(color.RedString("failure.measurement_submission"))
|
||||
if err := c.msmts[idx64].UploadFailed(c.Ctx.DB, err.Error()); err != nil {
|
||||
return errors.Wrap(err, "failed to mark upload as failed")
|
||||
}
|
||||
} else if err := c.msmts[idx64].UploadSucceeded(c.Ctx.DB); err != nil {
|
||||
return errors.Wrap(err, "failed to mark upload as succeeded")
|
||||
}
|
||||
}
|
||||
|
||||
if e.Value.Failure != "" {
|
||||
log.Errorf("Failure in status.end: %s", e.Value.Failure)
|
||||
if err := exp.SaveMeasurement(measurement, c.msmtPath); err != nil {
|
||||
return errors.Wrap(err, "failed to save measurement on disk")
|
||||
}
|
||||
if err := c.msmts[idx64].Done(c.Ctx.DB); err != nil {
|
||||
return errors.Wrap(err, "failed to mark measurement as done")
|
||||
}
|
||||
|
||||
c.res.DataUsageDown += e.Value.DownloadedKB
|
||||
c.res.DataUsageUp += e.Value.UploadedKB
|
||||
})
|
||||
// We're not sure whether it's enough to log the error or we should
|
||||
// instead also mark the measurement as failed. Strictly speaking this
|
||||
// is an inconsistency between the code that generate the measurement
|
||||
// and the code that process the measurement. We do have some data
|
||||
// but we're not gonna have a summary. To be reconsidered.
|
||||
genericTk, err := enginex.MakeGenericTestKeys(measurement)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("failed to cast the test keys")
|
||||
continue
|
||||
}
|
||||
tk, err := c.nt.GetTestKeys(genericTk)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("failed to obtain testKeys")
|
||||
continue
|
||||
}
|
||||
log.Debugf("Fetching: %d %v", idx, c.msmts[idx64])
|
||||
if err := database.AddTestKeys(c.Ctx.DB, c.msmts[idx64], tk); err != nil {
|
||||
return errors.Wrap(err, "failed to add test keys to summary")
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("Registered all the handlers")
|
||||
log.Debugf("status.end")
|
||||
for idx, msmt := range c.msmts {
|
||||
log.Debugf("adding msmt#%d to result", idx)
|
||||
if err := msmt.AddToResult(c.Ctx.DB, c.res); err != nil {
|
||||
return errors.Wrap(err, "failed to add to result")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnProgress should be called when a new progress event is available.
|
||||
func (c *Controller) OnProgress(perc float64, msg string) {
|
||||
log.Debugf("OnProgress: %f - %s", perc, msg)
|
||||
|
||||
if c.numInputs >= 1 {
|
||||
// make the percentage relative to the current input over all inputs
|
||||
floor := (float64(c.curInputIdx) / float64(c.numInputs))
|
||||
step := 1.0 / float64(c.numInputs)
|
||||
perc = floor + perc * step
|
||||
}
|
||||
if c.ntCount > 0 {
|
||||
// make the percentage relative to the current nettest over all nettests
|
||||
perc = float64(c.ntIndex)/float64(c.ntCount) + perc/float64(c.ntCount)
|
||||
}
|
||||
key := fmt.Sprintf("%T", c.nt)
|
||||
output.Progress(key, perc, msg)
|
||||
}
|
||||
|
||||
// Entry is an opaque measurement entry
|
||||
type Entry struct {
|
||||
TestKeys map[string]interface{} `json:"test_keys"`
|
||||
}
|
||||
|
||||
// OnEntry should be called every time there is a new entry
|
||||
func (c *Controller) OnEntry(idx int64, jsonStr string) {
|
||||
log.Debugf("OnEntry")
|
||||
|
||||
var entry Entry
|
||||
if err := json.Unmarshal([]byte(jsonStr), &entry); err != nil {
|
||||
log.WithError(err).Error("failed to parse onEntry")
|
||||
return
|
||||
}
|
||||
// XXX is it correct to just log the error instead of marking the whole
|
||||
// measurement as failed?
|
||||
tk, err := c.nt.GetTestKeys(entry.TestKeys)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("failed to obtain testKeys")
|
||||
}
|
||||
|
||||
log.Debugf("Fetching: %s %v", idx, c.msmts[idx])
|
||||
err = database.AddTestKeys(c.Ctx.DB, c.msmts[idx], tk)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("failed to add test keys to summary")
|
||||
}
|
||||
}
|
||||
|
||||
// MKStart is the interface for the mk.Nettest Start() function
|
||||
type MKStart func(name string) (chan bool, error)
|
||||
|
||||
// Start should be called to start the test
|
||||
func (c *Controller) Start(f MKStart) {
|
||||
log.Debugf("MKStart: %s", f)
|
||||
// OnDataUsage should be called when we have a data usage update.
|
||||
func (c *Controller) OnDataUsage(dloadKiB, uploadKiB float64) {
|
||||
c.res.DataUsageDown += dloadKiB
|
||||
c.res.DataUsageUp += uploadKiB
|
||||
}
|
||||
|
||||
@@ -3,8 +3,8 @@ package performance
|
||||
import (
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/measurement-kit/go-measurement-kit"
|
||||
"github.com/ooni/probe-cli/nettests"
|
||||
"github.com/ooni/probe-engine/experiment/dash"
|
||||
)
|
||||
|
||||
// Dash test implementation
|
||||
@@ -13,9 +13,10 @@ type Dash struct {
|
||||
|
||||
// Run starts the test
|
||||
func (d Dash) Run(ctl *nettests.Controller) error {
|
||||
dash := mk.NewNettest("Dash")
|
||||
ctl.Init(dash)
|
||||
return dash.Run()
|
||||
experiment := dash.NewExperiment(
|
||||
ctl.Ctx.Session, dash.Config{},
|
||||
)
|
||||
return ctl.Run(experiment, []string{""})
|
||||
}
|
||||
|
||||
// DashTestKeys for the test
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package performance
|
||||
|
||||
import (
|
||||
"github.com/measurement-kit/go-measurement-kit"
|
||||
"github.com/ooni/probe-cli/nettests"
|
||||
"github.com/ooni/probe-engine/experiment/ndt"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
@@ -12,9 +12,10 @@ type NDT struct {
|
||||
|
||||
// Run starts the test
|
||||
func (n NDT) Run(ctl *nettests.Controller) error {
|
||||
nt := mk.NewNettest("Ndt")
|
||||
ctl.Init(nt)
|
||||
return nt.Run()
|
||||
experiment := ndt.NewExperiment(
|
||||
ctl.Ctx.Session, ndt.Config{},
|
||||
)
|
||||
return ctl.Run(experiment, []string{""})
|
||||
}
|
||||
|
||||
// NDTTestKeys for the test
|
||||
|
||||
@@ -1,63 +1,32 @@
|
||||
package websites
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"context"
|
||||
|
||||
"github.com/apex/log"
|
||||
"github.com/measurement-kit/go-measurement-kit"
|
||||
"github.com/ooni/probe-cli/internal/database"
|
||||
"github.com/ooni/probe-cli/nettests"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/ooni/probe-engine/experiment/web_connectivity"
|
||||
"github.com/ooni/probe-engine/orchestra/testlists"
|
||||
)
|
||||
|
||||
// URLInfo contains the URL and the citizenlab category code for that URL
|
||||
type URLInfo struct {
|
||||
URL string `json:"url"`
|
||||
CountryCode string `json:"country_code"`
|
||||
CategoryCode string `json:"category_code"`
|
||||
}
|
||||
|
||||
// URLResponse is the orchestrate url response containing a list of URLs
|
||||
type URLResponse struct {
|
||||
Results []URLInfo `json:"results"`
|
||||
}
|
||||
|
||||
const orchestrateBaseURL = "https://events.proteus.test.ooni.io"
|
||||
|
||||
func lookupURLs(ctl *nettests.Controller) ([]string, map[int64]int64, error) {
|
||||
var (
|
||||
parsed = new(URLResponse)
|
||||
urls []string
|
||||
)
|
||||
var urls []string
|
||||
urlIDMap := make(map[int64]int64)
|
||||
log.Debug("Looking up URLs")
|
||||
// XXX pass in the configuration for category codes
|
||||
reqURL := fmt.Sprintf("%s/api/v1/urls?probe_cc=%s",
|
||||
orchestrateBaseURL,
|
||||
ctl.Ctx.Session.ProbeCC())
|
||||
|
||||
resp, err := http.Get(reqURL)
|
||||
testlist, err := testlists.NewClient(ctl.Ctx.Session).Do(
|
||||
context.Background(), ctl.Ctx.Session.ProbeCC(),
|
||||
)
|
||||
if err != nil {
|
||||
return urls, urlIDMap, errors.Wrap(err, "failed to perform request")
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return urls, urlIDMap, errors.Wrap(err, "failed to read response body")
|
||||
}
|
||||
err = json.Unmarshal([]byte(body), &parsed)
|
||||
if err != nil {
|
||||
return urls, urlIDMap, errors.Wrap(err, "failed to parse json")
|
||||
}
|
||||
|
||||
for idx, url := range parsed.Results {
|
||||
for idx, url := range testlist {
|
||||
log.Debugf("Going over URL %d", idx)
|
||||
urlID, err := database.CreateOrUpdateURL(ctl.Ctx.DB, url.URL, url.CategoryCode, url.CountryCode)
|
||||
urlID, err := database.CreateOrUpdateURL(
|
||||
ctl.Ctx.DB, url.URL, url.CategoryCode, url.CountryCode,
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("failed to add to the URL table")
|
||||
return nil, nil, err
|
||||
}
|
||||
log.Debugf("Mapped URL %s to idx %d and urlID %d", url.URL, idx, urlID)
|
||||
urlIDMap[int64(idx)] = urlID
|
||||
@@ -72,17 +41,16 @@ type WebConnectivity struct {
|
||||
|
||||
// Run starts the test
|
||||
func (n WebConnectivity) Run(ctl *nettests.Controller) error {
|
||||
nt := mk.NewNettest("WebConnectivity")
|
||||
ctl.Init(nt)
|
||||
|
||||
urls, urlIDMap, err := lookupURLs(ctl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctl.SetInputIdxMap(urlIDMap)
|
||||
nt.Options.Inputs = urls
|
||||
|
||||
return nt.Run()
|
||||
experiment := web_connectivity.NewExperiment(
|
||||
ctl.Ctx.Session,
|
||||
web_connectivity.Config{LogLevel: "INFO"},
|
||||
)
|
||||
return ctl.Run(experiment, urls)
|
||||
}
|
||||
|
||||
// WebConnectivityTestKeys for the test
|
||||
|
||||
Reference in New Issue
Block a user