diff --git a/.vscode/settings.json b/.vscode/settings.json index 209a8ae7..e9d9543a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -9,8 +9,10 @@ "ffcmocks", "fftypes", "hashicorp", + "idempotency", "Infof", "kvstore", + "receiptsmocks", "smconf", "stretchr", "Tracef", diff --git a/Makefile b/Makefile index 6c93950f..ef6fafae 100644 --- a/Makefile +++ b/Makefile @@ -61,11 +61,12 @@ mocks-$(strip $(1))-$(strip $(2)): ${MOCKERY} sarama ${MOCKERY} --case underscore --dir $(1) --name $(2) --outpkg $(3) --output mocks/$(strip $(3)) endef -$(eval $(call makemock, internal/contractregistry, ContractStore, contractregistrymocks)) -$(eval $(call makemock, internal/contractregistry, RemoteRegistry, contractregistrymocks)) -$(eval $(call makemock, internal/eth, RPCClient, ethmocks)) -$(eval $(call makemock, internal/ffcapiconnector, FFCServer, ffcapiconnectormocks)) -$(eval $(call makemock, $$(SARAMA_PATH), Client, saramamocks)) -$(eval $(call makemock, $$(SARAMA_PATH), ConsumerGroup, saramamocks)) -$(eval $(call makemock, $$(SARAMA_PATH), ConsumerGroupSession, saramamocks)) -$(eval $(call makemock, $$(SARAMA_PATH), ConsumerGroupClaim, saramamocks)) +$(eval $(call makemock, internal/contractregistry, ContractStore, contractregistrymocks)) +$(eval $(call makemock, internal/contractregistry, RemoteRegistry, contractregistrymocks)) +$(eval $(call makemock, internal/eth, RPCClient, ethmocks)) +$(eval $(call makemock, internal/ffcapiconnector, FFCServer, ffcapiconnectormocks)) +$(eval $(call makemock, internal/receipts, ReceiptStorePersistence, receiptsmocks)) +$(eval $(call makemock, $$(SARAMA_PATH), Client, saramamocks)) +$(eval $(call makemock, $$(SARAMA_PATH), ConsumerGroup, saramamocks)) +$(eval $(call makemock, $$(SARAMA_PATH), ConsumerGroupSession, saramamocks)) +$(eval $(call makemock, $$(SARAMA_PATH), ConsumerGroupClaim, saramamocks)) diff --git a/cmd/ethconnect.go b/cmd/ethconnect.go index 5b4cfd4e..94ca4a51 100644 --- a/cmd/ethconnect.go +++ b/cmd/ethconnect.go @@ -26,6 +26,7 @@ import ( "github.com/hyperledger/firefly-ethconnect/internal/errors" "github.com/hyperledger/firefly-ethconnect/internal/kafka" + "github.com/hyperledger/firefly-ethconnect/internal/receipts" "github.com/hyperledger/firefly-ethconnect/internal/rest" "github.com/hyperledger/firefly-ethconnect/internal/utils" "github.com/icza/dyno" @@ -56,19 +57,14 @@ func initLogging(debugLevel int) { switch debugLevel { case 0: log.SetLevel(log.ErrorLevel) - break case 1: log.SetLevel(log.InfoLevel) - break case 2: log.SetLevel(log.DebugLevel) - break case 3: log.SetLevel(log.TraceLevel) - break default: log.SetLevel(log.DebugLevel) - break } log.Debugf("Log level set to %d", debugLevel) } @@ -168,20 +164,7 @@ func startServer() (err error) { anyRoutineFinished := make(chan bool) var dontPrintYaml = false - for name, conf := range serverConfig.KafkaBridges { - kafkaBridge := kafka.NewKafkaBridge(&dontPrintYaml) - kafkaBridge.SetConf(conf) - if err := kafkaBridge.ValidateConf(); err != nil { - return err - } - go func(name string, anyRoutineFinished chan bool) { - log.Infof("Starting Kafka->Ethereum bridge '%s'", name) - if err := kafkaBridge.Start(); err != nil { - log.Errorf("Kafka->Ethereum bridge failed: %s", err) - } - anyRoutineFinished <- true - }(name, anyRoutineFinished) - } + // Merge in legacy named 'webbhooks' configs if serverConfig.RESTGateways == nil { serverConfig.RESTGateways = make(map[string]*rest.RESTGatewayConf) @@ -189,12 +172,49 @@ func startServer() (err error) { for name, conf := range serverConfig.Webhooks { serverConfig.RESTGateways[name] = conf } + var idempotencyCheckReceiptStore receipts.ReceiptStorePersistence + restGateways := make(map[string]*rest.RESTGateway) for name, conf := range serverConfig.RESTGateways { restGateway := rest.NewRESTGateway(&dontPrintYaml) + restGateways[name] = restGateway restGateway.SetConf(conf) if err := restGateway.ValidateConf(); err != nil { return err } + // This is a slightly awkward cross-component call, to account for the most popular pattern of usage: + // - Run in server mode + // - Single REST API Gateway + // - Single Kafka bridge co-located in the same process + // In this scenario, we can pass the receipt store to the Kafka bridge for it to do + // additional idempotency checks that prevent res-submission of transactions. + if idempotencyCheckReceiptStore == nil { + idempotencyCheckReceiptStore, err = restGateway.InitReceiptStore() + if err != nil { + return err + } + } + + } + + // Start the kafka bridges, passing in the receipt store if we have one + for name, conf := range serverConfig.KafkaBridges { + kafkaBridge := kafka.NewKafkaBridge(&dontPrintYaml) + kafkaBridge.SetConf(conf) + if err := kafkaBridge.ValidateConf(); err != nil { + return err + } + go func(name string, anyRoutineFinished chan bool) { + log.Infof("Starting Kafka->Ethereum bridge '%s'", name) + if err := kafkaBridge.Start(idempotencyCheckReceiptStore); err != nil { + log.Errorf("Kafka->Ethereum bridge failed: %s", err) + } + anyRoutineFinished <- true + }(name, anyRoutineFinished) + } + + // Start the rest gateways + for name, rgw := range restGateways { + restGateway := rgw go func(name string, anyRoutineFinished chan bool) { log.Infof("Starting REST gateway '%s'", name) if err := restGateway.Start(); err != nil { diff --git a/internal/contractgateway/rest2eth.go b/internal/contractgateway/rest2eth.go index 3bab04f6..4e7dac2b 100644 --- a/internal/contractgateway/rest2eth.go +++ b/internal/contractgateway/rest2eth.go @@ -582,13 +582,14 @@ func (r *rest2eth) deployContract(res http.ResponseWriter, req *http.Request, fr } } else { ack := !getFlyParamBool("noack", req) // turn on ack's by default - immediateReceipt := strings.EqualFold(getFlyParam("acktype", req), "receipt") + deployMsg.AckType = strings.ToLower(getFlyParam("acktype", req)) + immediateReceipt := deployMsg.AckType == "receipt" // Async messages are dispatched as generic map payloads. // We are confident in the re-serialization here as we've deserialized from JSON then built our own structure msgBytes, _ := json.Marshal(deployMsg) var mapMsg map[string]interface{} - json.Unmarshal(msgBytes, &mapMsg) + _ = json.Unmarshal(msgBytes, &mapMsg) if asyncResponse, status, err := r.asyncDispatcher.DispatchMsgAsync(req.Context(), mapMsg, ack, immediateReceipt); err != nil { r.restErrReply(res, req, err, status) } else { @@ -630,7 +631,8 @@ func (r *rest2eth) sendTransaction(res http.ResponseWriter, req *http.Request, f } } else { ack := !getFlyParamBool("noack", req) // turn on ack's by default - immediateReceipt := strings.EqualFold(getFlyParam("acktype", req), "receipt") + msg.AckType = strings.ToLower(getFlyParam("acktype", req)) + immediateReceipt := msg.AckType == "receipt" // Async messages are dispatched as generic map payloads. // We are confident in the re-serialization here as we've deserialized from JSON then built our own structure diff --git a/internal/contractgateway/syncdispatcher_test.go b/internal/contractgateway/syncdispatcher_test.go index 2afc6a59..bb17fe13 100644 --- a/internal/contractgateway/syncdispatcher_test.go +++ b/internal/contractgateway/syncdispatcher_test.go @@ -21,6 +21,7 @@ import ( "github.com/hyperledger/firefly-ethconnect/internal/eth" "github.com/hyperledger/firefly-ethconnect/internal/messages" + "github.com/hyperledger/firefly-ethconnect/internal/receipts" "github.com/hyperledger/firefly-ethconnect/internal/tx" "github.com/stretchr/testify/assert" ) @@ -58,6 +59,8 @@ func (p *mockProcessor) OnMessage(c tx.TxnContext) { } } func (p *mockProcessor) Init(eth.RPCClient) {} +func (p *mockProcessor) SetReceiptStoreForIdempotencyCheck(receiptStore receipts.ReceiptStorePersistence) { +} type mockReplyProcessor struct { err error diff --git a/internal/errors/errors.go b/internal/errors/errors.go index ba26c7de..bde538fd 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -513,6 +513,10 @@ var ( // ReceiptStoreKeyNotUnique non-unique request ID ReceiptStoreKeyNotUnique = e(100219, "Request ID is not unique") + // ReceiptErrorIdempotencyCheck failed to query receipt during idempotency check + ReceiptErrorIdempotencyCheck = e(100220, "Failed querying the receipt store, performing duplicate message check on ackmode=receipt for id %s: %s") + // ResubmissionPreventedCheckTransactionHash redelivery was prevented by the processor + ResubmissionPreventedCheckTransactionHash = e(100221, "Resubmission of this transaction was prevented by the REST API Gateway. Check the status of the transaction by the transaction hash") ) type EthconnectError interface { diff --git a/internal/kafka/kafkabridge.go b/internal/kafka/kafkabridge.go index 4169648d..88078774 100644 --- a/internal/kafka/kafkabridge.go +++ b/internal/kafka/kafkabridge.go @@ -27,6 +27,7 @@ import ( "github.com/hyperledger/firefly-ethconnect/internal/errors" "github.com/hyperledger/firefly-ethconnect/internal/eth" "github.com/hyperledger/firefly-ethconnect/internal/messages" + "github.com/hyperledger/firefly-ethconnect/internal/receipts" "github.com/hyperledger/firefly-ethconnect/internal/tx" "github.com/hyperledger/firefly-ethconnect/internal/utils" log "github.com/sirupsen/logrus" @@ -87,7 +88,7 @@ func (k *KafkaBridge) CobraInit() (cmd *cobra.Command) { Short: "Kafka->Ethereum (JSON/RPC) Bridge", RunE: func(cmd *cobra.Command, args []string) (err error) { log.Infof("Starting Kafka bridge") - err = k.Start() + err = k.Start(nil /* only available in full server mode with co-located REST API Gateway */) return }, PreRunE: func(cmd *cobra.Command, args []string) (err error) { @@ -145,7 +146,7 @@ func (k *KafkaBridge) addInflightMsg(msg *sarama.ConsumerMessage, producer Kafka // is very important that the consumer of the wrapped context object calls Reply pCtx = &ctx k.inFlight[ctx.reqOffset] = pCtx - log.Infof("Message now in-flight: %s", pCtx) + log.Debugf("Message now in-flight: %s", pCtx) // Attempt to process the headers from the original message, // which could fail. In which case we still have a msgContext inflight // that needs Reply (and offset commit). So our caller must @@ -200,19 +201,19 @@ func (k *KafkaBridge) setInFlightComplete(ctx *msgContext, consumer KafkaConsume // Build an offset sorted list of the inflight ctx.complete = true - var completeInParition []*msgContext + var completeInPartition []*msgContext for _, inflight := range k.inFlight { if inflight.saramaMsg.Partition == ctx.saramaMsg.Partition { - completeInParition = append(completeInParition, inflight) + completeInPartition = append(completeInPartition, inflight) } } - sort.Sort(ctxByOffset(completeInParition)) + sort.Sort(ctxByOffset(completeInPartition)) // Go forwards until the first that isn't complete var readyToAck []*msgContext - for i := 0; i < len(completeInParition); i++ { - if completeInParition[i].complete { - readyToAck = append(readyToAck, completeInParition[i]) + for i := 0; i < len(completeInPartition); i++ { + if completeInPartition[i].complete { + readyToAck = append(readyToAck, completeInPartition[i]) } else { break } @@ -221,7 +222,7 @@ func (k *KafkaBridge) setInFlightComplete(ctx *msgContext, consumer KafkaConsume canMark := len(readyToAck) > 0 log.Debugf("Ready=%d:%d CanMark=%t Infight=%d InflightSamePartition=%d ReadyToAck=%d", ctx.saramaMsg.Partition, ctx.saramaMsg.Offset, canMark, - len(k.inFlight), len(completeInParition), len(readyToAck)) + len(k.inFlight), len(completeInPartition), len(readyToAck)) if canMark { // Remove all the ready-to-acks from the in-flight list for i := 0; i < len(readyToAck); i++ { @@ -345,7 +346,7 @@ func (k *KafkaBridge) ConsumerMessagesLoop(consumer KafkaConsumer, producer Kafk log.Debugf("Kafka consumer loop started") for msg := range consumer.Messages() { k.inFlightCond.L.Lock() - log.Infof("Kafka consumer received message: Partition=%d Offset=%d", msg.Partition, msg.Offset) + log.Debugf("Kafka consumer received message: Partition=%d Offset=%d", msg.Partition, msg.Offset) // We cannot build up an infinite number of messages in memory for len(k.inFlight) >= k.conf.MaxInFlight { @@ -361,6 +362,8 @@ func (k *KafkaBridge) ConsumerMessagesLoop(consumer KafkaConsumer, producer Kafk // This was a dup } else if err == nil { // Dispatch for processing if we parsed the message successfully + headers := msgCtx.Headers() + log.Infof("Kafka consumer dispatching message '%s' Type=%s ID=%s", msgCtx.reqOffset, headers.MsgType, headers.ID) k.processor.OnMessage(msgCtx) } else { // Dispatch a generic 'bad data' reply @@ -399,7 +402,8 @@ func (k *KafkaBridge) ProducerSuccessLoop(consumer KafkaConsumer, producer Kafka k.inFlightCond.L.Lock() reqOffset := msg.Metadata.(string) if ctx, ok := k.inFlight[reqOffset]; ok { - log.Infof("Reply sent: %s", ctx) + headers := ctx.Headers() + log.Infof("Kafka consumer replying to message '%s' Type=%s ID=%s", ctx.reqOffset, headers.MsgType, headers.ID) // While still holding the lock, add this to the completed list _ = k.setInFlightComplete(ctx, consumer) // We've reduced the in-flight count - wake any waiting consumer go func @@ -423,7 +427,9 @@ func (k *KafkaBridge) connect() (err error) { } // Start kicks off the bridge -func (k *KafkaBridge) Start() (err error) { +func (k *KafkaBridge) Start(receiptStore receipts.ReceiptStorePersistence) (err error) { + + k.processor.SetReceiptStoreForIdempotencyCheck(receiptStore) if *k.printYAML { b, err := utils.MarshalToYAML(&k.conf) diff --git a/internal/kafka/kafkabridge_test.go b/internal/kafka/kafkabridge_test.go index c0c16a93..c7dd7670 100644 --- a/internal/kafka/kafkabridge_test.go +++ b/internal/kafka/kafkabridge_test.go @@ -29,6 +29,7 @@ import ( "github.com/hyperledger/firefly-ethconnect/internal/errors" "github.com/hyperledger/firefly-ethconnect/internal/eth" "github.com/hyperledger/firefly-ethconnect/internal/messages" + "github.com/hyperledger/firefly-ethconnect/internal/receipts" "github.com/hyperledger/firefly-ethconnect/internal/tx" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -93,6 +94,10 @@ func (p *testKafkaMsgProcessor) OnMessage(msg tx.TxnContext) { p.messages <- msg return } + +func (p *testKafkaMsgProcessor) SetReceiptStoreForIdempotencyCheck(receiptStore receipts.ReceiptStorePersistence) { +} + func TestNewKafkaBridge(t *testing.T) { assert := assert.New(t) diff --git a/internal/messages/messages.go b/internal/messages/messages.go index 793f5f90..607f7300 100644 --- a/internal/messages/messages.go +++ b/internal/messages/messages.go @@ -36,6 +36,8 @@ const ( MsgTypeTransactionSuccess = "TransactionSuccess" // MsgTypeTransactionFailure - a transaction receipt where status is 0 MsgTypeTransactionFailure = "TransactionFailure" + // MsgTypeTransactionRedeliveryPrevented - idempotency check caught a redelivery of the message + MsgTypeTransactionRedeliveryPrevented = "TransactionRedeliveryPrevented" // RecordHeaderAccessToken - record header name for passing JWT token over messaging RecordHeaderAccessToken = "fly-accesstoken" ) @@ -194,6 +196,14 @@ type TransactionReceipt struct { RegisterAs string `json:"registerAs,omitempty"` } +// TransactionRedeliveryNotification is sent on redelivery of a message, when the ackmode=receipt +// idempotency check is enabled. The REST API Gateway (or other consumer), should avoid overwriting +// any received receipt when it gets this. +type TransactionRedeliveryNotification struct { + ReplyCommon + TransactionHash string `json:"transactionHash"` +} + // TransactionInfo is the detailed transaction info returned by eth_getTransactionByXXXXX // For the big numbers, we pass a simple string as well as a full // ethereum hex encoding version diff --git a/internal/rest/leveldbreceipt_test.go b/internal/receipts/leveldbreceipt_test.go similarity index 96% rename from internal/rest/leveldbreceipt_test.go rename to internal/receipts/leveldbreceipt_test.go index edc2b981..68c4b193 100644 --- a/internal/rest/leveldbreceipt_test.go +++ b/internal/receipts/leveldbreceipt_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rest +package receipts import ( "encoding/json" @@ -86,7 +86,7 @@ func TestNewLevelDBReceiptsCreateOK(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: tmpdir, } - r, err := newLevelDBReceipts(conf) + r, err := NewLevelDBReceipts(conf) defer r.store.Close() assert.Equal(conf, r.conf) assert.Nil(err) @@ -98,7 +98,7 @@ func TestLevelDBReceiptCreateErr(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: path.Join(tmpdir, "dummyfile"), } - _, err := newLevelDBReceipts(conf) + _, err := NewLevelDBReceipts(conf) assert.Regexp("Unable to open LevelDB: .*", err) } @@ -108,7 +108,7 @@ func TestLevelDBReceiptsAddReceiptOK(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: tmpdir, } - r, err := newLevelDBReceipts(conf) + r, err := NewLevelDBReceipts(conf) defer r.store.Close() receipt := make(map[string]interface{}) @@ -138,7 +138,7 @@ func TestLevelDBReceiptsAddReceiptIdempotencyCheck(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: tmpdir, } - r, err := newLevelDBReceipts(conf) + r, err := NewLevelDBReceipts(conf) defer r.store.Close() receipt := make(map[string]interface{}) @@ -157,7 +157,7 @@ func TestLevelDBReceiptsAddReceiptOverwrite(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: tmpdir, } - r, err := newLevelDBReceipts(conf) + r, err := NewLevelDBReceipts(conf) defer r.store.Close() receipt := make(map[string]interface{}) @@ -184,7 +184,7 @@ func TestLevelDBReceiptsAddReceiptFailed(t *testing.T) { } t1 := time.Unix(1000000, 0) entropy := ulid.Monotonic(rand.New(rand.NewSource(t1.UnixNano())), 0) - r := &levelDBReceipts{ + r := &LevelDBReceipts{ conf: &LevelDBReceiptStoreConf{}, store: kvstoreMock, idEntropy: entropy, @@ -201,7 +201,7 @@ func TestLevelDBReceiptsGetReceiptsOK(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: path.Join(tmpdir, "test1"), } - r, err := newLevelDBReceipts(conf) + r, err := NewLevelDBReceipts(conf) defer r.store.Close() id1 := utils.UUIDv4() @@ -236,7 +236,7 @@ func TestLevelDBReceiptsGetReceiptsWithStartEnd(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: path.Join(tmpdir, "test1"), } - r, _ := newLevelDBReceipts(conf) + r, _ := NewLevelDBReceipts(conf) defer r.store.Close() id1 := utils.UUIDv4() @@ -314,7 +314,7 @@ func TestLevelDBReceiptsFilterByIDs(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: path.Join(tmpdir, "test2"), } - r, err := newLevelDBReceipts(conf) + r, err := NewLevelDBReceipts(conf) defer r.store.Close() now := time.Now() @@ -357,7 +357,7 @@ func TestLevelDBReceiptsFilterByIDsAndFromTo(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: path.Join(tmpdir, "test3"), } - r, err := newLevelDBReceipts(conf) + r, err := NewLevelDBReceipts(conf) defer r.store.Close() now := time.Now() @@ -399,7 +399,7 @@ func TestLevelDBReceiptsFilterFromTo(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: path.Join(tmpdir, "test4"), } - r, err := newLevelDBReceipts(conf) + r, err := NewLevelDBReceipts(conf) defer r.store.Close() now := time.Now() @@ -453,7 +453,7 @@ func TestLevelDBReceiptsFilterNotFound(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: path.Join(tmpdir, "test6"), } - r, err := newLevelDBReceipts(conf) + r, err := NewLevelDBReceipts(conf) defer r.store.Close() now := time.Now() @@ -509,7 +509,7 @@ func TestLevelDBReceiptsGetReceiptOK(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: path.Join(tmpdir, "test7"), } - r, err := newLevelDBReceipts(conf) + r, err := NewLevelDBReceipts(conf) defer r.store.Close() receipt1 := make(map[string]interface{}) receipt1["_id"] = "r1" @@ -530,7 +530,7 @@ func TestLevelDBReceiptsGetReceiptsUnmarshalFailIgnoreReceipt(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: path.Join(tmpdir, "test8"), } - r, err := newLevelDBReceipts(conf) + r, err := NewLevelDBReceipts(conf) defer r.store.Close() receipt1 := make(map[string]interface{}) receipt1["_id"] = "r1" @@ -551,7 +551,7 @@ func TestLevelDBReceiptsGetReceiptNotFound(t *testing.T) { conf := &LevelDBReceiptStoreConf{ Path: path.Join(tmpdir, "test6"), } - r, _ := newLevelDBReceipts(conf) + r, _ := NewLevelDBReceipts(conf) defer r.store.Close() result, err := r.GetReceipt("receipt1") @@ -565,7 +565,7 @@ func TestLevelDBReceiptsGetReceiptErrorID(t *testing.T) { kvstoreMock := &mockKVStore{ err: fmt.Errorf("pop"), } - r := &levelDBReceipts{ + r := &LevelDBReceipts{ conf: &LevelDBReceiptStoreConf{}, store: kvstoreMock, } @@ -582,7 +582,7 @@ func TestLevelDBReceiptsGetReceiptErrorGeneratedID(t *testing.T) { getFailIdx: 1, getVal: []byte("generated-id"), } - r := &levelDBReceipts{ + r := &LevelDBReceipts{ conf: &LevelDBReceiptStoreConf{}, store: kvstoreMock, } @@ -597,7 +597,7 @@ func TestLevelDBReceiptsGetReceiptBadDataID(t *testing.T) { kvstoreMock := &mockKVStore{ getVal: []byte("!json"), } - r := &levelDBReceipts{ + r := &LevelDBReceipts{ conf: &LevelDBReceiptStoreConf{}, store: kvstoreMock, } @@ -612,7 +612,7 @@ func TestGetReceiptsByLookupKeyLimit(t *testing.T) { kvstoreMock := &mockKVStore{ getVal: []byte("{}"), } - r := &levelDBReceipts{ + r := &LevelDBReceipts{ conf: &LevelDBReceiptStoreConf{}, store: kvstoreMock, } @@ -627,7 +627,7 @@ func TestGetReceiptsByLookupKeyGetFail(t *testing.T) { kvstoreMock := &mockKVStore{ err: fmt.Errorf("pop"), } - r := &levelDBReceipts{ + r := &LevelDBReceipts{ conf: &LevelDBReceiptStoreConf{}, store: kvstoreMock, } @@ -642,7 +642,7 @@ func TestGetReceiptsByLookupUnmarshalFail(t *testing.T) { kvstoreMock := &mockKVStore{ getVal: []byte("!json"), } - r := &levelDBReceipts{ + r := &LevelDBReceipts{ conf: &LevelDBReceiptStoreConf{}, store: kvstoreMock, } diff --git a/internal/rest/leveldbreceipts.go b/internal/receipts/leveldbreceipts.go similarity index 93% rename from internal/rest/leveldbreceipts.go rename to internal/receipts/leveldbreceipts.go index f4b5aa68..046ccd64 100644 --- a/internal/rest/leveldbreceipts.go +++ b/internal/receipts/leveldbreceipts.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rest +package receipts import ( "encoding/json" @@ -30,7 +30,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -type levelDBReceipts struct { +type LevelDBReceipts struct { conf *LevelDBReceiptStoreConf store kvstore.KVStore entropyLock sync.Mutex @@ -38,7 +38,7 @@ type levelDBReceipts struct { defaultLimit int } -func newLevelDBReceipts(conf *LevelDBReceiptStoreConf) (*levelDBReceipts, error) { +func NewLevelDBReceipts(conf *LevelDBReceiptStoreConf) (*LevelDBReceipts, error) { store, err := kvstore.NewLDBKeyValueStore(conf.Path) if err != nil { return nil, errors.Errorf(errors.ReceiptStoreLevelDBConnect, err) @@ -46,7 +46,7 @@ func newLevelDBReceipts(conf *LevelDBReceiptStoreConf) (*levelDBReceipts, error) t := time.Unix(1000000, 0) entropy := ulid.Monotonic(rand.New(rand.NewSource(t.UnixNano())), 0) - return &levelDBReceipts{ + return &LevelDBReceipts{ conf: conf, store: store, idEntropy: entropy, @@ -56,7 +56,7 @@ func newLevelDBReceipts(conf *LevelDBReceiptStoreConf) (*levelDBReceipts, error) // AddReceipt processes an individual reply message, and contains all errors // To account for any transitory failures writing to mongoDB, it retries adding receipt with a backoff -func (l *levelDBReceipts) AddReceipt(requestID string, receipt *map[string]interface{}, overwrite bool) (err error) { +func (l *LevelDBReceipts) AddReceipt(requestID string, receipt *map[string]interface{}, overwrite bool) (err error) { // insert an entry with a composite key to track the insertion order l.entropyLock.Lock() newID := ulid.MustNew(ulid.Timestamp(time.Now()), l.idEntropy) @@ -110,7 +110,7 @@ func (l *levelDBReceipts) AddReceipt(requestID string, receipt *map[string]inter } // GetReceipts Returns recent receipts with skip, limit and other query parameters -func (l *levelDBReceipts) GetReceipts(skip, limit int, ids []string, sinceEpochMS int64, from, to, start string) (*[]map[string]interface{}, error) { +func (l *LevelDBReceipts) GetReceipts(skip, limit int, ids []string, sinceEpochMS int64, from, to, start string) (*[]map[string]interface{}, error) { // the application of the parameters are implemented to match mongo queries: // - find the starting point: // - if "start" is present, use it @@ -172,7 +172,7 @@ func (l *levelDBReceipts) GetReceipts(skip, limit int, ids []string, sinceEpochM return &results, nil } -func (l *levelDBReceipts) getReceiptsNoFilter(itr kvstore.KVIterator, skip, limit int, start string) []map[string]interface{} { +func (l *LevelDBReceipts) getReceiptsNoFilter(itr kvstore.KVIterator, skip, limit int, start string) []map[string]interface{} { results := []map[string]interface{}{} index := 0 var valid bool @@ -209,7 +209,7 @@ func (l *levelDBReceipts) getReceiptsNoFilter(itr kvstore.KVIterator, skip, limi } // getReply handles a HTTP request for an individual reply -func (l *levelDBReceipts) GetReceipt(requestID string) (*map[string]interface{}, error) { +func (l *LevelDBReceipts) GetReceipt(requestID string) (*map[string]interface{}, error) { val, err := l.store.Get(requestID) if err != nil { if err == kvstore.ErrorNotFound { @@ -235,7 +235,7 @@ func (l *levelDBReceipts) GetReceipt(requestID string) (*map[string]interface{}, return &result, nil } -func (l *levelDBReceipts) findEndPoint(sinceEpochMS int64) string { +func (l *LevelDBReceipts) findEndPoint(sinceEpochMS int64) string { searchKey := fmt.Sprintf("receivedAt:%d:", sinceEpochMS) itr := l.store.NewIterator() defer itr.Release() @@ -253,7 +253,7 @@ func (l *levelDBReceipts) findEndPoint(sinceEpochMS int64) string { return "" } -func (l *levelDBReceipts) getLookupKeysByIDs(ids []string, start, end string) []string { +func (l *LevelDBReceipts) getLookupKeysByIDs(ids []string, start, end string) []string { result := []string{} for _, id := range ids { val, err := l.store.Get(id) @@ -271,7 +271,7 @@ func (l *levelDBReceipts) getLookupKeysByIDs(ids []string, start, end string) [] return result } -func (l *levelDBReceipts) getLookupKeysByFromAndTo(from, to string, start, end string, limit int) []string { +func (l *LevelDBReceipts) getLookupKeysByFromAndTo(from, to string, start, end string, limit int) []string { var fromKeys []string itr := l.store.NewIterator() @@ -303,7 +303,7 @@ func (l *levelDBReceipts) getLookupKeysByFromAndTo(from, to string, start, end s return result } -func (l *levelDBReceipts) getLookupKeysByPrefix(itr kvstore.KVIterator, prefix string, limit int) []string { +func (l *LevelDBReceipts) getLookupKeysByPrefix(itr kvstore.KVIterator, prefix string, limit int) []string { lookupKeys := []string{} count := 0 found := itr.Seek(prefix) @@ -333,7 +333,7 @@ func (l *levelDBReceipts) getLookupKeysByPrefix(itr kvstore.KVIterator, prefix s return lookupKeys } -func (l *levelDBReceipts) getReceiptsByLookupKey(lookupKeys []string, limit int) *[]map[string]interface{} { +func (l *LevelDBReceipts) getReceiptsByLookupKey(lookupKeys []string, limit int) *[]map[string]interface{} { length := len(lookupKeys) if limit > 0 && limit < length { length = limit diff --git a/internal/rest/memreceipts.go b/internal/receipts/memreceipts.go similarity index 84% rename from internal/rest/memreceipts.go rename to internal/receipts/memreceipts.go index 4350d48e..9a3e16c7 100644 --- a/internal/rest/memreceipts.go +++ b/internal/receipts/memreceipts.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rest +package receipts import ( "container/list" @@ -22,15 +22,15 @@ import ( log "github.com/sirupsen/logrus" ) -type memoryReceipts struct { +type MemoryReceipts struct { conf *ReceiptStoreConf receipts *list.List byID map[string]*map[string]interface{} mux sync.Mutex } -func newMemoryReceipts(conf *ReceiptStoreConf) *memoryReceipts { - r := &memoryReceipts{ +func NewMemoryReceipts(conf *ReceiptStoreConf) *MemoryReceipts { + r := &MemoryReceipts{ conf: conf, receipts: list.New(), byID: make(map[string]*map[string]interface{}), @@ -39,7 +39,11 @@ func newMemoryReceipts(conf *ReceiptStoreConf) *memoryReceipts { return r } -func (m *memoryReceipts) GetReceipts(skip, limit int, ids []string, sinceEpochMS int64, from, to, start string) (*[]map[string]interface{}, error) { +func (m *MemoryReceipts) Receipts() *list.List { + return m.receipts +} + +func (m *MemoryReceipts) GetReceipts(skip, limit int, ids []string, sinceEpochMS int64, from, to, start string) (*[]map[string]interface{}, error) { m.mux.Lock() defer m.mux.Unlock() @@ -59,7 +63,7 @@ func (m *memoryReceipts) GetReceipts(skip, limit int, ids []string, sinceEpochMS return &results, nil } -func (m *memoryReceipts) GetReceipt(requestID string) (*map[string]interface{}, error) { +func (m *MemoryReceipts) GetReceipt(requestID string) (*map[string]interface{}, error) { m.mux.Lock() defer m.mux.Unlock() @@ -70,7 +74,7 @@ func (m *memoryReceipts) GetReceipt(requestID string) (*map[string]interface{}, return nil, nil } -func (m *memoryReceipts) AddReceipt(requestID string, receipt *map[string]interface{}, overwrite bool) error { +func (m *MemoryReceipts) AddReceipt(requestID string, receipt *map[string]interface{}, overwrite bool) error { m.mux.Lock() defer m.mux.Unlock() diff --git a/internal/rest/memreceipts_test.go b/internal/receipts/memreceipts_test.go similarity index 95% rename from internal/rest/memreceipts_test.go rename to internal/receipts/memreceipts_test.go index 5d8fc4c3..7bac12cd 100644 --- a/internal/rest/memreceipts_test.go +++ b/internal/receipts/memreceipts_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rest +package receipts import ( "fmt" @@ -27,7 +27,7 @@ func TestMemReceiptsWrapping(t *testing.T) { conf := &ReceiptStoreConf{ MaxDocs: 50, } - r := newMemoryReceipts(conf) + r := NewMemoryReceipts(conf) for i := 0; i < 100; i++ { receipt := make(map[string]interface{}) @@ -52,7 +52,7 @@ func TestMemReceiptsNoIDFilterImpl(t *testing.T) { conf := &ReceiptStoreConf{ MaxDocs: 50, } - r := newMemoryReceipts(conf) + r := NewMemoryReceipts(conf) _, err := r.GetReceipts(0, 0, []string{"test"}, 0, "t", "t", "") assert.Regexp("Memory receipts do not support filtering", err) diff --git a/internal/rest/mongoreceipts.go b/internal/receipts/mongoreceipts.go similarity index 89% rename from internal/rest/mongoreceipts.go rename to internal/receipts/mongoreceipts.go index 0638525a..d79df605 100644 --- a/internal/rest/mongoreceipts.go +++ b/internal/receipts/mongoreceipts.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rest +package receipts import ( "time" @@ -25,23 +25,22 @@ import ( const ( mongoConnectTimeout = 10 * 1000 - backoffFactor = 1.1 ) -type mongoReceipts struct { +type MongoReceipts struct { conf *MongoDBReceiptStoreConf mgo MongoDatabase collection MongoCollection } -func newMongoReceipts(conf *MongoDBReceiptStoreConf) *mongoReceipts { - return &mongoReceipts{ +func NewMongoReceipts(conf *MongoDBReceiptStoreConf) *MongoReceipts { + return &MongoReceipts{ conf: conf, mgo: &mgoWrapper{}, } } -func (m *mongoReceipts) connect() (err error) { +func (m *MongoReceipts) Connect() (err error) { if m.conf.ConnectTimeoutMS <= 0 { m.conf.ConnectTimeoutMS = mongoConnectTimeout } @@ -76,7 +75,7 @@ func (m *mongoReceipts) connect() (err error) { // AddReceipt processes an individual reply message, and contains all errors // To account for any transitory failures writing to mongoDB, it retries adding receipt with a backoff -func (m *mongoReceipts) AddReceipt(requestID string, receipt *map[string]interface{}, overwrite bool) (err error) { +func (m *MongoReceipts) AddReceipt(requestID string, receipt *map[string]interface{}, overwrite bool) (err error) { if overwrite { return m.collection.Upsert(bson.M{"_id": requestID}, *receipt) } else { @@ -85,7 +84,7 @@ func (m *mongoReceipts) AddReceipt(requestID string, receipt *map[string]interfa } // GetReceipts Returns recent receipts with skip & limit -func (m *mongoReceipts) GetReceipts(skip, limit int, ids []string, sinceEpochMS int64, from, to, start string) (*[]map[string]interface{}, error) { +func (m *MongoReceipts) GetReceipts(skip, limit int, ids []string, sinceEpochMS int64, from, to, start string) (*[]map[string]interface{}, error) { filter := bson.M{} if len(ids) > 0 { filter["_id"] = bson.M{ @@ -121,7 +120,7 @@ func (m *mongoReceipts) GetReceipts(skip, limit int, ids []string, sinceEpochMS } // getReply handles a HTTP request for an individual reply -func (m *mongoReceipts) GetReceipt(requestID string) (*map[string]interface{}, error) { +func (m *MongoReceipts) GetReceipt(requestID string) (*map[string]interface{}, error) { query := m.collection.Find(bson.M{"_id": requestID}) result := make(map[string]interface{}) if err := query.One(&result); err == mgo.ErrNotFound { diff --git a/internal/rest/mongoreceipts_test.go b/internal/receipts/mongoreceipts_test.go similarity index 93% rename from internal/rest/mongoreceipts_test.go rename to internal/receipts/mongoreceipts_test.go index 85fee875..2e1ad429 100644 --- a/internal/rest/mongoreceipts_test.go +++ b/internal/receipts/mongoreceipts_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rest +package receipts import ( "fmt" @@ -118,7 +118,7 @@ func (m *mockQuery) One(result interface{}) error { func TestNewMongoReceipts(t *testing.T) { assert := assert.New(t) conf := &MongoDBReceiptStoreConf{} - r := newMongoReceipts(conf) + r := NewMongoReceipts(conf) assert.Equal(conf, r.conf) } @@ -126,7 +126,7 @@ func TestMongoReceiptsConnectOK(t *testing.T) { assert := assert.New(t) mgoMock := &mockMongo{connErr: nil} - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{ ReceiptStoreConf: ReceiptStoreConf{MaxDocs: 123}, URL: "testurl", @@ -136,7 +136,7 @@ func TestMongoReceiptsConnectOK(t *testing.T) { mgo: mgoMock, } - err := r.connect() + err := r.Connect() assert.NoError(err) assert.Equal("testurl", mgoMock.url) assert.Equal("testdb", mgoMock.databaseName) @@ -149,12 +149,12 @@ func TestMongoReceiptsConnectConnErr(t *testing.T) { assert := assert.New(t) mgoMock := &mockMongo{connErr: fmt.Errorf("pop")} - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{}, mgo: mgoMock, } - err := r.connect() + err := r.Connect() assert.Regexp("Unable to connect to MongoDB: pop", err) } @@ -163,12 +163,12 @@ func TestMongoReceiptsConnectCollErr(t *testing.T) { mgoMock := &mockMongo{} mgoMock.collection.collErr = fmt.Errorf("pop") - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{}, mgo: mgoMock, } - err := r.connect() + err := r.Connect() assert.NoError(err) } @@ -177,12 +177,12 @@ func TestMongoReceiptsConnectIdxErr(t *testing.T) { mgoMock := &mockMongo{} mgoMock.collection.ensureIndexErr = fmt.Errorf("pop") - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{}, mgo: mgoMock, } - err := r.connect() + err := r.Connect() assert.Regexp("Unable to create index: pop", err) } @@ -190,12 +190,12 @@ func TestMongoReceiptsInsertReceiptOK(t *testing.T) { assert := assert.New(t) mgoMock := &mockMongo{} - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{}, mgo: mgoMock, } - r.connect() + r.Connect() receipt := make(map[string]interface{}) err := r.AddReceipt("key", &receipt, false) assert.NoError(err) @@ -205,12 +205,12 @@ func TestMongoReceiptsUpsertReceiptOK(t *testing.T) { assert := assert.New(t) mgoMock := &mockMongo{} - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{}, mgo: mgoMock, } - r.connect() + r.Connect() receipt := make(map[string]interface{}) err := r.AddReceipt("key", &receipt, true) assert.NoError(err) @@ -221,12 +221,12 @@ func TestMongoReceiptsAddReceiptFailed(t *testing.T) { mgoMock := &mockMongo{} mgoMock.collection.insertErr = fmt.Errorf("pop") - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{}, mgo: mgoMock, } - r.connect() + r.Connect() receipt := make(map[string]interface{}) err := r.AddReceipt("key", &receipt, false) assert.Regexp("pop", err) @@ -236,7 +236,7 @@ func TestMongoReceiptsGetReceiptsOK(t *testing.T) { assert := assert.New(t) mgoMock := &mockMongo{} - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{}, mgo: mgoMock, } @@ -251,7 +251,7 @@ func TestMongoReceiptsGetReceiptsOK(t *testing.T) { *resArray = append(*resArray, res2) } - r.connect() + r.Connect() results, err := r.GetReceipts(5, 2, nil, 0, "", "", "") assert.NoError(err) assert.Equal(5, mgoMock.collection.mockQuery.skip) @@ -264,7 +264,7 @@ func TestMongoReceiptsFilter(t *testing.T) { assert := assert.New(t) mgoMock := &mockMongo{} - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{}, mgo: mgoMock, } @@ -279,7 +279,7 @@ func TestMongoReceiptsFilter(t *testing.T) { *resArray = append(*resArray, res2) } - r.connect() + r.Connect() now := time.Now() results, err := r.GetReceipts(0, 0, []string{"key1", "key2"}, now.UnixNano()/int64(time.Millisecond), "addr1", "addr2", "") assert.NoError(err) @@ -298,14 +298,14 @@ func TestMongoReceiptsGetReceiptsNotFound(t *testing.T) { assert := assert.New(t) mgoMock := &mockMongo{} - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{}, mgo: mgoMock, } mgoMock.collection.mockQuery.allErr = mgo.ErrNotFound - r.connect() + r.Connect() results, err := r.GetReceipts(5, 2, nil, 0, "", "", "") assert.NoError(err) assert.Len(*results, 0) @@ -315,14 +315,14 @@ func TestMongoReceiptsGetReceiptsError(t *testing.T) { assert := assert.New(t) mgoMock := &mockMongo{} - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{}, mgo: mgoMock, } mgoMock.collection.mockQuery.allErr = fmt.Errorf("pop") - r.connect() + r.Connect() _, err := r.GetReceipts(5, 2, nil, 0, "", "", "") assert.Regexp("pop", err) } @@ -331,7 +331,7 @@ func TestMongoReceiptsGetReceiptOK(t *testing.T) { assert := assert.New(t) mgoMock := &mockMongo{} - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{}, mgo: mgoMock, } @@ -344,7 +344,7 @@ func TestMongoReceiptsGetReceiptOK(t *testing.T) { *resMap = res1 } - r.connect() + r.Connect() result, err := r.GetReceipt("receipt1") assert.NoError(err) assert.Equal("receipt1", (*result)["_id"]) @@ -355,14 +355,14 @@ func TestMongoReceiptsGetReceiptNotFound(t *testing.T) { assert := assert.New(t) mgoMock := &mockMongo{} - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{}, mgo: mgoMock, } mgoMock.collection.mockQuery.oneErr = mgo.ErrNotFound - r.connect() + r.Connect() result, err := r.GetReceipt("receipt1") assert.NoError(err) assert.Nil(result) @@ -372,14 +372,14 @@ func TestMongoReceiptsGetReceiptError(t *testing.T) { assert := assert.New(t) mgoMock := &mockMongo{} - r := &mongoReceipts{ + r := &MongoReceipts{ conf: &MongoDBReceiptStoreConf{}, mgo: mgoMock, } mgoMock.collection.mockQuery.oneErr = fmt.Errorf("pop") - r.connect() + r.Connect() _, err := r.GetReceipt("receipt1") assert.Regexp("pop", err) } diff --git a/internal/rest/mongwrapper.go b/internal/receipts/mongwrapper.go similarity index 99% rename from internal/rest/mongwrapper.go rename to internal/receipts/mongwrapper.go index 7a62ee9e..e17fdce1 100644 --- a/internal/rest/mongwrapper.go +++ b/internal/receipts/mongwrapper.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rest +package receipts // github.com/globalsign/mgo seems to be the most popular mgo fork. It seems // a key feature we need is missing from the official Golang library. diff --git a/internal/receipts/receipts.go b/internal/receipts/receipts.go new file mode 100644 index 00000000..0e163fa2 --- /dev/null +++ b/internal/receipts/receipts.go @@ -0,0 +1,45 @@ +// Copyright 2022 Kaleido + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receipts + +// ReceiptStorePersistence interface implemented by persistence layers +type ReceiptStorePersistence interface { + GetReceipts(skip, limit int, ids []string, sinceEpochMS int64, from, to, start string) (*[]map[string]interface{}, error) + GetReceipt(requestID string) (*map[string]interface{}, error) + AddReceipt(requestID string, receipt *map[string]interface{}, overwriteAndRetry bool) error +} + +// ReceiptStoreConf is the common configuration for all receipt stores +type ReceiptStoreConf struct { + MaxDocs int `json:"maxDocs"` + QueryLimit int `json:"queryLimit"` + RetryInitialDelayMS int `json:"retryInitialDelay"` + RetryTimeoutMS int `json:"retryTimeout"` +} + +// MongoDBReceiptStoreConf is the configuration for a MongoDB receipt store +type MongoDBReceiptStoreConf struct { + ReceiptStoreConf + URL string `json:"url"` + Database string `json:"database"` + Collection string `json:"collection"` + ConnectTimeoutMS int `json:"connectTimeout"` +} + +// LevelDBReceiptStoreConf is the configuration for a LevelDB receipt store +type LevelDBReceiptStoreConf struct { + ReceiptStoreConf + Path string `json:"path"` +} diff --git a/internal/rest/receiptstore.go b/internal/rest/receiptstore.go index ff6b1df2..4b728e65 100644 --- a/internal/rest/receiptstore.go +++ b/internal/rest/receiptstore.go @@ -26,6 +26,7 @@ import ( "github.com/hyperledger/firefly-ethconnect/internal/contractgateway" "github.com/hyperledger/firefly-ethconnect/internal/errors" "github.com/hyperledger/firefly-ethconnect/internal/messages" + "github.com/hyperledger/firefly-ethconnect/internal/receipts" "github.com/hyperledger/firefly-ethconnect/internal/utils" "github.com/julienschmidt/httprouter" log "github.com/sirupsen/logrus" @@ -36,26 +37,20 @@ const ( defaultRetryTimeout = 120 * 1000 defaultRetryInitialDelay = 500 defaultMaxDocs = 250 + backoffFactor = 1.1 ) var uuidCharsVerifier, _ = regexp.Compile("^[0-9a-zA-Z-]+$") -// ReceiptStorePersistence interface implemented by persistence layers -type ReceiptStorePersistence interface { - GetReceipts(skip, limit int, ids []string, sinceEpochMS int64, from, to, start string) (*[]map[string]interface{}, error) - GetReceipt(requestID string) (*map[string]interface{}, error) - AddReceipt(requestID string, receipt *map[string]interface{}, overwriteAndRetry bool) error -} - type receiptStore struct { - conf *ReceiptStoreConf - persistence ReceiptStorePersistence + conf *receipts.ReceiptStoreConf + persistence receipts.ReceiptStorePersistence smartContractGW contractgateway.SmartContractGateway reservedIDs map[string]bool reservationMux sync.Mutex } -func newReceiptStore(conf *ReceiptStoreConf, persistence ReceiptStorePersistence, smartContractGW contractgateway.SmartContractGateway) *receiptStore { +func newReceiptStore(conf *receipts.ReceiptStoreConf, persistence receipts.ReceiptStorePersistence, smartContractGW contractgateway.SmartContractGateway) *receiptStore { if conf.RetryTimeoutMS <= 0 { conf.RetryTimeoutMS = defaultRetryTimeout } @@ -145,9 +140,29 @@ func (r *receiptStore) processReply(msgBytes []byte) { msgType := utils.GetMapString(headers, "type") contractAddr := utils.GetMapString(parsedMsg, "contractAddress") result := "" - if msgType == messages.MsgTypeError { + switch msgType { + case messages.MsgTypeError: result = utils.GetMapString(parsedMsg, "errorMessage") - } else { + case messages.MsgTypeTransactionRedeliveryPrevented: + // If we receive this, then we need to make sure either: + // a) We have a good receipt in our DB already + // b) We swap the status into an error - as the application might have to check the transaction status themselves from the TX Hash + result = utils.GetMapString(parsedMsg, "transactionHash") + existingReceipt, err := r.persistence.GetReceipt(requestID) + if err == nil && existingReceipt != nil { + existingHeaders := r.extractHeaders(*existingReceipt) + msgType := utils.GetMapString(existingHeaders, "type") + if msgType == messages.MsgTypeTransactionFailure || msgType == messages.MsgTypeTransactionSuccess { + // We already have a valid receipt - do not overwrite it + log.Warnf("Ignoring redelivery reply message. requestId='%s' reqOffset='%s' type='%s': %s", requestID, reqOffset, msgType, result) + return + } + } + // We need to switch to an error to let them know we cannot provide the receipt + idempotencyErr := errors.Errorf(errors.ResubmissionPreventedCheckTransactionHash) + parsedMsg["errorCode"] = idempotencyErr.Code() + parsedMsg["errorMessage"] = idempotencyErr.ErrorNoCode() + default: result = utils.GetMapString(parsedMsg, "transactionHash") } log.Infof("Received reply message. requestId='%s' reqOffset='%s' type='%s': %s", requestID, reqOffset, msgType, result) diff --git a/internal/rest/receiptstore_test.go b/internal/rest/receiptstore_test.go index 4dd2c1df..682fe825 100644 --- a/internal/rest/receiptstore_test.go +++ b/internal/rest/receiptstore_test.go @@ -28,6 +28,7 @@ import ( "github.com/hyperledger/firefly-ethconnect/internal/auth/authtest" "github.com/hyperledger/firefly-ethconnect/internal/ethbind" "github.com/hyperledger/firefly-ethconnect/internal/messages" + "github.com/hyperledger/firefly-ethconnect/internal/receipts" "github.com/hyperledger/firefly-ethconnect/internal/utils" "github.com/julienschmidt/httprouter" ) @@ -54,7 +55,7 @@ func (m *mockReceiptErrs) AddReceipt(requestID string, receipt *map[string]inter } func newReceiptsErrTestServer(err error) (*receiptStore, *httptest.Server) { - r := newReceiptStore(&ReceiptStoreConf{ + r := newReceiptStore(&receipts.ReceiptStoreConf{ RetryTimeoutMS: 1, RetryInitialDelayMS: 1, }, &mockReceiptErrs{ @@ -67,20 +68,20 @@ func newReceiptsErrTestServer(err error) (*receiptStore, *httptest.Server) { return r, httptest.NewServer(router) } -func newReceiptsTestStore(replyCallback func(message interface{})) (*receiptStore, *memoryReceipts) { +func newReceiptsTestStore(replyCallback func(message interface{})) (*receiptStore, *receipts.MemoryReceipts) { gw := &mockContractGW{ replyCallback: replyCallback, } - conf := &ReceiptStoreConf{ + conf := &receipts.ReceiptStoreConf{ MaxDocs: 50, QueryLimit: 50, } - p := newMemoryReceipts(conf) + p := receipts.NewMemoryReceipts(conf) r := newReceiptStore(conf, p, gw) return r, p } -func newReceiptsTestServer() (*receiptStore, *memoryReceipts, *httptest.Server) { +func newReceiptsTestServer() (*receiptStore, *receipts.MemoryReceipts, *httptest.Server) { r, p := newReceiptsTestStore(nil) router := &httprouter.Router{} r.addRoutes(router) @@ -103,8 +104,8 @@ func TestReplyProcessorWithValidReply(t *testing.T) { r.processReply(replyMsgBytes) - assert.Equal(1, p.receipts.Len()) - front := *p.receipts.Front().Value.(*map[string]interface{}) + assert.Equal(1, p.Receipts().Len()) + front := *p.Receipts().Front().Value.(*map[string]interface{}) assert.Equal(replyMsg.Headers.ReqID, front["_id"]) } @@ -128,8 +129,8 @@ func TestReplyProcessorWithContractGWSuccess(t *testing.T) { r.processReply(replyMsgBytes) - assert.Equal(1, p.receipts.Len()) - front := *p.receipts.Front().Value.(*map[string]interface{}) + assert.Equal(1, p.Receipts().Len()) + front := *p.Receipts().Front().Value.(*map[string]interface{}) assert.Equal(replyMsg.Headers.ReqID, front["_id"]) } @@ -155,8 +156,8 @@ func TestReplyProcessorWithContractGWFailure(t *testing.T) { r.processReply(replyMsgBytes) - assert.Equal(1, p.receipts.Len()) - front := *p.receipts.Front().Value.(*map[string]interface{}) + assert.Equal(1, p.Receipts().Len()) + front := *p.Receipts().Front().Value.(*map[string]interface{}) assert.Equal(replyMsg.Headers.ReqID, front["_id"]) } @@ -183,7 +184,7 @@ func TestReplyProcessorWithInvalidReplySwallowsErr(t *testing.T) { } func TestReplyProcessorWithPeristenceErrorPanics(t *testing.T) { - r := newReceiptStore(&ReceiptStoreConf{ + r := newReceiptStore(&receipts.ReceiptStoreConf{ RetryTimeoutMS: 1, RetryInitialDelayMS: 1, }, &mockReceiptErrs{ @@ -220,8 +221,8 @@ func TestReplyProcessorWithErrorReply(t *testing.T) { r.processReply(replyMsgBytes) - assert.Equal(1, p.receipts.Len()) - front := *p.receipts.Front().Value.(*map[string]interface{}) + assert.Equal(1, p.Receipts().Len()) + front := *p.Receipts().Front().Value.(*map[string]interface{}) assert.Equal(replyMsg.Headers.ReqID, front["_id"]) assert.Equal(replyMsg.ErrorMessage, front["errorMessage"]) assert.Equal(replyMsg.OriginalMessage, front["requestPayload"]) @@ -236,7 +237,7 @@ func TestReplyProcessorMissingHeaders(t *testing.T) { msgBytes, _ := json.Marshal(&emptyMsg) r.processReply(msgBytes) - assert.Equal(0, p.receipts.Len()) + assert.Equal(0, p.Receipts().Len()) } func TestReplyProcessorMissingRequestId(t *testing.T) { @@ -249,7 +250,7 @@ func TestReplyProcessorMissingRequestId(t *testing.T) { r.processReply(replyMsgBytes) - assert.Equal(0, p.receipts.Len()) + assert.Equal(0, p.Receipts().Len()) } func TestReplyProcessorInsertError(t *testing.T) { @@ -263,7 +264,7 @@ func TestReplyProcessorInsertError(t *testing.T) { r.processReply(replyMsgBytes) - assert.Equal(1, p.receipts.Len()) + assert.Equal(1, p.Receipts().Len()) } func testGETObject(ts *httptest.Server, path string) (int, map[string]interface{}, error) { @@ -566,6 +567,63 @@ func TestSendReplyBroadcast(t *testing.T) { r.processReply(replyMsgBytes) } +func TestSendReplyRedeliveryStore(t *testing.T) { + assert := assert.New(t) + r, _ := newReceiptsTestStore(func(message interface{}) { + assert.NotNil(message) + }) + + replyMsg := &messages.TransactionReceipt{} + replyMsg.Headers.MsgType = messages.MsgTypeTransactionRedeliveryPrevented + replyMsg.Headers.ID = utils.UUIDv4() + replyMsg.Headers.ReqID = utils.UUIDv4() + replyMsg.Headers.ReqOffset = "topic:1:2" + txHash := ethbind.API.HexToHash("0x02587104e9879911bea3d5bf6ccd7e1a6cb9a03145b8a1141804cebd6aa67c5c") + replyMsg.TransactionHash = &txHash + replyMsgBytes, _ := json.Marshal(&replyMsg) + + r.processReply(replyMsgBytes) + + rec, err := r.persistence.GetReceipt(replyMsg.Headers.ReqID) + assert.NoError(err) + assert.NotNil(rec) + assert.Equal(messages.MsgTypeTransactionRedeliveryPrevented, (*rec)["headers"].(map[string]interface{})["type"].(string)) +} + +func TestSendReplyRedeliverySkip(t *testing.T) { + assert := assert.New(t) + r, _ := newReceiptsTestStore(func(message interface{}) { + assert.NotNil(message) + }) + + reqID := utils.UUIDv4() + replyMsg := &messages.TransactionReceipt{} + replyMsg.Headers.MsgType = messages.MsgTypeTransactionSuccess + replyMsg.Headers.ID = utils.UUIDv4() + replyMsg.Headers.ReqID = reqID + replyMsg.Headers.ReqOffset = "topic:1:2" + txHash := ethbind.API.HexToHash("0x02587104e9879911bea3d5bf6ccd7e1a6cb9a03145b8a1141804cebd6aa67c5c") + replyMsg.TransactionHash = &txHash + replyMsgBytes, _ := json.Marshal(&replyMsg) + + r.processReply(replyMsgBytes) + + replyMsg = &messages.TransactionReceipt{} + replyMsg.Headers.MsgType = messages.MsgTypeTransactionRedeliveryPrevented + replyMsg.Headers.ID = utils.UUIDv4() + replyMsg.Headers.ReqID = reqID + replyMsg.Headers.ReqOffset = "topic:1:3" + replyMsg.TransactionHash = &txHash + replyMsgBytes, _ = json.Marshal(&replyMsg) + + r.processReply(replyMsgBytes) + + rec, err := r.persistence.GetReceipt(replyMsg.Headers.ReqID) + assert.NoError(err) + assert.NotNil(rec) + assert.Equal(messages.MsgTypeTransactionSuccess, (*rec)["headers"].(map[string]interface{})["type"].(string)) +} + func TestReserveID(t *testing.T) { assert := assert.New(t) r, _ := newReceiptsTestStore(func(message interface{}) { diff --git a/internal/rest/restgateway.go b/internal/rest/restgateway.go index a7af37b6..65d87a0c 100644 --- a/internal/rest/restgateway.go +++ b/internal/rest/restgateway.go @@ -32,6 +32,7 @@ import ( "github.com/hyperledger/firefly-ethconnect/internal/eth" "github.com/hyperledger/firefly-ethconnect/internal/kafka" "github.com/hyperledger/firefly-ethconnect/internal/messages" + "github.com/hyperledger/firefly-ethconnect/internal/receipts" "github.com/hyperledger/firefly-ethconnect/internal/tx" "github.com/hyperledger/firefly-ethconnect/internal/utils" "github.com/hyperledger/firefly-ethconnect/internal/ws" @@ -47,35 +48,12 @@ const ( MaxHeaderSize = 16 * 1024 ) -// ReceiptStoreConf is the common configuration for all receipt stores -type ReceiptStoreConf struct { - MaxDocs int `json:"maxDocs"` - QueryLimit int `json:"queryLimit"` - RetryInitialDelayMS int `json:"retryInitialDelay"` - RetryTimeoutMS int `json:"retryTimeout"` -} - -// MongoDBReceiptStoreConf is the configuration for a MongoDB receipt store -type MongoDBReceiptStoreConf struct { - ReceiptStoreConf - URL string `json:"url"` - Database string `json:"database"` - Collection string `json:"collection"` - ConnectTimeoutMS int `json:"connectTimeout"` -} - -// LevelDBReceiptStoreConf is the configuration for a LevelDB receipt store -type LevelDBReceiptStoreConf struct { - ReceiptStoreConf - Path string `json:"path"` -} - // RESTGatewayConf defines the YAML config structure for a webhooks bridge instance type RESTGatewayConf struct { Kafka kafka.KafkaCommonConf `json:"kafka"` - MongoDB MongoDBReceiptStoreConf `json:"mongodb"` - LevelDB LevelDBReceiptStoreConf `json:"leveldb"` - MemStore ReceiptStoreConf `json:"memstore"` + MongoDB receipts.MongoDBReceiptStoreConf `json:"mongodb"` + LevelDB receipts.LevelDBReceiptStoreConf `json:"leveldb"` + MemStore receipts.ReceiptStoreConf `json:"memstore"` OpenAPI contractgateway.SmartContractGatewayConf `json:"openapi"` HTTP struct { LocalAddr string `json:"localAddr"` @@ -237,9 +215,43 @@ func (g *RESTGateway) newAccessTokenContextHandler(parent http.Handler) http.Han }) } +// ReceiptStorePersistence allows other components to access the receipt store persistence for idempotency checks, when co-located in the same address space +func (g *RESTGateway) InitReceiptStore() (receipts.ReceiptStorePersistence, error) { + var receiptStoreConf *receipts.ReceiptStoreConf + var receiptStorePersistence receipts.ReceiptStorePersistence + if g.conf.MongoDB.URL != "" { + receiptStoreConf = &g.conf.MongoDB.ReceiptStoreConf + mongoStore := receipts.NewMongoReceipts(&g.conf.MongoDB) + receiptStorePersistence = mongoStore + if err := mongoStore.Connect(); err != nil { + return nil, err + } + } else if g.conf.LevelDB.Path != "" { + receiptStoreConf = &g.conf.LevelDB.ReceiptStoreConf + leveldbStore, err := receipts.NewLevelDBReceipts(&g.conf.LevelDB) + if err != nil { + return nil, err + } + receiptStorePersistence = leveldbStore + } else { + receiptStoreConf = &g.conf.MemStore + memStore := receipts.NewMemoryReceipts(&g.conf.MemStore) + receiptStorePersistence = memStore + } + g.receipts = newReceiptStore(receiptStoreConf, receiptStorePersistence, g.smartContractGW) + return g.receipts.persistence, nil +} + // Start kicks off the HTTP listener and router func (g *RESTGateway) Start() (err error) { + // Ensure the receipt store is initialized + if g.receipts == nil { + if _, err := g.InitReceiptStore(); err != nil { + return err + } + } + if *g.printYAML { b, err := utils.MarshalToYAML(&g.conf) print("# YAML Configuration snippet for REST Gateway\n" + string(b)) @@ -274,31 +286,7 @@ func (g *RESTGateway) Start() (err error) { g.smartContractGW.AddRoutes(router) } - var receiptStoreConf *ReceiptStoreConf - var receiptStorePersistence ReceiptStorePersistence - if g.conf.MongoDB.URL != "" { - receiptStoreConf = &g.conf.MongoDB.ReceiptStoreConf - mongoStore := newMongoReceipts(&g.conf.MongoDB) - receiptStorePersistence = mongoStore - if err = mongoStore.connect(); err != nil { - return - } - } else if g.conf.LevelDB.Path != "" { - receiptStoreConf = &g.conf.LevelDB.ReceiptStoreConf - leveldbStore, errResult := newLevelDBReceipts(&g.conf.LevelDB) - if errResult != nil { - err = errResult - return - } - receiptStorePersistence = leveldbStore - } else { - receiptStoreConf = &g.conf.MemStore - memStore := newMemoryReceipts(&g.conf.MemStore) - receiptStorePersistence = memStore - } - router.GET("/status", g.statusHandler) - g.receipts = newReceiptStore(receiptStoreConf, receiptStorePersistence, g.smartContractGW) g.receipts.addRoutes(router) if len(g.conf.Kafka.Brokers) > 0 { wk := newWebhooksKafka(&g.conf.Kafka, g.receipts) diff --git a/internal/rest/webhooks_test.go b/internal/rest/webhooks_test.go index 1a9fa09a..fb9aa0a4 100644 --- a/internal/rest/webhooks_test.go +++ b/internal/rest/webhooks_test.go @@ -26,6 +26,7 @@ import ( "testing" "github.com/hyperledger/firefly-ethconnect/internal/messages" + "github.com/hyperledger/firefly-ethconnect/internal/receipts" "github.com/hyperledger/firefly-ethconnect/mocks/ethmocks" "github.com/hyperledger/firefly-ethconnect/mocks/ffcapiconnectormocks" "github.com/julienschmidt/httprouter" @@ -127,8 +128,8 @@ func TestWebhookHandlerContractGWImmeidateReceiptSuccess(t *testing.T) { } deployMsgBytes, _ := json.Marshal(&deployMsg) req, _ := http.NewRequest("POST", "/any", bytes.NewReader(deployMsgBytes)) - r := newMemoryReceipts(&ReceiptStoreConf{}) - rs := newReceiptStore(&ReceiptStoreConf{}, r, nil) + r := receipts.NewMemoryReceipts(&receipts.ReceiptStoreConf{}) + rs := newReceiptStore(&receipts.ReceiptStoreConf{}, r, nil) w := &webhooks{ smartContractGW: &mockContractGW{}, handler: &mockHandler{}, diff --git a/internal/rest/webhooksdirect_test.go b/internal/rest/webhooksdirect_test.go index dbe03cea..ed1d52d5 100644 --- a/internal/rest/webhooksdirect_test.go +++ b/internal/rest/webhooksdirect_test.go @@ -26,6 +26,7 @@ import ( "github.com/hyperledger/firefly-ethconnect/internal/eth" "github.com/hyperledger/firefly-ethconnect/internal/messages" + "github.com/hyperledger/firefly-ethconnect/internal/receipts" "github.com/hyperledger/firefly-ethconnect/internal/tx" "github.com/julienschmidt/httprouter" @@ -41,10 +42,12 @@ func (p *mockProcessor) OnMessage(ctx tx.TxnContext) { p.capturedCtx = ctx.(*msgContext) } func (p *mockProcessor) Init(eth.RPCClient) {} +func (p *mockProcessor) SetReceiptStoreForIdempotencyCheck(receiptStore receipts.ReceiptStorePersistence) { +} -func newTestWebhooksDirect(maxMsgs int) (*webhooksDirect, *memoryReceipts, *mockProcessor) { - rsc := &ReceiptStoreConf{} - r := newMemoryReceipts(rsc) +func newTestWebhooksDirect(maxMsgs int) (*webhooksDirect, *receipts.MemoryReceipts, *mockProcessor) { + rsc := &receipts.ReceiptStoreConf{} + r := receipts.NewMemoryReceipts(rsc) rs := newReceiptStore(rsc, r, nil) conf := &WebhooksDirectConf{ MaxInFlight: maxMsgs, @@ -55,7 +58,7 @@ func newTestWebhooksDirect(maxMsgs int) (*webhooksDirect, *memoryReceipts, *mock return wd, r, p } -func newTestWebhooksDirectServer(maxMsgs int) (*webhooksDirect, *httptest.Server, *memoryReceipts, *mockProcessor) { +func newTestWebhooksDirectServer(maxMsgs int) (*webhooksDirect, *httptest.Server, *receipts.MemoryReceipts, *mockProcessor) { wd, r, p := newTestWebhooksDirect(maxMsgs) router := &httprouter.Router{} wh := newWebhooks(wd, wd.receipts, nil, nil, eth.EthCommonConf{}) diff --git a/internal/rest/webhookskafka_test.go b/internal/rest/webhookskafka_test.go index 68a638b2..d887a4cb 100644 --- a/internal/rest/webhookskafka_test.go +++ b/internal/rest/webhookskafka_test.go @@ -36,6 +36,7 @@ import ( "github.com/hyperledger/firefly-ethconnect/internal/eth" "github.com/hyperledger/firefly-ethconnect/internal/kafka" "github.com/hyperledger/firefly-ethconnect/internal/messages" + "github.com/hyperledger/firefly-ethconnect/internal/receipts" "github.com/julienschmidt/httprouter" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -104,8 +105,8 @@ func newTestKafkaComon() *testKafkaCommon { } func newTestWebhooks() (*webhooks, *webhooksKafka, *testKafkaCommon, *httptest.Server) { - p := &memoryReceipts{} - r := newReceiptStore(&ReceiptStoreConf{}, p, nil) + p := &receipts.MemoryReceipts{} + r := newReceiptStore(&receipts.ReceiptStoreConf{}, p, nil) k := newTestKafkaComon() wk := newWebhooksKafkaBase(r) wk.kafka = k diff --git a/internal/tx/txnprocessor.go b/internal/tx/txnprocessor.go index c95a94be..614763d1 100644 --- a/internal/tx/txnprocessor.go +++ b/internal/tx/txnprocessor.go @@ -28,6 +28,7 @@ import ( "github.com/hyperledger/firefly-ethconnect/internal/errors" "github.com/hyperledger/firefly-ethconnect/internal/eth" "github.com/hyperledger/firefly-ethconnect/internal/messages" + "github.com/hyperledger/firefly-ethconnect/internal/receipts" "github.com/hyperledger/firefly-ethconnect/internal/utils" ethbinding "github.com/kaleido-io/ethbinding/pkg" log "github.com/sirupsen/logrus" @@ -47,6 +48,7 @@ type TxnProcessor interface { OnMessage(TxnContext) Init(eth.RPCClient) ResolveAddress(from string) (resolvedFrom string, err error) + SetReceiptStoreForIdempotencyCheck(receiptStore receipts.ReceiptStorePersistence) } var highestID = 1000000 @@ -67,6 +69,7 @@ type inflightTxn struct { signer eth.TXSigner gapFillSucceeded bool gapFillTxHash string + idempotencyCheck bool } func (i *inflightTxn) nonceNumber() json.Number { @@ -117,6 +120,7 @@ type txnProcessor struct { concurrencySlots chan bool concurrency int64 gasEstimationFactor float64 + receiptStore receipts.ReceiptStorePersistence sendRetryForce bool sendRetryDelayMin time.Duration @@ -171,6 +175,16 @@ func (p *txnProcessor) Init(rpc eth.RPCClient) { } } +// SetReceiptStoreForIdempotencyCheck is for the common case, that we are running the REST API Gateway +// component, and the Kafka Bridge component in the same address space. +// When set, this allows us to re-do the idempotency check that can be used on the REST API Gateway +// (see fly-acktype=receipt / kld-acktype=receipt) when we are passed messages by Kafka. +// Then if due to a Kafka reconnect/failover of the consumer group, we have had a redelivery, we can +// avoid resubmission. +func (p *txnProcessor) SetReceiptStoreForIdempotencyCheck(receiptStore receipts.ReceiptStorePersistence) { + p.receiptStore = receiptStore +} + // CobraInitTxnProcessor sets the standard command-line parameters for the txnprocessor func CobraInitTxnProcessor(cmd *cobra.Command, txconf *TxnProcessorConf) { cmd.Flags().IntVarP(&txconf.MaxTXWaitTime, "tx-timeout", "x", utils.DefInt("ETH_TX_TIMEOUT", 0), "Maximum wait time for an individual transaction (seconds)") @@ -235,8 +249,61 @@ func (p *txnProcessor) resolveSigner(from string) (signer eth.TXSigner, err erro return } +// idempotencyCheck called by addInflightWrapper within the inflight lock, in the case the +// extra ackType=receipt idempotency check is enabled, and possible due to co-location with +// the REST API Gateway. +func (p *txnProcessor) idempotencyCheck(inflight *inflightTxn, inflightForAddr *inflightTxnState) (bool, error) { + inflight.idempotencyCheck = true + + // First check the in-memory list + for _, alreadyInflight := range inflightForAddr.txnsInFlight { + if alreadyInflight.msgID == inflight.msgID { + log.Warnf("Kafka redelivery of message already inflight: %s", inflight.msgID) + // We don't send a new reply here - special nil return + return false, nil + } + } + // Then check LevelDB - we should find the entry + r, err := p.receiptStore.GetReceipt(inflight.msgID) + if err != nil { + err = errors.Errorf(errors.ReceiptErrorIdempotencyCheck, inflight.msgID, err) + return false, err + } + if r == nil { + log.Warnf("Did not find acktype=receipt record in receipt store during dispatch: %s", inflight.msgID) + } else if txHash, txHashSet := (*r)["transactionHash"]; txHashSet { + if hashString, ok := txHash.(string); ok && hashString != "" { + log.Warnf("Kafka redelivery of message already dispatched: %s", inflight.msgID) + // We need to call back to let the Kafka layer know this should be cancelled. + var notification messages.TransactionRedeliveryNotification + notification.Headers.MsgType = messages.MsgTypeTransactionRedeliveryPrevented + notification.TransactionHash = hashString + go inflight.txnContext.Reply(¬ification) + return false, nil + } + } + return true, nil +} + +// idempotencyUpdateSubmitted writes the raw reply before removing the in-flight transaction entry. +// This doesn't stop it being sent to Kafka and then re-written by the REST API Gateway when it receives it, +// and emits that update on the webhook. +func (p *txnProcessor) idempotencyUpdateSubmitted(inflight *inflightTxn) { + r, err := p.receiptStore.GetReceipt(inflight.msgID) + if r != nil && err == nil { + // We mark it submitted by setting the transaction hash - this means even if the reply doesn't get through, + // anyone checking the receipt store will find the transaction hash and be able to call our API to + // check the chain directly for the receipt. + (*r)["transactionHash"] = inflight.tx.Hash + err = p.receiptStore.AddReceipt(inflight.msgID, r, true) + } + if err != nil { + log.Errorf("Failed to write dispatched record %s for idempotency checking: %s", inflight.msgID, err) + } +} + // newInflightWrapper uses the supplied transaction, the inflight txn list -// and the ethereum node's transction count to determine the right next +// and the ethereum node's transaction count to determine the right next // nonce for the transaction. // Builds a new wrapper containing this information, that can be added to // the inflight list if the transaction is submitted @@ -285,6 +352,7 @@ func (p *txnProcessor) addInflightWrapper(txnContext TxnContext, msg *messages.T // Hold the lock just while we're adding it to the map and dealing with nonce checking. p.inflightTxnsLock.Lock() + defer p.inflightTxnsLock.Unlock() // The user can supply a nonce and manage them externally, using their own // application-side list of transactions, to prevent the possibility of @@ -293,21 +361,33 @@ func (p *txnProcessor) addInflightWrapper(txnContext TxnContext, msg *messages.T highestID++ var highestNonce int64 = -1 suppliedNonce := msg.Nonce - inflightForAddr, exists := p.inflightTxns[inflight.from] + inflightForAddr, alreadyInflightForAddr := p.inflightTxns[inflight.from] // Add the inflight transaction to our tracking structure - newEntry := false - if !exists { + if !alreadyInflightForAddr { inflightForAddr = &inflightTxnState{} inflightForAddr.txnsInFlight = []*inflightTxn{} // We don't want this new structure to be added in the case of an early return on failure, so // we just mark here that it should be added, and it gets added only on the success path. - newEntry = true + } + + // We do an additional idempotency check before accepting the in-flight messages from Kafka, because + // it might be a redelivery. + // We can only do the idempotency check if: + // 1. We are co-located with the REST API Gateway in the same go process + // - Otherwise SetReceiptStoreForIdempotencyCheck() won't have been called to set p.receiptStore + // 2. The user specified fly-acktype=receipt on the REST API Gateway, which propagates into AckType on the message we receive + // - This is the (slightly awkward) spelling to enable an idempotency check on the REST API Gateway + if p.receiptStore != nil && msg.AckType == "receipt" { + submit, err := p.idempotencyCheck(inflight, inflightForAddr) + if !submit || err != nil { + return nil, err // note nil, nil now must be handled by callers + } } if !nodeAssignNonce && suppliedNonce == "" { // Check the currently inflight txns to see if we have a high nonce to use without // needing to query the node to find the highest nonce. - if exists { + if alreadyInflightForAddr { highestNonce = inflightForAddr.highestNonce } } @@ -328,7 +408,6 @@ func (p *txnProcessor) addInflightWrapper(txnContext TxnContext, msg *messages.T // Note: We do not have highestNonce calculation for in-flight private transactions, // so attempting to submit more than one per block currently will FAIL if inflight.nonce, err = eth.GetOrionTXCount(txnContext.Context(), p.rpc, &from, inflight.privacyGroupID); err != nil { - p.inflightTxnsLock.Unlock() return nil, err } fromNode = true @@ -347,7 +426,6 @@ func (p *txnProcessor) addInflightWrapper(txnContext TxnContext, msg *messages.T // (or if gas price is being varied by the submitter the potential of // overwriting a transaction) if inflight.nonce, err = eth.GetTransactionCount(txnContext.Context(), p.rpc, &from, "pending"); err != nil { - p.inflightTxnsLock.Unlock() return nil, err } inflightForAddr.highestNonce = inflight.nonce // store the nonce in our inflight txns state @@ -357,14 +435,11 @@ func (p *txnProcessor) addInflightWrapper(txnContext TxnContext, msg *messages.T before := len(inflightForAddr.txnsInFlight) inflightForAddr.txnsInFlight = append(inflightForAddr.txnsInFlight, inflight) inflight.initialWaitDelay = p.inflightTxnDelayer.GetInitialDelay() // Must call under lock - if newEntry { + if !alreadyInflightForAddr { p.inflightTxns[inflight.from] = inflightForAddr } - // Clear lock before logging - p.inflightTxnsLock.Unlock() - - log.Infof("In-flight %d added. nonce=%d addr=%s before=%d (node=%t)", inflight.id, inflight.nonce, inflight.from, before, fromNode) + log.Infof("In-flight %s added (%d). nonce=%d addr=%s before=%d (node=%t)", inflight.msgID, inflight.id, inflight.nonce, inflight.from, before, fromNode) return } @@ -403,9 +478,10 @@ func (p *txnProcessor) cancelInFlight(inflight *inflightTxn, submitted bool) { } } } + p.inflightTxnsLock.Unlock() - log.Infof("In-flight %d complete. nonce=%d addr=%s nan=%t sub=%t before=%d after=%d highest=%d", inflight.id, inflight.nonce, inflight.from, inflight.nodeAssignNonce, submitted, before, after, highestNonce) + log.Infof("In-flight %s complete (%d). nonce=%d addr=%s nan=%t sub=%t before=%d after=%d highest=%d", inflight.msgID, inflight.id, inflight.nonce, inflight.from, inflight.nodeAssignNonce, submitted, before, after, highestNonce) // If we've got a gap potential, we need to submit a gap-fill TX if !submitted && highestNonce > inflight.nonce && !inflight.nodeAssignNonce { @@ -474,6 +550,12 @@ func (p *txnProcessor) waitForCompletion(inflight *inflightTxn, initialWaitDelay } } + // If this request had the additional idempotency check enabled, then we need to write the reply + // in-line here, before we remove the entry from the in-flight list. + if inflight.idempotencyCheck { + p.idempotencyUpdateSubmitted(inflight) + } + if timedOut { if err != nil { inflight.txnContext.SendErrorReplyWithTX(500, errors.Errorf(errors.TransactionSendReceiptCheckError, retries, err), inflight.tx.Hash) @@ -538,7 +620,6 @@ func (p *txnProcessor) waitForCompletion(inflight *inflightTxn, initialWaitDelay if receipt.TransactionIndex != nil { reply.TransactionIndexStr = strconv.FormatUint(uint64(*receipt.TransactionIndex), 10) } - inflight.txnContext.Reply(&reply) } @@ -565,6 +646,10 @@ func (p *txnProcessor) OnDeployContractMessage(txnContext TxnContext, msg *messa txnContext.SendErrorReply(400, err) return } + if inflight == nil { + // Skip sending due to idempotency check - any reply is already handled + return + } inflight.registerAs = msg.RegisterAs msg.Nonce = inflight.nonceNumber() @@ -585,6 +670,10 @@ func (p *txnProcessor) OnSendTransactionMessage(txnContext TxnContext, msg *mess txnContext.SendErrorReply(400, err) return } + if inflight == nil { + // Skip sending due to idempotency check - any reply is already handled + return + } msg.Nonce = inflight.nonceNumber() tx, err := eth.NewSendTxn(msg, inflight.signer) diff --git a/internal/tx/txnprocessor_test.go b/internal/tx/txnprocessor_test.go index d002d3ba..594a3578 100644 --- a/internal/tx/txnprocessor_test.go +++ b/internal/tx/txnprocessor_test.go @@ -34,9 +34,11 @@ import ( "github.com/hyperledger/firefly-ethconnect/internal/eth" "github.com/hyperledger/firefly-ethconnect/internal/ethbind" "github.com/hyperledger/firefly-ethconnect/internal/messages" + "github.com/hyperledger/firefly-ethconnect/mocks/receiptsmocks" ethbinding "github.com/kaleido-io/ethbinding/pkg" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) type errorReply struct { @@ -106,6 +108,28 @@ var goodSendTxnJSONWithoutGas = "{" + " \"method\":{\"name\":\"test\"}" + "}" +var goodSendTxnJSONIdempotent = `{ + "headers": { + "type": "SendTransaction", + "id": "id12345-idempotent" + }, + "from": "` + testFromAddr + `", + "gas": "123", + "method": {"name":"test"}, + "acktype": "receipt" + }` + +var goodDeployTxnJSONIdempotent = `{ + "headers": { + "type": "DeployContract", + "id": "id12345-idempotent" + }, + "from": "` + testFromAddr + `", + "gas": "123", + "solidity": "pragma solidity >=0.4.22 <=0.8; contract t {constructor() public {}}", + "acktype": "receipt" + }` + var goodDeployTxnPrivateJSON = "{" + " \"headers\":{\"type\": \"DeployContract\"}," + " \"solidity\":\"pragma solidity >=0.4.22 <=0.8; contract t {constructor() public {}}\"," + @@ -754,6 +778,177 @@ func TestOnSendTransactionMessageTxnTimeout(t *testing.T) { } +func TestOnSendTransactionMessageTxnIdempotentStoreOK(t *testing.T) { + + zero := 0 + txnProcessor := NewTxnProcessor(&TxnProcessorConf{ + MaxTXWaitTime: 1, + SendRetryMax: &zero, + }, ð.RPCConf{}).(*txnProcessor) + testTxnContext := &testTxnContext{} + testTxnContext.jsonMsg = goodSendTxnJSONIdempotent + testRPC := goodMessageRPC() + txnProcessor.Init(testRPC) + + mr := &receiptsmocks.ReceiptStorePersistence{} + txnProcessor.SetReceiptStoreForIdempotencyCheck(mr) + mr.On("GetReceipt", "id12345-idempotent").Return(&map[string]interface{}{ + // no transactionHash field + }, nil) + mr.On("AddReceipt", "id12345-idempotent", mock.MatchedBy(func(r *map[string]interface{}) bool { + return (*r)["transactionHash"] == "0xe2215336b09f9b5b82e36e1144ed64f40a42e61b68fdaca82549fd98b8531a89" + }), true).Return(nil).Once() + + txnProcessor.OnMessage(testTxnContext) + for inMap := false; !inMap; _, inMap = txnProcessor.inflightTxns[strings.ToLower(testFromAddr)] { + time.Sleep(1 * time.Millisecond) + } + txnWG := &txnProcessor.inflightTxns[strings.ToLower(testFromAddr)].txnsInFlight[0].wg + txnWG.Wait() + + mr.AssertExpectations(t) +} + +func TestOnSendTransactionMessageTxnHandleNotFound(t *testing.T) { + + zero := 0 + txnProcessor := NewTxnProcessor(&TxnProcessorConf{ + MaxTXWaitTime: 1, + SendRetryMax: &zero, + }, ð.RPCConf{}).(*txnProcessor) + testTxnContext := &testTxnContext{} + testTxnContext.jsonMsg = goodSendTxnJSONIdempotent + testRPC := goodMessageRPC() + txnProcessor.Init(testRPC) + + mr := &receiptsmocks.ReceiptStorePersistence{} + txnProcessor.SetReceiptStoreForIdempotencyCheck(mr) + mr.On("GetReceipt", "id12345-idempotent").Return(nil, nil) // not found on the way in, so disabled + + txnProcessor.OnMessage(testTxnContext) + for inMap := false; !inMap; _, inMap = txnProcessor.inflightTxns[strings.ToLower(testFromAddr)] { + time.Sleep(1 * time.Millisecond) + } + txnWG := &txnProcessor.inflightTxns[strings.ToLower(testFromAddr)].txnsInFlight[0].wg + txnWG.Wait() + + mr.AssertExpectations(t) +} + +func TestOnSendTransactionMessageFail(t *testing.T) { + + zero := 0 + txnProcessor := NewTxnProcessor(&TxnProcessorConf{ + MaxTXWaitTime: 1, + SendRetryMax: &zero, + }, ð.RPCConf{}).(*txnProcessor) + testTxnContext := &testTxnContext{} + testTxnContext.jsonMsg = goodSendTxnJSONIdempotent + testRPC := goodMessageRPC() + txnProcessor.Init(testRPC) + + mr := &receiptsmocks.ReceiptStorePersistence{} + txnProcessor.SetReceiptStoreForIdempotencyCheck(mr) + mr.On("GetReceipt", "id12345-idempotent").Return(nil, fmt.Errorf("pop")) + + txnProcessor.OnMessage(testTxnContext) + assert.Empty(t, txnProcessor.inflightTxns) + + mr.AssertExpectations(t) +} + +func TestOnSendTransactionMessageTxnIdempotentStoreFail(t *testing.T) { + + zero := 0 + txnProcessor := NewTxnProcessor(&TxnProcessorConf{ + MaxTXWaitTime: 1, + SendRetryMax: &zero, + }, ð.RPCConf{}).(*txnProcessor) + testTxnContext := &testTxnContext{} + testTxnContext.jsonMsg = goodSendTxnJSONIdempotent + testRPC := goodMessageRPC() + txnProcessor.Init(testRPC) + + mr := &receiptsmocks.ReceiptStorePersistence{} + txnProcessor.SetReceiptStoreForIdempotencyCheck(mr) + mr.On("GetReceipt", "id12345-idempotent").Return(&map[string]interface{}{ + // no transactionHash field + }, nil) + mr.On("AddReceipt", "id12345-idempotent", mock.MatchedBy(func(r *map[string]interface{}) bool { + return (*r)["transactionHash"] == "0xe2215336b09f9b5b82e36e1144ed64f40a42e61b68fdaca82549fd98b8531a89" + }), true).Return(fmt.Errorf("pop")).Once() // swallowed with logging + + txnProcessor.OnMessage(testTxnContext) + for inMap := false; !inMap; _, inMap = txnProcessor.inflightTxns[strings.ToLower(testFromAddr)] { + time.Sleep(1 * time.Millisecond) + } + txnWG := &txnProcessor.inflightTxns[strings.ToLower(testFromAddr)].txnsInFlight[0].wg + txnWG.Wait() + + mr.AssertExpectations(t) +} + +func TestOnSendTransactionMessageTxnIdempotentSkipInflight(t *testing.T) { + + zero := 0 + txnProcessor := NewTxnProcessor(&TxnProcessorConf{ + MaxTXWaitTime: 1, + SendRetryMax: &zero, + }, ð.RPCConf{}).(*txnProcessor) + testTxnContext := &testTxnContext{} + testTxnContext.jsonMsg = goodDeployTxnJSONIdempotent + testRPC := goodMessageRPC() + txnProcessor.Init(testRPC) + + txnProcessor.inflightTxns = map[string]*inflightTxnState{ + strings.ToLower(testFromAddr): { + txnsInFlight: []*inflightTxn{ + {msgID: "id12345-idempotent"}, + }, + }, + } + + mr := &receiptsmocks.ReceiptStorePersistence{} + txnProcessor.SetReceiptStoreForIdempotencyCheck(mr) + + txnProcessor.OnMessage(testTxnContext) + assert.Len(t, txnProcessor.inflightTxns, 1) + + mr.AssertExpectations(t) +} + +func TestOnSendTransactionMessageTxnIdempotentSkipDuplicate(t *testing.T) { + + zero := 0 + txnProcessor := NewTxnProcessor(&TxnProcessorConf{ + MaxTXWaitTime: 1, + SendRetryMax: &zero, + }, ð.RPCConf{}).(*txnProcessor) + testTxnContext := &testTxnContext{} + testTxnContext.jsonMsg = goodSendTxnJSONIdempotent + testRPC := goodMessageRPC() + txnProcessor.Init(testRPC) + + mr := &receiptsmocks.ReceiptStorePersistence{} + txnProcessor.SetReceiptStoreForIdempotencyCheck(mr) + mr.On("GetReceipt", "id12345-idempotent").Return(&map[string]interface{}{ + "transactionHash": "0xe2215336b09f9b5b82e36e1144ed64f40a42e61b68fdaca82549fd98b8531a89", + }, nil).Once() + + txnProcessor.OnMessage(testTxnContext) + assert.Empty(t, txnProcessor.inflightTxns) + + mr.AssertExpectations(t) + + for { + if len(testTxnContext.replies) > 0 { + break + } + time.Sleep(10 * time.Microsecond) + } + assert.Equal(t, messages.MsgTypeTransactionRedeliveryPrevented, testTxnContext.replies[0].ReplyHeaders().MsgType) +} + func TestOnSendTransactionMessageFailedTxn(t *testing.T) { assert := assert.New(t) diff --git a/mocks/receiptsmocks/receipt_store_persistence.go b/mocks/receiptsmocks/receipt_store_persistence.go new file mode 100644 index 00000000..7f69a667 --- /dev/null +++ b/mocks/receiptsmocks/receipt_store_persistence.go @@ -0,0 +1,70 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package receiptsmocks + +import mock "github.com/stretchr/testify/mock" + +// ReceiptStorePersistence is an autogenerated mock type for the ReceiptStorePersistence type +type ReceiptStorePersistence struct { + mock.Mock +} + +// AddReceipt provides a mock function with given fields: requestID, receipt, overwriteAndRetry +func (_m *ReceiptStorePersistence) AddReceipt(requestID string, receipt *map[string]interface{}, overwriteAndRetry bool) error { + ret := _m.Called(requestID, receipt, overwriteAndRetry) + + var r0 error + if rf, ok := ret.Get(0).(func(string, *map[string]interface{}, bool) error); ok { + r0 = rf(requestID, receipt, overwriteAndRetry) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetReceipt provides a mock function with given fields: requestID +func (_m *ReceiptStorePersistence) GetReceipt(requestID string) (*map[string]interface{}, error) { + ret := _m.Called(requestID) + + var r0 *map[string]interface{} + if rf, ok := ret.Get(0).(func(string) *map[string]interface{}); ok { + r0 = rf(requestID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*map[string]interface{}) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(requestID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetReceipts provides a mock function with given fields: skip, limit, ids, sinceEpochMS, from, to, start +func (_m *ReceiptStorePersistence) GetReceipts(skip int, limit int, ids []string, sinceEpochMS int64, from string, to string, start string) (*[]map[string]interface{}, error) { + ret := _m.Called(skip, limit, ids, sinceEpochMS, from, to, start) + + var r0 *[]map[string]interface{} + if rf, ok := ret.Get(0).(func(int, int, []string, int64, string, string, string) *[]map[string]interface{}); ok { + r0 = rf(skip, limit, ids, sinceEpochMS, from, to, start) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*[]map[string]interface{}) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(int, int, []string, int64, string, string, string) error); ok { + r1 = rf(skip, limit, ids, sinceEpochMS, from, to, start) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +}