diff --git a/internal/httpclientx/getjson.go b/internal/httpclientx/getjson.go index 417ccfca2..346c52423 100644 --- a/internal/httpclientx/getjson.go +++ b/internal/httpclientx/getjson.go @@ -21,7 +21,7 @@ import ( // // This function either returns an error or a valid Output. func GetJSON[Output any](ctx context.Context, epnt *Endpoint, config *Config) (Output, error) { - return NewOverlappedGetJSON[Output](config).Run(ctx, epnt) + return OverlappedIgnoreIndex(NewOverlappedGetJSON[Output](config).Run(ctx, epnt)) } func getJSON[Output any](ctx context.Context, epnt *Endpoint, config *Config) (Output, error) { diff --git a/internal/httpclientx/getraw.go b/internal/httpclientx/getraw.go index e5b87ec5b..da155215a 100644 --- a/internal/httpclientx/getraw.go +++ b/internal/httpclientx/getraw.go @@ -21,7 +21,7 @@ import ( // // This function either returns an error or a valid Output. func GetRaw(ctx context.Context, epnt *Endpoint, config *Config) ([]byte, error) { - return NewOverlappedGetRaw(config).Run(ctx, epnt) + return OverlappedIgnoreIndex(NewOverlappedGetRaw(config).Run(ctx, epnt)) } func getRaw(ctx context.Context, epnt *Endpoint, config *Config) ([]byte, error) { diff --git a/internal/httpclientx/getxml.go b/internal/httpclientx/getxml.go index 54452c375..8163f3df3 100644 --- a/internal/httpclientx/getxml.go +++ b/internal/httpclientx/getxml.go @@ -21,7 +21,7 @@ import ( // // This function either returns an error or a valid Output. func GetXML[Output any](ctx context.Context, epnt *Endpoint, config *Config) (Output, error) { - return NewOverlappedGetXML[Output](config).Run(ctx, epnt) + return OverlappedIgnoreIndex(NewOverlappedGetXML[Output](config).Run(ctx, epnt)) } func getXML[Output any](ctx context.Context, epnt *Endpoint, config *Config) (Output, error) { diff --git a/internal/httpclientx/overlapped.go b/internal/httpclientx/overlapped.go index 76aacbe54..8577e119f 100644 --- a/internal/httpclientx/overlapped.go +++ b/internal/httpclientx/overlapped.go @@ -8,14 +8,16 @@ import ( "context" "errors" "time" - - "github.com/ooni/probe-cli/v3/internal/erroror" ) // OverlappedDefaultScheduleInterval is the default schedule interval. After this interval // has elapsed for a URL without seeing a success, we will schedule the next URL. const OverlappedDefaultScheduleInterval = 15 * time.Second +// OverlappedDefaultWatchdogTimeout is the timeout after which we assume all the API calls +// have gone rogue and forcibly interrupt all of them. +const OverlappedDefaultWatchdogTimeout = 5 * time.Minute + // Overlapped represents the possibility of overlapping HTTP calls for a set of // functionally equivalent URLs, such that we start a new call if the previous one // has failed to produce a result within the configured ScheduleInterval. @@ -24,7 +26,7 @@ const OverlappedDefaultScheduleInterval = 15 * time.Second // // Under very bad networking conditions, [*Overlapped] would cause a new network // call to start while the previous one is still in progress and very slowly downloading -// a response. A future implementation SHOULD probably account for this possibility. +// a response. A future implementation MIGHT want to account for this possibility. type Overlapped[Output any] struct { // RunFunc is the MANDATORY function that fetches the given [*Endpoint]. // @@ -42,12 +44,22 @@ type Overlapped[Output any] struct { // // If you set it manually, you MUST modify it before calling [*Overlapped.Run]. ScheduleInterval time.Duration + + // WatchdogTimeout is the MANDATORY timeout after which the code assumes + // that all API calls must be aborted and give up. + // + // This field is typically initialized by [NewOverlappedGetJSON], [NewOverlappedGetRaw], + // [NewOverlappedGetXML], or [NewOverlappedPostJSON] to be [OverlappedDefaultWatchdogTimeout]. + // + // If you set it manually, you MUST modify it before calling [*Overlapped.Run]. + WatchdogTimeout time.Duration } func newOverlappedWithFunc[Output any](fx func(context.Context, *Endpoint) (Output, error)) *Overlapped[Output] { return &Overlapped[Output]{ RunFunc: fx, ScheduleInterval: OverlappedDefaultScheduleInterval, + WatchdogTimeout: OverlappedDefaultWatchdogTimeout, } } @@ -83,81 +95,149 @@ func NewOverlappedPostJSON[Input, Output any](input Input, config *Config) *Over var ErrGenericOverlappedFailure = errors.New("overlapped: generic failure") // Run runs the overlapped operations, returning the result of the first operation -// that succeeds and otherwise returning an error describing what happened. +// that succeeds and its endpoint index, or the error that occurred. +func (ovx *Overlapped[Output]) Run(ctx context.Context, epnts ...*Endpoint) (Output, int, error) { + return OverlappedReduce[Output](ovx.Map(ctx, epnts...)) +} + +// OverlappedErrorOr combines error information, result information and the endpoint index. +type OverlappedErrorOr[Output any] struct { + // Err is the error or nil. + Err error + + // Index is the endpoint index. + Index int + + // Value is the result. + Value Output +} + +// Map applies the [*Overlapped.RunFunc] function to each epnts entry, thus producing +// a result for each entry. This function will cancel subsequent operations until there +// is a success: subsequent results will be [context.Canceled] errors. // -// # Limitations +// Note that you SHOULD use [*Overlapped.Run] unless you want to observe the result +// of each operation, which is mostly useful when running unit tests. // -// This implementation creates a new goroutine for each provided URL under the assumption that -// the overall number of URLs is small. A future revision would address this issue. -func (ovx *Overlapped[Output]) Run(ctx context.Context, epnts ...*Endpoint) (Output, error) { - // create cancellable context for early cancellation - ctx, cancel := context.WithCancel(ctx) +// Note that this function will return a zero length slice if epnts lenth is also zero. +func (ovx *Overlapped[Output]) Map(ctx context.Context, epnts ...*Endpoint) []*OverlappedErrorOr[Output] { + // create cancellable context for early cancellation and also apply the + // watchdog timeout so that eventually this code returns. + // + // we are going to cancel this context as soon as we have a successful response so + // that we do not waste network resources by performing other attempts. + ctx, cancel := context.WithTimeout(ctx, ovx.WatchdogTimeout) defer cancel() // construct channel for collecting the results - output := make(chan *erroror.Value[Output]) + // + // we're using this channel to communicate results back from goroutines running + // in the background and performing the real API call + output := make(chan *OverlappedErrorOr[Output]) - // schedule a measuring goroutine per URL. - for idx := 0; idx < len(epnts); idx++ { - go ovx.transact(ctx, idx, epnts[idx], output) - } + // create ticker for scheduling subsequent attempts + // + // the ticker is going to tick at every schedule interval to start another + // attempt, if the previous attempt has not produced a result in time + ticker := time.NewTicker(ovx.ScheduleInterval) + defer ticker.Stop() - // we expect to see exactly a response for each goroutine - var ( - firstOutput *Output - errorv []error - ) - for idx := 0; idx < len(epnts); idx++ { - // get a result from one of the goroutines - result := <-output - - // handle the error case - if result.Err != nil { - errorv = append(errorv, result.Err) - continue - } + // create index for the next endpoint to try + idx := 0 - // possibly record the first success - if firstOutput == nil { - firstOutput = &result.Value + // create vector for collecting results + // + // for simplicity, we're going to collect results from every goroutine + // including the ones cancelled by context after the previous success and + // then we're going to filter the results and produce a final result + results := []*OverlappedErrorOr[Output]{} + + // keep looping until we have results for each endpoints + for len(results) < len(epnts) { + + // if there are more endpoints to try, spawn a goroutine to try, + // and, otherwise, we can safely stop ticking + if idx < len(epnts) { + go ovx.transact(ctx, idx, epnts[idx], output) + idx++ + } else { + ticker.Stop() } - // make sure we interrupt all the other goroutines - cancel() + select { + // this event means that a child goroutine completed + // so we store the result; on success interrupt all the + // background goroutines and stop ticking + // + // note that we MUST continue reading until we have + // exactly `len(epnts)` results because the inner + // goroutine performs blocking writes on the channel + case res := <-output: + results = append(results, res) + if res.Err == nil { + ticker.Stop() + cancel() + } + + // this means the ticker ticked, so we should loop again and + // attempt another endpoint because it's time to do that + case <-ticker.C: + } } - // handle the case of success - if firstOutput != nil { - return *firstOutput, nil + // just send the results vector back to the caller + return results +} + +// OverlappedReduce takes the results of [*Overlapped.Map] and returns either an Output or an error. +// +// Note that you SHOULD use [*Overlapped.Run] unless you want to observe the result +// of each operation, which is mostly useful when running unit tests. +// +// The return value is (output, index, nil) on success and (zero, zero, error) on failure. +func OverlappedReduce[Output any](results []*OverlappedErrorOr[Output]) (Output, int, error) { + // postprocess the results to check for success and + // aggregate all the errors that occurred + errorv := []error{} + for _, res := range results { + if res.Err == nil { + return res.Value, res.Index, nil + } + errorv = append(errorv, res.Err) } // handle the case where there's no error + // + // this happens if the user provided no endpoints to measure if len(errorv) <= 0 { errorv = append(errorv, ErrGenericOverlappedFailure) } // return zero value and errors list - return *new(Output), errors.Join(errorv...) + // + // note that errors.Join returns nil if all the errors are nil or the + // list is nil, which is why we handle the corner case above + return *new(Output), 0, errors.Join(errorv...) } // transact performs an HTTP transaction with the given URL and writes results to the output channel. -func (ovx *Overlapped[Output]) transact(ctx context.Context, idx int, epnt *Endpoint, output chan<- *erroror.Value[Output]) { - // wait for our time to start - // - // add one nanosecond to make sure the delay is always positive - timer := time.NewTimer(time.Duration(idx)*ovx.ScheduleInterval + time.Nanosecond) - defer timer.Stop() - select { - case <-ctx.Done(): - output <- &erroror.Value[Output]{Err: ctx.Err()} - return - case <-timer.C: - // fallthrough - } - +func (ovx *Overlapped[Output]) transact( + ctx context.Context, idx int, epnt *Endpoint, output chan<- *OverlappedErrorOr[Output]) { // obtain the results value, err := ovx.RunFunc(ctx, epnt) // emit the results - output <- &erroror.Value[Output]{Err: err, Value: value} + // + // note that this unconditional channel write REQUIRES that we keep reading from + // the results channel in Run until we have a result per input endpoint + output <- &OverlappedErrorOr[Output]{ + Err: err, + Index: idx, + Value: value, + } +} + +// OverlappedIgnoreIndex is a filter that removes the index from [*Overlapped.Run] results. +func OverlappedIgnoreIndex[Output any](value Output, _ int, err error) (Output, error) { + return value, err } diff --git a/internal/httpclientx/overlapped_test.go b/internal/httpclientx/overlapped_test.go index 8e904dc84..e09d4c95d 100644 --- a/internal/httpclientx/overlapped_test.go +++ b/internal/httpclientx/overlapped_test.go @@ -11,6 +11,8 @@ import ( "github.com/google/go-cmp/cmp" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/must" + "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/runtimex" "github.com/ooni/probe-cli/v3/internal/testingx" ) @@ -18,14 +20,26 @@ import ( // an [*Overlapped], we do not necessarily need to test that each top-level constructor // are WAI; rather, we should focus on the mechanics of multiple URLs. -func TestNewOverlappedPostJSONIsPerformingOverlappedCalls(t *testing.T) { +func TestNewOverlappedPostJSONFastRecoverFromEarlyErrors(t *testing.T) { + // // Scenario: // // - 0.th.ooni.org is SNI blocked // - 1.th.ooni.org is SNI blocked // - 2.th.ooni.org is SNI blocked // - 3.th.ooni.org WAIs + // + // We expect to get a response from 3.th.ooni.org. + // + // Because the first three THs fail fast but the schedule interval is the default (i.e., + // 15 seconds), we're testing whether the algorithm allows us to recover quickly from + // failure and check the other endpoints without waiting for too much time. + // + // Note: before changing the algorithm, this test ran for 45 seconds. Now it runs + // for 1s because a previous goroutine terminating with error causes the next + // goroutine to start and attempt to fetch the resource. + // zeroTh := testingx.MustNewHTTPServer(testingx.HTTPHandlerReset()) defer zeroTh.Close() @@ -60,12 +74,12 @@ func TestNewOverlappedPostJSONIsPerformingOverlappedCalls(t *testing.T) { UserAgent: model.HTTPHeaderUserAgent, }) - // make sure we set a low scheduling interval to make test faster - overlapped.ScheduleInterval = time.Second - // Now we issue the requests and check we're getting the correct response. + // + // We're splitting the algorithm into its Map step and its Reduce step because + // this allows us to clearly observe what happened. - apiResp, err := overlapped.Run( + results := overlapped.Map( context.Background(), NewEndpoint(zeroTh.URL), NewEndpoint(oneTh.URL), @@ -73,53 +87,84 @@ func TestNewOverlappedPostJSONIsPerformingOverlappedCalls(t *testing.T) { NewEndpoint(threeTh.URL), ) + runtimex.Assert(len(results) == 4, "unexpected number of results") + + // the first three attempts should have failed with connection reset + // while the fourth result should be successful + for _, entry := range results { + t.Log(entry.Index, string(must.MarshalJSON(entry))) + switch entry.Index { + case 0, 1, 2: + if err := entry.Err; !errors.Is(err, netxlite.ECONNRESET) { + t.Fatal("unexpected error", err) + } + case 3: + if err := entry.Err; err != nil { + t.Fatal("unexpected error", err) + } + if diff := cmp.Diff(expectedResponse, entry.Value); diff != "" { + t.Fatal(diff) + } + default: + t.Fatal("unexpected index", entry.Index) + } + } + + // Now run the reduce step of the algorithm and make sure we correctly + // return the first success and the nil error + + apiResp, idx, err := OverlappedReduce(results) + // we do not expect to see a failure because threeTh is WAI if err != nil { t.Fatal(err) } + if idx != 3 { + t.Fatal("unexpected success index", idx) + } + // compare response to expectation if diff := cmp.Diff(expectedResponse, apiResp); diff != "" { t.Fatal(diff) } } -func TestNewOverlappedPostJSONCancelsPendingCalls(t *testing.T) { +func TestNewOverlappedPostJSONFirstCallSucceeds(t *testing.T) { + // // Scenario: // - // - 0.th.ooni.org is WAI but slow + // - 0.th.ooni.org is WAI // - 1.th.ooni.org is WAI // - 2.th.ooni.org is WAI // - 3.th.ooni.org is WAI + // + // We expect to get a response from the first TH because it's the first goroutine + // that we schedule. Subsequent calls should be canceled. + // expectedResponse := &apiResponse{ Age: 41, Name: "sbs", } - slowwakeup := make(chan any) - zeroTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - <-slowwakeup w.Write(must.MarshalJSON(expectedResponse)) })) defer zeroTh.Close() oneTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - <-slowwakeup w.Write(must.MarshalJSON(expectedResponse)) })) defer oneTh.Close() twoTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - <-slowwakeup w.Write(must.MarshalJSON(expectedResponse)) })) defer twoTh.Close() threeTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - <-slowwakeup w.Write(must.MarshalJSON(expectedResponse)) })) defer threeTh.Close() @@ -143,21 +188,129 @@ func TestNewOverlappedPostJSONCancelsPendingCalls(t *testing.T) { // to fetch from their respective URLs. overlapped.ScheduleInterval = 15 * time.Second - // In the background we're going to emit slow wakeup signals at fixed intervals - // after an initial waiting interval, such that goroutines unblock in order + // Now we issue the requests and check we're getting the correct response. + // + // We're splitting the algorithm into its Map step and its Reduce step because + // this allows us to clearly observe what happened. - go func() { - time.Sleep(250 * time.Millisecond) - for idx := 0; idx < 4; idx++ { - slowwakeup <- true - time.Sleep(250 * time.Millisecond) + results := overlapped.Map( + context.Background(), + NewEndpoint(zeroTh.URL), + NewEndpoint(oneTh.URL), + NewEndpoint(twoTh.URL), + NewEndpoint(threeTh.URL), + ) + + runtimex.Assert(len(results) == 4, "unexpected number of results") + + // the first attempt should succeed and subsequent ones should + // have failed with the context.Canceled error + for _, entry := range results { + t.Log(entry.Index, string(must.MarshalJSON(entry))) + switch entry.Index { + case 1, 2, 3: + if err := entry.Err; !errors.Is(err, context.Canceled) { + t.Fatal("unexpected error", err) + } + case 0: + if err := entry.Err; err != nil { + t.Fatal("unexpected error", err) + } + if diff := cmp.Diff(expectedResponse, entry.Value); diff != "" { + t.Fatal(diff) + } + default: + t.Fatal("unexpected index", entry.Index) } - close(slowwakeup) - }() + } + + // Now run the reduce step of the algorithm and make sure we correctly + // return the first success and the nil error + + apiResp, idx, err := OverlappedReduce(results) + + // we do not expect to see a failure because all the THs are WAI + if err != nil { + t.Fatal(err) + } + + if idx != 0 { + t.Fatal("unexpected success index", idx) + } + + // compare response to expectation + if diff := cmp.Diff(expectedResponse, apiResp); diff != "" { + t.Fatal(diff) + } +} + +func TestNewOverlappedPostJSONHandlesAllTimeouts(t *testing.T) { + + // + // Scenario: + // + // - 0.th.ooni.org causes timeout + // - 1.th.ooni.org causes timeout + // - 2.th.ooni.org causes timeout + // - 3.th.ooni.org causes timeout + // + // We expect to loop for all endpoints and then discover that all of them + // failed. To make the test ~quick, we reduce the scheduling interval, and + // the watchdog timeout. + // + + blockforever := make(chan any) + + zeroTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-blockforever + w.WriteHeader(http.StatusBadGateway) + })) + defer zeroTh.Close() + + oneTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-blockforever + w.WriteHeader(http.StatusBadGateway) + })) + defer oneTh.Close() + + twoTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-blockforever + w.WriteHeader(http.StatusBadGateway) + })) + defer twoTh.Close() + + threeTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-blockforever + w.WriteHeader(http.StatusBadGateway) + })) + defer threeTh.Close() + + // Create client configuration. We don't care much about the + // JSON requests and reponses being aligned to reality. + + apiReq := &apiRequest{ + UserID: 117, + } + + overlapped := NewOverlappedPostJSON[*apiRequest, *apiResponse](apiReq, &Config{ + Authorization: "", // not relevant for this test + Client: http.DefaultClient, + Logger: log.Log, + UserAgent: model.HTTPHeaderUserAgent, + }) + + // make sure the schedule interval is low to make this test run faster. + overlapped.ScheduleInterval = 250 * time.Millisecond // Now we issue the requests and check we're getting the correct response. + // + // We're splitting the algorithm into its Map step and its Reduce step because + // this allows us to clearly observe what happened. + + // modify the watchdog timeout be much smaller than usual + overlapped.WatchdogTimeout = 2 * time.Second - apiResp, err := overlapped.Run( + results := overlapped.Map( context.Background(), NewEndpoint(zeroTh.URL), NewEndpoint(oneTh.URL), @@ -165,15 +318,166 @@ func TestNewOverlappedPostJSONCancelsPendingCalls(t *testing.T) { NewEndpoint(threeTh.URL), ) - // we do not expect to see a failure because threeTh is WAI + runtimex.Assert(len(results) == 4, "unexpected number of results") + + // all the attempts should have failed with context deadline exceeded + for _, entry := range results { + t.Log(entry.Index, string(must.MarshalJSON(entry))) + switch entry.Index { + case 0, 1, 2, 3: + if err := entry.Err; !errors.Is(err, context.DeadlineExceeded) { + t.Fatal("unexpected error", err) + } + default: + t.Fatal("unexpected index", entry.Index) + } + } + + // Now run the reduce step of the algorithm and make sure we correctly + // return the first success and the nil error + + apiResp, idx, err := OverlappedReduce(results) + + // we expect to see a failure because the watchdog timeout should have fired + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatal("unexpected error", err) + } + + if idx != 0 { + t.Fatal("unexpected index", idx) + } + + // we expect the api response to be nil + if apiResp != nil { + t.Fatal("expected nil resp") + } + + // now unblock the blocked goroutines + close(blockforever) +} + +func TestNewOverlappedPostJSONResetTimeoutSuccessCanceled(t *testing.T) { + + // + // Scenario: + // + // - 0.th.ooni.org resets the connection + // - 1.th.ooni.org causes timeout + // - 2.th.ooni.org is WAI + // - 3.th.ooni.org causes timeout + // + // We expect to see a success and to never attempt with 3.th.ooni.org. + // + + blockforever := make(chan any) + + zeroTh := testingx.MustNewHTTPServer(testingx.HTTPHandlerReset()) + defer zeroTh.Close() + + oneTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-blockforever + w.WriteHeader(http.StatusBadGateway) + })) + defer oneTh.Close() + + expectedResponse := &apiResponse{ + Age: 41, + Name: "sbs", + } + + twoTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write(must.MarshalJSON(expectedResponse)) + })) + defer twoTh.Close() + + threeTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-blockforever + w.WriteHeader(http.StatusBadGateway) + })) + defer threeTh.Close() + + // Create client configuration. We don't care much about the + // JSON requests and reponses being aligned to reality. + + apiReq := &apiRequest{ + UserID: 117, + } + + overlapped := NewOverlappedPostJSON[*apiRequest, *apiResponse](apiReq, &Config{ + Authorization: "", // not relevant for this test + Client: http.DefaultClient, + Logger: log.Log, + UserAgent: model.HTTPHeaderUserAgent, + }) + + // make sure the schedule interval is low to make this test run faster. + overlapped.ScheduleInterval = 250 * time.Millisecond + + // Now we issue the requests and check we're getting the correct response. + // + // We're splitting the algorithm into its Map step and its Reduce step because + // this allows us to clearly observe what happened. + // + // Note: we're running this test with the default watchdog timeout. + + results := overlapped.Map( + context.Background(), + NewEndpoint(zeroTh.URL), + NewEndpoint(oneTh.URL), + NewEndpoint(twoTh.URL), + NewEndpoint(threeTh.URL), + ) + + runtimex.Assert(len(results) == 4, "unexpected number of results") + + // attempt 0: should have seen connection reset + // attempt 1: should have seen the context canceled + // attempt 2: should be successful + // attempt 3: should have seen the context canceled + for _, entry := range results { + t.Log(entry.Index, string(must.MarshalJSON(entry))) + switch entry.Index { + case 0: + if err := entry.Err; !errors.Is(err, netxlite.ECONNRESET) { + t.Fatal("unexpected error", err) + } + case 1, 3: + if err := entry.Err; !errors.Is(err, context.Canceled) { + t.Fatal("unexpected error", err) + } + case 2: + if err := entry.Err; err != nil { + t.Fatal("unexpected error", err) + } + if diff := cmp.Diff(expectedResponse, entry.Value); diff != "" { + t.Fatal(diff) + } + default: + t.Fatal("unexpected index", entry.Index) + } + } + + // Now run the reduce step of the algorithm and make sure we correctly + // return the first success and the nil error + + apiResp, idx, err := OverlappedReduce(results) + + // we do not expect to see a failure because one of the THs is WAI if err != nil { t.Fatal(err) } + if idx != 2 { + t.Fatal("unexpected success index", idx) + } + // compare response to expectation if diff := cmp.Diff(expectedResponse, apiResp); diff != "" { t.Fatal(diff) } + + // now unblock the blocked goroutines + close(blockforever) } func TestNewOverlappedPostJSONWithNoURLs(t *testing.T) { @@ -195,15 +499,31 @@ func TestNewOverlappedPostJSONWithNoURLs(t *testing.T) { // Now we issue the requests without any URLs and make sure // the result we get is the generic overlapped error - apiResp, err := overlapped.Run(context.Background()) + apiResp, idx, err := overlapped.Run(context.Background() /* no URLs here! */) - // we do not expect to see a failure because threeTh is WAI + // we do expect to see the generic overlapped failure if !errors.Is(err, ErrGenericOverlappedFailure) { t.Fatal("unexpected error", err) } + if idx != 0 { + t.Fatal("unexpected index", idx) + } + // we expect a nil response if apiResp != nil { t.Fatal("expected nil API response") } } + +func TestNewOverlappedWithFuncDefaultsAreCorrect(t *testing.T) { + overlapped := newOverlappedWithFunc(func(ctx context.Context, e *Endpoint) (int, error) { + return 1, nil + }) + if overlapped.ScheduleInterval != 15*time.Second { + t.Fatal("unexpected ScheduleInterval") + } + if overlapped.WatchdogTimeout != 5*time.Minute { + t.Fatal("unexpected WatchdogTimeout") + } +} diff --git a/internal/httpclientx/postjson.go b/internal/httpclientx/postjson.go index 43b4c6317..1dd2efe67 100644 --- a/internal/httpclientx/postjson.go +++ b/internal/httpclientx/postjson.go @@ -25,7 +25,7 @@ import ( // // This function either returns an error or a valid Output. func PostJSON[Input, Output any](ctx context.Context, epnt *Endpoint, input Input, config *Config) (Output, error) { - return NewOverlappedPostJSON[Input, Output](input, config).Run(ctx, epnt) + return OverlappedIgnoreIndex(NewOverlappedPostJSON[Input, Output](input, config).Run(ctx, epnt)) } func postJSON[Input, Output any](ctx context.Context, epnt *Endpoint, input Input, config *Config) (Output, error) {