From 74c139d0e4adffb33d5c42a56f0718f6198d4c68 Mon Sep 17 00:00:00 2001 From: Alex Shorsher Date: Wed, 30 Oct 2024 11:55:35 -0400 Subject: [PATCH 1/2] don't retry operations that have conflict error If an operation receives a conflict error from the blockchain connector, currently it will continutally retry submitting that operation but it will never succeed. Instead, we shouldn't retry if we know the connector has received the submission. Signed-off-by: Alex Shorsher --- internal/batch/batch_processor.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index c4496f0ac..34076934d 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -633,6 +633,11 @@ func (bp *batchProcessor) dispatchBatch(payload *DispatchPayload) error { } } } + conflictErr, conflictTestOk := err.(operations.ConflictError) + if conflictTestOk && conflictErr.IsConflictError() { + // We know that the connector has received our batch, so we shouldn't need to retry + return true, nil + } } else { if core.IsPinned(payload.Batch.TX.Type) { payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateSent) From a1161233bc1c937f3df9b3c5f9e463b269e39c78 Mon Sep 17 00:00:00 2001 From: Alex Shorsher Date: Fri, 1 Nov 2024 15:23:34 -0400 Subject: [PATCH 2/2] add message state on conflict error Signed-off-by: Alex Shorsher --- internal/batch/batch_processor.go | 1 + internal/batch/batch_processor_test.go | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index 34076934d..a708c6d55 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -636,6 +636,7 @@ func (bp *batchProcessor) dispatchBatch(payload *DispatchPayload) error { conflictErr, conflictTestOk := err.(operations.ConflictError) if conflictTestOk && conflictErr.IsConflictError() { // We know that the connector has received our batch, so we shouldn't need to retry + payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateSent) return true, nil } } else { diff --git a/internal/batch/batch_processor_test.go b/internal/batch/batch_processor_test.go index fde042660..f5704b60e 100644 --- a/internal/batch/batch_processor_test.go +++ b/internal/batch/batch_processor_test.go @@ -75,6 +75,18 @@ func mockRunAsGroupPassthrough(mdi *databasemocks.Plugin) { } } +type testConflictError struct { + err error +} + +func (tce *testConflictError) Error() string { + return tce.err.Error() +} + +func (tce *testConflictError) IsConflictError() bool { + return true +} + func TestUnfilledBatch(t *testing.T) { log.SetLevel("debug") coreconfig.Reset() @@ -129,6 +141,17 @@ func TestUnfilledBatch(t *testing.T) { mim.AssertExpectations(t) } +func TestHandleDispatchConflictError(t *testing.T) { + cancel, _, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchPayload) error { + conflictErr := testConflictError{err: fmt.Errorf("pop")} + return &conflictErr + }) + defer cancel() + bp.dispatchBatch(&DispatchPayload{}) + bp.cancelCtx() + <-bp.done +} + func TestBatchSizeOverflow(t *testing.T) { log.SetLevel("debug") coreconfig.Reset()