From f6d79d35a5420eff28dc05bc79c03ef78b6f0ef0 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 8 Jun 2022 13:50:40 +0200 Subject: [PATCH 1/2] Use timeouts in tests for async writes --- acceptance_testing.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/acceptance_testing.go b/acceptance_testing.go index e7520ce6..43e8aa4d 100644 --- a/acceptance_testing.go +++ b/acceptance_testing.go @@ -460,8 +460,8 @@ func (d ConfigurableAcceptanceTestDriver) writeAsync(ctx context.Context, dest D return err } - // TODO create timeout for wait to prevent deadlock for badly written connectors - waitForAck.Wait() + // wait for each of the records, for at most the specified write timeout + waitTimeout(&waitForAck, time.Duration(len(records))*d.WriteTimeout()) if ackErr != nil { return ackErr } @@ -938,8 +938,8 @@ func (a acceptanceTest) TestDestination_WriteAsync_Success(t *testing.T) { is.NoErr(err) // wait for acks to get called - // TODO timeout if it takes too long - ackWg.Wait() + // wait for each of the records, for at most the specified write timeout + waitTimeout(&ackWg, time.Duration(20)*a.driver.WriteTimeout()) got := a.driver.ReadFromDestination(t, want) a.isEqualRecords(is, want, got) @@ -1129,3 +1129,17 @@ func (a acceptanceTest) isEqualData(is *is.I, want, got Data) { is.Equal(want.Bytes(), got.Bytes()) // data did not match (want != got) } } + +func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + return false // completed normally + case <-time.After(timeout): + return true // timed out + } +} From 1c1fbd2313b5bb4cfe56318acd10a9d6e9a84b83 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 9 Jun 2022 13:22:22 +0200 Subject: [PATCH 2/2] extract util functions --- acceptance_testing.go | 16 +------------ destination.go | 25 +++------------------ wait_utils.go | 52 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 37 deletions(-) create mode 100644 wait_utils.go diff --git a/acceptance_testing.go b/acceptance_testing.go index 43e8aa4d..5af09cc5 100644 --- a/acceptance_testing.go +++ b/acceptance_testing.go @@ -208,7 +208,7 @@ func (d ConfigurableAcceptanceTestDriver) Skip(t *testing.T) { for _, skipRegex := range skipRegexs { if skipRegex.MatchString(t.Name()) { - t.Skip(fmt.Sprintf("caller requested to skip tests that match the regex %q", skipRegex.String())) + t.Skipf("caller requested to skip tests that match the regex %q", skipRegex.String()) } } } @@ -1129,17 +1129,3 @@ func (a acceptanceTest) isEqualData(is *is.I, want, got Data) { is.Equal(want.Bytes(), got.Bytes()) // data did not match (want != got) } } - -func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { - c := make(chan struct{}) - go func() { - defer close(c) - wg.Wait() - }() - select { - case <-c: - return false // completed normally - case <-time.After(timeout): - return true // timed out - } -} diff --git a/destination.go b/destination.go index 36ced4d9..d1724b87 100644 --- a/destination.go +++ b/destination.go @@ -179,7 +179,7 @@ func (a *destinationPluginAdapter) Run(ctx context.Context, stream cpluginv1.Des if err == io.EOF { // stream is closed // wait for all acks to be sent back to Conduit - return a.waitForAcks(ctx) + return waitOrDone(ctx, &a.wgAckFuncs) } return fmt.Errorf("write stream error: %w", err) } @@ -235,25 +235,6 @@ func (a *destinationPluginAdapter) ackFunc(r Record, stream cpluginv1.Destinatio } } -func (a *destinationPluginAdapter) waitForAcks(ctx context.Context) error { - // wait for all acks to be sent back to Conduit - ackFuncsDone := make(chan struct{}) - go func() { - a.wgAckFuncs.Wait() - close(ackFuncsDone) - }() - return a.waitForClose(ctx, ackFuncsDone) -} - -func (a *destinationPluginAdapter) waitForClose(ctx context.Context, stop chan struct{}) error { - select { - case <-stop: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - func (a *destinationPluginAdapter) Stop(ctx context.Context, req cpluginv1.DestinationStopRequest) (cpluginv1.DestinationStopResponse, error) { // last thing we do is cancel context in Open defer a.openCancel() @@ -263,7 +244,7 @@ func (a *destinationPluginAdapter) Stop(ctx context.Context, req cpluginv1.Desti defer cancel() // wait for all acks to be sent back to Conduit - waitErr := a.waitForAcks(waitCtx) + waitErr := waitOrDone(waitCtx, &a.wgAckFuncs) if waitErr != nil { // just log error and continue to flush at least the processed records Logger(ctx).Warn().Err(waitErr).Msg("failed to wait for all acks to be sent back to Conduit") @@ -286,7 +267,7 @@ func (a *destinationPluginAdapter) Stop(ctx context.Context, req cpluginv1.Desti // everything went as expected, let's cancel the context in Open and // wait for Run to stop gracefully a.openCancel() - err = a.waitForClose(ctx, a.runDone) + err = waitForClose(ctx, a.runDone) return cpluginv1.DestinationStopResponse{}, err } diff --git a/wait_utils.go b/wait_utils.go new file mode 100644 index 00000000..0d9a52fd --- /dev/null +++ b/wait_utils.go @@ -0,0 +1,52 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sdk + +import ( + "context" + "sync" + "time" +) + +// waitTimeout returns true if the given WaitGroup's counter is zero +// before the given timeout is reached. Returns false otherwise. +func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { + withTimeout, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return waitOrDone(withTimeout, wg) == nil +} + +// waitTimeout returns nil if the given WaitGroup's counter is zero +// before the given context is done. Returns the context's Err() otherwise. +func waitOrDone(ctx context.Context, wg *sync.WaitGroup) error { + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + return waitForClose(ctx, done) +} + +// waitForClose waits until the given channel receives a struct or until the given context is done. +// If the channel receives a struct before the context is done, nil is returned. +// Returns context's Err() otherwise. +func waitForClose(ctx context.Context, stop chan struct{}) error { + select { + case <-stop: + return nil + case <-ctx.Done(): + return ctx.Err() + } +}