feat(webstepsx): websteps using measurex (#530)

This diff adds the prototype websteps implementation that used
to live at https://github.com/ooni/probe-cli/pull/506.

The code is reasonably good already and it's pointing to a roaming
test helper that I've properly configured.

You can run websteps with:

```
./miniooni -n websteps
```

This will go over the test list for your country.

At this stage the mechanics of the experiment is set, but we
still need to have a conversation on the following topics:

1. whether we're okay with reusing the data format used by other
OONI experiments, or we would like to use a more compact data
format (which may either be a more compact JSON or we can choose
to always submit compressed measurements for websteps);

2. the extent to which we would like to keep the measurement as
a collection of "the experiment saw this" and "the test helper
saw that" and let the pipeline choose an overall score: this is
clearly an option, but there is also the opposite option to
build a summary of the measurement on the probe.

Compared to the previous prototype of websteps, the main
architectural change we have here is that we are following
the point of view of the probe and the test helper is
much more dumb. Basically, the probe will choose which
redirection to follow and ask the test helper every time
it discovers a new URL to measure it w/o redirections.

Reference issue: https://github.com/ooni/probe/issues/1733
This commit is contained in:
Simone Basso 2021-09-30 02:06:27 +02:00 committed by GitHub
parent d45e58c14f
commit ba9151d4fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 650 additions and 10 deletions

View File

@ -11,7 +11,9 @@ import (
"github.com/apex/log"
"github.com/ooni/probe-cli/v3/internal/cmd/oohelper/internal"
"github.com/ooni/probe-cli/v3/internal/engine/experiment/webstepsx"
"github.com/ooni/probe-cli/v3/internal/engine/netx"
"github.com/ooni/probe-cli/v3/internal/measurex"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)
@ -20,8 +22,9 @@ var (
debug = flag.Bool("debug", false, "Toggle debug mode")
httpClient *http.Client
resolver netx.Resolver
server = flag.String("server", "https://wcth.ooni.io/", "URL of the test helper")
server = flag.String("server", "", "URL of the test helper")
target = flag.String("target", "", "Target URL for the test helper")
fwebsteps = flag.Bool("websteps", false, "Use the websteps TH")
)
func newhttpclient() *http.Client {
@ -43,18 +46,49 @@ func init() {
}
func main() {
defer cancel()
logmap := map[bool]log.Level{
true: log.DebugLevel,
false: log.InfoLevel,
}
flag.Parse()
log.SetLevel(logmap[*debug])
clnt := internal.OOClient{HTTPClient: httpClient, Resolver: resolver}
config := internal.OOConfig{TargetURL: *target, ServerURL: *server}
defer cancel()
cresp, err := clnt.Do(ctx, config)
runtimex.PanicOnError(err, "client.Do failed")
apimap := map[bool]func() interface{}{
false: wcth,
true: webstepsth,
}
cresp := apimap[*fwebsteps]()
data, err := json.MarshalIndent(cresp, "", " ")
runtimex.PanicOnError(err, "json.MarshalIndent failed")
fmt.Printf("%s\n", string(data))
}
func webstepsth() interface{} {
serverURL := *server
if serverURL == "" {
serverURL = "https://1.th.ooni.org/api/v1/websteps"
}
clnt := &webstepsx.THClient{
DNServers: []*measurex.ResolverInfo{{
Network: "udp",
Address: "8.8.4.4:53",
}},
HTTPClient: httpClient,
ServerURL: serverURL,
}
cresp, err := clnt.Run(ctx, *target)
runtimex.PanicOnError(err, "client.Run failed")
return cresp
}
func wcth() interface{} {
serverURL := *server
if serverURL == "" {
serverURL = "https://wcth.ooni.io/"
}
clnt := internal.OOClient{HTTPClient: httpClient, Resolver: resolver}
config := internal.OOConfig{TargetURL: *target, ServerURL: serverURL}
cresp, err := clnt.Do(ctx, config)
runtimex.PanicOnError(err, "client.Do failed")
return cresp
}

View File

@ -11,6 +11,7 @@ import (
"github.com/apex/log"
"github.com/ooni/probe-cli/v3/internal/cmd/oohelperd/internal/webconnectivity"
"github.com/ooni/probe-cli/v3/internal/cmd/oohelperd/internal/websteps"
"github.com/ooni/probe-cli/v3/internal/engine/experiment/webstepsx"
"github.com/ooni/probe-cli/v3/internal/engine/netx"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)
@ -58,6 +59,7 @@ func main() {
func testableMain() {
mux := http.NewServeMux()
mux.Handle("/api/unstable/websteps", &websteps.Handler{Config: &websteps.Config{}})
mux.Handle("/api/v1/websteps", &webstepsx.THHandler{})
mux.Handle("/", webconnectivity.Handler{
Client: httpx,
Dialer: dialer,

View File

@ -23,7 +23,7 @@ import (
"github.com/ooni/probe-cli/v3/internal/engine/experiment/torsf"
"github.com/ooni/probe-cli/v3/internal/engine/experiment/urlgetter"
"github.com/ooni/probe-cli/v3/internal/engine/experiment/webconnectivity"
"github.com/ooni/probe-cli/v3/internal/engine/experiment/websteps"
"github.com/ooni/probe-cli/v3/internal/engine/experiment/webstepsx"
"github.com/ooni/probe-cli/v3/internal/engine/experiment/whatsapp"
)
@ -330,11 +330,11 @@ var experimentsByName = map[string]func(*Session) *ExperimentBuilder{
"websteps": func(session *Session) *ExperimentBuilder {
return &ExperimentBuilder{
build: func(config interface{}) *Experiment {
return NewExperiment(session, websteps.NewExperimentMeasurer(
*config.(*websteps.Config),
return NewExperiment(session, webstepsx.NewExperimentMeasurer(
*config.(*webstepsx.Config),
))
},
config: &websteps.Config{},
config: &webstepsx.Config{},
inputPolicy: InputOrQueryBackend,
}
},

View File

@ -0,0 +1,6 @@
// Package webstepsx contains a websteps implementation
// based on the internal/measurex package.
//
// This implementation does not follow any existing spec
// rather we are modeling the spec on this one.
package webstepsx

View File

@ -0,0 +1,217 @@
package webstepsx
//
// Measurer
//
// This file contains the client implementation.
//
import (
"context"
"errors"
"net/http"
"net/url"
"time"
"github.com/ooni/probe-cli/v3/internal/engine/model"
"github.com/ooni/probe-cli/v3/internal/engine/netx/archival"
"github.com/ooni/probe-cli/v3/internal/measurex"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)
const (
testName = "websteps"
testVersion = "0.0.2"
)
// Config contains the experiment config.
type Config struct{}
// TestKeys contains the experiment's test keys.
type TestKeys struct {
*measurex.URLMeasurement
}
// Measurer performs the measurement.
type Measurer struct {
Config Config
}
var (
_ model.ExperimentMeasurer = &Measurer{}
_ model.ExperimentMeasurerAsync = &Measurer{}
)
// NewExperimentMeasurer creates a new ExperimentMeasurer.
func NewExperimentMeasurer(config Config) model.ExperimentMeasurer {
return &Measurer{Config: config}
}
// ExperimentName implements ExperimentMeasurer.ExperExperimentName.
func (mx *Measurer) ExperimentName() string {
return testName
}
// ExperimentVersion implements ExperimentMeasurer.ExperExperimentVersion.
func (mx *Measurer) ExperimentVersion() string {
return testVersion
}
var (
// ErrNoAvailableTestHelpers is emitted when there are no available test helpers.
ErrNoAvailableTestHelpers = errors.New("no available helpers")
// ErrNoInput indicates that no input was provided.
ErrNoInput = errors.New("no input provided")
// ErrInputIsNotAnURL indicates that the input is not an URL.
ErrInputIsNotAnURL = errors.New("input is not an URL")
// ErrUnsupportedInput indicates that the input URL scheme is unsupported.
ErrUnsupportedInput = errors.New("unsupported input scheme")
)
// RunAsync implements ExperimentMeasurerAsync.RunAsync.
func (mx *Measurer) RunAsync(
ctx context.Context, sess model.ExperimentSession, input string,
callbacks model.ExperimentCallbacks) (<-chan *model.ExperimentAsyncTestKeys, error) {
// 1. Parse and verify URL
URL, err := url.Parse(input)
if err != nil {
return nil, ErrInputIsNotAnURL
}
if URL.Scheme != "http" && URL.Scheme != "https" {
return nil, ErrUnsupportedInput
}
// 2. Find the testhelper
testhelpers, _ := sess.GetTestHelpersByName("web-connectivity")
var testhelper *model.Service
for _, th := range testhelpers {
if th.Type == "https" {
testhelper = &th
break
}
}
if testhelper == nil {
return nil, ErrNoAvailableTestHelpers
}
testhelper.Address = "https://1.th.ooni.org/api/v1/websteps" // TODO(bassosimone): remove!
out := make(chan *model.ExperimentAsyncTestKeys)
go mx.runAsync(ctx, sess, input, testhelper, out)
return out, nil
}
var measurerResolvers = []*measurex.ResolverInfo{{
Network: "system",
Address: "",
}, {
Network: "udp",
Address: "8.8.4.4:53",
}, {
Network: "udp",
Address: "1.1.1.1:53",
}}
func (mx *Measurer) runAsync(ctx context.Context, sess model.ExperimentSession,
URL string, th *model.Service, out chan<- *model.ExperimentAsyncTestKeys) {
defer close(out)
helper := &measurerMeasureURLHelper{
Clnt: sess.DefaultHTTPClient(),
Logger: sess.Logger(),
THURL: th.Address,
}
mmx := &measurex.Measurer{
Begin: time.Now(),
HTTPClient: sess.DefaultHTTPClient(),
MeasureURLHelper: helper,
Logger: sess.Logger(),
Resolvers: measurerResolvers,
TLSHandshaker: netxlite.NewTLSHandshakerStdlib(sess.Logger()),
}
cookies := measurex.NewCookieJar()
in := mmx.MeasureURLAndFollowRedirections(
ctx, URL, measurex.NewHTTPRequestHeaderForMeasuring(), cookies)
for m := range in {
out <- &model.ExperimentAsyncTestKeys{
MeasurementRuntime: m.TotalRuntime.Seconds(),
TestKeys: &TestKeys{URLMeasurement: m},
Extensions: map[string]int64{
archival.ExtHTTP.Name: archival.ExtHTTP.V,
archival.ExtDNS.Name: archival.ExtDNS.V,
archival.ExtNetevents.Name: archival.ExtNetevents.V,
archival.ExtTCPConnect.Name: archival.ExtTCPConnect.V,
archival.ExtTLSHandshake.Name: archival.ExtTLSHandshake.V,
},
}
}
}
// measurerMeasureURLHelper injects the TH into the normal
// URL measurement flow implemented by measurex.
type measurerMeasureURLHelper struct {
// Clnt is the MANDATORY client to use
Clnt measurex.HTTPClient
// Logger is the MANDATORY Logger to use
Logger model.Logger
// THURL is the MANDATORY TH URL.
THURL string
}
func (mth *measurerMeasureURLHelper) LookupExtraHTTPEndpoints(
ctx context.Context, URL *url.URL, headers http.Header,
curEndpoints ...*measurex.HTTPEndpoint) (
[]*measurex.HTTPEndpoint, interface{}, error) {
cc := &THClientCall{
Endpoints: measurex.HTTPEndpointsToEndpoints(curEndpoints),
HTTPClient: mth.Clnt,
Header: headers,
THURL: mth.THURL,
TargetURL: URL.String(),
}
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
ol := measurex.NewOperationLogger(
mth.Logger, "THClientCall %s", URL.String())
resp, err := cc.Call(ctx)
ol.Stop(err)
if err != nil {
return nil, resp, err
}
var out []*measurex.HTTPEndpoint
for _, epnt := range resp.Endpoints {
out = append(out, &measurex.HTTPEndpoint{
Domain: URL.Hostname(),
Network: epnt.Network,
Address: epnt.Address,
SNI: URL.Hostname(),
ALPN: measurex.ALPNForHTTPEndpoint(epnt.Network),
URL: URL,
Header: headers,
})
}
return out, resp, nil
}
// Run implements ExperimentMeasurer.Run.
func (mx *Measurer) Run(ctx context.Context, sess model.ExperimentSession,
measurement *model.Measurement, callbacks model.ExperimentCallbacks) error {
return errors.New("sync run is not implemented")
}
// SummaryKeys contains summary keys for this experiment.
//
// Note that this structure is part of the ABI contract with probe-cli
// therefore we should be careful when changing it.
type SummaryKeys struct {
Accessible bool `json:"accessible"`
Blocking string `json:"blocking"`
IsAnomaly bool `json:"-"`
}
// GetSummaryKeys implements model.ExperimentMeasurer.GetSummaryKeys.
func (mx *Measurer) GetSummaryKeys(measurement *model.Measurement) (interface{}, error) {
sk := SummaryKeys{}
return sk, nil
}

View File

@ -0,0 +1,381 @@
package webstepsx
//
// TH (Test Helper)
//
// This file contains an implementation of the
// (proposed) websteps test helper spec.
//
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"github.com/apex/log"
"github.com/ooni/probe-cli/v3/internal/measurex"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/runtimex"
"github.com/ooni/probe-cli/v3/internal/version"
)
//
// Messages exchanged by the TH client and server
//
// THClientRequest is the request received by the test helper.
type THClientRequest struct {
// Endpoints is a list of endpoints to measure.
Endpoints []*measurex.Endpoint
// URL is the URL we want to measure.
URL string
// HTTPRequestHeaders contains the request headers.
HTTPRequestHeaders http.Header
}
// THServerResponse is the response from the test helper.
type THServerResponse struct {
// DNS contains all the DNS related measurements.
DNS []*measurex.DNSMeasurement `json:"dns"`
// Endpoints contains a measurement for each endpoint
// that was discovered by the probe or the TH.
Endpoints []*measurex.HTTPEndpointMeasurement `json:"endpoints"`
}
// thMaxAcceptableBodySize is the maximum acceptable body size by TH code.
const thMaxAcceptableBodySize = 1 << 20
//
// TH client implementation
//
// THClient is the high-level API to invoke the TH. This API
// should be used by command line clients.
type THClient struct {
// DNSServers is the MANDATORY list of DNS-over-UDP
// servers to use to discover endpoints locally.
DNServers []*measurex.ResolverInfo
// HTTPClient is the MANDATORY HTTP client to
// use for contacting the TH.
HTTPClient measurex.HTTPClient
// ServerURL is the MANDATORY URL of the TH HTTP endpoint.
ServerURL string
}
// Run calls the TH and returns the response or an error.
//
// Arguments:
//
// - ctx is the context with timeout/deadline/cancellation
//
// - URL is the URL the TH server should measure for us
//
// Algorithm:
//
// - use DNSServers to discover extra endpoints for the target URL
//
// - call the TH using the HTTPClient and the ServerURL
//
// - return response or error.
func (c *THClient) Run(ctx context.Context, URL string) (*THServerResponse, error) {
parsed, err := url.Parse(URL)
if err != nil {
return nil, err
}
mx := measurex.NewMeasurerWithDefaultSettings()
var dns []*measurex.DNSMeasurement
for m := range mx.LookupURLHostParallel(ctx, parsed, c.DNServers...) {
dns = append(dns, m)
}
endpoints, err := measurex.AllEndpointsForURL(parsed, dns...)
if err != nil {
return nil, err
}
return (&THClientCall{
Endpoints: endpoints,
HTTPClient: c.HTTPClient,
Header: measurex.NewHTTPRequestHeaderForMeasuring(),
THURL: c.ServerURL,
TargetURL: URL,
}).Call(ctx)
}
// THClientCall allows to perform a single TH client call. Make sure
// you fill all the fields marked as MANDATORY before use.
type THClientCall struct {
// Endpoints contains the MANDATORY endpoints we discovered.
Endpoints []*measurex.Endpoint
// HTTPClient is the MANDATORY HTTP client to
// use for contacting the TH.
HTTPClient measurex.HTTPClient
// Header contains the MANDATORY request headers.
Header http.Header
// THURL is the MANDATORY test helper URL.
THURL string
// TargetURL is the MANDATORY URL to measure.
TargetURL string
}
// Call performs the specified TH call and returns either a response or an error.
func (c *THClientCall) Call(ctx context.Context) (*THServerResponse, error) {
creq := &THClientRequest{
Endpoints: c.Endpoints,
URL: c.TargetURL,
HTTPRequestHeaders: c.Header,
}
reqBody, err := json.Marshal(creq)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(
ctx, "POST", c.THURL, bytes.NewReader(reqBody))
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", fmt.Sprintf("ooniprobe-cli/%s", version.Version))
return c.httpClientDo(req)
}
// errTHRequestFailed is the error returned if the TH response is not 200 Ok.
var errTHRequestFailed = errors.New("th: request failed")
func (c *THClientCall) httpClientDo(req *http.Request) (*THServerResponse, error) {
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 { // THHandler returns either 400 or 200
return nil, errTHRequestFailed
}
r := io.LimitReader(resp.Body, thMaxAcceptableBodySize)
respBody, err := netxlite.ReadAllContext(req.Context(), r)
if err != nil {
return nil, err
}
var sresp THServerResponse
if err := json.Unmarshal(respBody, &sresp); err != nil {
return nil, err
}
return &sresp, nil
}
//
// TH server implementation
//
// THHandler implements the test helper API.
//
// This handler exposes a unique HTTP endpoint that you need to
// mount to the desired path when creating the server.
//
// The canonical mount point for the HTTP endpoint is /api/v1/websteps.
//
// Accepted methods and request body:
//
// - we only accept POST;
//
// - we expect a THClientRequest as the body.
//
// Status code and response body:
//
// - on success, status is 200 and THServerResponse is the body;
//
// - on failure, status is 400 and there is no body.
//
type THHandler struct{}
// ServerHTTP implements http.Handler.ServeHTTP.
func (h *THHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.Header().Add("Server", fmt.Sprintf("oohelperd/%s", version.Version))
if req.Method != "POST" {
w.WriteHeader(400)
return
}
reader := io.LimitReader(req.Body, thMaxAcceptableBodySize)
data, err := netxlite.ReadAllContext(req.Context(), reader)
if err != nil {
w.WriteHeader(400)
return
}
var creq THClientRequest
if err := json.Unmarshal(data, &creq); err != nil {
w.WriteHeader(400)
return
}
cresp, err := h.singleStep(req.Context(), &creq)
if err != nil {
w.WriteHeader(400)
return
}
// We assume that the following call cannot fail because it's a
// clearly serializable data structure.
data, err = json.Marshal(cresp)
runtimex.PanicOnError(err, "json.Marshal failed")
w.Header().Add("Content-Type", "application/json")
w.Write(data)
}
// singleStep performs a singleStep measurement.
//
// The function name derives from the definition (we invented)
// of "web steps". Each redirection is a step. For each step you
// need to figure out the endpoints to use with the DNS. After
// that, you need to check all endpoints. Because here we do not
// perform redirection, this is just a single "step".
//
// The algorithm is the following:
//
// 1. parse the URL and return error if it does not parse or
// the scheme is neither HTTP nor HTTPS;
//
// 2. discover additional endpoints using a suitable DoH
// resolver and the URL's hostname as the domain;
//
// 3. measure each discovered endpoint.
//
// The return value is either a THServerResponse or an error.
func (h *THHandler) singleStep(
ctx context.Context, req *THClientRequest) (*THServerResponse, error) {
mx := measurex.NewMeasurerWithDefaultSettings()
mx.MeasureURLHelper = &thMeasureURLHelper{req.Endpoints}
mx.Resolvers = []*measurex.ResolverInfo{{
Network: measurex.ResolverForeign,
ForeignResolver: thResolver,
}}
jar := measurex.NewCookieJar()
meas, err := mx.MeasureURL(ctx, req.URL, req.HTTPRequestHeaders, jar)
if err != nil {
return nil, err
}
return &THServerResponse{
DNS: meas.DNS,
Endpoints: h.simplifyEndpoints(meas.Endpoints),
}, nil
}
func (h *THHandler) simplifyEndpoints(
in []*measurex.HTTPEndpointMeasurement) (out []*measurex.HTTPEndpointMeasurement) {
for _, epnt := range in {
out = append(out, &measurex.HTTPEndpointMeasurement{
URL: epnt.URL,
Network: epnt.Network,
Address: epnt.Address,
Measurement: h.simplifyMeasurement(epnt.Measurement),
})
}
return
}
func (h *THHandler) simplifyMeasurement(in *measurex.Measurement) (out *measurex.Measurement) {
out = &measurex.Measurement{
Connect: in.Connect,
TLSHandshake: h.simplifyHandshake(in.TLSHandshake),
QUICHandshake: h.simplifyHandshake(in.QUICHandshake),
LookupHost: in.LookupHost,
LookupHTTPSSvc: in.LookupHTTPSSvc,
HTTPRoundTrip: h.simplifyHTTPRoundTrip(in.HTTPRoundTrip),
}
return
}
func (h *THHandler) simplifyHandshake(
in []*measurex.TLSHandshakeEvent) (out []*measurex.TLSHandshakeEvent) {
for _, ev := range in {
out = append(out, &measurex.TLSHandshakeEvent{
CipherSuite: ev.CipherSuite,
Failure: ev.Failure,
NegotiatedProto: ev.NegotiatedProto,
TLSVersion: ev.TLSVersion,
PeerCerts: nil,
Finished: 0,
RemoteAddr: ev.RemoteAddr,
SNI: ev.SNI,
ALPN: ev.ALPN,
SkipVerify: ev.SkipVerify,
Oddity: ev.Oddity,
Network: ev.Network,
Started: 0,
})
}
return
}
func (h *THHandler) simplifyHTTPRoundTrip(
in []*measurex.HTTPRoundTripEvent) (out []*measurex.HTTPRoundTripEvent) {
for _, ev := range in {
out = append(out, &measurex.HTTPRoundTripEvent{
Failure: ev.Failure,
Request: ev.Request,
Response: h.simplifyHTTPResponse(ev.Response),
Finished: 0,
Started: 0,
Oddity: ev.Oddity,
})
}
return
}
func (h *THHandler) simplifyHTTPResponse(
in *measurex.HTTPResponse) (out *measurex.HTTPResponse) {
if in != nil {
out = &measurex.HTTPResponse{
Code: in.Code,
Headers: in.Headers,
Body: nil,
BodyIsTruncated: in.BodyIsTruncated,
BodyLength: in.BodyLength,
BodyIsUTF8: in.BodyIsUTF8,
}
}
return
}
type thMeasureURLHelper struct {
epnts []*measurex.Endpoint
}
func (thh *thMeasureURLHelper) LookupExtraHTTPEndpoints(
ctx context.Context, URL *url.URL, headers http.Header,
serverEpnts ...*measurex.HTTPEndpoint) (
epnts []*measurex.HTTPEndpoint, thMeaurement interface{}, err error) {
for _, epnt := range thh.epnts {
epnts = append(epnts, &measurex.HTTPEndpoint{
Domain: URL.Hostname(),
Network: epnt.Network,
Address: epnt.Address,
SNI: URL.Hostname(),
ALPN: measurex.ALPNForHTTPEndpoint(epnt.Network),
URL: URL,
Header: headers, // but overriden later anyway
})
}
return
}
// thResolverURL is the DNS resolver URL used by the TH. We use an
// encrypted resolver to reduce the risk that there is DNS-over-UDP
// censorship in the place where we deploy the TH.
const thResolverURL = "https://dns.google/dns-query"
// thResolver is the DNS resolver used by the TH.
//
// Here we're using github.com/apex/log as the logger, which
// is fine because this is backend only code.
var thResolver = netxlite.WrapResolver(log.Log, netxlite.NewSerialResolver(
netxlite.NewDNSOverHTTPS(http.DefaultClient, thResolverURL),
))