Skip to content

Commit

Permalink
httpclientx: implement fast recovery and return endpoint index (#1586)
Browse files Browse the repository at this point in the history
  • Loading branch information
bassosimone committed May 3, 2024
1 parent 9dbe15c commit 929d46a
Show file tree
Hide file tree
Showing 6 changed files with 483 additions and 83 deletions.
2 changes: 1 addition & 1 deletion internal/httpclientx/getjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
//
// This function either returns an error or a valid Output.
func GetJSON[Output any](ctx context.Context, epnt *Endpoint, config *Config) (Output, error) {
return NewOverlappedGetJSON[Output](config).Run(ctx, epnt)
return OverlappedIgnoreIndex(NewOverlappedGetJSON[Output](config).Run(ctx, epnt))
}

func getJSON[Output any](ctx context.Context, epnt *Endpoint, config *Config) (Output, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/httpclientx/getraw.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
//
// This function either returns an error or a valid Output.
func GetRaw(ctx context.Context, epnt *Endpoint, config *Config) ([]byte, error) {
return NewOverlappedGetRaw(config).Run(ctx, epnt)
return OverlappedIgnoreIndex(NewOverlappedGetRaw(config).Run(ctx, epnt))
}

func getRaw(ctx context.Context, epnt *Endpoint, config *Config) ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/httpclientx/getxml.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
//
// This function either returns an error or a valid Output.
func GetXML[Output any](ctx context.Context, epnt *Endpoint, config *Config) (Output, error) {
return NewOverlappedGetXML[Output](config).Run(ctx, epnt)
return OverlappedIgnoreIndex(NewOverlappedGetXML[Output](config).Run(ctx, epnt))
}

func getXML[Output any](ctx context.Context, epnt *Endpoint, config *Config) (Output, error) {
Expand Down
186 changes: 133 additions & 53 deletions internal/httpclientx/overlapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import (
"context"
"errors"
"time"

"github.com/ooni/probe-cli/v3/internal/erroror"
)

// OverlappedDefaultScheduleInterval is the default schedule interval. After this interval
// has elapsed for a URL without seeing a success, we will schedule the next URL.
const OverlappedDefaultScheduleInterval = 15 * time.Second

// OverlappedDefaultWatchdogTimeout is the timeout after which we assume all the API calls
// have gone rogue and forcibly interrupt all of them.
const OverlappedDefaultWatchdogTimeout = 5 * time.Minute

// Overlapped represents the possibility of overlapping HTTP calls for a set of
// functionally equivalent URLs, such that we start a new call if the previous one
// has failed to produce a result within the configured ScheduleInterval.
Expand All @@ -24,7 +26,7 @@ const OverlappedDefaultScheduleInterval = 15 * time.Second
//
// Under very bad networking conditions, [*Overlapped] would cause a new network
// call to start while the previous one is still in progress and very slowly downloading
// a response. A future implementation SHOULD probably account for this possibility.
// a response. A future implementation MIGHT want to account for this possibility.
type Overlapped[Output any] struct {
// RunFunc is the MANDATORY function that fetches the given [*Endpoint].
//
Expand All @@ -42,12 +44,22 @@ type Overlapped[Output any] struct {
//
// If you set it manually, you MUST modify it before calling [*Overlapped.Run].
ScheduleInterval time.Duration

// WatchdogTimeout is the MANDATORY timeout after which the code assumes
// that all API calls must be aborted and give up.
//
// This field is typically initialized by [NewOverlappedGetJSON], [NewOverlappedGetRaw],
// [NewOverlappedGetXML], or [NewOverlappedPostJSON] to be [OverlappedDefaultWatchdogTimeout].
//
// If you set it manually, you MUST modify it before calling [*Overlapped.Run].
WatchdogTimeout time.Duration
}

func newOverlappedWithFunc[Output any](fx func(context.Context, *Endpoint) (Output, error)) *Overlapped[Output] {
return &Overlapped[Output]{
RunFunc: fx,
ScheduleInterval: OverlappedDefaultScheduleInterval,
WatchdogTimeout: OverlappedDefaultWatchdogTimeout,
}
}

Expand Down Expand Up @@ -83,81 +95,149 @@ func NewOverlappedPostJSON[Input, Output any](input Input, config *Config) *Over
var ErrGenericOverlappedFailure = errors.New("overlapped: generic failure")

// Run runs the overlapped operations, returning the result of the first operation
// that succeeds and otherwise returning an error describing what happened.
// that succeeds and its endpoint index, or the error that occurred.
func (ovx *Overlapped[Output]) Run(ctx context.Context, epnts ...*Endpoint) (Output, int, error) {
return OverlappedReduce[Output](ovx.Map(ctx, epnts...))
}

// OverlappedErrorOr combines error information, result information and the endpoint index.
type OverlappedErrorOr[Output any] struct {
// Err is the error or nil.
Err error

// Index is the endpoint index.
Index int

// Value is the result.
Value Output
}

// Map applies the [*Overlapped.RunFunc] function to each epnts entry, thus producing
// a result for each entry. This function will cancel subsequent operations until there
// is a success: subsequent results will be [context.Canceled] errors.
//
// # Limitations
// Note that you SHOULD use [*Overlapped.Run] unless you want to observe the result
// of each operation, which is mostly useful when running unit tests.
//
// This implementation creates a new goroutine for each provided URL under the assumption that
// the overall number of URLs is small. A future revision would address this issue.
func (ovx *Overlapped[Output]) Run(ctx context.Context, epnts ...*Endpoint) (Output, error) {
// create cancellable context for early cancellation
ctx, cancel := context.WithCancel(ctx)
// Note that this function will return a zero length slice if epnts lenth is also zero.
func (ovx *Overlapped[Output]) Map(ctx context.Context, epnts ...*Endpoint) []*OverlappedErrorOr[Output] {
// create cancellable context for early cancellation and also apply the
// watchdog timeout so that eventually this code returns.
//
// we are going to cancel this context as soon as we have a successful response so
// that we do not waste network resources by performing other attempts.
ctx, cancel := context.WithTimeout(ctx, ovx.WatchdogTimeout)
defer cancel()

// construct channel for collecting the results
output := make(chan *erroror.Value[Output])
//
// we're using this channel to communicate results back from goroutines running
// in the background and performing the real API call
output := make(chan *OverlappedErrorOr[Output])

// schedule a measuring goroutine per URL.
for idx := 0; idx < len(epnts); idx++ {
go ovx.transact(ctx, idx, epnts[idx], output)
}
// create ticker for scheduling subsequent attempts
//
// the ticker is going to tick at every schedule interval to start another
// attempt, if the previous attempt has not produced a result in time
ticker := time.NewTicker(ovx.ScheduleInterval)
defer ticker.Stop()

// we expect to see exactly a response for each goroutine
var (
firstOutput *Output
errorv []error
)
for idx := 0; idx < len(epnts); idx++ {
// get a result from one of the goroutines
result := <-output

// handle the error case
if result.Err != nil {
errorv = append(errorv, result.Err)
continue
}
// create index for the next endpoint to try
idx := 0

// possibly record the first success
if firstOutput == nil {
firstOutput = &result.Value
// create vector for collecting results
//
// for simplicity, we're going to collect results from every goroutine
// including the ones cancelled by context after the previous success and
// then we're going to filter the results and produce a final result
results := []*OverlappedErrorOr[Output]{}

// keep looping until we have results for each endpoints
for len(results) < len(epnts) {

// if there are more endpoints to try, spawn a goroutine to try,
// and, otherwise, we can safely stop ticking
if idx < len(epnts) {
go ovx.transact(ctx, idx, epnts[idx], output)
idx++
} else {
ticker.Stop()
}

// make sure we interrupt all the other goroutines
cancel()
select {
// this event means that a child goroutine completed
// so we store the result; on success interrupt all the
// background goroutines and stop ticking
//
// note that we MUST continue reading until we have
// exactly `len(epnts)` results because the inner
// goroutine performs blocking writes on the channel
case res := <-output:
results = append(results, res)
if res.Err == nil {
ticker.Stop()
cancel()
}

// this means the ticker ticked, so we should loop again and
// attempt another endpoint because it's time to do that
case <-ticker.C:
}
}

// handle the case of success
if firstOutput != nil {
return *firstOutput, nil
// just send the results vector back to the caller
return results
}

// OverlappedReduce takes the results of [*Overlapped.Map] and returns either an Output or an error.
//
// Note that you SHOULD use [*Overlapped.Run] unless you want to observe the result
// of each operation, which is mostly useful when running unit tests.
//
// The return value is (output, index, nil) on success and (zero, zero, error) on failure.
func OverlappedReduce[Output any](results []*OverlappedErrorOr[Output]) (Output, int, error) {
// postprocess the results to check for success and
// aggregate all the errors that occurred
errorv := []error{}
for _, res := range results {
if res.Err == nil {
return res.Value, res.Index, nil
}
errorv = append(errorv, res.Err)
}

// handle the case where there's no error
//
// this happens if the user provided no endpoints to measure
if len(errorv) <= 0 {
errorv = append(errorv, ErrGenericOverlappedFailure)
}

// return zero value and errors list
return *new(Output), errors.Join(errorv...)
//
// note that errors.Join returns nil if all the errors are nil or the
// list is nil, which is why we handle the corner case above
return *new(Output), 0, errors.Join(errorv...)
}

// transact performs an HTTP transaction with the given URL and writes results to the output channel.
func (ovx *Overlapped[Output]) transact(ctx context.Context, idx int, epnt *Endpoint, output chan<- *erroror.Value[Output]) {
// wait for our time to start
//
// add one nanosecond to make sure the delay is always positive
timer := time.NewTimer(time.Duration(idx)*ovx.ScheduleInterval + time.Nanosecond)
defer timer.Stop()
select {
case <-ctx.Done():
output <- &erroror.Value[Output]{Err: ctx.Err()}
return
case <-timer.C:
// fallthrough
}

func (ovx *Overlapped[Output]) transact(
ctx context.Context, idx int, epnt *Endpoint, output chan<- *OverlappedErrorOr[Output]) {
// obtain the results
value, err := ovx.RunFunc(ctx, epnt)

// emit the results
output <- &erroror.Value[Output]{Err: err, Value: value}
//
// note that this unconditional channel write REQUIRES that we keep reading from
// the results channel in Run until we have a result per input endpoint
output <- &OverlappedErrorOr[Output]{
Err: err,
Index: idx,
Value: value,
}
}

// OverlappedIgnoreIndex is a filter that removes the index from [*Overlapped.Run] results.
func OverlappedIgnoreIndex[Output any](value Output, _ int, err error) (Output, error) {
return value, err
}
Loading

0 comments on commit 929d46a

Please sign in to comment.