fix(oohelperd): use throw-away HTTPClient, Dialer, Resolver (#833)
This diff modifies the implementation of oohelperd in the master branch to always use throw-away HTTPClient, Dialer, and Resolver. The rationale of this change is to ensure we're not hitting limits of the HTTPClient regarding the max number of connections per host. This issue is described at https://github.com/ooni/probe/issues/2182. While there, it feels more correct to use throw-away Dialer and Resolver. We have a different patch for the release/3.15 branch because of netx-related refactorings: https://github.com/ooni/probe-cli/pull/832.
This commit is contained in:
parent
59410edba9
commit
a4d17085f5
|
@ -19,16 +19,18 @@ type CtrlDNSResult = webconnectivity.ControlDNSResult
|
||||||
|
|
||||||
// DNSConfig configures the DNS check.
|
// DNSConfig configures the DNS check.
|
||||||
type DNSConfig struct {
|
type DNSConfig struct {
|
||||||
Domain string
|
Domain string
|
||||||
Out chan CtrlDNSResult
|
NewResolver func() model.Resolver
|
||||||
Resolver model.Resolver
|
Out chan CtrlDNSResult
|
||||||
Wg *sync.WaitGroup
|
Wg *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// DNSDo performs the DNS check.
|
// DNSDo performs the DNS check.
|
||||||
func DNSDo(ctx context.Context, config *DNSConfig) {
|
func DNSDo(ctx context.Context, config *DNSConfig) {
|
||||||
defer config.Wg.Done()
|
defer config.Wg.Done()
|
||||||
addrs, err := config.Resolver.LookupHost(ctx, config.Domain)
|
reso := config.NewResolver()
|
||||||
|
defer reso.CloseIdleConnections()
|
||||||
|
addrs, err := reso.LookupHost(ctx, config.Domain)
|
||||||
if addrs == nil {
|
if addrs == nil {
|
||||||
addrs = []string{} // fix: the old test helper did that
|
addrs = []string{} // fix: the old test helper did that
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/ooni/probe-cli/v3/internal/engine/experiment/webconnectivity"
|
"github.com/ooni/probe-cli/v3/internal/engine/experiment/webconnectivity"
|
||||||
|
"github.com/ooni/probe-cli/v3/internal/model"
|
||||||
"github.com/ooni/probe-cli/v3/internal/model/mocks"
|
"github.com/ooni/probe-cli/v3/internal/model/mocks"
|
||||||
"github.com/ooni/probe-cli/v3/internal/netxlite"
|
"github.com/ooni/probe-cli/v3/internal/netxlite"
|
||||||
)
|
)
|
||||||
|
@ -68,13 +69,18 @@ func TestDNSDo(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
config := &DNSConfig{
|
config := &DNSConfig{
|
||||||
Domain: "antani.ooni.org",
|
Domain: "antani.ooni.org",
|
||||||
Out: make(chan webconnectivity.ControlDNSResult, 1),
|
NewResolver: func() model.Resolver {
|
||||||
Resolver: &mocks.Resolver{
|
return &mocks.Resolver{
|
||||||
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
|
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
|
||||||
return nil, netxlite.ErrOODNSNoSuchHost
|
return nil, netxlite.ErrOODNSNoSuchHost
|
||||||
},
|
},
|
||||||
|
MockCloseIdleConnections: func() {
|
||||||
|
// nothing
|
||||||
|
},
|
||||||
|
}
|
||||||
},
|
},
|
||||||
Wg: &sync.WaitGroup{},
|
Out: make(chan webconnectivity.ControlDNSResult, 1),
|
||||||
|
Wg: &sync.WaitGroup{},
|
||||||
}
|
}
|
||||||
config.Wg.Add(1)
|
config.Wg.Add(1)
|
||||||
DNSDo(ctx, config)
|
DNSDo(ctx, config)
|
||||||
|
|
|
@ -19,9 +19,9 @@ type CtrlHTTPResponse = webconnectivity.ControlHTTPRequestResult
|
||||||
|
|
||||||
// HTTPConfig configures the HTTP check.
|
// HTTPConfig configures the HTTP check.
|
||||||
type HTTPConfig struct {
|
type HTTPConfig struct {
|
||||||
Client model.HTTPClient
|
|
||||||
Headers map[string][]string
|
Headers map[string][]string
|
||||||
MaxAcceptableBody int64
|
MaxAcceptableBody int64
|
||||||
|
NewClient func() model.HTTPClient
|
||||||
Out chan CtrlHTTPResponse
|
Out chan CtrlHTTPResponse
|
||||||
URL string
|
URL string
|
||||||
Wg *sync.WaitGroup
|
Wg *sync.WaitGroup
|
||||||
|
@ -50,7 +50,9 @@ func HTTPDo(ctx context.Context, config *HTTPConfig) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
resp, err := config.Client.Do(req)
|
clnt := config.NewClient()
|
||||||
|
defer clnt.CloseIdleConnections()
|
||||||
|
resp, err := clnt.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
config.Out <- CtrlHTTPResponse{ // fix: emit -1 like old test helper does
|
config.Out <- CtrlHTTPResponse{ // fix: emit -1 like old test helper does
|
||||||
BodyLength: -1,
|
BodyLength: -1,
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
"github.com/ooni/probe-cli/v3/internal/model"
|
||||||
"github.com/ooni/probe-cli/v3/internal/netxlite"
|
"github.com/ooni/probe-cli/v3/internal/netxlite"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -17,12 +18,14 @@ func TestHTTPDoWithInvalidURL(t *testing.T) {
|
||||||
httpch := make(chan CtrlHTTPResponse, 1)
|
httpch := make(chan CtrlHTTPResponse, 1)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go HTTPDo(ctx, &HTTPConfig{
|
go HTTPDo(ctx, &HTTPConfig{
|
||||||
Client: http.DefaultClient,
|
|
||||||
Headers: nil,
|
Headers: nil,
|
||||||
MaxAcceptableBody: 1 << 24,
|
MaxAcceptableBody: 1 << 24,
|
||||||
Out: httpch,
|
NewClient: func() model.HTTPClient {
|
||||||
URL: "http://[::1]aaaa",
|
return http.DefaultClient
|
||||||
Wg: wg,
|
},
|
||||||
|
Out: httpch,
|
||||||
|
URL: "http://[::1]aaaa",
|
||||||
|
Wg: wg,
|
||||||
})
|
})
|
||||||
// wait for measurement steps to complete
|
// wait for measurement steps to complete
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
@ -39,16 +42,18 @@ func TestHTTPDoWithHTTPTransportFailure(t *testing.T) {
|
||||||
httpch := make(chan CtrlHTTPResponse, 1)
|
httpch := make(chan CtrlHTTPResponse, 1)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go HTTPDo(ctx, &HTTPConfig{
|
go HTTPDo(ctx, &HTTPConfig{
|
||||||
Client: &http.Client{
|
|
||||||
Transport: FakeTransport{
|
|
||||||
Err: expected,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Headers: nil,
|
Headers: nil,
|
||||||
MaxAcceptableBody: 1 << 24,
|
MaxAcceptableBody: 1 << 24,
|
||||||
Out: httpch,
|
NewClient: func() model.HTTPClient {
|
||||||
URL: "http://www.x.org",
|
return &http.Client{
|
||||||
Wg: wg,
|
Transport: FakeTransport{
|
||||||
|
Err: expected,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Out: httpch,
|
||||||
|
URL: "http://www.x.org",
|
||||||
|
Wg: wg,
|
||||||
})
|
})
|
||||||
// wait for measurement steps to complete
|
// wait for measurement steps to complete
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
|
@ -20,10 +20,10 @@ type (
|
||||||
|
|
||||||
// MeasureConfig contains configuration for Measure.
|
// MeasureConfig contains configuration for Measure.
|
||||||
type MeasureConfig struct {
|
type MeasureConfig struct {
|
||||||
Client model.HTTPClient
|
|
||||||
Dialer model.Dialer
|
|
||||||
MaxAcceptableBody int64
|
MaxAcceptableBody int64
|
||||||
Resolver model.Resolver
|
NewClient func() model.HTTPClient
|
||||||
|
NewDialer func() model.Dialer
|
||||||
|
NewResolver func() model.Resolver
|
||||||
}
|
}
|
||||||
|
|
||||||
// Measure performs the measurement described by the request and
|
// Measure performs the measurement described by the request and
|
||||||
|
@ -40,10 +40,10 @@ func Measure(ctx context.Context, config MeasureConfig, creq *CtrlRequest) (*Ctr
|
||||||
if net.ParseIP(URL.Hostname()) == nil {
|
if net.ParseIP(URL.Hostname()) == nil {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go DNSDo(ctx, &DNSConfig{
|
go DNSDo(ctx, &DNSConfig{
|
||||||
Domain: URL.Hostname(),
|
Domain: URL.Hostname(),
|
||||||
Out: dnsch,
|
NewResolver: config.NewResolver,
|
||||||
Resolver: config.Resolver,
|
Out: dnsch,
|
||||||
Wg: wg,
|
Wg: wg,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
// tcpconnect: start
|
// tcpconnect: start
|
||||||
|
@ -51,19 +51,19 @@ func Measure(ctx context.Context, config MeasureConfig, creq *CtrlRequest) (*Ctr
|
||||||
for _, endpoint := range creq.TCPConnect {
|
for _, endpoint := range creq.TCPConnect {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go TCPDo(ctx, &TCPConfig{
|
go TCPDo(ctx, &TCPConfig{
|
||||||
Dialer: config.Dialer,
|
Endpoint: endpoint,
|
||||||
Endpoint: endpoint,
|
NewDialer: config.NewDialer,
|
||||||
Out: tcpconnch,
|
Out: tcpconnch,
|
||||||
Wg: wg,
|
Wg: wg,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
// http: start
|
// http: start
|
||||||
httpch := make(chan CtrlHTTPResponse, 1)
|
httpch := make(chan CtrlHTTPResponse, 1)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go HTTPDo(ctx, &HTTPConfig{
|
go HTTPDo(ctx, &HTTPConfig{
|
||||||
Client: config.Client,
|
|
||||||
Headers: creq.HTTPRequestHeaders,
|
Headers: creq.HTTPRequestHeaders,
|
||||||
MaxAcceptableBody: config.MaxAcceptableBody,
|
MaxAcceptableBody: config.MaxAcceptableBody,
|
||||||
|
NewClient: config.NewClient,
|
||||||
Out: httpch,
|
Out: httpch,
|
||||||
URL: creq.HTTPRequest,
|
URL: creq.HTTPRequest,
|
||||||
Wg: wg,
|
Wg: wg,
|
||||||
|
|
|
@ -20,16 +20,18 @@ type TCPResultPair struct {
|
||||||
|
|
||||||
// TCPConfig configures the TCP connect check.
|
// TCPConfig configures the TCP connect check.
|
||||||
type TCPConfig struct {
|
type TCPConfig struct {
|
||||||
Dialer model.Dialer
|
Endpoint string
|
||||||
Endpoint string
|
NewDialer func() model.Dialer
|
||||||
Out chan TCPResultPair
|
Out chan TCPResultPair
|
||||||
Wg *sync.WaitGroup
|
Wg *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// TCPDo performs the TCP check.
|
// TCPDo performs the TCP check.
|
||||||
func TCPDo(ctx context.Context, config *TCPConfig) {
|
func TCPDo(ctx context.Context, config *TCPConfig) {
|
||||||
defer config.Wg.Done()
|
defer config.Wg.Done()
|
||||||
conn, err := config.Dialer.DialContext(ctx, "tcp", config.Endpoint)
|
dialer := config.NewDialer()
|
||||||
|
defer dialer.CloseIdleConnections()
|
||||||
|
conn, err := dialer.DialContext(ctx, "tcp", config.Endpoint)
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,10 +14,10 @@ import (
|
||||||
|
|
||||||
// Handler implements the Web Connectivity test helper HTTP API.
|
// Handler implements the Web Connectivity test helper HTTP API.
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
Client model.HTTPClient
|
|
||||||
Dialer model.Dialer
|
|
||||||
MaxAcceptableBody int64
|
MaxAcceptableBody int64
|
||||||
Resolver model.Resolver
|
NewClient func() model.HTTPClient
|
||||||
|
NewDialer func() model.Dialer
|
||||||
|
NewResolver func() model.Resolver
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
func (h Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||||
|
|
|
@ -51,10 +51,16 @@ const requestWithoutDomainName = `{
|
||||||
|
|
||||||
func TestWorkingAsIntended(t *testing.T) {
|
func TestWorkingAsIntended(t *testing.T) {
|
||||||
handler := Handler{
|
handler := Handler{
|
||||||
Client: http.DefaultClient,
|
|
||||||
Dialer: netxlite.NewDialerWithStdlibResolver(model.DiscardLogger),
|
|
||||||
MaxAcceptableBody: 1 << 24,
|
MaxAcceptableBody: 1 << 24,
|
||||||
Resolver: netxlite.NewUnwrappedStdlibResolver(),
|
NewClient: func() model.HTTPClient {
|
||||||
|
return http.DefaultClient
|
||||||
|
},
|
||||||
|
NewDialer: func() model.Dialer {
|
||||||
|
return netxlite.NewDialerWithStdlibResolver(model.DiscardLogger)
|
||||||
|
},
|
||||||
|
NewResolver: func() model.Resolver {
|
||||||
|
return netxlite.NewUnwrappedStdlibResolver()
|
||||||
|
},
|
||||||
}
|
}
|
||||||
srv := httptest.NewServer(handler)
|
srv := httptest.NewServer(handler)
|
||||||
defer srv.Close()
|
defer srv.Close()
|
||||||
|
|
|
@ -17,22 +17,22 @@ import (
|
||||||
const maxAcceptableBody = 1 << 24
|
const maxAcceptableBody = 1 << 24
|
||||||
|
|
||||||
var (
|
var (
|
||||||
dialer model.Dialer
|
endpoint = flag.String("endpoint", ":8080", "Endpoint where to listen")
|
||||||
endpoint = flag.String("endpoint", ":8080", "Endpoint where to listen")
|
srvcancel context.CancelFunc
|
||||||
httpClient model.HTTPClient
|
srvctx context.Context
|
||||||
resolver model.Resolver
|
srvwg = new(sync.WaitGroup)
|
||||||
srvcancel context.CancelFunc
|
|
||||||
srvctx context.Context
|
|
||||||
srvwg = new(sync.WaitGroup)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
srvctx, srvcancel = context.WithCancel(context.Background())
|
srvctx, srvcancel = context.WithCancel(context.Background())
|
||||||
|
}
|
||||||
|
|
||||||
|
func newresolver() model.Resolver {
|
||||||
// Implementation note: pin to a specific resolver so we don't depend upon the
|
// Implementation note: pin to a specific resolver so we don't depend upon the
|
||||||
// default resolver configured by the box. Also, use an encrypted transport thus
|
// default resolver configured by the box. Also, use an encrypted transport thus
|
||||||
// we're less vulnerable to any policy implemented by the box's provider.
|
// we're less vulnerable to any policy implemented by the box's provider.
|
||||||
resolver = netxlite.NewParallelDNSOverHTTPSResolver(log.Log, "https://8.8.8.8/dns-query")
|
resolver := netxlite.NewParallelDNSOverHTTPSResolver(log.Log, "https://8.8.8.8/dns-query")
|
||||||
httpClient = netxlite.NewHTTPClientWithResolver(log.Log, resolver)
|
return resolver
|
||||||
}
|
}
|
||||||
|
|
||||||
func shutdown(srv *http.Server) {
|
func shutdown(srv *http.Server) {
|
||||||
|
@ -55,10 +55,14 @@ func main() {
|
||||||
func testableMain() {
|
func testableMain() {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
mux.Handle("/", webconnectivity.Handler{
|
mux.Handle("/", webconnectivity.Handler{
|
||||||
Client: httpClient,
|
|
||||||
Dialer: dialer,
|
|
||||||
MaxAcceptableBody: maxAcceptableBody,
|
MaxAcceptableBody: maxAcceptableBody,
|
||||||
Resolver: resolver,
|
NewClient: func() model.HTTPClient {
|
||||||
|
return netxlite.NewHTTPClientWithResolver(log.Log, newresolver())
|
||||||
|
},
|
||||||
|
NewDialer: func() model.Dialer {
|
||||||
|
return netxlite.NewDialerWithResolver(log.Log, newresolver())
|
||||||
|
},
|
||||||
|
NewResolver: newresolver,
|
||||||
})
|
})
|
||||||
srv := &http.Server{Addr: *endpoint, Handler: mux}
|
srv := &http.Server{Addr: *endpoint, Handler: mux}
|
||||||
srvwg.Add(1)
|
srvwg.Add(1)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user