From fea7a21d7a86643fb41e6b2b3127fcce857af38a Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Mon, 1 Apr 2024 13:57:19 -0400 Subject: [PATCH 1/8] Allow cancelling a batch that is stuck in dispatch Batch manager status can already be queried with "/status/batchmanager". The response includes a flag to say if a processor is blocked, along with the ID of the flushing batch, and the last error message. The new API "/batches/{batchid}/cancel" can now be used to cancel a batch. This is currently only valid for batches with transaction type "contract_invoke_pin". It will mark all messages in the batch as "cancelled", and for private messages, will send a new batch of type "batch_pin" containing gap fill messages. These messages have no data, but will have a CID pointing to the original (failed) message, and will consume nonces to allow the topic to become unblocked. Signed-off-by: Andrew Richardson --- docs/reference/types/message.md | 2 +- docs/swagger/swagger.yaml | 91 +++++++++++++++ internal/apiserver/route_post_batch_cancel.go | 43 +++++++ internal/apiserver/routes.go | 1 + internal/batch/batch_manager.go | 41 ++++++- internal/batch/batch_manager_test.go | 2 +- internal/batch/batch_processor.go | 107 ++++++++++++++---- internal/broadcast/manager.go | 5 +- internal/broadcast/manager_test.go | 2 +- internal/coremsgs/en_api_translations.go | 1 + internal/coremsgs/en_error_messages.go | 3 + internal/coremsgs/en_struct_descriptions.go | 1 + internal/events/aggregator.go | 18 ++- internal/events/aggregator_test.go | 24 ++-- internal/privatemessaging/privatemessaging.go | 5 +- .../privatemessaging/privatemessaging_test.go | 2 +- mocks/batchmocks/manager.go | 18 +++ pkg/core/constants.go | 14 +-- pkg/core/message.go | 4 +- 19 files changed, 325 insertions(+), 59 deletions(-) create mode 100644 internal/apiserver/route_post_batch_cancel.go diff --git a/docs/reference/types/message.md b/docs/reference/types/message.md index fc1ea92c2..444616871 100644 --- a/docs/reference/types/message.md +++ b/docs/reference/types/message.md @@ -61,7 +61,7 @@ nav_order: 16 | `hash` | The hash of the message. Derived from the header, which includes the data hash | `Bytes32` | | `batch` | The UUID of the batch in which the message was pinned/transferred | [`UUID`](simpletypes#uuid) | | `txid` | The ID of the transaction used to order/deliver this message | [`UUID`](simpletypes#uuid) | -| `state` | The current state of the message | `FFEnum`:
`"staged"`
`"ready"`
`"sent"`
`"pending"`
`"confirmed"`
`"rejected"` | +| `state` | The current state of the message | `FFEnum`:
`"staged"`
`"ready"`
`"sent"`
`"pending"`
`"confirmed"`
`"rejected"`
`"cancelled"` | | `confirmed` | The timestamp of when the message was confirmed/rejected | [`FFTime`](simpletypes#fftime) | | `rejectReason` | If a message was rejected, provides details on the rejection reason | `string` | | `data` | The list of data elements attached to the message | [`DataRef[]`](#dataref) | diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 2551c422f..0564c667c 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -2449,6 +2449,36 @@ paths: description: "" tags: - Default Namespace + /batches/{batchid}/cancel: + post: + description: Cancel a batch that has failed to dispatch + operationId: postBatchCancel + parameters: + - description: The batch ID + in: path + name: batchid + required: true + schema: + type: string + - description: Server-side request timeout (milliseconds, or set a custom suffix + like 10s) + in: header + name: Request-Timeout + schema: + default: 2m0s + type: string + requestBody: + content: + application/json: {} + responses: + "204": + content: + application/json: {} + description: Success + default: + description: "" + tags: + - Default Namespace /blockchainevents: get: description: Gets a list of blockchain events @@ -7264,6 +7294,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -9679,6 +9710,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver @@ -9958,6 +9990,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -10598,6 +10631,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -10757,6 +10791,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -11087,6 +11122,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -11252,6 +11288,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -11663,6 +11700,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -14709,6 +14747,43 @@ paths: description: "" tags: - Non-Default Namespace + /namespaces/{ns}/batches/{batchid}/cancel: + post: + description: Cancel a batch that has failed to dispatch + operationId: postBatchCancelNamespace + parameters: + - description: The batch ID + in: path + name: batchid + required: true + schema: + type: string + - description: The namespace which scopes this request + in: path + name: ns + required: true + schema: + example: default + type: string + - description: Server-side request timeout (milliseconds, or set a custom suffix + like 10s) + in: header + name: Request-Timeout + schema: + default: 2m0s + type: string + requestBody: + content: + application/json: {} + responses: + "204": + content: + application/json: {} + description: Success + default: + description: "" + tags: + - Non-Default Namespace /namespaces/{ns}/blockchainevents: get: description: Gets a list of blockchain events @@ -19838,6 +19913,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -22372,6 +22448,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver @@ -22658,6 +22735,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -23370,6 +23448,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -23535,6 +23614,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -23872,6 +23952,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -24037,6 +24118,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -24455,6 +24537,7 @@ paths: - pending - confirmed - rejected + - cancelled type: string txid: description: The ID of the transaction used to order/deliver this @@ -27218,6 +27301,10 @@ paths: description: True if the batch flush is in a retry loop, due to errors being returned by the plugins type: boolean + cancelled: + description: True if the current batch flush has been + cancelled + type: boolean flushing: description: If a flush is in progress, this is the UUID of the batch being flushed @@ -36372,6 +36459,10 @@ paths: description: True if the batch flush is in a retry loop, due to errors being returned by the plugins type: boolean + cancelled: + description: True if the current batch flush has been + cancelled + type: boolean flushing: description: If a flush is in progress, this is the UUID of the batch being flushed diff --git a/internal/apiserver/route_post_batch_cancel.go b/internal/apiserver/route_post_batch_cancel.go new file mode 100644 index 000000000..5113155fc --- /dev/null +++ b/internal/apiserver/route_post_batch_cancel.go @@ -0,0 +1,43 @@ +// Copyright © 2024 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apiserver + +import ( + "net/http" + + "github.com/hyperledger/firefly-common/pkg/ffapi" + "github.com/hyperledger/firefly/internal/coremsgs" +) + +var postBatchCancel = &ffapi.Route{ + Name: "postBatchCancel", + Path: "batches/{batchid}/cancel", + Method: http.MethodPost, + PathParams: []*ffapi.PathParam{ + {Name: "batchid", Description: coremsgs.APIParamsBatchID}, + }, + QueryParams: nil, + Description: coremsgs.APIEndpointsPostBatchCancel, + JSONInputValue: nil, + JSONOutputValue: nil, + JSONOutputCodes: []int{http.StatusNoContent}, + Extensions: &coreExtensions{ + CoreJSONHandler: func(r *ffapi.APIRequest, cr *coreRequest) (output interface{}, err error) { + return nil, cr.or.BatchManager().CancelBatch(cr.ctx, r.PP["batchid"]) + }, + }, +} diff --git a/internal/apiserver/routes.go b/internal/apiserver/routes.go index 28c960589..8fa5ab472 100644 --- a/internal/apiserver/routes.go +++ b/internal/apiserver/routes.go @@ -128,6 +128,7 @@ var routes = append( getVerifierByID, getVerifiers, patchUpdateIdentity, + postBatchCancel, postContractAPIInvoke, postContractAPIPublish, postContractAPIQuery, diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index 18d556906..0054c4053 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -21,6 +21,7 @@ import ( "crypto/sha256" "encoding/binary" "fmt" + "strings" "sync" "time" @@ -76,6 +77,7 @@ func NewBatchManager(ctx context.Context, ns string, di database.Plugin, dm data type Manager interface { RegisterDispatcher(name string, txType core.TransactionType, msgTypes []core.MessageType, handler DispatchHandler, batchOptions DispatcherOptions) LoadContexts(ctx context.Context, payload *DispatchPayload) error + CancelBatch(ctx context.Context, batchID string) error NewMessages() chan<- int64 Start() error Close() @@ -172,7 +174,7 @@ func (bm *batchManager) NewMessages() chan<- int64 { return bm.newMessages } -func (bm *batchManager) getProcessor(txType core.TransactionType, msgType core.MessageType, group *fftypes.Bytes32, author string) (*batchProcessor, error) { +func (bm *batchManager) getProcessor(txType core.TransactionType, msgType core.MessageType, group *fftypes.Bytes32, author string, create bool) (*batchProcessor, error) { bm.dispatcherMux.Lock() defer bm.dispatcherMux.Unlock() @@ -183,7 +185,7 @@ func (bm *batchManager) getProcessor(txType core.TransactionType, msgType core.M } name := bm.getProcessorKey(author, group) processor, ok := dispatcher.processors[name] - if !ok { + if !ok && create { processor = newBatchProcessor( bm, &batchProcessorConf{ @@ -320,7 +322,7 @@ func (bm *batchManager) messageSequencer() { // the database store. Meaning we cannot rely on the sequence having been set. msg.Sequence = entry.Sequence - processor, err := bm.getProcessor(msg.Header.TxType, msg.Header.Type, msg.Header.Group, msg.Header.SignerRef.Author) + processor, err := bm.getProcessor(msg.Header.TxType, msg.Header.Type, msg.Header.Group, msg.Header.SignerRef.Author, true) if err != nil { l.Errorf("Failed to dispatch message %s: %s", msg.Header.ID, err) continue @@ -559,6 +561,10 @@ func (bm *batchManager) loadContext(ctx context.Context, msg *core.Message) ([]* return nil, i18n.NewError(ctx, coremsgs.MsgPinsNotAssigned) } for _, pinStr := range msg.Pins { + if len(pinStr) > 64 { + pinParts := strings.SplitN(pinStr, ":", 2) + pinStr = pinParts[0] + } pin, err := fftypes.ParseBytes32(ctx, pinStr) if err != nil { return nil, err @@ -588,3 +594,30 @@ func (bm *batchManager) LoadContexts(ctx context.Context, payload *DispatchPaylo } return nil } + +func (bm *batchManager) CancelBatch(ctx context.Context, batchID string) error { + id, err := fftypes.ParseUUID(ctx, batchID) + if err != nil { + return err + } + bp, err := bm.database.GetBatchByID(ctx, bm.namespace, id) + if err != nil { + return err + } + batch, err := bm.data.HydrateBatch(ctx, bp) + if err != nil { + return err + } + if len(batch.Payload.Messages) == 0 { + return i18n.NewError(ctx, coremsgs.MsgErrorLoadingBatch) + } + msg := batch.Payload.Messages[0] + processor, err := bm.getProcessor(msg.Header.TxType, msg.Header.Type, msg.Header.Group, msg.Header.SignerRef.Author, false) + if err != nil { + return err + } + if processor == nil { + return i18n.NewError(ctx, coremsgs.MsgCannotCancelBatchState) + } + return processor.cancelFlush(ctx, id) +} diff --git a/internal/batch/batch_manager_test.go b/internal/batch/batch_manager_test.go index 6449617f7..bee84faa0 100644 --- a/internal/batch/batch_manager_test.go +++ b/internal/batch/batch_manager_test.go @@ -339,7 +339,7 @@ func TestGetInvalidBatchTypeMsg(t *testing.T) { txHelper, _ := txcommon.NewTransactionHelper(ctx, "ns1", mdi, mdm, cmi) bm, _ := NewBatchManager(context.Background(), "ns1", mdi, mdm, mim, txHelper) defer bm.Close() - _, err := bm.(*batchManager).getProcessor(core.BatchTypeBroadcast, "wrong", nil, "") + _, err := bm.(*batchManager).getProcessor(core.BatchTypeBroadcast, "wrong", nil, "", true) assert.Regexp(t, "FF10126", err) } diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index b435e5279..5ef438645 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -24,8 +24,10 @@ import ( "time" "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly-common/pkg/retry" + "github.com/hyperledger/firefly/internal/coremsgs" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/operations" "github.com/hyperledger/firefly/internal/txcommon" @@ -53,6 +55,7 @@ type batchProcessorConf struct { type FlushStatus struct { LastFlushTime *fftypes.FFTime `ffstruct:"BatchFlushStatus" json:"lastFlushStartTime"` Flushing *fftypes.UUID `ffstruct:"BatchFlushStatus" json:"flushing,omitempty"` + Cancelled bool `ffstruct:"BatchFlushStatus" json:"cancelled"` Blocked bool `ffstruct:"BatchFlushStatus" json:"blocked"` LastFlushError string `ffstruct:"BatchFlushStatus" json:"lastFlushError,omitempty"` LastFlushErrorTime *fftypes.FFTime `ffstruct:"BatchFlushStatus" json:"lastFlushErrorTime,omitempty"` @@ -103,6 +106,7 @@ type DispatchPayload struct { Messages []*core.Message Data core.DataArray Pins []*fftypes.Bytes32 + State core.MessageState } const batchSizeEstimateBase = int64(512) @@ -216,7 +220,6 @@ func (bp *batchProcessor) startFlush(overflow bool) (id *fftypes.UUID, flushAsse bp.statusMux.Lock() defer bp.statusMux.Unlock() // Start the clock - bp.flushStatus.Blocked = false bp.flushStatus.LastFlushTime = fftypes.Now() // Split the current work if required for overflow overflowWork := make([]*batchWork, 0) @@ -250,6 +253,8 @@ func (bp *batchProcessor) updateFlushStats(payload *DispatchPayload, byteSize in duration := time.Since(*fs.LastFlushTime.Time()) fs.Flushing = nil + fs.Blocked = false + fs.Cancelled = false fs.TotalBatches++ @@ -277,6 +282,29 @@ func (bp *batchProcessor) captureFlushError(err error) { fs.LastFlushError = err.Error() } +func (bp *batchProcessor) cancelFlush(ctx context.Context, id *fftypes.UUID) error { + bp.statusMux.Lock() + defer bp.statusMux.Unlock() + fs := &bp.flushStatus + + if bp.conf.txType != core.TransactionTypeContractInvokePin { + return i18n.NewError(ctx, coremsgs.MsgCannotCancelBatchType) + } + if !fs.Blocked || !id.Equals(fs.Flushing) { + return i18n.NewError(ctx, coremsgs.MsgCannotCancelBatchState) + } + fs.Cancelled = true + return nil +} + +func (bp *batchProcessor) isCancelled() bool { + bp.statusMux.Lock() + defer bp.statusMux.Unlock() + fs := &bp.flushStatus + + return fs.Cancelled +} + func (bp *batchProcessor) startQuiesce() { // We are ready to quiesce, but we can't safely close our input channel. // We just do a non-blocking pass (queue length is 1) to the manager to @@ -376,7 +404,7 @@ func (bp *batchProcessor) flush(overflow bool) error { } log.L(bp.ctx).Debugf("Dispatched batch %s", id) - // Finalization phase: Writes back the changes to the DB, so that these messages will not be + // Finalization phase: Writes back the changes to the DB, so that these messages // are all tagged as part of this batch, and won't be included in any future batches. err = bp.markPayloadDispatched(state) if err != nil { @@ -432,6 +460,11 @@ func (bp *batchProcessor) calculateContexts(ctx context.Context, payload *Dispat if isPrivate && len(msg.Pins) > 0 { // We have already allocated pins to this message, we cannot re-allocate. log.L(ctx).Debugf("Message %s already has %d pins allocated", msg.Header.ID, len(msg.Pins)) + pins, err := bp.bm.loadContext(ctx, msg) + if err != nil { + return err + } + payload.Pins = append(payload.Pins, pins...) continue } var pins fftypes.FFStringArray @@ -475,7 +508,7 @@ func (bp *batchProcessor) flushNonceState(ctx context.Context, state *dispatchSt return nil } -func (bp *batchProcessor) updateMessagePins(ctx context.Context, batchID *fftypes.UUID, messages []*core.Message, state *dispatchState) error { +func (bp *batchProcessor) updateMessagePins(ctx context.Context, messages []*core.Message, state *dispatchState) error { // It's important we update the message pins at this phase, as we have "spent" a nonce // on this topic from the database. So this message has grabbed a slot in our queue. // If we fail the dispatch, and redo the batch sealing process, we must not allocate @@ -515,7 +548,7 @@ func (bp *batchProcessor) sealBatch(payload *DispatchPayload) (err error) { if err = bp.flushNonceState(ctx, state); err != nil { return err } - if err = bp.updateMessagePins(ctx, payload.Batch.ID, payload.Messages, state); err != nil { + if err = bp.updateMessagePins(ctx, payload.Messages, state); err != nil { return err } } @@ -568,11 +601,53 @@ func (bp *batchProcessor) dispatchBatch(payload *DispatchPayload) error { // Call the dispatcher to do the heavy lifting - will only exit if we're closed return operations.RunWithOperationContext(bp.ctx, func(ctx context.Context) error { return bp.retry.Do(ctx, "batch dispatch", func(attempt int) (retry bool, err error) { - return true, bp.conf.dispatch(ctx, payload) + err = bp.conf.dispatch(ctx, payload) + if err != nil { + if bp.isCancelled() { + log.L(ctx).Warnf("Batch %s was cancelled - replacing with gap fill", payload.Batch.ID) + for _, msg := range payload.Messages { + if err := bp.writeGapFill(ctx, msg); err != nil { + return true, err + } + } + err = nil + payload.State = core.MessageStateCancelled + } + } else { + if core.IsPinned(bp.conf.txType) { + payload.State = core.MessageStateSent + } else { + payload.State = core.MessageStateConfirmed + } + } + return true, err }) }) } +func (bp *batchProcessor) writeGapFill(ctx context.Context, msg *core.Message) error { + // Gap fill is only needed for private messages + if msg.Header.Type != core.MessageTypePrivate { + return nil + } + gapFill := &core.MessageInOut{ + Message: core.Message{ + Header: msg.Header, + LocalNamespace: bp.bm.namespace, + State: core.MessageStateReady, + Pins: msg.Pins, // reuse any assigned pins to fill the nonce gap + }, + } + gapFill.Header.ID = fftypes.NewUUID() + gapFill.Header.CID = msg.Header.ID + gapFill.Header.Tag = core.SystemTagGapFill + gapFill.Header.TxType = core.TransactionTypeBatchPin + if err := gapFill.Seal(ctx); err != nil { + return err + } + return bp.data.WriteNewMessage(ctx, &data.NewMessage{Message: gapFill}) +} + func (bp *batchProcessor) markPayloadDispatched(payload *DispatchPayload) error { return bp.retry.Do(bp.ctx, "mark dispatched messages", func(attempt int) (retry bool, err error) { return true, bp.database.RunAsGroup(bp.ctx, func(ctx context.Context) (err error) { @@ -583,10 +658,7 @@ func (bp *batchProcessor) markPayloadDispatched(payload *DispatchPayload) error msgIDs[i] = msg.Header.ID msg.BatchID = payload.Batch.ID msg.TransactionID = payload.Batch.TX.ID - if core.IsPinned(bp.conf.txType) { - msg.State = core.MessageStateSent - } else { - msg.State = core.MessageStateConfirmed + if payload.State == core.MessageStateConfirmed { msg.Confirmed = confirmTime } bp.data.UpdateMessageIfCached(ctx, msg) @@ -600,21 +672,16 @@ func (bp *batchProcessor) markPayloadDispatched(payload *DispatchPayload) error ) allMsgsUpdate := database.MessageQueryFactory.NewUpdate(ctx). Set("batch", payload.Batch.ID). - Set("txid", payload.Batch.TX.ID) - if core.IsPinned(bp.conf.txType) { - // Sent state, waiting for confirm - allMsgsUpdate.Set("state", core.MessageStateSent) - } else { - // Immediate confirmation if no batch pinning - allMsgsUpdate. - Set("state", core.MessageStateConfirmed). - Set("confirmed", confirmTime) + Set("txid", payload.Batch.TX.ID). + Set("state", payload.State) + if payload.State == core.MessageStateConfirmed { + allMsgsUpdate.Set("confirmed", confirmTime) } if err = bp.database.UpdateMessages(ctx, bp.bm.namespace, filter, allMsgsUpdate); err != nil { return err } - if !core.IsPinned(bp.conf.txType) { + if payload.State == core.MessageStateConfirmed { for _, msg := range payload.Messages { // Emit a confirmation event locally immediately for _, topic := range msg.Header.Topics { diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 52b93ece5..43639ed36 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -42,6 +42,7 @@ import ( ) const broadcastDispatcherName = "pinned_broadcast" +const broadcastCustomDispatcherName = "pinned_broadcast_custom" type Manager interface { core.Named @@ -114,7 +115,7 @@ func NewBroadcastManager(ctx context.Context, ns *core.Namespace, di database.Pl core.MessageTypeDeprecatedApprovalBroadcast, }, bm.dispatchBatch, bo) - ba.RegisterDispatcher(broadcastDispatcherName, + ba.RegisterDispatcher(broadcastCustomDispatcherName, core.TransactionTypeContractInvokePin, []core.MessageType{ core.MessageTypeBroadcast, diff --git a/internal/broadcast/manager_test.go b/internal/broadcast/manager_test.go index a57347d11..730c0397b 100644 --- a/internal/broadcast/manager_test.go +++ b/internal/broadcast/manager_test.go @@ -74,7 +74,7 @@ func newTestBroadcastCommon(t *testing.T, metricsEnabled bool) (*broadcastManage }, mock.Anything, mock.Anything).Return() mba.On("RegisterDispatcher", - broadcastDispatcherName, + broadcastCustomDispatcherName, core.TransactionTypeContractInvokePin, []core.MessageType{ core.MessageTypeBroadcast, diff --git a/internal/coremsgs/en_api_translations.go b/internal/coremsgs/en_api_translations.go index 39ed59e83..9e743f271 100644 --- a/internal/coremsgs/en_api_translations.go +++ b/internal/coremsgs/en_api_translations.go @@ -152,6 +152,7 @@ var ( APIEndpointsGetVerifierByHash = ffm("api.endpoints.getVerifierByHash", "Gets a verifier by its hash") APIEndpointsGetVerifiers = ffm("api.endpoints.getVerifiers", "Gets a list of verifiers") APIEndpointsPatchUpdateIdentity = ffm("api.endpoints.patchUpdateIdentity", "Updates an identity") + APIEndpointsPostBatchCancel = ffm("api.endpoints.postBatchCancel", "Cancel a batch that has failed to dispatch") APIEndpointsPostContractDeploy = ffm("api.endpoints.postContractDeploy", "Deploy a new smart contract") APIEndpointsPostContractAPIInvoke = ffm("api.endpoints.postContractAPIInvoke", "Invokes a method on a smart contract API. Performs a blockchain transaction.") APIEndpointsPostContractAPIPublish = ffm("api.endpoints.postContractAPIPublish", "Publish a contract API to all other members of the multiparty network") diff --git a/internal/coremsgs/en_error_messages.go b/internal/coremsgs/en_error_messages.go index a01606f95..2b6467d60 100644 --- a/internal/coremsgs/en_error_messages.go +++ b/internal/coremsgs/en_error_messages.go @@ -304,4 +304,7 @@ var ( MsgMaxSubscriptionEventScanLimitBreached = ffe("FF10463", "Event scan limit breached with start sequence ID %d and end sequence ID %d. Please restrict your query to a narrower range", 400) MsgSequenceIDDidNotParseToInt = ffe("FF10464", "Could not parse provided %s to an integer sequence ID", 400) MsgInternalServerError = ffe("FF10465", "Internal server error: %s", 500) + MsgCannotCancelBatchType = ffe("FF10466", "Cannot cancel batch of this type", 400) + MsgErrorLoadingBatch = ffe("FF10467", "Error loading batch messages") + MsgCannotCancelBatchState = ffe("FF10468", "Batch is not currently dispatching", 400) ) diff --git a/internal/coremsgs/en_struct_descriptions.go b/internal/coremsgs/en_struct_descriptions.go index 5e065c574..4356b1455 100644 --- a/internal/coremsgs/en_struct_descriptions.go +++ b/internal/coremsgs/en_struct_descriptions.go @@ -472,6 +472,7 @@ var ( BatchFlushStatusLastFlushTime = ffm("BatchFlushStatus.lastFlushStartTime", "The last time a flush was performed") BatchFlushStatusFlushing = ffm("BatchFlushStatus.flushing", "If a flush is in progress, this is the UUID of the batch being flushed") BatchFlushStatusBlocked = ffm("BatchFlushStatus.blocked", "True if the batch flush is in a retry loop, due to errors being returned by the plugins") + BatchFlushStatusCancelled = ffm("BatchFlushStatus.cancelled", "True if the current batch flush has been cancelled") BatchFlushStatusLastFlushError = ffm("BatchFlushStatus.lastFlushError", "The last error received by this batch processor while flushing") BatchFlushStatusLastFlushErrorTime = ffm("BatchFlushStatus.lastFlushErrorTime", "The time of the last flush") BatchFlushStatusAverageBatchBytes = ffm("BatchFlushStatus.averageBatchBytes", "The average byte size of each batch") diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 967dd315b..82ab3936e 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -525,7 +525,7 @@ func (ag *aggregator) processMessage(ctx context.Context, manifest *core.BatchMa if action == core.ActionConfirm { l.Debugf("Attempt dispatch msg=%s broadcastContexts=%v privatePins=%v", msg.Header.ID, unmaskedContexts, msg.Pins) - action, correlator, err = ag.readyForDispatch(ctx, msg, data, manifest.TX.ID, state, pin) + action, correlator, err = ag.readyForDispatch(ctx, msg, data, manifest.TX.ID, state) } } @@ -551,6 +551,15 @@ func (ag *aggregator) processMessage(ctx context.Context, manifest *core.BatchMa np.IncrementNextPin(ctx, ag.namespace) } state.markMessageDispatched(manifest.ID, msg, msgBaseIndex, newState) + + // For gap fill messages, mark the original message cancelled + // This is only applicable if the original message was already received + // (only for private messages where batch content was delivered via data exchange) + if msg.Header.Tag == core.SystemTagGapFill { + state.markMessageDispatched(manifest.ID, &core.Message{ + Header: core.MessageHeader{ID: msg.Header.CID}, + }, 0, core.MessageStateCancelled) + } return nil } @@ -566,7 +575,7 @@ func needsTokenApproval(msg *core.Message) bool { msg.Header.Type == core.MessageTypeDeprecatedApprovalPrivate } -func (ag *aggregator) readyForDispatch(ctx context.Context, msg *core.Message, data core.DataArray, tx *fftypes.UUID, state *batchState, pin *core.Pin) (action core.MessageAction, correlator *fftypes.UUID, err error) { +func (ag *aggregator) readyForDispatch(ctx context.Context, msg *core.Message, data core.DataArray, tx *fftypes.UUID, state *batchState) (action core.MessageAction, correlator *fftypes.UUID, err error) { // Verify we have all the blobs for the data if resolved, err := ag.resolveBlobs(ctx, data); err != nil { return core.ActionRetry, nil, err @@ -638,6 +647,9 @@ func (ag *aggregator) readyForDispatch(ctx context.Context, msg *core.Message, d } else if valid { action = core.ActionConfirm } + + default: + action = core.ActionConfirm } return action, correlator, err diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index 2fd95bc79..69afb581e 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -1515,7 +1515,7 @@ func TestReadyForDispatchFailValidateData(t *testing.T) { Data: core.DataRefs{ {ID: fftypes.NewUUID()}, }, - }, core.DataArray{}, nil, &batchState{}, &core.Pin{Signer: "0x12345"}) + }, core.DataArray{}, nil, &batchState{}) assert.EqualError(t, err, "pop") } @@ -1537,7 +1537,7 @@ func TestReadyForDispatchMissingBlobs(t *testing.T) { Hash: blobHash, Public: "public-ref", }}, - }, nil, &batchState{}, &core.Pin{Signer: "0x12345"}) + }, nil, &batchState{}) assert.NoError(t, err) assert.Equal(t, core.ActionWait, action) @@ -1560,7 +1560,7 @@ func TestReadyForDispatchBlobsError(t *testing.T) { Hash: blobHash, Public: "public-ref", }}, - }, nil, &batchState{}, &core.Pin{Signer: "0x12345"}) + }, nil, &batchState{}) assert.EqualError(t, err, "pop") assert.Equal(t, core.ActionRetry, action) @@ -1584,7 +1584,7 @@ func TestReadyForDispatchMissingTransfers(t *testing.T) { }, } msg.Hash = msg.Header.Hash() - action, _, err := ag.readyForDispatch(ag.ctx, msg, core.DataArray{}, nil, &batchState{}, &core.Pin{Signer: "0x12345"}) + action, _, err := ag.readyForDispatch(ag.ctx, msg, core.DataArray{}, nil, &batchState{}) assert.NoError(t, err) assert.Equal(t, core.ActionWait, action) @@ -1606,7 +1606,7 @@ func TestReadyForDispatchGetTransfersFail(t *testing.T) { }, } msg.Hash = msg.Header.Hash() - action, _, err := ag.readyForDispatch(ag.ctx, msg, core.DataArray{}, nil, &batchState{}, &core.Pin{Signer: "0x12345"}) + action, _, err := ag.readyForDispatch(ag.ctx, msg, core.DataArray{}, nil, &batchState{}) assert.EqualError(t, err, "pop") assert.Equal(t, core.ActionRetry, action) @@ -1634,7 +1634,7 @@ func TestReadyForDispatchTransferMismatch(t *testing.T) { ag.mdi.On("GetTokenTransfers", ag.ctx, "ns1", mock.Anything).Return(transfers, nil, nil) - action, _, err := ag.readyForDispatch(ag.ctx, msg, core.DataArray{}, nil, &batchState{}, &core.Pin{Signer: "0x12345"}) + action, _, err := ag.readyForDispatch(ag.ctx, msg, core.DataArray{}, nil, &batchState{}) assert.NoError(t, err) assert.Equal(t, core.ActionWait, action) @@ -1656,7 +1656,7 @@ func TestReadyForDispatchGetApprovalsFail(t *testing.T) { }, } msg.Hash = msg.Header.Hash() - action, _, err := ag.readyForDispatch(ag.ctx, msg, core.DataArray{}, nil, &batchState{}, &core.Pin{Signer: "0x12345"}) + action, _, err := ag.readyForDispatch(ag.ctx, msg, core.DataArray{}, nil, &batchState{}) assert.EqualError(t, err, "pop") assert.Equal(t, core.ActionRetry, action) @@ -1678,7 +1678,7 @@ func TestReadyForDispatchGetApprovalsMissing(t *testing.T) { }, } msg.Hash = msg.Header.Hash() - action, _, err := ag.readyForDispatch(ag.ctx, msg, core.DataArray{}, nil, &batchState{}, &core.Pin{Signer: "0x12345"}) + action, _, err := ag.readyForDispatch(ag.ctx, msg, core.DataArray{}, nil, &batchState{}) assert.NoError(t, err) assert.Equal(t, core.ActionWait, action) @@ -1706,7 +1706,7 @@ func TestReadyForDispatchApprovalMismatch(t *testing.T) { ag.mdi.On("GetTokenApprovals", ag.ctx, "ns1", mock.Anything).Return(approvals, nil, nil) - action, _, err := ag.readyForDispatch(ag.ctx, msg, core.DataArray{}, nil, &batchState{}, &core.Pin{Signer: "0x12345"}) + action, _, err := ag.readyForDispatch(ag.ctx, msg, core.DataArray{}, nil, &batchState{}) assert.NoError(t, err) assert.Equal(t, core.ActionWait, action) @@ -1876,7 +1876,7 @@ func TestDefinitionBroadcastActionRetry(t *testing.T) { ag.mdh.On("HandleDefinitionBroadcast", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(definitions.HandlerResult{Action: core.ActionRetry}, fmt.Errorf("pop")) - _, _, err := ag.readyForDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &core.Pin{Signer: "0x12345"}) + _, _, err := ag.readyForDispatch(ag.ctx, msg1, nil, nil, &batchState{}) assert.EqualError(t, err, "pop") } @@ -1982,7 +1982,7 @@ func TestDefinitionBroadcastActionWait(t *testing.T) { ag.mdh.On("HandleDefinitionBroadcast", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(definitions.HandlerResult{Action: core.ActionWait}, nil) - _, _, err := ag.readyForDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &core.Pin{Signer: "0x12345"}) + _, _, err := ag.readyForDispatch(ag.ctx, msg1, nil, nil, &batchState{}) assert.NoError(t, err) } @@ -2029,7 +2029,7 @@ func TestReadyForDispatchGroupInit(t *testing.T) { Type: core.MessageTypeGroupInit, SignerRef: core.SignerRef{Key: "0x12345", Author: org1.DID}, }, - }, nil, nil, bs, &core.Pin{Signer: "0x12345"}) + }, nil, nil, bs) assert.NoError(t, err) assert.Equal(t, core.ActionConfirm, action) diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index fef50c04d..a0fe84189 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -42,6 +42,7 @@ import ( ) const pinnedPrivateDispatcherName = "pinned_private" +const pinnedPrivateCustomDispatcherName = "pinned_private_custom" const unpinnedPrivateDispatcherName = "unpinned_private" type Manager interface { @@ -146,7 +147,7 @@ func NewPrivateMessaging(ctx context.Context, ns *core.Namespace, di database.Pl }, pm.dispatchPinnedBatch, bo) - ba.RegisterDispatcher(pinnedPrivateDispatcherName, + ba.RegisterDispatcher(pinnedPrivateCustomDispatcherName, core.TransactionTypeContractInvokePin, []core.MessageType{ core.MessageTypePrivate, diff --git a/internal/privatemessaging/privatemessaging_test.go b/internal/privatemessaging/privatemessaging_test.go index 731ef83cf..60149aa9f 100644 --- a/internal/privatemessaging/privatemessaging_test.go +++ b/internal/privatemessaging/privatemessaging_test.go @@ -75,7 +75,7 @@ func newTestPrivateMessagingCommon(t *testing.T, metricsEnabled bool) (*privateM }, mock.Anything, mock.Anything).Return() mba.On("RegisterDispatcher", - pinnedPrivateDispatcherName, + pinnedPrivateCustomDispatcherName, core.TransactionTypeContractInvokePin, []core.MessageType{ core.MessageTypePrivate, diff --git a/mocks/batchmocks/manager.go b/mocks/batchmocks/manager.go index 090367320..d4a0e2d82 100644 --- a/mocks/batchmocks/manager.go +++ b/mocks/batchmocks/manager.go @@ -17,6 +17,24 @@ type Manager struct { mock.Mock } +// CancelBatch provides a mock function with given fields: ctx, batchID +func (_m *Manager) CancelBatch(ctx context.Context, batchID string) error { + ret := _m.Called(ctx, batchID) + + if len(ret) == 0 { + panic("no return value specified for CancelBatch") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, batchID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Close provides a mock function with given fields: func (_m *Manager) Close() { _m.Called() diff --git a/pkg/core/constants.go b/pkg/core/constants.go index 2c73d82bf..d1dc13d67 100644 --- a/pkg/core/constants.go +++ b/pkg/core/constants.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -30,35 +30,27 @@ const ( ) const ( - // SystemTagDefineDatatype is the tag for messages that broadcast data definitions SystemTagDefineDatatype = "ff_define_datatype" - // DeprecatedSystemTagDefineOrganization is the tag for messages that broadcast organization definitions DeprecatedSystemTagDefineOrganization = "ff_define_organization" - // DeprecatedSystemTagDefineNode is the tag for messages that broadcast node definitions DeprecatedSystemTagDefineNode = "ff_define_node" - // SystemTagDefineGroup is the tag for messages that send the definition of a group, to all parties in that group SystemTagDefineGroup = "ff_define_group" - // SystemTagDefinePool is the tag for messages that broadcast data definitions SystemTagDefinePool = "ff_define_pool" - // SystemTagDefineFFI is the tag for messages that broadcast contract FFIs SystemTagDefineFFI = "ff_define_ffi" - // SystemTagDefineContractAPI is the tag for messages that broadcast contract APIs SystemTagDefineContractAPI = "ff_define_contract_api" - // SystemTagIdentityClaim is the tag for messages that broadcast an identity claim SystemTagIdentityClaim = "ff_identity_claim" - //nolint:gosec // SystemTagIdentityVerification is the tag for messages that broadcast an identity verification SystemTagIdentityVerification = "ff_identity_verification" - // SystemTagIdentityUpdate is the tag for messages that broadcast an identity update SystemTagIdentityUpdate = "ff_identity_update" + // SystemTagGapFill is the tag for messages that provide a nonce gap fill for a message that failed to send + SystemTagGapFill = "ff_gap_fill" ) diff --git a/pkg/core/message.go b/pkg/core/message.go index 269c4d51f..d878163e3 100644 --- a/pkg/core/message.go +++ b/pkg/core/message.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -68,6 +68,8 @@ var ( MessageStateConfirmed = fftypes.FFEnumValue("messagestate", "confirmed") // MessageStateRejected is a message that has completed confirmation, but has been rejected by FireFly MessageStateRejected = fftypes.FFEnumValue("messagestate", "rejected") + // MessageStateCancelled is a message that was cancelled without being sent + MessageStateCancelled = fftypes.FFEnumValue("messagestate", "cancelled") ) // MessageHeader contains all fields that contribute to the hash From 137e657e66c52bd183114d1359586af3a0744eca Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Tue, 2 Apr 2024 00:26:44 -0400 Subject: [PATCH 2/8] Add unit tests for batch cancel Signed-off-by: Andrew Richardson --- .../apiserver/route_post_batch_cancel_test.go | 47 ++++ internal/batch/batch_manager_test.go | 216 +++++++++++++++ internal/batch/batch_processor.go | 15 +- internal/batch/batch_processor_test.go | 262 +++++++++++++++++- 4 files changed, 528 insertions(+), 12 deletions(-) create mode 100644 internal/apiserver/route_post_batch_cancel_test.go diff --git a/internal/apiserver/route_post_batch_cancel_test.go b/internal/apiserver/route_post_batch_cancel_test.go new file mode 100644 index 000000000..494dc41f0 --- /dev/null +++ b/internal/apiserver/route_post_batch_cancel_test.go @@ -0,0 +1,47 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apiserver + +import ( + "bytes" + "encoding/json" + "net/http/httptest" + "testing" + + "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly/mocks/batchmocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestPostBatchCancel(t *testing.T) { + o, r := newTestAPIServer() + o.On("Authorize", mock.Anything, mock.Anything).Return(nil) + mbm := &batchmocks.Manager{} + o.On("BatchManager").Return(mbm) + input := fftypes.JSONObject{} + var buf bytes.Buffer + json.NewEncoder(&buf).Encode(&input) + req := httptest.NewRequest("POST", "/api/v1/namespaces/ns1/batches/batch1/cancel", &buf) + req.Header.Set("Content-Type", "application/json; charset=utf-8") + res := httptest.NewRecorder() + + mbm.On("CancelBatch", mock.Anything, "batch1").Return(nil) + r.ServeHTTP(res, req) + + assert.Equal(t, 204, res.Result().StatusCode) +} diff --git a/internal/batch/batch_manager_test.go b/internal/batch/batch_manager_test.go index bee84faa0..5b1f86d33 100644 --- a/internal/batch/batch_manager_test.go +++ b/internal/batch/batch_manager_test.go @@ -734,3 +734,219 @@ func TestLoadContextsPrivateBadPin(t *testing.T) { assert.Regexp(t, "FF00107", err) } + +func TestCancelBatchBadID(t *testing.T) { + bm, cancel := newTestBatchManager(t) + defer cancel() + + err := bm.CancelBatch(context.Background(), "bad-id") + assert.Regexp(t, "FF00138", err) +} + +func TestCancelBatchFailLoad(t *testing.T) { + bm, cancel := newTestBatchManager(t) + defer cancel() + + batchID := fftypes.NewUUID() + + mdi := bm.database.(*databasemocks.Plugin) + mdi.On("GetBatchByID", context.Background(), "ns1", batchID).Return(nil, fmt.Errorf("pop")) + + err := bm.CancelBatch(context.Background(), batchID.String()) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestCancelBatchFailHydrate(t *testing.T) { + bm, cancel := newTestBatchManager(t) + defer cancel() + + batchID := fftypes.NewUUID() + bp := &core.BatchPersisted{ + BatchHeader: core.BatchHeader{ + ID: batchID, + }, + } + + mdi := bm.database.(*databasemocks.Plugin) + mdm := bm.data.(*datamocks.Manager) + mdi.On("GetBatchByID", context.Background(), "ns1", batchID).Return(bp, nil) + mdm.On("HydrateBatch", context.Background(), bp).Return(nil, fmt.Errorf("pop")) + + err := bm.CancelBatch(context.Background(), batchID.String()) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestCancelBatchNoPayload(t *testing.T) { + bm, cancel := newTestBatchManager(t) + defer cancel() + + batchID := fftypes.NewUUID() + bp := &core.BatchPersisted{ + BatchHeader: core.BatchHeader{ + ID: batchID, + }, + } + batch := &core.Batch{ + BatchHeader: bp.BatchHeader, + Payload: core.BatchPayload{}, + } + + mdi := bm.database.(*databasemocks.Plugin) + mdm := bm.data.(*datamocks.Manager) + mdi.On("GetBatchByID", context.Background(), "ns1", batchID).Return(bp, nil) + mdm.On("HydrateBatch", context.Background(), bp).Return(batch, nil) + + err := bm.CancelBatch(context.Background(), batchID.String()) + assert.Regexp(t, "FF10467", err) + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestCancelBatchUnregisteredProcessor(t *testing.T) { + bm, cancel := newTestBatchManager(t) + defer cancel() + + group := fftypes.NewRandB32() + + batchID := fftypes.NewUUID() + bp := &core.BatchPersisted{ + BatchHeader: core.BatchHeader{ + ID: batchID, + }, + } + msgid := fftypes.NewUUID() + msg := &core.Message{ + Header: core.MessageHeader{ + ID: msgid, + Type: core.MessageTypePrivate, + TxType: core.TransactionTypeBatchPin, + Group: group, + SignerRef: core.SignerRef{ + Author: "did:firefly:org/abcd", + }, + }, + } + batch := &core.Batch{ + BatchHeader: bp.BatchHeader, + Payload: core.BatchPayload{ + Messages: []*core.Message{msg}, + }, + } + + mdi := bm.database.(*databasemocks.Plugin) + mdm := bm.data.(*datamocks.Manager) + mdi.On("GetBatchByID", context.Background(), "ns1", batchID).Return(bp, nil) + mdm.On("HydrateBatch", context.Background(), bp).Return(batch, nil) + + err := bm.CancelBatch(context.Background(), batchID.String()) + assert.Regexp(t, "FF10126", err) + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestCancelBatchInactiveProcessor(t *testing.T) { + bm, cancel := newTestBatchManager(t) + defer cancel() + + bm.RegisterDispatcher("utdispatcher", core.TransactionTypeBatchPin, []core.MessageType{core.MessageTypePrivate}, + func(c context.Context, state *DispatchPayload) error { + return nil + }, + DispatcherOptions{BatchType: core.BatchTypePrivate}, + ) + group := fftypes.NewRandB32() + + batchID := fftypes.NewUUID() + bp := &core.BatchPersisted{ + BatchHeader: core.BatchHeader{ + ID: batchID, + }, + } + msgid := fftypes.NewUUID() + msg := &core.Message{ + Header: core.MessageHeader{ + ID: msgid, + Type: core.MessageTypePrivate, + TxType: core.TransactionTypeBatchPin, + Group: group, + SignerRef: core.SignerRef{ + Author: "did:firefly:org/abcd", + }, + }, + } + batch := &core.Batch{ + BatchHeader: bp.BatchHeader, + Payload: core.BatchPayload{ + Messages: []*core.Message{msg}, + }, + } + + mdi := bm.database.(*databasemocks.Plugin) + mdm := bm.data.(*datamocks.Manager) + mdi.On("GetBatchByID", context.Background(), "ns1", batchID).Return(bp, nil) + mdm.On("HydrateBatch", context.Background(), bp).Return(batch, nil) + + err := bm.CancelBatch(context.Background(), batchID.String()) + assert.Regexp(t, "FF10468", err) + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestCancelBatchInvalidType(t *testing.T) { + bm, cancel := newTestBatchManager(t) + defer cancel() + + bm.RegisterDispatcher("utdispatcher", core.TransactionTypeBatchPin, []core.MessageType{core.MessageTypePrivate}, + func(c context.Context, state *DispatchPayload) error { + return nil + }, + DispatcherOptions{BatchType: core.BatchTypePrivate}, + ) + group := fftypes.NewRandB32() + _, err := bm.getProcessor(core.TransactionTypeBatchPin, core.MessageTypePrivate, group, "did:firefly:org/abcd", true) + assert.NoError(t, err) + + batchID := fftypes.NewUUID() + bp := &core.BatchPersisted{ + BatchHeader: core.BatchHeader{ + ID: batchID, + }, + } + msgid := fftypes.NewUUID() + msg := &core.Message{ + Header: core.MessageHeader{ + ID: msgid, + Type: core.MessageTypePrivate, + TxType: core.TransactionTypeBatchPin, + Group: group, + SignerRef: core.SignerRef{ + Author: "did:firefly:org/abcd", + }, + }, + } + batch := &core.Batch{ + BatchHeader: bp.BatchHeader, + Payload: core.BatchPayload{ + Messages: []*core.Message{msg}, + }, + } + + mdi := bm.database.(*databasemocks.Plugin) + mdm := bm.data.(*datamocks.Manager) + mdi.On("GetBatchByID", context.Background(), "ns1", batchID).Return(bp, nil) + mdm.On("HydrateBatch", context.Background(), bp).Return(batch, nil) + + err = bm.CancelBatch(context.Background(), batchID.String()) + assert.Regexp(t, "FF10466", err) + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) +} diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index 5ef438645..41bf61e74 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -290,7 +290,7 @@ func (bp *batchProcessor) cancelFlush(ctx context.Context, id *fftypes.UUID) err if bp.conf.txType != core.TransactionTypeContractInvokePin { return i18n.NewError(ctx, coremsgs.MsgCannotCancelBatchType) } - if !fs.Blocked || !id.Equals(fs.Flushing) { + if !id.Equals(fs.Flushing) { return i18n.NewError(ctx, coremsgs.MsgCannotCancelBatchState) } fs.Cancelled = true @@ -604,13 +604,13 @@ func (bp *batchProcessor) dispatchBatch(payload *DispatchPayload) error { err = bp.conf.dispatch(ctx, payload) if err != nil { if bp.isCancelled() { + err = nil log.L(ctx).Warnf("Batch %s was cancelled - replacing with gap fill", payload.Batch.ID) for _, msg := range payload.Messages { - if err := bp.writeGapFill(ctx, msg); err != nil { - return true, err + if err == nil { + err = bp.writeGapFill(ctx, msg) } } - err = nil payload.State = core.MessageStateCancelled } } else { @@ -642,10 +642,11 @@ func (bp *batchProcessor) writeGapFill(ctx context.Context, msg *core.Message) e gapFill.Header.CID = msg.Header.ID gapFill.Header.Tag = core.SystemTagGapFill gapFill.Header.TxType = core.TransactionTypeBatchPin - if err := gapFill.Seal(ctx); err != nil { - return err + err := gapFill.Seal(ctx) + if err == nil { + err = bp.data.WriteNewMessage(ctx, &data.NewMessage{Message: gapFill}) } - return bp.data.WriteNewMessage(ctx, &data.NewMessage{Message: gapFill}) + return err } func (bp *batchProcessor) markPayloadDispatched(payload *DispatchPayload) error { diff --git a/internal/batch/batch_processor_test.go b/internal/batch/batch_processor_test.go index 39426c4ef..70d502f90 100644 --- a/internal/batch/batch_processor_test.go +++ b/internal/batch/batch_processor_test.go @@ -22,11 +22,13 @@ import ( "testing" "time" + "github.com/hyperledger/firefly-common/pkg/ffapi" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly-common/pkg/retry" "github.com/hyperledger/firefly/internal/cache" "github.com/hyperledger/firefly/internal/coreconfig" + "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/mocks/cachemocks" "github.com/hyperledger/firefly/mocks/databasemocks" @@ -85,7 +87,7 @@ func TestUnfilledBatch(t *testing.T) { mockRunAsGroupPassthrough(mdi) mdi.On("UpdateMessages", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(nil) - mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil) mth := bp.txHelper.(*txcommonmocks.Helper) mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeBatchPin, core.IdempotencyKey("")).Return(fftypes.NewUUID(), nil) @@ -134,7 +136,7 @@ func TestBatchSizeOverflow(t *testing.T) { bp.conf.BatchMaxBytes = batchSizeEstimateBase + (&core.Message{}).EstimateSize(false) + 100 mockRunAsGroupPassthrough(mdi) mdi.On("UpdateMessages", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(nil) - mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil) mth := bp.txHelper.(*txcommonmocks.Helper) mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeBatchPin, core.IdempotencyKey("")).Return(fftypes.NewUUID(), nil) @@ -512,7 +514,7 @@ func TestMarkMessageDispatchedUnpinnedOK(t *testing.T) { mockRunAsGroupPassthrough(mdi) mdi.On("UpdateMessages", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(nil) - mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) @@ -568,7 +570,7 @@ func TestMaskContextsRetryAfterPinsAssigned(t *testing.T) { mdi.On("UpdateNonce", mock.Anything, mock.MatchedBy(func(dbNonce *core.Nonce) bool { return dbNonce.Nonce == 12347 // twice incremented })).Return(nil).Once() - mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil) mdm := bp.data.(*datamocks.Manager) mdm.On("UpdateMessageIfCached", mock.Anything, mock.Anything).Return() @@ -671,7 +673,7 @@ func TestSealBatchTXAlreadyAssigned(t *testing.T) { mdi.On("GetNonce", mock.Anything, mock.Anything).Return(nil, nil) mdi.On("InsertNonce", mock.Anything, mock.Anything).Return(nil) mdi.On("UpdateMessage", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(nil) - mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil) mim := bp.bm.identity.(*identitymanagermocks.Manager) mim.On("GetLocalNode", mock.Anything).Return(&core.Identity{}, nil) @@ -706,6 +708,68 @@ func TestSealBatchTXAlreadyAssigned(t *testing.T) { mdm.AssertExpectations(t) } +func TestCalculateContextsLoadPins(t *testing.T) { + cancel, _, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchPayload) error { + return nil + }) + cancel() + + pin1 := fftypes.NewRandB32() + pin2 := fftypes.NewRandB32() + + msg := &core.Message{ + Header: core.MessageHeader{ + ID: fftypes.NewUUID(), + Type: core.MessageTypePrivate, + Group: fftypes.NewRandB32(), + Topics: fftypes.FFStringArray{"topic1"}, + TxType: core.TransactionTypeContractInvokePin, + }, + Pins: fftypes.FFStringArray{ + pin1.String(), // old pin (hash only) + pin2.String() + ":00000001", // new pin (hash + nonce) + }, + } + + payload := &DispatchPayload{ + Messages: []*core.Message{msg}, + } + state := &dispatchState{} + + err := bp.calculateContexts(bp.ctx, payload, state) + assert.NoError(t, err) + // Payload is updated with pins loaded from message + assert.Equal(t, []*fftypes.Bytes32{pin1, pin2}, payload.Pins) + // State is not updated (because pins were already persisted) + assert.Nil(t, state.msgPins) +} + +func TestCalculateContextsLoadPinsFail(t *testing.T) { + cancel, _, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchPayload) error { + return nil + }) + cancel() + + msg := &core.Message{ + Header: core.MessageHeader{ + ID: fftypes.NewUUID(), + Type: core.MessageTypePrivate, + Group: fftypes.NewRandB32(), + Topics: fftypes.FFStringArray{"topic1"}, + TxType: core.TransactionTypeContractInvokePin, + }, + Pins: fftypes.FFStringArray{"bad-pin"}, + } + + payload := &DispatchPayload{ + Messages: []*core.Message{msg}, + } + state := &dispatchState{} + + err := bp.calculateContexts(bp.ctx, payload, state) + assert.Regexp(t, "FF00107", err) +} + func TestBigBatchEstimate(t *testing.T) { log.SetLevel("debug") coreconfig.Reset() @@ -794,3 +858,191 @@ func TestBigBatchEstimate(t *testing.T) { assert.Greater(t, sizeEstimate, int64(len(bd))) } + +func TestCancelBatchPrivate(t *testing.T) { + first := true + var bp *batchProcessor + dispatched := make(chan *DispatchPayload) + cancel, mdi, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchPayload) error { + if first { + first = false + err := bp.cancelFlush(c, state.Batch.ID) + assert.NoError(t, err) + return fmt.Errorf("pop") + } + dispatched <- state + return nil + }) + defer cancel() + bp.conf.txType = core.TransactionTypeContractInvokePin + + msg1 := fftypes.NewUUID() // cancelled + msg2 := fftypes.NewUUID() // dispatched + + mockRunAsGroupPassthrough(mdi) + mdi.On("GetNonce", mock.Anything, mock.Anything).Return(&core.Nonce{ + Nonce: 12345, + }, nil) + mdi.On("UpdateNonce", mock.Anything, mock.MatchedBy(func(dbNonce *core.Nonce) bool { + return dbNonce.Nonce == 12346 + })).Return(nil) + mdi.On("UpdateMessage", mock.Anything, "ns1", msg1, mock.Anything).Return(nil).Once() + mdi.On("UpdateMessage", mock.Anything, "ns1", msg2, mock.Anything).Return(nil).Once() + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil).Twice() + mdi.On("UpdateMessages", mock.Anything, "ns1", mock.MatchedBy(func(filter ffapi.AndFilter) bool { + info, err := filter.Finalize() + assert.NoError(t, err) + assert.Len(t, info.Children, 2) + return info.Children[0].String() == "id IN ['"+msg1.String()+"']" && info.Children[1].String() == "state == 'ready'" + }), mock.MatchedBy(func(update ffapi.Update) bool { + info, err := update.Finalize() + assert.NoError(t, err) + assert.Len(t, info.SetOperations, 3) + assert.Equal(t, "batch", info.SetOperations[0].Field) + assert.Equal(t, "txid", info.SetOperations[1].Field) + assert.Equal(t, "state", info.SetOperations[2].Field) + val, _ := info.SetOperations[2].Value.Value() + return val == "cancelled" + })).Return(nil).Once() + mdi.On("UpdateMessages", mock.Anything, "ns1", mock.MatchedBy(func(filter ffapi.AndFilter) bool { + info, err := filter.Finalize() + assert.NoError(t, err) + assert.Len(t, info.Children, 2) + return info.Children[0].String() == "id IN ['"+msg2.String()+"']" && info.Children[1].String() == "state == 'ready'" + }), mock.MatchedBy(func(update ffapi.Update) bool { + info, err := update.Finalize() + assert.NoError(t, err) + assert.Len(t, info.SetOperations, 3) + assert.Equal(t, "batch", info.SetOperations[0].Field) + assert.Equal(t, "txid", info.SetOperations[1].Field) + assert.Equal(t, "state", info.SetOperations[2].Field) + val, _ := info.SetOperations[2].Value.Value() + return val == "sent" + })).Return(nil).Maybe() // race condition - may or may not finish this second message + + mim := bp.bm.identity.(*identitymanagermocks.Manager) + mim.On("GetLocalNode", mock.Anything).Return(&core.Identity{}, nil) + + mth := bp.txHelper.(*txcommonmocks.Helper) + mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeContractInvokePin, core.IdempotencyKey("")).Return(fftypes.NewUUID(), nil) + + mdm := bp.data.(*datamocks.Manager) + mdm.On("UpdateMessageIfCached", mock.Anything, mock.Anything).Return() + mdm.On("WriteNewMessage", mock.Anything, mock.MatchedBy(func(msg *data.NewMessage) bool { + return msg.Message.Header.CID.Equals(msg1) && msg.Message.Header.Tag == core.SystemTagGapFill + })).Return(nil) + + go func() { + bp.newWork <- &batchWork{msg: &core.Message{Header: core.MessageHeader{ + ID: msg1, + Type: core.MessageTypePrivate, + Group: fftypes.NewRandB32(), + Topics: fftypes.FFStringArray{"topic1"}, + }}} + bp.newWork <- &batchWork{msg: &core.Message{Header: core.MessageHeader{ + ID: msg2, + Type: core.MessageTypePrivate, + Group: fftypes.NewRandB32(), + Topics: fftypes.FFStringArray{"topic1"}, + }}} + }() + <-dispatched + + mdi.AssertExpectations(t) + mim.AssertExpectations(t) + mth.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestCancelBatchBroadcast(t *testing.T) { + first := true + var bp *batchProcessor + dispatched := make(chan *DispatchPayload) + cancel, mdi, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchPayload) error { + if first { + first = false + err := bp.cancelFlush(c, state.Batch.ID) + assert.NoError(t, err) + return fmt.Errorf("pop") + } + dispatched <- state + return nil + }) + defer cancel() + bp.conf.txType = core.TransactionTypeContractInvokePin + + msg1 := fftypes.NewUUID() // cancelled + msg2 := fftypes.NewUUID() // dispatched + + mockRunAsGroupPassthrough(mdi) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil).Twice() + mdi.On("UpdateMessages", mock.Anything, "ns1", mock.MatchedBy(func(filter ffapi.AndFilter) bool { + info, err := filter.Finalize() + assert.NoError(t, err) + assert.Len(t, info.Children, 2) + return info.Children[0].String() == "id IN ['"+msg1.String()+"']" && info.Children[1].String() == "state == 'ready'" + }), mock.MatchedBy(func(update ffapi.Update) bool { + info, err := update.Finalize() + assert.NoError(t, err) + assert.Len(t, info.SetOperations, 3) + assert.Equal(t, "batch", info.SetOperations[0].Field) + assert.Equal(t, "txid", info.SetOperations[1].Field) + assert.Equal(t, "state", info.SetOperations[2].Field) + val, _ := info.SetOperations[2].Value.Value() + return val == "cancelled" + })).Return(nil).Once() + mdi.On("UpdateMessages", mock.Anything, "ns1", mock.MatchedBy(func(filter ffapi.AndFilter) bool { + info, err := filter.Finalize() + assert.NoError(t, err) + assert.Len(t, info.Children, 2) + return info.Children[0].String() == "id IN ['"+msg2.String()+"']" && info.Children[1].String() == "state == 'ready'" + }), mock.MatchedBy(func(update ffapi.Update) bool { + info, err := update.Finalize() + assert.NoError(t, err) + assert.Len(t, info.SetOperations, 3) + assert.Equal(t, "batch", info.SetOperations[0].Field) + assert.Equal(t, "txid", info.SetOperations[1].Field) + assert.Equal(t, "state", info.SetOperations[2].Field) + val, _ := info.SetOperations[2].Value.Value() + return val == "sent" + })).Return(nil).Maybe() // race condition - may or may not finish this second message + + mim := bp.bm.identity.(*identitymanagermocks.Manager) + mim.On("GetLocalNode", mock.Anything).Return(&core.Identity{}, nil) + + mth := bp.txHelper.(*txcommonmocks.Helper) + mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeContractInvokePin, core.IdempotencyKey("")).Return(fftypes.NewUUID(), nil) + + mdm := bp.data.(*datamocks.Manager) + mdm.On("UpdateMessageIfCached", mock.Anything, mock.Anything).Return() + + go func() { + bp.newWork <- &batchWork{msg: &core.Message{Header: core.MessageHeader{ + ID: msg1, + Type: core.MessageTypeBroadcast, + Topics: fftypes.FFStringArray{"topic1"}, + }}} + bp.newWork <- &batchWork{msg: &core.Message{Header: core.MessageHeader{ + ID: msg2, + Type: core.MessageTypeBroadcast, + Topics: fftypes.FFStringArray{"topic1"}, + }}} + }() + <-dispatched + + mdi.AssertExpectations(t) + mim.AssertExpectations(t) + mth.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestCancelBatchNotFlushing(t *testing.T) { + cancel, _, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchPayload) error { + return nil + }) + defer cancel() + bp.conf.txType = core.TransactionTypeContractInvokePin + + err := bp.cancelFlush(context.Background(), fftypes.NewUUID()) + assert.Regexp(t, "FF10468", err) +} From 618a265d72e8c473300b577e7f5de824eca6f4b3 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Fri, 5 Apr 2024 14:38:42 -0400 Subject: [PATCH 3/8] Add test coverage for gap fill logic in aggregator Signed-off-by: Andrew Richardson --- internal/events/aggregator_test.go | 57 ++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index 69afb581e..012dc0c93 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -1193,6 +1193,63 @@ func TestProcessMsgFailPinUpdate(t *testing.T) { } +func TestProcessMsgGapFill(t *testing.T) { + ag := newTestAggregator() + defer ag.cleanup(t) + bs := newBatchState(&ag.aggregator) + pin := fftypes.NewRandB32() + org1 := newTestOrg("org1") + + groupID := fftypes.NewRandB32() + msg := &core.Message{ + Header: core.MessageHeader{ + ID: fftypes.NewUUID(), + Group: groupID, + Topics: fftypes.FFStringArray{"topic1"}, + Namespace: "ns1", + SignerRef: core.SignerRef{ + Author: org1.DID, + Key: "0x12345", + }, + CID: fftypes.NewUUID(), + Tag: core.SystemTagGapFill, + }, + Pins: fftypes.FFStringArray{pin.String()}, + } + + ag.mim.On("FindIdentityForVerifier", ag.ctx, []core.IdentityType{core.IdentityTypeOrg, core.IdentityTypeCustom}, &core.VerifierRef{ + Type: core.VerifierTypeEthAddress, + Value: "0x12345", + }).Return(org1, nil) + ag.mdi.On("GetNextPinsForContext", ag.ctx, "ns1", mock.Anything).Return([]*core.NextPin{ + {Context: fftypes.NewRandB32(), Hash: pin, Identity: org1.DID}, + }, nil) + ag.mdm.On("GetMessageWithDataCached", ag.ctx, mock.Anything, data.CRORequirePins).Return(msg, nil, true, nil) + ag.mdm.On("UpdateMessageStateIfCached", ag.ctx, msg.Header.ID, core.MessageStateConfirmed, mock.Anything, "").Once().Return() + ag.mdm.On("UpdateMessageStateIfCached", ag.ctx, msg.Header.CID, core.MessageStateCancelled, mock.Anything, "").Once().Return() + + ag.mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(nil) + ag.mdi.On("UpdateNextPin", ag.ctx, "ns1", mock.Anything, mock.Anything).Return(nil) + ag.mdi.On("UpdatePins", ag.ctx, "ns1", mock.Anything, mock.Anything).Return(nil) + + ag.mdi.On("UpdateMessages", ag.ctx, "ns1", mock.Anything, mock.Anything).Twice().Return(nil) + + err := ag.processMessage(ag.ctx, &core.BatchManifest{ + ID: fftypes.NewUUID(), + }, &core.Pin{Masked: true, Sequence: 12345, Signer: "0x12345"}, 10, &core.MessageManifestEntry{ + MessageRef: core.MessageRef{ + ID: msg.Header.ID, + Hash: msg.Hash, + }, + Topics: len(msg.Header.Topics), + }, &core.BatchPersisted{}, bs) + assert.NoError(t, err) + + err = bs.RunFinalize(ag.ctx) + assert.NoError(t, err) + +} + func TestCheckMaskedContextReadyMismatchedAuthor(t *testing.T) { ag := newTestAggregator() defer ag.cleanup(t) From b3768f7196d5bf24fc5f6b78079a8b46d5bf0eb7 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Fri, 5 Apr 2024 14:39:22 -0400 Subject: [PATCH 4/8] Add test coverage for existing gaps in blockchain event handling Signed-off-by: Andrew Richardson --- internal/events/blockchain_event.go | 6 +-- internal/events/blockchain_event_test.go | 6 ++- internal/events/event_poller_test.go | 54 ++++++++++++------------ 3 files changed, 35 insertions(+), 31 deletions(-) diff --git a/internal/events/blockchain_event.go b/internal/events/blockchain_event.go index e4541bbf0..8708c9cd3 100644 --- a/internal/events/blockchain_event.go +++ b/internal/events/blockchain_event.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -57,7 +57,7 @@ func buildBlockchainEvent(ns string, subID *fftypes.UUID, event *blockchain.Even } func (em *eventManager) getChainListenerByProtocolIDCached(ctx context.Context, protocolID string, bc *eventBatchContext) (*core.ContractListener, error) { - // Event a negative result is cached in te scope of the event batch (so we don't spam the DB hundreds of times in one tight loop to get not-found) + // Even a negative result is cached in the scope of the event batch (so we don't spam the DB hundreds of times in one tight loop to get not-found) if l, batchResult := bc.contractListenerResults[protocolID]; batchResult { return l, nil } @@ -105,7 +105,7 @@ func (em *eventManager) getTopicForChainListener(listener *core.ContractListener return core.SystemBatchPinTopic } var topic string - if listener != nil && listener.Topic != "" { + if listener.Topic != "" { topic = listener.Topic } else { topic = listener.ID.String() diff --git a/internal/events/blockchain_event_test.go b/internal/events/blockchain_event_test.go index 987c2d8a5..67c293a5c 100644 --- a/internal/events/blockchain_event_test.go +++ b/internal/events/blockchain_event_test.go @@ -100,13 +100,17 @@ func TestContractEventUnknownSubscription(t *testing.T) { }, } - em.mdi.On("GetContractListenerByBackendID", mock.Anything, "ns1", "sb-1").Return(nil, nil) + em.mdi.On("GetContractListenerByBackendID", mock.Anything, "ns1", "sb-1").Once().Return(nil, nil) err := em.BlockchainEventBatch([]*blockchain.EventToDispatch{ { Type: blockchain.EventTypeForListener, ForListener: ev, }, + { + Type: blockchain.EventTypeForListener, + ForListener: ev, // second event should read from cache (even though listener is not found) + }, }) assert.NoError(t, err) diff --git a/internal/events/event_poller_test.go b/internal/events/event_poller_test.go index de080a5a2..38c6947c3 100644 --- a/internal/events/event_poller_test.go +++ b/internal/events/event_poller_test.go @@ -32,7 +32,7 @@ import ( "github.com/stretchr/testify/mock" ) -func newTestEventPoller(t *testing.T, mdi *databasemocks.Plugin, neh newEventsHandler, rewinder func() (bool, int64)) (ep *eventPoller, cancel func()) { +func newTestEventPoller(mdi *databasemocks.Plugin, neh newEventsHandler, rewinder func() (bool, int64)) (ep *eventPoller, cancel func()) { ctx, cancel := context.WithCancel(context.Background()) ep = newEventPoller(ctx, mdi, newEventNotifier(ctx, "ut"), &eventPollerConf{ eventBatchSize: 10, @@ -65,7 +65,7 @@ func newTestEventPoller(t *testing.T, mdi *databasemocks.Plugin, neh newEventsHa func TestStartStopEventPoller(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) mdi.On("GetOffset", mock.Anything, core.OffsetTypeSubscription, "test").Return(&core.Offset{ Type: core.OffsetTypeAggregator, Name: aggregatorOffsetName, @@ -82,7 +82,7 @@ func TestStartStopEventPoller(t *testing.T) { func TestRestoreOffsetNewestOK(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) defer cancel() mdi.On("GetOffset", mock.Anything, core.OffsetTypeSubscription, "test").Return(nil, nil).Once() mdi.On("GetOffset", mock.Anything, core.OffsetTypeSubscription, "test").Return(&core.Offset{Current: 12345}, nil, nil).Once() @@ -98,7 +98,7 @@ func TestRestoreOffsetNewestOK(t *testing.T) { func TestRestoreOffsetNewestNoEvents(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) defer cancel() mdi.On("GetOffset", mock.Anything, core.OffsetTypeSubscription, "test").Return(nil, nil).Once() mdi.On("GetOffset", mock.Anything, core.OffsetTypeSubscription, "test").Return(&core.Offset{Current: -1}, nil).Once() @@ -114,7 +114,7 @@ func TestRestoreOffsetNewestNoEvents(t *testing.T) { func TestRestoreOffsetNewestFail(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) defer cancel() mdi.On("GetOffset", mock.Anything, core.OffsetTypeSubscription, "test").Return(nil, nil) mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("pop")) @@ -126,7 +126,7 @@ func TestRestoreOffsetNewestFail(t *testing.T) { func TestRestoreOffsetOldest(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) firstEvent := core.SubOptsFirstEventOldest ep.conf.firstEvent = &firstEvent defer cancel() @@ -143,7 +143,7 @@ func TestRestoreOffsetOldest(t *testing.T) { func TestRestoreOffsetSpecific(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) firstEvent := core.SubOptsFirstEvent("123456") ep.conf.firstEvent = &firstEvent defer cancel() @@ -160,7 +160,7 @@ func TestRestoreOffsetSpecific(t *testing.T) { func TestRestoreOffsetFailRead(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) cancel() // to avoid infinite retry mdi.On("GetOffset", mock.Anything, core.OffsetTypeSubscription, "test").Return(nil, fmt.Errorf("pop")) ep.start() @@ -169,7 +169,7 @@ func TestRestoreOffsetFailRead(t *testing.T) { func TestRestoreOffsetFailWrite(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) firstEvent := core.SubOptsFirstEventOldest ep.conf.firstEvent = &firstEvent defer cancel() @@ -182,7 +182,7 @@ func TestRestoreOffsetFailWrite(t *testing.T) { func TestRestoreOffsetEphemeral(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) firstEvent := core.SubOptsFirstEventOldest ep.conf.firstEvent = &firstEvent ep.conf.ephemeral = true @@ -194,7 +194,7 @@ func TestRestoreOffsetEphemeral(t *testing.T) { func TestReadPageExit(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) cancel() mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("pop")) ep.eventLoop() @@ -204,7 +204,7 @@ func TestReadPageExit(t *testing.T) { func TestReadPageSingleCommitEvent(t *testing.T) { mdi := &databasemocks.Plugin{} processEventCalled := make(chan core.LocallySequenced, 1) - ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) { + ep, cancel := newTestEventPoller(mdi, func(events []core.LocallySequenced) (bool, error) { processEventCalled <- events[0] return false, nil }, nil) @@ -222,7 +222,7 @@ func TestReadPageSingleCommitEvent(t *testing.T) { func TestReadPageBatchTimeoutNotFull(t *testing.T) { mdi := &databasemocks.Plugin{} processEventCalled := make(chan []core.LocallySequenced, 1) - ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) { + ep, cancel := newTestEventPoller(mdi, func(events []core.LocallySequenced) (bool, error) { processEventCalled <- events return false, nil }, nil) @@ -248,7 +248,7 @@ func TestReadPageBatchTimeoutNotFull(t *testing.T) { func TestReadPageBatchFull(t *testing.T) { mdi := &databasemocks.Plugin{} processEventCalled := make(chan []core.LocallySequenced, 1) - ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) { + ep, cancel := newTestEventPoller(mdi, func(events []core.LocallySequenced) (bool, error) { processEventCalled <- events return false, nil }, nil) @@ -273,7 +273,7 @@ func TestReadPageBatchFull(t *testing.T) { func TestReadPageRewind(t *testing.T) { mdi := &databasemocks.Plugin{} processEventCalled := make(chan core.LocallySequenced, 1) - ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) { + ep, cancel := newTestEventPoller(mdi, func(events []core.LocallySequenced) (bool, error) { processEventCalled <- events[0] return false, nil }, func() (bool, int64) { @@ -300,7 +300,7 @@ func TestReadPageRewind(t *testing.T) { func TestReadPageProcessEventsRetryExit(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) { return false, fmt.Errorf("pop") }, nil) + ep, cancel := newTestEventPoller(mdi, func(events []core.LocallySequenced) (bool, error) { return false, fmt.Errorf("pop") }, nil) cancel() ev1 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "") mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1}, nil, nil).Once() @@ -311,7 +311,7 @@ func TestReadPageProcessEventsRetryExit(t *testing.T) { func TestProcessEventsFail(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) { + ep, cancel := newTestEventPoller(mdi, func(events []core.LocallySequenced) (bool, error) { return false, fmt.Errorf("pop") }, nil) defer cancel() @@ -324,7 +324,7 @@ func TestProcessEventsFail(t *testing.T) { func TestWaitForShoulderTapOrExitCloseBatch(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) cancel() ep.conf.eventBatchTimeout = 1 * time.Minute ep.conf.eventBatchSize = 50 @@ -333,7 +333,7 @@ func TestWaitForShoulderTapOrExitCloseBatch(t *testing.T) { func TestWaitForShoulderTapImmediateRepollOnFullBatch(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) cancel() ep.conf.eventBatchTimeout = 1 * time.Minute ep.conf.eventBatchSize = 1 @@ -342,7 +342,7 @@ func TestWaitForShoulderTapImmediateRepollOnFullBatch(t *testing.T) { func TestWaitForShoulderTapOrExitClosePoll(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) cancel() ep.conf.eventBatchTimeout = 1 * time.Minute ep.conf.eventBatchSize = 2 @@ -351,7 +351,7 @@ func TestWaitForShoulderTapOrExitClosePoll(t *testing.T) { func TestWaitForShoulderTapOrPollTimeoutBatchAndPoll(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) defer cancel() ep.conf.eventBatchTimeout = 1 * time.Microsecond ep.conf.eventPollTimeout = 1 * time.Microsecond @@ -361,7 +361,7 @@ func TestWaitForShoulderTapOrPollTimeoutBatchAndPoll(t *testing.T) { func TestWaitForShoulderTapOrPollTimeoutTap(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) defer cancel() ep.conf.eventPollTimeout = 10 * time.Second ep.shoulderTap() @@ -370,7 +370,7 @@ func TestWaitForShoulderTapOrPollTimeoutTap(t *testing.T) { func TestDoubleTap(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) defer cancel() ep.shoulderTap() ep.shoulderTap() // this should not block @@ -378,7 +378,7 @@ func TestDoubleTap(t *testing.T) { func TestWaitForBatchTimeoutClosedContext(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) ep.conf.eventBatchTimeout = 1 * time.Minute cancel() ep.waitForBatchTimeout() @@ -386,7 +386,7 @@ func TestWaitForBatchTimeoutClosedContext(t *testing.T) { func TestDoubleConfirm(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) defer cancel() ep.commitOffset(12345) ep.commitOffset(12346) // this should not block @@ -395,7 +395,7 @@ func TestDoubleConfirm(t *testing.T) { func TestOffsetCommitLoopOk(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) cancel() mdi.On("UpdateOffset", mock.Anything, ep.offsetID, mock.Anything).Return(nil) @@ -410,7 +410,7 @@ func TestOffsetCommitLoopOk(t *testing.T) { func TestOffsetCommitLoopFail(t *testing.T) { mdi := &databasemocks.Plugin{} - ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep, cancel := newTestEventPoller(mdi, nil, nil) cancel() mdi.On("UpdateOffset", mock.Anything, ep.offsetID, mock.Anything).Return(fmt.Errorf("pop")) From 17a8380479a43175ecbb610fe7a9162515edcc61 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Fri, 5 Apr 2024 14:42:41 -0400 Subject: [PATCH 5/8] Clean up some error messages Signed-off-by: Andrew Richardson --- internal/batch/batch_manager.go | 2 +- internal/batch/batch_processor.go | 4 ++-- internal/coremsgs/en_error_messages.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index 0054c4053..c86efea6d 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -617,7 +617,7 @@ func (bm *batchManager) CancelBatch(ctx context.Context, batchID string) error { return err } if processor == nil { - return i18n.NewError(ctx, coremsgs.MsgCannotCancelBatchState) + return i18n.NewError(ctx, coremsgs.MsgBatchNotDispatching, batchID, nil) } return processor.cancelFlush(ctx, id) } diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index 41bf61e74..cb7457876 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -288,10 +288,10 @@ func (bp *batchProcessor) cancelFlush(ctx context.Context, id *fftypes.UUID) err fs := &bp.flushStatus if bp.conf.txType != core.TransactionTypeContractInvokePin { - return i18n.NewError(ctx, coremsgs.MsgCannotCancelBatchType) + return i18n.NewError(ctx, coremsgs.MsgCannotCancelBatchType, bp.conf.txType) } if !id.Equals(fs.Flushing) { - return i18n.NewError(ctx, coremsgs.MsgCannotCancelBatchState) + return i18n.NewError(ctx, coremsgs.MsgBatchNotDispatching, id, fs.Flushing) } fs.Cancelled = true return nil diff --git a/internal/coremsgs/en_error_messages.go b/internal/coremsgs/en_error_messages.go index 2b6467d60..24a005a80 100644 --- a/internal/coremsgs/en_error_messages.go +++ b/internal/coremsgs/en_error_messages.go @@ -304,7 +304,7 @@ var ( MsgMaxSubscriptionEventScanLimitBreached = ffe("FF10463", "Event scan limit breached with start sequence ID %d and end sequence ID %d. Please restrict your query to a narrower range", 400) MsgSequenceIDDidNotParseToInt = ffe("FF10464", "Could not parse provided %s to an integer sequence ID", 400) MsgInternalServerError = ffe("FF10465", "Internal server error: %s", 500) - MsgCannotCancelBatchType = ffe("FF10466", "Cannot cancel batch of this type", 400) + MsgCannotCancelBatchType = ffe("FF10466", "Cannot cancel batch of type: %s", 400) MsgErrorLoadingBatch = ffe("FF10467", "Error loading batch messages") - MsgCannotCancelBatchState = ffe("FF10468", "Batch is not currently dispatching", 400) + MsgBatchNotDispatching = ffe("FF10468", "Batch %s is not currently dispatching - current: %s", 400) ) From 1a5358a9ee57ca277402ef8005871ad41c36ceac Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Mon, 8 Apr 2024 16:25:26 -0400 Subject: [PATCH 6/8] Both "pinned" transaction types should share a batch processor "batch_pin" and "contract_invoke_pin" pull from the same pool of nonces for private messages, and thus need to share a single dispatch thread (per message type). This ensures that the order in which nonces are assigned is the order in which nonces are actually used (which is critical for ordering). "unpinned" private messages can continue to be a separate dispatcher, as they don't require nonces. Signed-off-by: Andrew Richardson --- internal/batch/batch_manager.go | 22 ++++-- internal/batch/batch_manager_test.go | 71 +++++++++++++++---- internal/batch/batch_processor.go | 26 +++---- internal/batch/batch_processor_test.go | 68 +++++++++++++----- internal/broadcast/manager.go | 9 +-- internal/broadcast/manager_test.go | 9 +-- internal/events/aggregator.go | 2 +- internal/privatemessaging/privatemessaging.go | 12 +--- .../privatemessaging/privatemessaging_test.go | 11 +-- mocks/batchmocks/manager.go | 6 +- 10 files changed, 148 insertions(+), 88 deletions(-) diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index c86efea6d..6ff7511b2 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -75,7 +75,7 @@ func NewBatchManager(ctx context.Context, ns string, di database.Plugin, dm data } type Manager interface { - RegisterDispatcher(name string, txType core.TransactionType, msgTypes []core.MessageType, handler DispatchHandler, batchOptions DispatcherOptions) + RegisterDispatcher(name string, pinned bool, msgTypes []core.MessageType, handler DispatchHandler, batchOptions DispatcherOptions) LoadContexts(ctx context.Context, payload *DispatchPayload) error CancelBatch(ctx context.Context, batchID string) error NewMessages() chan<- int64 @@ -143,11 +143,15 @@ func (bm *batchManager) getProcessorKey(author string, groupID *fftypes.Bytes32) return fmt.Sprintf("%s|%v", author, groupID) } -func (bm *batchManager) getDispatcherKey(txType core.TransactionType, msgType core.MessageType) string { - return fmt.Sprintf("tx:%s/%s", txType, msgType) +func (bm *batchManager) getDispatcherKey(pinned bool, msgType core.MessageType) string { + txType := "pinned" + if !pinned { + txType = "unpinned" + } + return fmt.Sprintf("%s|%s", txType, msgType) } -func (bm *batchManager) RegisterDispatcher(name string, txType core.TransactionType, msgTypes []core.MessageType, handler DispatchHandler, options DispatcherOptions) { +func (bm *batchManager) RegisterDispatcher(name string, pinned bool, msgTypes []core.MessageType, handler DispatchHandler, options DispatcherOptions) { bm.dispatcherMux.Lock() defer bm.dispatcherMux.Unlock() @@ -159,7 +163,7 @@ func (bm *batchManager) RegisterDispatcher(name string, txType core.TransactionT } bm.allDispatchers = append(bm.allDispatchers, dispatcher) for _, msgType := range msgTypes { - bm.dispatcherMap[bm.getDispatcherKey(txType, msgType)] = dispatcher + bm.dispatcherMap[bm.getDispatcherKey(pinned, msgType)] = dispatcher } } @@ -178,7 +182,8 @@ func (bm *batchManager) getProcessor(txType core.TransactionType, msgType core.M bm.dispatcherMux.Lock() defer bm.dispatcherMux.Unlock() - dispatcherKey := bm.getDispatcherKey(txType, msgType) + pinned := core.IsPinned(txType) + dispatcherKey := bm.getDispatcherKey(pinned, msgType) dispatcher, ok := bm.dispatcherMap[dispatcherKey] if !ok { return nil, i18n.NewError(bm.ctx, coremsgs.MsgUnregisteredBatchType, dispatcherKey) @@ -191,7 +196,7 @@ func (bm *batchManager) getProcessor(txType core.TransactionType, msgType core.M &batchProcessorConf{ DispatcherOptions: dispatcher.options, name: name, - txType: txType, + pinned: pinned, dispatcherName: dispatcher.name, author: author, group: group, @@ -604,6 +609,9 @@ func (bm *batchManager) CancelBatch(ctx context.Context, batchID string) error { if err != nil { return err } + if bp.TX.Type != core.TransactionTypeContractInvokePin { + return i18n.NewError(ctx, coremsgs.MsgCannotCancelBatchType, bp.TX.Type) + } batch, err := bm.data.HydrateBatch(ctx, bp) if err != nil { return err diff --git a/internal/batch/batch_manager_test.go b/internal/batch/batch_manager_test.go index 5b1f86d33..b651fca74 100644 --- a/internal/batch/batch_manager_test.go +++ b/internal/batch/batch_manager_test.go @@ -102,7 +102,7 @@ func TestE2EDispatchBroadcast(t *testing.T) { bm := bmi.(*batchManager) bm.readOffset = 1000 - bm.RegisterDispatcher("utdispatcher", core.TransactionTypeBatchPin, []core.MessageType{core.MessageTypeBroadcast}, handler, DispatcherOptions{ + bm.RegisterDispatcher("utdispatcher", true, []core.MessageType{core.MessageTypeBroadcast}, handler, DispatcherOptions{ BatchMaxSize: 2, BatchTimeout: 0, DisposeTimeout: 10 * time.Millisecond, @@ -224,7 +224,7 @@ func TestE2EDispatchPrivateUnpinned(t *testing.T) { bmi, _ := NewBatchManager(ctx, "ns1", mdi, mdm, mim, txHelper) bm := bmi.(*batchManager) - bm.RegisterDispatcher("utdispatcher", core.TransactionTypeBatchPin, []core.MessageType{core.MessageTypePrivate}, handler, DispatcherOptions{ + bm.RegisterDispatcher("utdispatcher", true, []core.MessageType{core.MessageTypePrivate}, handler, DispatcherOptions{ BatchMaxSize: 2, BatchTimeout: 0, DisposeTimeout: 120 * time.Second, @@ -370,7 +370,7 @@ func TestMessageSequencerMissingMessageData(t *testing.T) { cmi.On("GetCache", mock.Anything).Return(cache.NewUmanagedCache(ctx, 100, 5*time.Minute), nil) txHelper, _ := txcommon.NewTransactionHelper(ctx, "ns1", mdi, mdm, cmi) bm, _ := NewBatchManager(context.Background(), "ns1", mdi, mdm, mim, txHelper) - bm.RegisterDispatcher("utdispatcher", core.TransactionTypeNone, []core.MessageType{core.MessageTypeBroadcast}, + bm.RegisterDispatcher("utdispatcher", false, []core.MessageType{core.MessageTypeBroadcast}, func(c context.Context, state *DispatchPayload) error { return nil }, @@ -417,7 +417,7 @@ func TestMessageSequencerUpdateMessagesFail(t *testing.T) { mim.On("GetLocalNode", mock.Anything).Return(&core.Identity{}, nil) ctx, cancelCtx := context.WithCancel(context.Background()) bm, _ := NewBatchManager(ctx, "ns1", mdi, mdm, mim, txHelper) - bm.RegisterDispatcher("utdispatcher", core.TransactionTypeBatchPin, []core.MessageType{core.MessageTypeBroadcast}, + bm.RegisterDispatcher("utdispatcher", true, []core.MessageType{core.MessageTypeBroadcast}, func(c context.Context, state *DispatchPayload) error { return nil }, @@ -475,7 +475,7 @@ func TestMessageSequencerDispatchFail(t *testing.T) { mim.On("GetLocalNode", mock.Anything).Return(&core.Identity{}, nil) ctx, cancelCtx := context.WithCancel(context.Background()) bm, _ := NewBatchManager(ctx, "ns1", mdi, mdm, mim, txHelper) - bm.RegisterDispatcher("utdispatcher", core.TransactionTypeBatchPin, []core.MessageType{core.MessageTypeBroadcast}, + bm.RegisterDispatcher("utdispatcher", true, []core.MessageType{core.MessageTypeBroadcast}, func(c context.Context, state *DispatchPayload) error { cancelCtx() return fmt.Errorf("fizzle") @@ -517,7 +517,7 @@ func TestMessageSequencerUpdateBatchFail(t *testing.T) { txHelper, _ := txcommon.NewTransactionHelper(ctx, "ns1", mdi, mdm, cmi) mim.On("GetLocalNode", mock.Anything).Return(&core.Identity{}, nil) bm, _ := NewBatchManager(ctx, "ns1", mdi, mdm, mim, txHelper) - bm.RegisterDispatcher("utdispatcher", core.TransactionTypeBatchPin, []core.MessageType{core.MessageTypeBroadcast}, + bm.RegisterDispatcher("utdispatcher", true, []core.MessageType{core.MessageTypeBroadcast}, func(c context.Context, state *DispatchPayload) error { return nil }, @@ -767,6 +767,9 @@ func TestCancelBatchFailHydrate(t *testing.T) { BatchHeader: core.BatchHeader{ ID: batchID, }, + TX: core.TransactionRef{ + Type: core.TransactionTypeContractInvokePin, + }, } mdi := bm.database.(*databasemocks.Plugin) @@ -790,6 +793,9 @@ func TestCancelBatchNoPayload(t *testing.T) { BatchHeader: core.BatchHeader{ ID: batchID, }, + TX: core.TransactionRef{ + Type: core.TransactionTypeContractInvokePin, + }, } batch := &core.Batch{ BatchHeader: bp.BatchHeader, @@ -819,13 +825,16 @@ func TestCancelBatchUnregisteredProcessor(t *testing.T) { BatchHeader: core.BatchHeader{ ID: batchID, }, + TX: core.TransactionRef{ + Type: core.TransactionTypeContractInvokePin, + }, } msgid := fftypes.NewUUID() msg := &core.Message{ Header: core.MessageHeader{ ID: msgid, Type: core.MessageTypePrivate, - TxType: core.TransactionTypeBatchPin, + TxType: core.TransactionTypeContractInvokePin, Group: group, SignerRef: core.SignerRef{ Author: "did:firefly:org/abcd", @@ -855,7 +864,7 @@ func TestCancelBatchInactiveProcessor(t *testing.T) { bm, cancel := newTestBatchManager(t) defer cancel() - bm.RegisterDispatcher("utdispatcher", core.TransactionTypeBatchPin, []core.MessageType{core.MessageTypePrivate}, + bm.RegisterDispatcher("utdispatcher", true, []core.MessageType{core.MessageTypePrivate}, func(c context.Context, state *DispatchPayload) error { return nil }, @@ -868,13 +877,16 @@ func TestCancelBatchInactiveProcessor(t *testing.T) { BatchHeader: core.BatchHeader{ ID: batchID, }, + TX: core.TransactionRef{ + Type: core.TransactionTypeContractInvokePin, + }, } msgid := fftypes.NewUUID() msg := &core.Message{ Header: core.MessageHeader{ ID: msgid, Type: core.MessageTypePrivate, - TxType: core.TransactionTypeBatchPin, + TxType: core.TransactionTypeContractInvokePin, Group: group, SignerRef: core.SignerRef{ Author: "did:firefly:org/abcd", @@ -904,14 +916,44 @@ func TestCancelBatchInvalidType(t *testing.T) { bm, cancel := newTestBatchManager(t) defer cancel() - bm.RegisterDispatcher("utdispatcher", core.TransactionTypeBatchPin, []core.MessageType{core.MessageTypePrivate}, + bm.RegisterDispatcher("utdispatcher", true, []core.MessageType{core.MessageTypePrivate}, + func(c context.Context, state *DispatchPayload) error { + return nil + }, + DispatcherOptions{BatchType: core.BatchTypePrivate}, + ) + + batchID := fftypes.NewUUID() + bp := &core.BatchPersisted{ + BatchHeader: core.BatchHeader{ + ID: batchID, + }, + TX: core.TransactionRef{ + Type: core.TransactionTypeBatchPin, + }, + } + + mdi := bm.database.(*databasemocks.Plugin) + mdi.On("GetBatchByID", context.Background(), "ns1", batchID).Return(bp, nil) + + err := bm.CancelBatch(context.Background(), batchID.String()) + assert.Regexp(t, "FF10466", err) + + mdi.AssertExpectations(t) +} + +func TestCancelBatch(t *testing.T) { + bm, cancel := newTestBatchManager(t) + defer cancel() + + bm.RegisterDispatcher("utdispatcher", true, []core.MessageType{core.MessageTypePrivate}, func(c context.Context, state *DispatchPayload) error { return nil }, DispatcherOptions{BatchType: core.BatchTypePrivate}, ) group := fftypes.NewRandB32() - _, err := bm.getProcessor(core.TransactionTypeBatchPin, core.MessageTypePrivate, group, "did:firefly:org/abcd", true) + _, err := bm.getProcessor(core.TransactionTypeContractInvokePin, core.MessageTypePrivate, group, "did:firefly:org/abcd", true) assert.NoError(t, err) batchID := fftypes.NewUUID() @@ -919,13 +961,16 @@ func TestCancelBatchInvalidType(t *testing.T) { BatchHeader: core.BatchHeader{ ID: batchID, }, + TX: core.TransactionRef{ + Type: core.TransactionTypeContractInvokePin, + }, } msgid := fftypes.NewUUID() msg := &core.Message{ Header: core.MessageHeader{ ID: msgid, Type: core.MessageTypePrivate, - TxType: core.TransactionTypeBatchPin, + TxType: core.TransactionTypeContractInvokePin, Group: group, SignerRef: core.SignerRef{ Author: "did:firefly:org/abcd", @@ -945,7 +990,7 @@ func TestCancelBatchInvalidType(t *testing.T) { mdm.On("HydrateBatch", context.Background(), bp).Return(batch, nil) err = bm.CancelBatch(context.Background(), batchID.String()) - assert.Regexp(t, "FF10466", err) + assert.Regexp(t, "FF10468", err) mdi.AssertExpectations(t) mdm.AssertExpectations(t) diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index cb7457876..cc13741bd 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -44,7 +44,7 @@ type batchProcessorConf struct { DispatcherOptions name string dispatcherName string - txType core.TransactionType + pinned bool author string group *fftypes.Bytes32 dispatch DispatchHandler @@ -181,12 +181,13 @@ func (bp *batchProcessor) addWork(newWork *batchWork) (full, overflow bool) { // Check for conditions that prevent this piece of work from going into the current batch // (i.e. the new work is specifically assigned a separate transaction or signing key) - batchOfOne := bp.conf.txType == core.TransactionTypeContractInvokePin + batchOfOne := newWork.msg.Header.TxType == core.TransactionTypeContractInvokePin if batchOfOne { full = true overflow = len(bp.assemblyQueue) > 0 } else if len(bp.assemblyQueue) > 0 { - full = newWork.msg.Header.Key != bp.assemblyQueue[0].msg.Header.Key + full = newWork.msg.Header.TxType != bp.assemblyQueue[0].msg.Header.TxType || + newWork.msg.Header.Key != bp.assemblyQueue[0].msg.Header.Key overflow = true } @@ -286,10 +287,6 @@ func (bp *batchProcessor) cancelFlush(ctx context.Context, id *fftypes.UUID) err bp.statusMux.Lock() defer bp.statusMux.Unlock() fs := &bp.flushStatus - - if bp.conf.txType != core.TransactionTypeContractInvokePin { - return i18n.NewError(ctx, coremsgs.MsgCannotCancelBatchType, bp.conf.txType) - } if !id.Equals(fs.Flushing) { return i18n.NewError(ctx, coremsgs.MsgBatchNotDispatching, id, fs.Flushing) } @@ -434,6 +431,9 @@ func (bp *batchProcessor) initPayload(id *fftypes.UUID, flushWork []*batchWork) Group: bp.conf.group, Created: fftypes.Now(), }, + TX: core.TransactionRef{ + Type: flushWork[0].msg.Header.TxType, + }, }, } localNode, err := bp.bm.identity.GetLocalNode(bp.ctx) @@ -530,6 +530,7 @@ func (bp *batchProcessor) updateMessagePins(ctx context.Context, messages []*cor func (bp *batchProcessor) sealBatch(payload *DispatchPayload) (err error) { var state *dispatchState + txType := payload.Batch.TX.Type err = bp.retry.Do(bp.ctx, "batch persist", func(attempt int) (retry bool, err error) { return true, bp.database.RunAsGroup(bp.ctx, func(ctx context.Context) (err error) { @@ -541,7 +542,7 @@ func (bp *batchProcessor) sealBatch(payload *DispatchPayload) (err error) { } // Assign nonces and update nonces/messages in the database - if core.IsPinned(bp.conf.txType) { + if core.IsPinned(txType) { if err = bp.calculateContexts(ctx, payload, state); err != nil { return err } @@ -553,14 +554,13 @@ func (bp *batchProcessor) sealBatch(payload *DispatchPayload) (err error) { } } - payload.Batch.TX.Type = bp.conf.txType - batchOfOne := bp.conf.txType == core.TransactionTypeContractInvokePin + batchOfOne := txType == core.TransactionTypeContractInvokePin if batchOfOne && payload.Messages[0].TransactionID != nil { // For a batch-of-one with a pre-assigned transaction ID, propagate it to the batch payload.Batch.TX.ID = payload.Messages[0].TransactionID } else { // For all others, generate a new transaction - payload.Batch.TX.ID, err = bp.txHelper.SubmitNewTransaction(ctx, bp.conf.txType, "" /* no idempotency key */) + payload.Batch.TX.ID, err = bp.txHelper.SubmitNewTransaction(ctx, txType, "" /* no idempotency key */) if err != nil { return err } @@ -586,7 +586,7 @@ func (bp *batchProcessor) sealBatch(payload *DispatchPayload) (err error) { // Once the DB transaction is done, we need to update the messages with the pins. // We do this at this point, so the logic is re-entrant in a way that avoids re-allocating Pins to messages, but // means the retry loops above using the batch state function correctly. - if state != nil && core.IsPinned(bp.conf.txType) { + if state != nil && core.IsPinned(txType) { for _, msg := range payload.Messages { if pins, ok := state.msgPins[*msg.Header.ID]; ok { msg.Pins = pins @@ -614,7 +614,7 @@ func (bp *batchProcessor) dispatchBatch(payload *DispatchPayload) error { payload.State = core.MessageStateCancelled } } else { - if core.IsPinned(bp.conf.txType) { + if core.IsPinned(payload.Batch.TX.Type) { payload.State = core.MessageStateSent } else { payload.State = core.MessageStateConfirmed diff --git a/internal/batch/batch_processor_test.go b/internal/batch/batch_processor_test.go index 70d502f90..b75063dc0 100644 --- a/internal/batch/batch_processor_test.go +++ b/internal/batch/batch_processor_test.go @@ -49,7 +49,7 @@ func newTestBatchProcessor(t *testing.T, dispatch DispatchHandler) (func(), *dat cmi.On("GetCache", mock.Anything).Return(cache.NewUmanagedCache(ctx, 100, 5*time.Minute), nil) txHelper, _ := txcommon.NewTransactionHelper(ctx, "ns1", mdi, mdm, cmi) bp := newBatchProcessor(bm, &batchProcessorConf{ - txType: core.TransactionTypeBatchPin, + pinned: true, author: "did:firefly:org/abcd", dispatch: dispatch, DispatcherOptions: DispatcherOptions{ @@ -103,7 +103,12 @@ func TestUnfilledBatch(t *testing.T) { for i := 0; i < 5; i++ { msgid := fftypes.NewUUID() bp.newWork <- &batchWork{ - msg: &core.Message{Header: core.MessageHeader{ID: msgid}, Sequence: int64(1000 + i)}, + msg: &core.Message{ + Header: core.MessageHeader{ + ID: msgid, + TxType: core.TransactionTypeBatchPin, + }, + Sequence: int64(1000 + i)}, } } }() @@ -152,7 +157,12 @@ func TestBatchSizeOverflow(t *testing.T) { go func() { for i := 0; i < 2; i++ { bp.newWork <- &batchWork{ - msg: &core.Message{Header: core.MessageHeader{ID: msgIDs[i]}, Sequence: int64(1000 + i)}, + msg: &core.Message{ + Header: core.MessageHeader{ + ID: msgIDs[i], + TxType: core.TransactionTypeBatchPin, + }, + Sequence: int64(1000 + i)}, } } }() @@ -212,7 +222,12 @@ func TestCloseToUnblockUpsertBatch(t *testing.T) { msgid := fftypes.NewUUID() go func() { bp.newWork <- &batchWork{ - msg: &core.Message{Header: core.MessageHeader{ID: msgid}, Sequence: int64(1000)}, + msg: &core.Message{ + Header: core.MessageHeader{ + ID: msgid, + TxType: core.TransactionTypeBatchPin, + }, + Sequence: int64(1000)}, } }() @@ -248,6 +263,9 @@ func TestInsertNewNonceFail(t *testing.T) { BatchHeader: core.BatchHeader{ Group: gid, }, + TX: core.TransactionRef{ + Type: core.TransactionTypeBatchPin, + }, }, Messages: []*core.Message{ {Header: core.MessageHeader{ @@ -288,6 +306,9 @@ func TestUpdateExistingNonceFail(t *testing.T) { BatchHeader: core.BatchHeader{ Group: gid, }, + TX: core.TransactionRef{ + Type: core.TransactionTypeBatchPin, + }, }, Messages: []*core.Message{ {Header: core.MessageHeader{ @@ -323,6 +344,9 @@ func TestGetNonceFail(t *testing.T) { BatchHeader: core.BatchHeader{ Group: gid, }, + TX: core.TransactionRef{ + Type: core.TransactionTypeBatchPin, + }, }, Messages: []*core.Message{ {Header: core.MessageHeader{ @@ -359,6 +383,9 @@ func TestGetNonceMigrationFail(t *testing.T) { BatchHeader: core.BatchHeader{ Group: gid, }, + TX: core.TransactionRef{ + Type: core.TransactionTypeBatchPin, + }, }, Messages: []*core.Message{ {Header: core.MessageHeader{ @@ -403,25 +430,25 @@ func TestAddWorkBatchOfOne(t *testing.T) { return nil }) defer cancel() - bp.conf.txType = core.TransactionTypeContractInvokePin + header := core.MessageHeader{TxType: core.TransactionTypeContractInvokePin} full, overflow := bp.addWork(&batchWork{ - msg: &core.Message{Sequence: 200}, + msg: &core.Message{Sequence: 200, Header: header}, }) assert.True(t, full) assert.False(t, overflow) assert.Equal(t, []*batchWork{ - {msg: &core.Message{Sequence: 200}}, + {msg: &core.Message{Sequence: 200, Header: header}}, }, bp.assemblyQueue) full, overflow = bp.addWork(&batchWork{ - msg: &core.Message{Sequence: 201}, + msg: &core.Message{Sequence: 201, Header: header}, }) assert.True(t, full) assert.True(t, overflow) assert.Equal(t, []*batchWork{ - {msg: &core.Message{Sequence: 200}}, - {msg: &core.Message{Sequence: 201}}, + {msg: &core.Message{Sequence: 200, Header: header}}, + {msg: &core.Message{Sequence: 201, Header: header}}, }, bp.assemblyQueue) } @@ -510,7 +537,6 @@ func TestMarkMessageDispatchedUnpinnedOK(t *testing.T) { return nil }) defer cancel() - bp.conf.txType = core.TransactionTypeUnpinned mockRunAsGroupPassthrough(mdi) mdi.On("UpdateMessages", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(nil) @@ -532,7 +558,14 @@ func TestMarkMessageDispatchedUnpinnedOK(t *testing.T) { for i := 0; i < 5; i++ { msgid := fftypes.NewUUID() bp.newWork <- &batchWork{ - msg: &core.Message{Header: core.MessageHeader{ID: msgid, Topics: fftypes.FFStringArray{"topic1"}}, Sequence: int64(1000 + i)}, + msg: &core.Message{ + Header: core.MessageHeader{ + ID: msgid, + Topics: fftypes.FFStringArray{"topic1"}, + TxType: core.TransactionTypeUnpinned, + }, + Sequence: int64(1000 + i), + }, } } }() @@ -588,6 +621,7 @@ func TestMaskContextsRetryAfterPinsAssigned(t *testing.T) { Type: core.MessageTypePrivate, Group: groupID, Topics: fftypes.FFStringArray{"topic1"}, + TxType: core.TransactionTypeBatchPin, }, } msg2 := &core.Message{ @@ -596,6 +630,7 @@ func TestMaskContextsRetryAfterPinsAssigned(t *testing.T) { Type: core.MessageTypePrivate, Group: groupID, Topics: fftypes.FFStringArray{"topic1"}, + TxType: core.TransactionTypeBatchPin, }, } @@ -645,6 +680,7 @@ func TestMaskContextsUpdateMessageFail(t *testing.T) { Type: core.MessageTypePrivate, Group: fftypes.NewRandB32(), Topics: fftypes.FFStringArray{"topic1"}, + TxType: core.TransactionTypeBatchPin, }, } @@ -693,7 +729,6 @@ func TestSealBatchTXAlreadyAssigned(t *testing.T) { TransactionID: txID, } - bp.conf.txType = core.TransactionTypeContractInvokePin state := bp.initPayload(fftypes.NewUUID(), []*batchWork{{msg: msg}}) err := bp.sealBatch(state) assert.NoError(t, err) @@ -874,7 +909,6 @@ func TestCancelBatchPrivate(t *testing.T) { return nil }) defer cancel() - bp.conf.txType = core.TransactionTypeContractInvokePin msg1 := fftypes.NewUUID() // cancelled msg2 := fftypes.NewUUID() // dispatched @@ -938,12 +972,14 @@ func TestCancelBatchPrivate(t *testing.T) { Type: core.MessageTypePrivate, Group: fftypes.NewRandB32(), Topics: fftypes.FFStringArray{"topic1"}, + TxType: core.TransactionTypeContractInvokePin, }}} bp.newWork <- &batchWork{msg: &core.Message{Header: core.MessageHeader{ ID: msg2, Type: core.MessageTypePrivate, Group: fftypes.NewRandB32(), Topics: fftypes.FFStringArray{"topic1"}, + TxType: core.TransactionTypeContractInvokePin, }}} }() <-dispatched @@ -969,7 +1005,6 @@ func TestCancelBatchBroadcast(t *testing.T) { return nil }) defer cancel() - bp.conf.txType = core.TransactionTypeContractInvokePin msg1 := fftypes.NewUUID() // cancelled msg2 := fftypes.NewUUID() // dispatched @@ -1021,11 +1056,13 @@ func TestCancelBatchBroadcast(t *testing.T) { ID: msg1, Type: core.MessageTypeBroadcast, Topics: fftypes.FFStringArray{"topic1"}, + TxType: core.TransactionTypeContractInvokePin, }}} bp.newWork <- &batchWork{msg: &core.Message{Header: core.MessageHeader{ ID: msg2, Type: core.MessageTypeBroadcast, Topics: fftypes.FFStringArray{"topic1"}, + TxType: core.TransactionTypeContractInvokePin, }}} }() <-dispatched @@ -1041,7 +1078,6 @@ func TestCancelBatchNotFlushing(t *testing.T) { return nil }) defer cancel() - bp.conf.txType = core.TransactionTypeContractInvokePin err := bp.cancelFlush(context.Background(), fftypes.NewUUID()) assert.Regexp(t, "FF10468", err) diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 43639ed36..415585bbb 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -42,7 +42,6 @@ import ( ) const broadcastDispatcherName = "pinned_broadcast" -const broadcastCustomDispatcherName = "pinned_broadcast_custom" type Manager interface { core.Named @@ -107,19 +106,13 @@ func NewBroadcastManager(ctx context.Context, ns *core.Namespace, di database.Pl } ba.RegisterDispatcher(broadcastDispatcherName, - core.TransactionTypeBatchPin, + true, []core.MessageType{ core.MessageTypeBroadcast, core.MessageTypeDefinition, core.MessageTypeDeprecatedTransferBroadcast, core.MessageTypeDeprecatedApprovalBroadcast, }, bm.dispatchBatch, bo) - - ba.RegisterDispatcher(broadcastCustomDispatcherName, - core.TransactionTypeContractInvokePin, - []core.MessageType{ - core.MessageTypeBroadcast, - }, bm.dispatchBatch, bo) } om.RegisterHandler(ctx, bm, []core.OpType{ diff --git a/internal/broadcast/manager_test.go b/internal/broadcast/manager_test.go index 730c0397b..db95cd3f1 100644 --- a/internal/broadcast/manager_test.go +++ b/internal/broadcast/manager_test.go @@ -65,7 +65,7 @@ func newTestBroadcastCommon(t *testing.T, metricsEnabled bool) (*broadcastManage mba.On("RegisterDispatcher", broadcastDispatcherName, - core.TransactionTypeBatchPin, + true, []core.MessageType{ core.MessageTypeBroadcast, core.MessageTypeDefinition, @@ -73,13 +73,6 @@ func newTestBroadcastCommon(t *testing.T, metricsEnabled bool) (*broadcastManage core.MessageTypeDeprecatedApprovalBroadcast, }, mock.Anything, mock.Anything).Return() - mba.On("RegisterDispatcher", - broadcastCustomDispatcherName, - core.TransactionTypeContractInvokePin, - []core.MessageType{ - core.MessageTypeBroadcast, - }, mock.Anything, mock.Anything).Return() - mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything) rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe() diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 82ab3936e..70078f178 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -190,7 +190,7 @@ func (ag *aggregator) rewindOffchainBatches() (bool, int64) { return false, 0 } - // Retry idefinitely for database errors (until the context closes) + // Retry indefinitely for database errors (until the context closes) var rewindBatch *fftypes.UUID var offset int64 _ = ag.retry.Do(ag.ctx, "check for off-chain batch deliveries", func(attempt int) (retry bool, err error) { diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index a0fe84189..d38d16390 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -42,7 +42,6 @@ import ( ) const pinnedPrivateDispatcherName = "pinned_private" -const pinnedPrivateCustomDispatcherName = "pinned_private_custom" const unpinnedPrivateDispatcherName = "unpinned_private" type Manager interface { @@ -138,7 +137,7 @@ func NewPrivateMessaging(ctx context.Context, ns *core.Namespace, di database.Pl } ba.RegisterDispatcher(pinnedPrivateDispatcherName, - core.TransactionTypeBatchPin, + true, []core.MessageType{ core.MessageTypeGroupInit, core.MessageTypePrivate, @@ -147,15 +146,8 @@ func NewPrivateMessaging(ctx context.Context, ns *core.Namespace, di database.Pl }, pm.dispatchPinnedBatch, bo) - ba.RegisterDispatcher(pinnedPrivateCustomDispatcherName, - core.TransactionTypeContractInvokePin, - []core.MessageType{ - core.MessageTypePrivate, - }, - pm.dispatchPinnedBatch, bo) - ba.RegisterDispatcher(unpinnedPrivateDispatcherName, - core.TransactionTypeUnpinned, + false, []core.MessageType{ core.MessageTypePrivate, }, diff --git a/internal/privatemessaging/privatemessaging_test.go b/internal/privatemessaging/privatemessaging_test.go index 60149aa9f..dab7daccd 100644 --- a/internal/privatemessaging/privatemessaging_test.go +++ b/internal/privatemessaging/privatemessaging_test.go @@ -66,7 +66,7 @@ func newTestPrivateMessagingCommon(t *testing.T, metricsEnabled bool) (*privateM mba.On("RegisterDispatcher", pinnedPrivateDispatcherName, - core.TransactionTypeBatchPin, + true, []core.MessageType{ core.MessageTypeGroupInit, core.MessageTypePrivate, @@ -74,16 +74,9 @@ func newTestPrivateMessagingCommon(t *testing.T, metricsEnabled bool) (*privateM core.MessageTypeDeprecatedApprovalPrivate, }, mock.Anything, mock.Anything).Return() - mba.On("RegisterDispatcher", - pinnedPrivateCustomDispatcherName, - core.TransactionTypeContractInvokePin, - []core.MessageType{ - core.MessageTypePrivate, - }, mock.Anything, mock.Anything).Return() - mba.On("RegisterDispatcher", unpinnedPrivateDispatcherName, - core.TransactionTypeUnpinned, + false, []core.MessageType{ core.MessageTypePrivate, }, mock.Anything, mock.Anything).Return() diff --git a/mocks/batchmocks/manager.go b/mocks/batchmocks/manager.go index d4a0e2d82..9054cb661 100644 --- a/mocks/batchmocks/manager.go +++ b/mocks/batchmocks/manager.go @@ -78,9 +78,9 @@ func (_m *Manager) NewMessages() chan<- int64 { return r0 } -// RegisterDispatcher provides a mock function with given fields: name, txType, msgTypes, handler, batchOptions -func (_m *Manager) RegisterDispatcher(name string, txType fftypes.FFEnum, msgTypes []fftypes.FFEnum, handler batch.DispatchHandler, batchOptions batch.DispatcherOptions) { - _m.Called(name, txType, msgTypes, handler, batchOptions) +// RegisterDispatcher provides a mock function with given fields: name, pinned, msgTypes, handler, batchOptions +func (_m *Manager) RegisterDispatcher(name string, pinned bool, msgTypes []fftypes.FFEnum, handler batch.DispatchHandler, batchOptions batch.DispatcherOptions) { + _m.Called(name, pinned, msgTypes, handler, batchOptions) } // Start provides a mock function with given fields: From a07d2621bc0aae8828724f23233f91ffdbc458c0 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Tue, 9 Apr 2024 12:26:40 -0400 Subject: [PATCH 7/8] Dispatch gap fill messages inline To preserve the correct ordering of nonces, gap fill batches cannot be queued on the normal assembly loop (which might already have other messages queued). The special batch must be created, sealed, and dispatched immediately instead of the cancelled batch. Messages in both batches must be updated accordingly to move them to "cancelled" or "sent". Signed-off-by: Andrew Richardson --- internal/batch/batch_manager.go | 3 + internal/batch/batch_manager_test.go | 22 +++ internal/batch/batch_processor.go | 179 ++++++++++++++++--------- internal/batch/batch_processor_test.go | 61 ++++++++- 4 files changed, 196 insertions(+), 69 deletions(-) diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index 6ff7511b2..95dd845a4 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -609,6 +609,9 @@ func (bm *batchManager) CancelBatch(ctx context.Context, batchID string) error { if err != nil { return err } + if bp == nil { + return i18n.NewError(ctx, coremsgs.Msg404NotFound) + } if bp.TX.Type != core.TransactionTypeContractInvokePin { return i18n.NewError(ctx, coremsgs.MsgCannotCancelBatchType, bp.TX.Type) } diff --git a/internal/batch/batch_manager_test.go b/internal/batch/batch_manager_test.go index b651fca74..00987c9dd 100644 --- a/internal/batch/batch_manager_test.go +++ b/internal/batch/batch_manager_test.go @@ -942,6 +942,28 @@ func TestCancelBatchInvalidType(t *testing.T) { mdi.AssertExpectations(t) } +func TestCancelBatchNotFound(t *testing.T) { + bm, cancel := newTestBatchManager(t) + defer cancel() + + bm.RegisterDispatcher("utdispatcher", true, []core.MessageType{core.MessageTypePrivate}, + func(c context.Context, state *DispatchPayload) error { + return nil + }, + DispatcherOptions{BatchType: core.BatchTypePrivate}, + ) + + batchID := fftypes.NewUUID() + + mdi := bm.database.(*databasemocks.Plugin) + mdi.On("GetBatchByID", context.Background(), "ns1", batchID).Return(nil, nil) + + err := bm.CancelBatch(context.Background(), batchID.String()) + assert.Regexp(t, "FF10109", err) + + mdi.AssertExpectations(t) +} + func TestCancelBatch(t *testing.T) { bm, cancel := newTestBatchManager(t) defer cancel() diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index cc13741bd..c4496f0ac 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -101,12 +101,31 @@ type dispatchState struct { noncesAssigned map[fftypes.Bytes32]*nonceState } +type MessageUpdate struct { + messages []*core.Message + fromState core.MessageState + toState core.MessageState +} + type DispatchPayload struct { - Batch core.BatchPersisted - Messages []*core.Message - Data core.DataArray - Pins []*fftypes.Bytes32 - State core.MessageState + Batch core.BatchPersisted + Messages []*core.Message + Data core.DataArray + Pins []*fftypes.Bytes32 + MessageUpdates map[string]*MessageUpdate +} + +func (dp *DispatchPayload) addMessageUpdate(messages []*core.Message, fromState core.MessageState, toState core.MessageState) { + key := string(fromState + ":" + toState) + if dp.MessageUpdates == nil { + dp.MessageUpdates = make(map[string]*MessageUpdate) + } + entry, found := dp.MessageUpdates[key] + if !found { + entry = &MessageUpdate{fromState: fromState, toState: toState} + dp.MessageUpdates[key] = entry + } + entry.messages = append(entry.messages, messages...) } const batchSizeEstimateBase = int64(512) @@ -604,20 +623,21 @@ func (bp *batchProcessor) dispatchBatch(payload *DispatchPayload) error { err = bp.conf.dispatch(ctx, payload) if err != nil { if bp.isCancelled() { - err = nil - log.L(ctx).Warnf("Batch %s was cancelled - replacing with gap fill", payload.Batch.ID) - for _, msg := range payload.Messages { - if err == nil { - err = bp.writeGapFill(ctx, msg) + var gapFillPayload *DispatchPayload + gapFillPayload, err = bp.prepareGapFill(ctx, payload) + if err == nil { + payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateCancelled) + if gapFillPayload != nil { + payload.addMessageUpdate(gapFillPayload.Messages, core.MessageStateStaged, core.MessageStateSent) + err = bp.dispatchBatch(gapFillPayload) } } - payload.State = core.MessageStateCancelled } } else { if core.IsPinned(payload.Batch.TX.Type) { - payload.State = core.MessageStateSent + payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateSent) } else { - payload.State = core.MessageStateConfirmed + payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateConfirmed) } } return true, err @@ -625,72 +645,101 @@ func (bp *batchProcessor) dispatchBatch(payload *DispatchPayload) error { }) } -func (bp *batchProcessor) writeGapFill(ctx context.Context, msg *core.Message) error { - // Gap fill is only needed for private messages - if msg.Header.Type != core.MessageTypePrivate { - return nil +func (bp *batchProcessor) prepareGapFill(ctx context.Context, payload *DispatchPayload) (*DispatchPayload, error) { + // Gap fill is only needed for private custom pinned messages + if payload.Batch.Type != core.MessageTypePrivate || payload.Batch.TX.Type != core.TransactionTypeContractInvokePin { + return nil, nil } - gapFill := &core.MessageInOut{ - Message: core.Message{ - Header: msg.Header, - LocalNamespace: bp.bm.namespace, - State: core.MessageStateReady, - Pins: msg.Pins, // reuse any assigned pins to fill the nonce gap + log.L(ctx).Warnf("Batch %s was cancelled - replacing with gap fill", payload.Batch.ID) + + gapFills := make([]*core.Message, len(payload.Messages)) + for i, msg := range payload.Messages { + gapFill := &core.MessageInOut{ + Message: core.Message{ + Header: msg.Header, + LocalNamespace: bp.bm.namespace, + State: core.MessageStateStaged, // "staged" not "ready" to avoid the normal message dispatch loop + Pins: msg.Pins, // reuse any assigned pins to fill the nonce gap + }, + } + gapFill.Header.ID = fftypes.NewUUID() + gapFill.Header.CID = msg.Header.ID + gapFill.Header.Tag = core.SystemTagGapFill + gapFill.Header.TxType = core.TransactionTypeBatchPin + err := gapFill.Seal(ctx) + if err == nil { + err = bp.data.WriteNewMessage(ctx, &data.NewMessage{Message: gapFill}) + } + if err != nil { + return nil, err + } + gapFills[i] = &gapFill.Message + } + + gapFillPayload := &DispatchPayload{ + Batch: core.BatchPersisted{ + BatchHeader: payload.Batch.BatchHeader, + TX: core.TransactionRef{ + Type: core.TransactionTypeBatchPin, + }, }, + Messages: gapFills, } - gapFill.Header.ID = fftypes.NewUUID() - gapFill.Header.CID = msg.Header.ID - gapFill.Header.Tag = core.SystemTagGapFill - gapFill.Header.TxType = core.TransactionTypeBatchPin - err := gapFill.Seal(ctx) + gapFillPayload.Batch.ID = fftypes.NewUUID() + log.L(ctx).Infof("Prepared gap fill batch %s", gapFillPayload.Batch.ID) + + err := bp.sealBatch(gapFillPayload) if err == nil { - err = bp.data.WriteNewMessage(ctx, &data.NewMessage{Message: gapFill}) + log.L(ctx).Infof("Sealed gap fill batch %s", gapFillPayload.Batch.ID) } - return err + + return gapFillPayload, err } func (bp *batchProcessor) markPayloadDispatched(payload *DispatchPayload) error { return bp.retry.Do(bp.ctx, "mark dispatched messages", func(attempt int) (retry bool, err error) { return true, bp.database.RunAsGroup(bp.ctx, func(ctx context.Context) (err error) { - // Update the message state in the cache - msgIDs := make([]driver.Value, len(payload.Messages)) confirmTime := fftypes.Now() - for i, msg := range payload.Messages { - msgIDs[i] = msg.Header.ID - msg.BatchID = payload.Batch.ID - msg.TransactionID = payload.Batch.TX.ID - if payload.State == core.MessageStateConfirmed { - msg.Confirmed = confirmTime + for _, state := range payload.MessageUpdates { + // Update the message state in the cache + msgIDs := make([]driver.Value, len(state.messages)) + for i, msg := range state.messages { + msgIDs[i] = msg.Header.ID + msg.BatchID = payload.Batch.ID + msg.TransactionID = payload.Batch.TX.ID + if state.toState == core.MessageStateConfirmed { + msg.Confirmed = confirmTime + } + bp.data.UpdateMessageIfCached(ctx, msg) } - bp.data.UpdateMessageIfCached(ctx, msg) - } - // Update the message state in the database - fb := database.MessageQueryFactory.NewFilter(ctx) - filter := fb.And( - fb.In("id", msgIDs), - fb.Eq("state", core.MessageStateReady), // In the outside chance the next state transition happens first (which supersedes this) - ) - allMsgsUpdate := database.MessageQueryFactory.NewUpdate(ctx). - Set("batch", payload.Batch.ID). - Set("txid", payload.Batch.TX.ID). - Set("state", payload.State) - if payload.State == core.MessageStateConfirmed { - allMsgsUpdate.Set("confirmed", confirmTime) - } - if err = bp.database.UpdateMessages(ctx, bp.bm.namespace, filter, allMsgsUpdate); err != nil { - return err - } + // Update the message state in the database + fb := database.MessageQueryFactory.NewFilter(ctx) + filter := fb.And( + fb.In("id", msgIDs), + fb.Eq("state", state.fromState), // In the outside chance the next state transition happens first (which supersedes this) + ) + allMsgsUpdate := database.MessageQueryFactory.NewUpdate(ctx). + Set("batch", payload.Batch.ID). + Set("txid", payload.Batch.TX.ID). + Set("state", state.toState) + if state.toState == core.MessageStateConfirmed { + allMsgsUpdate.Set("confirmed", confirmTime) + } + if err = bp.database.UpdateMessages(ctx, bp.bm.namespace, filter, allMsgsUpdate); err != nil { + return err + } - if payload.State == core.MessageStateConfirmed { - for _, msg := range payload.Messages { - // Emit a confirmation event locally immediately - for _, topic := range msg.Header.Topics { - // One event per topic - event := core.NewEvent(core.EventTypeMessageConfirmed, payload.Batch.Namespace, msg.Header.ID, payload.Batch.TX.ID, topic) - event.Correlator = msg.Header.CID - if err := bp.database.InsertEvent(ctx, event); err != nil { - return err + if state.toState == core.MessageStateConfirmed { + for _, msg := range state.messages { + // Emit a confirmation event locally immediately + for _, topic := range msg.Header.Topics { + // One event per topic + event := core.NewEvent(core.EventTypeMessageConfirmed, payload.Batch.Namespace, msg.Header.ID, payload.Batch.TX.ID, topic) + event.Correlator = msg.Header.CID + if err := bp.database.InsertEvent(ctx, event); err != nil { + return err + } } } } diff --git a/internal/batch/batch_processor_test.go b/internal/batch/batch_processor_test.go index b75063dc0..fde042660 100644 --- a/internal/batch/batch_processor_test.go +++ b/internal/batch/batch_processor_test.go @@ -53,6 +53,7 @@ func newTestBatchProcessor(t *testing.T, dispatch DispatchHandler) (func(), *dat author: "did:firefly:org/abcd", dispatch: dispatch, DispatcherOptions: DispatcherOptions{ + BatchType: core.BatchTypePrivate, BatchMaxSize: 10, BatchMaxBytes: 1024 * 1024, BatchTimeout: 100 * time.Millisecond, @@ -900,6 +901,7 @@ func TestCancelBatchPrivate(t *testing.T) { dispatched := make(chan *DispatchPayload) cancel, mdi, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchPayload) error { if first { + // Request cancel of the first batch, then return an error to trigger the cancellation logic first = false err := bp.cancelFlush(c, state.Batch.ID) assert.NoError(t, err) @@ -921,8 +923,8 @@ func TestCancelBatchPrivate(t *testing.T) { return dbNonce.Nonce == 12346 })).Return(nil) mdi.On("UpdateMessage", mock.Anything, "ns1", msg1, mock.Anything).Return(nil).Once() - mdi.On("UpdateMessage", mock.Anything, "ns1", msg2, mock.Anything).Return(nil).Once() - mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil).Twice() + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil).Times(3) + mdi.On("UpdateMessages", mock.Anything, "ns1", mock.MatchedBy(func(filter ffapi.AndFilter) bool { info, err := filter.Finalize() assert.NoError(t, err) @@ -938,11 +940,12 @@ func TestCancelBatchPrivate(t *testing.T) { val, _ := info.SetOperations[2].Value.Value() return val == "cancelled" })).Return(nil).Once() + mdi.On("UpdateMessages", mock.Anything, "ns1", mock.MatchedBy(func(filter ffapi.AndFilter) bool { info, err := filter.Finalize() assert.NoError(t, err) assert.Len(t, info.Children, 2) - return info.Children[0].String() == "id IN ['"+msg2.String()+"']" && info.Children[1].String() == "state == 'ready'" + return info.Children[1].String() == "state == 'staged'" }), mock.MatchedBy(func(update ffapi.Update) bool { info, err := update.Finalize() assert.NoError(t, err) @@ -952,13 +955,23 @@ func TestCancelBatchPrivate(t *testing.T) { assert.Equal(t, "state", info.SetOperations[2].Field) val, _ := info.SetOperations[2].Value.Value() return val == "sent" - })).Return(nil).Maybe() // race condition - may or may not finish this second message + })).Return(nil).Once() + + // Race condition - may or may not finish this second message + mdi.On("UpdateMessage", mock.Anything, "ns1", msg2, mock.Anything).Return(nil).Maybe() + mdi.On("UpdateMessages", mock.Anything, "ns1", mock.MatchedBy(func(filter ffapi.AndFilter) bool { + info, err := filter.Finalize() + assert.NoError(t, err) + assert.Len(t, info.Children, 2) + return info.Children[0].String() == "id IN ['"+msg2.String()+"']" && info.Children[1].String() == "state == 'ready'" + }), mock.Anything).Return(nil).Maybe() mim := bp.bm.identity.(*identitymanagermocks.Manager) mim.On("GetLocalNode", mock.Anything).Return(&core.Identity{}, nil) mth := bp.txHelper.(*txcommonmocks.Helper) mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeContractInvokePin, core.IdempotencyKey("")).Return(fftypes.NewUUID(), nil) + mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeBatchPin, core.IdempotencyKey("")).Return(fftypes.NewUUID(), nil) mdm := bp.data.(*datamocks.Manager) mdm.On("UpdateMessageIfCached", mock.Anything, mock.Anything).Return() @@ -983,6 +996,7 @@ func TestCancelBatchPrivate(t *testing.T) { }}} }() <-dispatched + <-dispatched mdi.AssertExpectations(t) mim.AssertExpectations(t) @@ -1006,6 +1020,8 @@ func TestCancelBatchBroadcast(t *testing.T) { }) defer cancel() + bp.conf.DispatcherOptions.BatchType = core.BatchTypeBroadcast + msg1 := fftypes.NewUUID() // cancelled msg2 := fftypes.NewUUID() // dispatched @@ -1082,3 +1098,40 @@ func TestCancelBatchNotFlushing(t *testing.T) { err := bp.cancelFlush(context.Background(), fftypes.NewUUID()) assert.Regexp(t, "FF10468", err) } + +func TestGapFillMessageFail(t *testing.T) { + cancel, mdi, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchPayload) error { + return nil + }) + defer cancel() + + msg1 := &core.Message{ + Header: core.MessageHeader{ + ID: fftypes.NewUUID(), + }, + } + payload := &DispatchPayload{ + Batch: core.BatchPersisted{ + BatchHeader: core.BatchHeader{ + Type: core.BatchTypePrivate, + }, + TX: core.TransactionRef{ + Type: core.TransactionTypeContractInvokePin, + }, + }, + Messages: []*core.Message{msg1}, + } + + mockRunAsGroupPassthrough(mdi) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil) + + mdm := bp.data.(*datamocks.Manager) + mdm.On("WriteNewMessage", mock.Anything, mock.MatchedBy(func(msg *data.NewMessage) bool { + return msg.Message.Header.CID.Equals(msg1.Header.ID) && msg.Message.Header.Tag == core.SystemTagGapFill + })).Return(fmt.Errorf("pop")) + + _, err := bp.prepareGapFill(context.Background(), payload) + assert.EqualError(t, err, "pop") + + mdm.AssertExpectations(t) +} From 3cddcc98422d68c31db7260b51efc86264cb2290 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Tue, 9 Apr 2024 12:58:53 -0400 Subject: [PATCH 8/8] Always clear "group" on broadcast messages Signed-off-by: Andrew Richardson --- internal/broadcast/message.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/broadcast/message.go b/internal/broadcast/message.go index 5c2209cf9..225a41db6 100644 --- a/internal/broadcast/message.go +++ b/internal/broadcast/message.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -87,6 +87,7 @@ func (s *broadcastSender) SendAndWait(ctx context.Context) error { func (s *broadcastSender) setDefaults() { msg := s.msg.Message msg.Header.ID = fftypes.NewUUID() + msg.Header.Group = nil msg.Header.Namespace = s.mgr.namespace.NetworkName msg.LocalNamespace = s.mgr.namespace.Name msg.State = core.MessageStateReady