cleanup(jafar): do not depend on netx and urlgetter (#792)

There's no point in doing that. Also, once this change is merged, it becomes easier to cleanup/simplify netx.

See https://github.com/ooni/probe/issues/2121
This commit is contained in:
Simone Basso
2022-06-02 22:25:37 +02:00
committed by GitHub
parent 76b65893a1
commit 15da0f5344
19 changed files with 345 additions and 275 deletions
+18 -4
View File
@@ -153,14 +153,28 @@ func (p *DNSServer) nxdomain(query *dns.Msg) *dns.Msg {
}
func (p *DNSServer) localHost(query *dns.Msg) *dns.Msg {
return p.compose(query, net.IPv6loopback, net.IPv4(127, 0, 0, 1))
return dnsComposeResponse(query, net.IPv6loopback, net.IPv4(127, 0, 0, 1))
}
func (p *DNSServer) empty(query *dns.Msg) *dns.Msg {
return p.compose(query)
return dnsComposeResponse(query)
}
func (p *DNSServer) compose(query *dns.Msg, ips ...net.IP) *dns.Msg {
func dnsComposeQuery(domain string, qtype uint16) *dns.Msg {
question := dns.Question{
Name: dns.Fqdn(domain),
Qtype: qtype,
Qclass: dns.ClassINET,
}
query := &dns.Msg{}
query.RecursionDesired = true
query.Question = make([]dns.Question, 1)
query.Question[0] = question
query.Id = dns.Id()
return query
}
func dnsComposeResponse(query *dns.Msg, ips ...net.IP) *dns.Msg {
runtimex.PanicIfTrue(len(query.Question) != 1, "expecting a single question")
question := query.Question[0]
reply := new(dns.Msg)
@@ -205,5 +219,5 @@ func (p *DNSServer) cache(name string, query *dns.Msg) *dns.Msg {
if len(ipAddrs) <= 0 {
return p.nxdomain(query)
}
return p.compose(query, ipAddrs...)
return dnsComposeResponse(query, ipAddrs...)
}
+111 -48
View File
@@ -1,24 +1,23 @@
package filtering
import (
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
"github.com/google/martian/v3/mitm"
"github.com/miekg/dns"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)
// TODO(bassosimone): remove HTTPActionPass since we want integration tests
// to only run locally to make them much more predictable.
// HTTPAction is an HTTP filtering action that this proxy should take.
// HTTPAction is an HTTP filtering action that this server should take.
type HTTPAction string
const (
// HTTPActionPass passes the traffic to the destination.
HTTPActionPass = HTTPAction("pass")
// HTTPActionReset resets the connection.
HTTPActionReset = HTTPAction("reset")
@@ -30,25 +29,91 @@ const (
// HTTPAction451 causes the proxy to return a 451 error.
HTTPAction451 = HTTPAction("451")
// HTTPActionDoH causes the proxy to return a sensible reply
// with static IP addresses if the request is DoH.
HTTPActionDoH = HTTPAction("doh")
)
// HTTPProxy is a proxy that routes traffic depending on the
// host header and may implement filtering policies.
type HTTPProxy struct {
// OnIncomingHost is the MANDATORY hook called whenever we have
// successfully received an HTTP request.
OnIncomingHost func(host string) HTTPAction
// HTTPServer is a server that implements filtering policies.
type HTTPServer struct {
// action is the action to implement.
action HTTPAction
// cert is the fake CA certificate.
cert *x509.Certificate
// config is the config to generate certificates on the fly.
config *mitm.Config
// privkey is the private key that signed the cert.
privkey *rsa.PrivateKey
// server is the underlying server.
server *http.Server
// url contains the server URL
url *url.URL
}
// Start starts the proxy.
func (p *HTTPProxy) Start(address string) (net.Listener, error) {
listener, err := net.Listen("tcp", address)
if err != nil {
return nil, err
// NewHTTPServerCleartext creates a new HTTPServer using cleartext HTTP.
func NewHTTPServerCleartext(action HTTPAction) *HTTPServer {
return newHTTPOrHTTPSServer(action, false)
}
// NewHTTPServerTLS creates a new HTTP server using HTTPS.
func NewHTTPServerTLS(action HTTPAction) *HTTPServer {
return newHTTPOrHTTPSServer(action, true)
}
// Close closes the server ASAP.
func (p *HTTPServer) Close() error {
return p.server.Close()
}
// URL returns the server's URL
func (p *HTTPServer) URL() *url.URL {
return p.url
}
// TLSConfig returns a suitable base TLS config for the client.
func (p *HTTPServer) TLSConfig() *tls.Config {
config := &tls.Config{}
if p.cert != nil {
o := x509.NewCertPool()
o.AddCert(p.cert)
config.RootCAs = o
}
server := &http.Server{Handler: p}
go server.Serve(listener)
return listener, nil
return config
}
// newHTTPOrHTTPSServer is an internal factory for creating a new instance.
func newHTTPOrHTTPSServer(action HTTPAction, enableTLS bool) *HTTPServer {
listener, err := net.Listen("tcp", "127.0.0.1:0")
runtimex.PanicOnError(err, "net.Listen failed")
srv := &HTTPServer{
action: action,
cert: nil,
config: nil,
privkey: nil,
server: nil,
url: &url.URL{
Scheme: "",
Host: listener.Addr().String(),
},
}
srv.server = &http.Server{Handler: srv}
switch enableTLS {
case false:
srv.url.Scheme = "http"
go srv.server.Serve(listener)
case true:
srv.url.Scheme = "https"
srv.cert, srv.privkey, srv.config = tlsConfigMITM()
srv.server.TLSConfig = srv.config.TLS()
go srv.server.ServeTLS(listener, "", "") // using server.TLSConfig
}
return srv
}
// HTTPBlockPage451 is the block page returned along with status 451
@@ -60,34 +125,22 @@ var HTTPBlockpage451 = []byte(`<html><head>
</body></html>
`)
const httpProxyProduct = "jafar/0.1.0"
// ServeHTTP serves HTTP requests
func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Implementation note: use Via header to detect in a loose way
// requests originated by us and directed to us.
if r.Header.Get("Via") == httpProxyProduct || r.Host == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
p.handle(w, r)
}
func (p *HTTPProxy) handle(w http.ResponseWriter, r *http.Request) {
switch policy := p.OnIncomingHost(r.Host); policy {
case HTTPActionPass:
p.proxy(w, r)
func (p *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch p.action {
case HTTPActionReset, HTTPActionTimeout, HTTPActionEOF:
p.hijack(w, r, policy)
p.hijack(w, r, p.action)
case HTTPAction451:
w.WriteHeader(http.StatusUnavailableForLegalReasons)
w.Write(HTTPBlockpage451)
case HTTPActionDoH:
httpServeDNSOverHTTPS(w, r)
default:
w.WriteHeader(http.StatusInternalServerError)
}
}
func (p *HTTPProxy) hijack(w http.ResponseWriter, r *http.Request, policy HTTPAction) {
func (p *HTTPServer) hijack(w http.ResponseWriter, r *http.Request, policy HTTPAction) {
// Note:
//
// 1. we assume we can hihack the connection
@@ -109,12 +162,22 @@ func (p *HTTPProxy) hijack(w http.ResponseWriter, r *http.Request, policy HTTPAc
}
}
func (p *HTTPProxy) proxy(w http.ResponseWriter, r *http.Request) {
r.Header.Add("Via", httpProxyProduct) // see ServeHTTP
proxy := httputil.NewSingleHostReverseProxy(&url.URL{
Host: r.Host,
Scheme: "http",
})
proxy.Transport = http.DefaultTransport
proxy.ServeHTTP(w, r)
func httpPanicToInternalServerError(w http.ResponseWriter) {
if r := recover(); r != nil {
w.WriteHeader(500)
}
}
func httpServeDNSOverHTTPS(w http.ResponseWriter, r *http.Request) {
defer httpPanicToInternalServerError(w)
rawQuery, err := io.ReadAll(r.Body)
runtimex.PanicOnError(err, "io.ReadAll failed")
query := &dns.Msg{}
err = query.Unpack(rawQuery)
runtimex.PanicOnError(err, "query.Unpack failed")
runtimex.PanicIfTrue(query.Response, "is a response")
response := dnsComposeResponse(query, net.IPv4(8, 8, 8, 8), net.IPv4(8, 8, 4, 4))
rawResponse, err := response.Pack()
runtimex.PanicOnError(err, "response.Pack failed")
w.Write(rawResponse)
}
+93 -101
View File
@@ -1,155 +1,134 @@
package filtering
import (
"bytes"
"context"
"crypto/tls"
"errors"
"net"
"io"
"net/http"
"net/url"
"strings"
"testing"
"time"
"github.com/apex/log"
"github.com/miekg/dns"
"github.com/ooni/probe-cli/v3/internal/model/mocks"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)
func TestHTTPProxy(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
newproxy := func(action HTTPAction) (net.Listener, error) {
p := &HTTPProxy{
OnIncomingHost: func(host string) HTTPAction {
return action
},
}
return p.Start("127.0.0.1:0")
}
func TestHTTPServer(t *testing.T) {
httpGET := func(ctx context.Context, addr net.Addr, host string) (*http.Response, error) {
txp := netxlite.NewHTTPTransportStdlib(log.Log)
clnt := &http.Client{Transport: txp}
URL := &url.URL{
Scheme: "http",
Host: addr.String(),
Path: "/",
httpGET := func(ctx context.Context, method string, URL *url.URL, host string,
config *tls.Config, requestBody []byte) (*http.Response, error) {
txp := &http.Transport{
TLSClientConfig: config,
}
req, err := http.NewRequestWithContext(ctx, "GET", URL.String(), nil)
if config != nil {
config.ServerName = host
}
clnt := &http.Client{Transport: txp}
req, err := http.NewRequestWithContext(
ctx, method, URL.String(), bytes.NewReader(requestBody))
runtimex.PanicOnError(err, "http.NewRequest failed")
req.Host = host
return clnt.Do(req)
}
t.Run("HTTPActionPass", func(t *testing.T) {
ctx := context.Background()
listener, err := newproxy(HTTPActionPass)
if err != nil {
t.Fatal(err)
}
resp, err := httpGET(ctx, listener.Addr(), "nexa.polito.it")
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != 200 {
t.Fatal("unexpected status code", resp.StatusCode)
}
resp.Body.Close()
listener.Close()
})
t.Run("HTTPActionPass with self connect", func(t *testing.T) {
ctx := context.Background()
listener, err := newproxy(HTTPActionPass)
if err != nil {
t.Fatal(err)
}
resp, err := httpGET(ctx, listener.Addr(), listener.Addr().String())
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != 400 {
t.Fatal("unexpected status code", resp.StatusCode)
}
resp.Body.Close()
listener.Close()
})
t.Run("HTTPActionReset", func(t *testing.T) {
ctx := context.Background()
listener, err := newproxy(HTTPActionReset)
if err != nil {
t.Fatal(err)
}
resp, err := httpGET(ctx, listener.Addr(), "nexa.polito.it")
if err == nil || !strings.HasSuffix(err.Error(), netxlite.FailureConnectionReset) {
srvr := NewHTTPServerCleartext(HTTPActionReset)
resp, err := httpGET(ctx, "GET", srvr.URL(), "nexa.polito.it", srvr.TLSConfig(), nil)
if netxlite.NewTopLevelGenericErrWrapper(err).Error() != netxlite.FailureConnectionReset {
t.Fatal("unexpected err", err)
}
if resp != nil {
t.Fatal("expected nil resp")
}
listener.Close()
srvr.Close()
})
t.Run("HTTPActionTimeout", func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
listener, err := newproxy(HTTPActionTimeout)
if err != nil {
t.Fatal(err)
}
resp, err := httpGET(ctx, listener.Addr(), "nexa.polito.it")
srvr := NewHTTPServerCleartext(HTTPActionTimeout)
resp, err := httpGET(ctx, "GET", srvr.URL(), "nexa.polito.it", srvr.TLSConfig(), nil)
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatal("unexpected err", err)
}
if resp != nil {
t.Fatal("expected nil resp")
}
listener.Close()
srvr.Close()
})
t.Run("HTTPActionEOF", func(t *testing.T) {
ctx := context.Background()
listener, err := newproxy(HTTPActionEOF)
if err != nil {
t.Fatal(err)
}
resp, err := httpGET(ctx, listener.Addr(), "nexa.polito.it")
if err == nil || !strings.HasSuffix(err.Error(), netxlite.FailureEOFError) {
srvr := NewHTTPServerCleartext(HTTPActionEOF)
resp, err := httpGET(ctx, "GET", srvr.URL(), "nexa.polito.it", srvr.TLSConfig(), nil)
if !errors.Is(err, io.EOF) {
t.Fatal("unexpected err", err)
}
if resp != nil {
t.Fatal("expected nil resp")
}
listener.Close()
srvr.Close()
})
t.Run("HTTPAction451", func(t *testing.T) {
ctx := context.Background()
listener, err := newproxy(HTTPAction451)
if err != nil {
t.Fatal(err)
}
resp, err := httpGET(ctx, listener.Addr(), "nexa.polito.it")
srvr := NewHTTPServerCleartext(HTTPAction451)
resp, err := httpGET(ctx, "GET", srvr.URL(), "nexa.polito.it", srvr.TLSConfig(), nil)
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != 451 {
t.Fatal("unexpected status code", resp.StatusCode)
}
data, err := netxlite.ReadAllContext(ctx, resp.Body)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(HTTPBlockpage451, data) {
t.Fatal("unexpected data")
}
resp.Body.Close()
listener.Close()
srvr.Close()
})
t.Run("HTTPActionDoH", func(t *testing.T) {
ctx := context.Background()
srvr := NewHTTPServerTLS(HTTPActionDoH)
query := dnsComposeQuery("nexa.polito.it", dns.TypeA)
rawQuery, err := query.Pack()
if err != nil {
t.Fatal(err)
}
resp, err := httpGET(ctx, "POST", srvr.URL(), "dns.google", srvr.TLSConfig(), rawQuery)
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != 200 {
t.Fatal("unexpected status code", resp.StatusCode)
}
data, err := netxlite.ReadAllContext(ctx, resp.Body)
if err != nil {
t.Fatal(err)
}
response := &dns.Msg{}
if err := response.Unpack(data); err != nil {
t.Fatal(err)
}
// It suffices to see it's a DNS response
resp.Body.Close()
srvr.Close()
})
t.Run("unknown action", func(t *testing.T) {
ctx := context.Background()
listener, err := newproxy("")
if err != nil {
t.Fatal(err)
}
resp, err := httpGET(ctx, listener.Addr(), "nexa.polito.it")
srvr := NewHTTPServerCleartext("")
resp, err := httpGET(ctx, "GET", srvr.URL(), "nexa.polito.it", srvr.TLSConfig(), nil)
if err != nil {
t.Fatal(err)
}
@@ -157,17 +136,30 @@ func TestHTTPProxy(t *testing.T) {
t.Fatal("unexpected status code", resp.StatusCode)
}
resp.Body.Close()
listener.Close()
})
t.Run("Start fails on an invalid address", func(t *testing.T) {
p := &HTTPProxy{}
listener, err := p.Start("127.0.0.1")
if err == nil || !strings.HasSuffix(err.Error(), "missing port in address") {
t.Fatal("unexpected err", err)
}
if listener != nil {
t.Fatal("expected nil listener")
}
srvr.Close()
})
}
type httpResponseWriter struct {
http.ResponseWriter
code int
}
func (w *httpResponseWriter) WriteHeader(statusCode int) {
w.code = statusCode
}
func TestHTTPServeDNSOverHTTPSPanic(t *testing.T) {
w := &httpResponseWriter{}
req := &http.Request{
Body: io.NopCloser(&mocks.Reader{
MockRead: func(b []byte) (int, error) {
return 0, io.ErrUnexpectedEOF
},
}),
}
httpServeDNSOverHTTPS(w, req)
if w.code != 500 {
t.Fatal("did not intercept the panic")
}
}
+19 -11
View File
@@ -66,14 +66,19 @@ type TLSServer struct {
privkey *rsa.PrivateKey
}
// NewTLSServer creates and starts a new TLSServer that executes
// the given action during the TLS handshake.
func NewTLSServer(action TLSAction) *TLSServer {
done := make(chan bool)
func tlsConfigMITM() (*x509.Certificate, *rsa.PrivateKey, *mitm.Config) {
cert, privkey, err := mitm.NewAuthority("jafar", "OONI", 24*time.Hour)
runtimex.PanicOnError(err, "mitm.NewAuthority failed")
config, err := mitm.NewConfig(cert, privkey)
runtimex.PanicOnError(err, "mitm.NewConfig failed")
return cert, privkey, config
}
// NewTLSServer creates and starts a new TLSServer that executes
// the given action during the TLS handshake.
func NewTLSServer(action TLSAction) *TLSServer {
done := make(chan bool)
cert, privkey, config := tlsConfigMITM()
listener, err := net.Listen("tcp", "127.0.0.1:0")
runtimex.PanicOnError(err, "net.Listen failed")
ctx, cancel := context.WithCancel(context.Background())
@@ -139,13 +144,8 @@ func (p *TLSServer) handle(ctx context.Context, tcpConn net.Conn) {
GetCertificate: func(info *tls.ClientHelloInfo) (*tls.Certificate, error) {
switch p.action {
case TLSActionTimeout:
select {
case <-time.After(300 * time.Second):
return nil, errors.New("timing out the connection")
case <-ctx.Done():
p.reset(tcpConn)
return nil, ctx.Err()
}
err := p.timeout(ctx, tcpConn)
return nil, err
case TLSActionAlertInternalError:
p.alert(tcpConn, tlsAlertInternalError)
return nil, errors.New("already sent alert")
@@ -170,6 +170,14 @@ func (p *TLSServer) handle(ctx context.Context, tcpConn net.Conn) {
tlsConn.Close()
}
func (p *TLSServer) timeout(ctx context.Context, tcpConn net.Conn) error {
ctx, cancel := context.WithTimeout(ctx, 300*time.Second)
defer cancel()
<-ctx.Done()
p.reset(tcpConn)
return ctx.Err()
}
func (p *TLSServer) reset(conn net.Conn) {
if tc, good := conn.(*net.TCPConn); good {
tc.SetLinger(0)
+2 -2
View File
@@ -116,7 +116,7 @@ func TestTLSServer(t *testing.T) {
t.Fatal(err)
}
defer conn.Close()
data, err := io.ReadAll(conn)
data, err := netxlite.ReadAllContext(context.Background(), conn)
if err != nil {
t.Fatal(err)
}
@@ -134,7 +134,7 @@ func TestTLSServer(t *testing.T) {
t.Fatal(err)
}
defer conn.Close()
data, err := io.ReadAll(conn)
data, err := netxlite.ReadAllContext(context.Background(), conn)
if err != nil {
t.Fatal(err)
}
+9
View File
@@ -105,6 +105,15 @@ func (txp *httpTransportConnectionsCloser) CloseIdleConnections() {
txp.TLSDialer.CloseIdleConnections()
}
// NewHTTPTransportWithResolver creates a new HTTP transport using
// the stdlib for everything but the given resolver.
func NewHTTPTransportWithResolver(logger model.DebugLogger, reso model.Resolver) model.HTTPTransport {
dialer := NewDialerWithResolver(logger, reso)
thx := NewTLSHandshakerStdlib(logger)
tlsDialer := NewTLSDialer(dialer, thx)
return NewHTTPTransport(logger, dialer, tlsDialer)
}
// NewHTTPTransport combines NewOOHTTPBaseTransport and WrapHTTPTransport.
//
// This factory and NewHTTPTransportStdlib are the recommended
+21
View File
@@ -16,6 +16,27 @@ import (
"github.com/ooni/probe-cli/v3/internal/model/mocks"
)
func TestNewHTTPTransportWithResolver(t *testing.T) {
expected := errors.New("mocked error")
reso := &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return nil, expected
},
}
txp := NewHTTPTransportWithResolver(model.DiscardLogger, reso)
req, err := http.NewRequest("GET", "http://x.org", nil)
if err != nil {
t.Fatal(err)
}
resp, err := txp.RoundTrip(req)
if !errors.Is(err, expected) {
t.Fatal("unexpected err")
}
if resp != nil {
t.Fatal("expected nil resp")
}
}
func TestHTTPTransportErrWrapper(t *testing.T) {
t.Run("RoundTrip", func(t *testing.T) {
t.Run("with failure", func(t *testing.T) {
+10
View File
@@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"net"
"net/http"
"strings"
"time"
@@ -30,6 +31,15 @@ func NewResolverStdlib(logger model.DebugLogger, wrappers ...model.DNSTransportW
return WrapResolver(logger, newResolverSystem(wrappers...))
}
// NewParallelDNSOverHTTPSResolver creates a new DNS over HTTPS resolver
// that uses the standard library for all operations. This function constructs
// all the building blocks and calls WrapResolver on the returned resolver.
func NewParallelDNSOverHTTPSResolver(logger model.DebugLogger, URL string) model.Resolver {
client := &http.Client{Transport: NewHTTPTransportStdlib(logger)}
txp := WrapDNSTransport(NewUnwrappedDNSOverHTTPSTransport(client, URL))
return WrapResolver(logger, NewUnwrappedParallelResolver(txp))
}
func newResolverSystem(wrappers ...model.DNSTransportWrapper) *resolverSystem {
return &resolverSystem{
t: WrapDNSTransport(&dnsOverGetaddrinfoTransport{}, wrappers...),
+17
View File
@@ -65,6 +65,23 @@ func TestNewParallelResolverUDP(t *testing.T) {
}
}
func TestNewParallelDNSOverHTTPSResolver(t *testing.T) {
resolver := NewParallelDNSOverHTTPSResolver(log.Log, "https://1.1.1.1/dns-query")
idna := resolver.(*resolverIDNA)
logger := idna.Resolver.(*resolverLogger)
if logger.Logger != log.Log {
t.Fatal("invalid logger")
}
shortCircuit := logger.Resolver.(*resolverShortCircuitIPAddr)
errWrapper := shortCircuit.Resolver.(*resolverErrWrapper)
para := errWrapper.Resolver.(*ParallelResolver)
txp := para.Transport().(*dnsTransportErrWrapper)
dnsTxp := txp.DNSTransport.(*DNSOverHTTPSTransport)
if dnsTxp.Address() != "https://1.1.1.1/dns-query" {
t.Fatal("invalid address")
}
}
func TestResolverSystem(t *testing.T) {
t.Run("Network", func(t *testing.T) {
expected := "antani"