From 2a2982bd79c41ac612c4ea35113a54cdce8dba7c Mon Sep 17 00:00:00 2001 From: jkrvivian Date: Fri, 19 Jan 2024 17:26:45 +0800 Subject: [PATCH 1/5] Implement transactions topic --- pkg/mqtt/publish.go | 18 ++++++++++++++++++ pkg/mqtt/server.go | 10 ++++++++-- pkg/mqtt/topics.go | 1 + 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/pkg/mqtt/publish.go b/pkg/mqtt/publish.go index d55624d..3a40548 100644 --- a/pkg/mqtt/publish.go +++ b/pkg/mqtt/publish.go @@ -156,6 +156,24 @@ func (s *Server) publishBlockIfSubscribed(block *iotago.Block, rawData []byte) e }, blockTopics...) } +func (s *Server) publishTransactionIfSubscribed(block *iotago.Block) error { + blk, ok := block.Body.(*iotago.BasicBlockBody) + if !ok { + return nil + } + + signedTx, ok := blk.Payload.(*iotago.SignedTransaction) + if !ok { + return nil + } + + return s.publishWithPayloadFuncsOnTopicsIfSubscribed(func() ([]byte, error) { + return signedTx.Transaction.API.JSONEncode(signedTx.Transaction) + }, func() ([]byte, error) { + return signedTx.Transaction.API.Encode(signedTx.Transaction) + }, TopicTransactions) +} + func (s *Server) publishBlockMetadataOnTopicsIfSubscribed(metadataFunc func() (*iotaapi.BlockMetadataResponse, error), topics ...string) error { return s.publishPayloadOnTopicsIfSubscribed( func() (iotago.API, error) { return s.NodeBridge.APIProvider().CommittedAPI(), nil }, diff --git a/pkg/mqtt/server.go b/pkg/mqtt/server.go index 8b6f180..01feb59 100644 --- a/pkg/mqtt/server.go +++ b/pkg/mqtt/server.go @@ -213,7 +213,8 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st TopicBlocksBasic, TopicBlocksBasicTransaction, TopicBlocksBasicTransactionTaggedData, - TopicBlocksBasicTaggedData: + TopicBlocksBasicTaggedData, + TopicTransactions: s.startListenIfNeeded(ctx, GrpcListenToBlocks, s.listenToBlocks) case TopicBlockMetadataAccepted: @@ -280,7 +281,8 @@ func (s *Server) onUnsubscribeTopic(clientID string, topic string) { TopicBlocksBasic, TopicBlocksBasicTransaction, TopicBlocksBasicTransactionTaggedData, - TopicBlocksBasicTaggedData: + TopicBlocksBasicTaggedData, + TopicTransactions: s.stopListenIfNeeded(GrpcListenToBlocks) case TopicBlockMetadataAccepted: @@ -399,6 +401,10 @@ func (s *Server) listenToBlocks(ctx context.Context) error { s.LogErrorf("failed to publish block: %v", err) } + if err := s.publishTransactionIfSubscribed(block); err != nil { + s.LogErrorf("failed to publish block: %v", err) + } + // we don't return an error here, because we want to continue listening even if publishing fails once return nil }) diff --git a/pkg/mqtt/topics.go b/pkg/mqtt/topics.go index 228f1e1..8bccaf2 100644 --- a/pkg/mqtt/topics.go +++ b/pkg/mqtt/topics.go @@ -38,6 +38,7 @@ const ( TopicBlocksBasicTransactionTaggedData = "blocks/basic/transaction/tagged-data" // iotago.Block (track all incoming basic blocks with transactions and tagged data) TopicBlocksBasicTransactionTaggedDataTag = "blocks/basic/transaction/tagged-data/" + ParameterTag // iotago.Block (track all incoming basic blocks with transactions and specific tagged data) + TopicTransactions = "transactions" // iotago.Transaction (track all incoming transactions) // single block on subscribe and changes in it's metadata (accepted, confirmed). TopicTransactionsIncludedBlock = "transactions/" + ParameterTransactionID + "/included-block" // api.BlockWithMetadataResponse (track inclusion of a single transaction) From 47f7d4d6f8d9b6f2661e2a1d1d263fa8bb5a8311 Mon Sep 17 00:00:00 2001 From: jkrvivian Date: Fri, 19 Jan 2024 17:47:03 +0800 Subject: [PATCH 2/5] Add unit tests for transactions topic --- pkg/testsuite/mqtt_test.go | 38 +++++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/pkg/testsuite/mqtt_test.go b/pkg/testsuite/mqtt_test.go index 41b576a..63e43c3 100644 --- a/pkg/testsuite/mqtt_test.go +++ b/pkg/testsuite/mqtt_test.go @@ -199,6 +199,36 @@ func TestMqttTopics(t *testing.T) { } }(), + // ok - Transactions + func() *test { + testTx := ts.NewTestTransaction(true) + + return &test{ + name: "ok - Transactions", + topics: []*testTopic{ + { + topic: mqtt.TopicTransactions, + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.TopicBlocks, + mqtt.TopicBlocksBasic, + mqtt.TopicBlocksBasicTransaction, + mqtt.TopicBlocksBasicTransactionTaggedData, + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Transaction)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Transaction)), + postSubscribeFunc: func() { + ts.ReceiveBlock(&testsuite.MockedBlock{ + Block: testTx.Block, + RawBlockData: lo.PanicOnErr(ts.API().Encode(testTx.Block)), + }) + }, + } + }(), + // ok - Basic block with transaction and tagged data payload func() *test { testTx := ts.NewTestTransaction(true, tpkg.WithTxEssencePayload( @@ -237,9 +267,11 @@ func TestMqttTopics(t *testing.T) { isEventTarget: true, }, }, - topicsIgnore: []string{}, - jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Block)), - rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Block)), + topicsIgnore: []string{ + mqtt.TopicTransactions, + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Block)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Block)), postSubscribeFunc: func() { ts.ReceiveBlock(&testsuite.MockedBlock{ Block: testTx.Block, From bf6a8bbbf9bd95172a4efcf160e50a9003b6ea33 Mon Sep 17 00:00:00 2001 From: jkrvivian Date: Tue, 23 Jan 2024 21:43:48 +0800 Subject: [PATCH 3/5] Add transaction topic for specific txID --- pkg/mqtt/publish.go | 7 +++ pkg/mqtt/server.go | 88 ++++++++++++++++++++++++++++++++++++++ pkg/mqtt/topics.go | 20 +++++++++ pkg/testsuite/mqtt_test.go | 59 +++++++++++++++++++++++++ 4 files changed, 174 insertions(+) diff --git a/pkg/mqtt/publish.go b/pkg/mqtt/publish.go index 3a40548..4e9bc6a 100644 --- a/pkg/mqtt/publish.go +++ b/pkg/mqtt/publish.go @@ -196,6 +196,13 @@ func (s *Server) publishOutputIfSubscribed(ctx context.Context, output *nodebrid return output.Metadata.BlockID, nil }, ) + + s.fetchAndPublishTransaction(ctx, + output.OutputID.TransactionID(), + func() (iotago.BlockID, error) { + return output.Metadata.BlockID, nil + }, + ) } bech32HRP := s.NodeBridge.APIProvider().CommittedAPI().ProtocolParameters().Bech32HRP() diff --git a/pkg/mqtt/server.go b/pkg/mqtt/server.go index 01feb59..fcbec52 100644 --- a/pkg/mqtt/server.go +++ b/pkg/mqtt/server.go @@ -251,12 +251,16 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st // topicOutputsByUnlockConditionAndAddress // topicSpentOutputsByUnlockConditionAndAddress // topicTransactionsIncludedBlock + // topicTransactions s.startListenIfNeeded(ctx, GrpcListenToAcceptedTransactions, s.listenToAcceptedTransactions) s.startListenIfNeeded(ctx, GrpcListenToLedgerUpdates, s.listenToLedgerUpdates) if transactionID := TransactionIDFromTransactionsIncludedBlockTopic(topic); transactionID != iotago.EmptyTransactionID { go s.fetchAndPublishTransactionInclusion(ctx, transactionID) } + if transactionID := TransactionIDFromTransactionTopic(topic); transactionID != iotago.EmptyTransactionID { + go s.fetchAndPublishTransactions(ctx, transactionID) + } if outputID := OutputIDFromOutputsTopic(topic); outputID != iotago.EmptyOutputID { go s.fetchAndPublishOutput(ctx, outputID) } @@ -600,3 +604,87 @@ func (s *Server) fetchAndPublishTransactionInclusionWithBlock(ctx context.Contex s.LogErrorf("failed to publish transaction inclusion %s: %v", transactionID.ToHex(), err) } } + +func (s *Server) fetchAndPublishTransactions(ctx context.Context, transactionID iotago.TransactionID) { + + var blockID iotago.BlockID + blockIDFunc := func() (iotago.BlockID, error) { + if blockID.Empty() { + // get the output and then the blockID of the transaction that created the output + outputID := iotago.OutputID{} + copy(outputID[:], transactionID[:]) + + ctxFetch, cancelFetch := context.WithTimeout(ctx, fetchTimeout) + defer cancelFetch() + + output, err := s.NodeBridge.Output(ctxFetch, outputID) + if err != nil { + return iotago.EmptyBlockID, ierrors.Wrapf(err, "failed to retrieve output of transaction %s", transactionID.ToHex()) + } + + return output.Metadata.BlockID, nil + } + + return blockID, nil + } + + s.fetchAndPublishTransaction(ctx, transactionID, blockIDFunc) +} + +func (s *Server) fetchAndPublishTransaction(ctx context.Context, transactionID iotago.TransactionID, blockIDFunc func() (iotago.BlockID, error)) { + ctxFetch, cancelFetch := context.WithTimeout(ctx, fetchTimeout) + defer cancelFetch() + + var tx *iotago.Transaction + txFunc := func() (*iotago.Transaction, error) { + if tx != nil { + return tx, nil + } + + blockID, err := blockIDFunc() + if err != nil { + return nil, err + } + + resp, err := s.NodeBridge.Block(ctxFetch, blockID) + if err != nil { + s.LogErrorf("failed to retrieve block %s :%v", blockID.ToHex(), err) + return nil, err + } + + basicBlk, ok := resp.Body.(*iotago.BasicBlockBody) + if !ok { + err := ierrors.Errorf("block body is not basic block %s ", blockID.ToHex()) + s.LogError(err.Error()) + return nil, err + } + + signedTx, ok := basicBlk.Payload.(*iotago.SignedTransaction) + if !ok { + err := ierrors.Errorf("block payload is not signed transaction %s ", blockID.ToHex()) + s.LogError(err.Error()) + return nil, err + } + + tx = signedTx.Transaction + + return tx, nil + } + + if err := s.publishPayloadOnTopicsIfSubscribed( + func() (iotago.API, error) { + tx, err := txFunc() + if err != nil { + return nil, err + } + + return tx.API, nil + }, + func() (any, error) { + return txFunc() + }, + GetTopicTransaction(transactionID), + ); err != nil { + s.LogErrorf("failed to publish transaction %s: %v", transactionID.ToHex(), err) + } +} diff --git a/pkg/mqtt/topics.go b/pkg/mqtt/topics.go index 8bccaf2..f570fc1 100644 --- a/pkg/mqtt/topics.go +++ b/pkg/mqtt/topics.go @@ -41,6 +41,7 @@ const ( TopicTransactions = "transactions" // iotago.Transaction (track all incoming transactions) // single block on subscribe and changes in it's metadata (accepted, confirmed). TopicTransactionsIncludedBlock = "transactions/" + ParameterTransactionID + "/included-block" // api.BlockWithMetadataResponse (track inclusion of a single transaction) + TopicTransaction = "transactions/" + ParameterTransactionID // iotago.Transaction (track a specific transaction) // single block on subscribe and changes in it's metadata (accepted, confirmed). TopicBlockMetadata = "block-metadata/" + ParameterBlockID // api.BlockMetadataResponse (track changes to a single block) @@ -103,6 +104,21 @@ func TransactionIDFromTransactionsIncludedBlockTopic(topic string) iotago.Transa return iotago.EmptyTransactionID } +func TransactionIDFromTransactionTopic(topic string) iotago.TransactionID { + if strings.HasPrefix(topic, "transactions/") && !strings.HasSuffix(topic, "/included-block") { + transactionIDHex := strings.Replace(topic, "transactions/", "", 1) + + transactionID, err := iotago.TransactionIDFromHexString(transactionIDHex) + if err != nil || len(transactionID) != iotago.TransactionIDLength { + return iotago.EmptyTransactionID + } + + return transactionID + } + + return iotago.EmptyTransactionID +} + func OutputIDFromOutputsTopic(topic string) iotago.OutputID { if strings.HasPrefix(topic, "outputs/") && strings.Count(topic, "/") == 1 { outputIDHex := strings.Replace(topic, "outputs/", "", 1) @@ -137,6 +153,10 @@ func GetTopicTransactionsIncludedBlock(transactionID iotago.TransactionID) strin return strings.ReplaceAll(TopicTransactionsIncludedBlock, ParameterTransactionID, transactionID.ToHex()) } +func GetTopicTransaction(transactionID iotago.TransactionID) string { + return strings.ReplaceAll(TopicTransaction, ParameterTransactionID, transactionID.ToHex()) +} + func GetTopicAccountOutputs(accountID iotago.AccountID, hrp iotago.NetworkPrefix) string { return strings.ReplaceAll(TopicAccountOutputs, ParameterAccountAddress, accountID.ToAddress().Bech32(hrp)) } diff --git a/pkg/testsuite/mqtt_test.go b/pkg/testsuite/mqtt_test.go index 63e43c3..35ed84f 100644 --- a/pkg/testsuite/mqtt_test.go +++ b/pkg/testsuite/mqtt_test.go @@ -229,6 +229,55 @@ func TestMqttTopics(t *testing.T) { } }(), + // ok - Transaction + func() *test { + testTx := ts.NewTestTransaction(true) + + return &test{ + name: "ok - Transaction - TransactionTopic", + topics: []*testTopic{ + { + topic: mqtt.GetTopicTransaction(testTx.TransactionID), + isPollingTarget: true, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicOutput(testTx.ConsumedOutputID), + mqtt.GetTopicOutput(testTx.OutputID), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Transaction)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Transaction)), + preSubscribeFunc: func() { + // we need to add the block to the nodebridge, so that it is available + // for the TransactionsIncludedBlockTopic + ts.MockAddBlock(testTx.BlockID, testTx.Block) + + // we also need to add the first output to the nodebridge, so that it is available. + // this is also used by the TransactionsIncludedBlockTopic to get the blockID of the block containing the transaction of that output + ts.MockAddOutput(testTx.OutputID, testTx.Output) + }, + postSubscribeFunc: func() { + ts.ReceiveAcceptedTransaction(&nodebridge.AcceptedTransaction{ + API: ts.API(), + Slot: testTx.BlockID.Slot(), + TransactionID: testTx.TransactionID, + // the consumed input + Consumed: []*nodebridge.Output{ + ts.NewSpentNodeBridgeOutputFromTransaction(tpkg.RandBlockID(), testTx.ConsumedOutputCreationTransaction, testTx.BlockID.Slot(), testTx.TransactionID), + }, + // the created output + Created: []*nodebridge.Output{ + ts.NewNodeBridgeOutputFromTransaction(testTx.BlockID, testTx.Transaction), + }, + }) + }, + } + }(), + // ok - Basic block with transaction and tagged data payload func() *test { testTx := ts.NewTestTransaction(true, tpkg.WithTxEssencePayload( @@ -299,6 +348,7 @@ func TestMqttTopics(t *testing.T) { mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicTransaction(testTx.TransactionID), }, jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Block)), rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Block)), @@ -446,6 +496,7 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -492,6 +543,7 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -535,6 +587,7 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -580,6 +633,7 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -624,6 +678,7 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -667,6 +722,7 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -751,6 +807,7 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.SenderAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -816,6 +873,7 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicAnchorOutputs(anchorOutput.AnchorID, ts.API().ProtocolParameters().Bech32HRP()), @@ -867,6 +925,7 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicFoundryOutputs(lo.PanicOnErr(foundryOutput.FoundryID())), From d9cbec95b8a411f125d097c75cec7854b008fcad Mon Sep 17 00:00:00 2001 From: jkrvivian Date: Wed, 24 Jan 2024 17:21:21 +0800 Subject: [PATCH 4/5] Add transaction-metadata topic --- pkg/mqtt/publish.go | 12 ++++++ pkg/mqtt/server.go | 23 ++++++++++- pkg/mqtt/topics.go | 20 ++++++++++ pkg/testsuite/mqtt_test.go | 66 ++++++++++++++++++++++++++++++++ pkg/testsuite/nodebridge_mock.go | 26 +++++++++---- pkg/testsuite/testsuite.go | 4 ++ 6 files changed, 141 insertions(+), 10 deletions(-) diff --git a/pkg/mqtt/publish.go b/pkg/mqtt/publish.go index 4e9bc6a..ee45aff 100644 --- a/pkg/mqtt/publish.go +++ b/pkg/mqtt/publish.go @@ -184,6 +184,16 @@ func (s *Server) publishBlockMetadataOnTopicsIfSubscribed(metadataFunc func() (* ) } +func (s *Server) publishTransactionMetadataOnTopicsIfSubscribed(metadataFunc func() (*iotaapi.TransactionMetadataResponse, error), topics ...string) error { + return s.publishPayloadOnTopicsIfSubscribed( + func() (iotago.API, error) { return s.NodeBridge.APIProvider().CommittedAPI(), nil }, + func() (any, error) { + return metadataFunc() + }, + topics..., + ) +} + func (s *Server) publishOutputIfSubscribed(ctx context.Context, output *nodebridge.Output, publishOnAllTopics bool) error { topics := []string{GetTopicOutput(output.OutputID)} @@ -203,6 +213,8 @@ func (s *Server) publishOutputIfSubscribed(ctx context.Context, output *nodebrid return output.Metadata.BlockID, nil }, ) + + s.fetchAndPublishTransactionMetadata(ctx, output.OutputID.TransactionID()) } bech32HRP := s.NodeBridge.APIProvider().CommittedAPI().ProtocolParameters().Bech32HRP() diff --git a/pkg/mqtt/server.go b/pkg/mqtt/server.go index fcbec52..6ccea24 100644 --- a/pkg/mqtt/server.go +++ b/pkg/mqtt/server.go @@ -242,7 +242,7 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st go s.fetchAndPublishBlockMetadata(ctx, blockID) } - case strings.HasPrefix(topic, "outputs/") || strings.HasPrefix(topic, "transactions/"): + case strings.HasPrefix(topic, "outputs/") || strings.HasPrefix(topic, "transactions/") || strings.HasPrefix(topic, "transaction-metadata/"): // topicOutputs // topicAccountOutputs // topicAnchorOutputs @@ -252,6 +252,7 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st // topicSpentOutputsByUnlockConditionAndAddress // topicTransactionsIncludedBlock // topicTransactions + // topicTransactionMetadata s.startListenIfNeeded(ctx, GrpcListenToAcceptedTransactions, s.listenToAcceptedTransactions) s.startListenIfNeeded(ctx, GrpcListenToLedgerUpdates, s.listenToLedgerUpdates) @@ -261,6 +262,9 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st if transactionID := TransactionIDFromTransactionTopic(topic); transactionID != iotago.EmptyTransactionID { go s.fetchAndPublishTransactions(ctx, transactionID) } + if transactionID := TransactionIDFromTransactionMetadataTopic(topic); transactionID != iotago.EmptyTransactionID { + go s.fetchAndPublishTransactionMetadata(ctx, transactionID) + } if outputID := OutputIDFromOutputsTopic(topic); outputID != iotago.EmptyOutputID { go s.fetchAndPublishOutput(ctx, outputID) } @@ -308,7 +312,7 @@ func (s *Server) onUnsubscribeTopic(clientID string, topic string) { s.stopListenIfNeeded(GrpcListenToAcceptedBlocks) s.stopListenIfNeeded(GrpcListenToConfirmedBlocks) - case strings.HasPrefix(topic, "outputs/") || strings.HasPrefix(topic, "transactions/"): + case strings.HasPrefix(topic, "outputs/") || strings.HasPrefix(topic, "transactions/") || strings.HasPrefix(topic, "transaction-metadata/"): // topicOutputs // topicAccountOutputs // topicAnchorOutputs @@ -317,6 +321,8 @@ func (s *Server) onUnsubscribeTopic(clientID string, topic string) { // topicOutputsByUnlockConditionAndAddress // topicSpentOutputsByUnlockConditionAndAddress // topicTransactionsIncludedBlock + // topicTransactions + // topicTransactionMetadata s.stopListenIfNeeded(GrpcListenToAcceptedTransactions) s.stopListenIfNeeded(GrpcListenToLedgerUpdates) } @@ -536,6 +542,19 @@ func (s *Server) fetchAndPublishOutput(ctx context.Context, outputID iotago.Outp } } +func (s *Server) fetchAndPublishTransactionMetadata(ctx context.Context, transactionID iotago.TransactionID) { + if err := s.publishTransactionMetadataOnTopicsIfSubscribed(func() (*iotaapi.TransactionMetadataResponse, error) { + resp, err := s.NodeBridge.TransactionMetadata(ctx, transactionID) + if err != nil { + return nil, ierrors.Wrapf(err, "failed to retrieve transaction metadata %s", transactionID.ToHex()) + } + + return resp, nil + }, GetTopicTransactionMetadata(transactionID)); err != nil { + s.LogErrorf("failed to publish transaction metadata %s: %v", transactionID.ToHex(), err) + } +} + func (s *Server) fetchAndPublishTransactionInclusion(ctx context.Context, transactionID iotago.TransactionID) { var blockID iotago.BlockID diff --git a/pkg/mqtt/topics.go b/pkg/mqtt/topics.go index f570fc1..92b1a85 100644 --- a/pkg/mqtt/topics.go +++ b/pkg/mqtt/topics.go @@ -42,6 +42,7 @@ const ( // single block on subscribe and changes in it's metadata (accepted, confirmed). TopicTransactionsIncludedBlock = "transactions/" + ParameterTransactionID + "/included-block" // api.BlockWithMetadataResponse (track inclusion of a single transaction) TopicTransaction = "transactions/" + ParameterTransactionID // iotago.Transaction (track a specific transaction) + TopicTransactionMetadata = "transaction-metadata/" + ParameterTransactionID // api.TransactionMetadataResponse (track a specific transaction) // single block on subscribe and changes in it's metadata (accepted, confirmed). TopicBlockMetadata = "block-metadata/" + ParameterBlockID // api.BlockMetadataResponse (track changes to a single block) @@ -119,6 +120,21 @@ func TransactionIDFromTransactionTopic(topic string) iotago.TransactionID { return iotago.EmptyTransactionID } +func TransactionIDFromTransactionMetadataTopic(topic string) iotago.TransactionID { + if strings.HasPrefix(topic, "transaction-metadata/") { + transactionIDHex := strings.Replace(topic, "transaction-metadata/", "", 1) + + transactionID, err := iotago.TransactionIDFromHexString(transactionIDHex) + if err != nil || len(transactionID) != iotago.TransactionIDLength { + return iotago.EmptyTransactionID + } + + return transactionID + } + + return iotago.EmptyTransactionID +} + func OutputIDFromOutputsTopic(topic string) iotago.OutputID { if strings.HasPrefix(topic, "outputs/") && strings.Count(topic, "/") == 1 { outputIDHex := strings.Replace(topic, "outputs/", "", 1) @@ -157,6 +173,10 @@ func GetTopicTransaction(transactionID iotago.TransactionID) string { return strings.ReplaceAll(TopicTransaction, ParameterTransactionID, transactionID.ToHex()) } +func GetTopicTransactionMetadata(transactionID iotago.TransactionID) string { + return strings.ReplaceAll(TopicTransactionMetadata, ParameterTransactionID, transactionID.ToHex()) +} + func GetTopicAccountOutputs(accountID iotago.AccountID, hrp iotago.NetworkPrefix) string { return strings.ReplaceAll(TopicAccountOutputs, ParameterAccountAddress, accountID.ToAddress().Bech32(hrp)) } diff --git a/pkg/testsuite/mqtt_test.go b/pkg/testsuite/mqtt_test.go index 35ed84f..d147730 100644 --- a/pkg/testsuite/mqtt_test.go +++ b/pkg/testsuite/mqtt_test.go @@ -248,6 +248,7 @@ func TestMqttTopics(t *testing.T) { mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicTransactionMetadata(testTx.TransactionID), }, jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Transaction)), rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Transaction)), @@ -278,6 +279,61 @@ func TestMqttTopics(t *testing.T) { } }(), + // ok - TransactionMetadata + func() *test { + testTx := ts.NewTestTransaction(true) + transactionMetadataResponse := &api.TransactionMetadataResponse{ + TransactionID: testTx.TransactionID, + TransactionState: api.TransactionStateAccepted, + } + + return &test{ + name: "ok - Transaction - TopicTransactionMetadata", + topics: []*testTopic{ + { + topic: mqtt.GetTopicTransactionMetadata(testTx.TransactionID), + isPollingTarget: true, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicOutput(testTx.ConsumedOutputID), + mqtt.GetTopicOutput(testTx.OutputID), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicTransaction(testTx.TransactionID), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(transactionMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(transactionMetadataResponse)), + preSubscribeFunc: func() { + // we need to add the block to the nodebridge, so that it is available + // for the TransactionsIncludedBlockTopic + ts.MockAddBlock(testTx.BlockID, testTx.Block) + + // we also need to add the first output to the nodebridge, so that it is available. + // this is also used by the TransactionsIncludedBlockTopic to get the blockID of the block containing the transaction of that output + ts.MockAddOutput(testTx.OutputID, testTx.Output) + ts.MockAddTransactionMetadata(transactionMetadataResponse.TransactionID, transactionMetadataResponse) + }, + postSubscribeFunc: func() { + ts.ReceiveAcceptedTransaction(&nodebridge.AcceptedTransaction{ + API: ts.API(), + Slot: testTx.BlockID.Slot(), + TransactionID: testTx.TransactionID, + // the consumed input + Consumed: []*nodebridge.Output{ + ts.NewSpentNodeBridgeOutputFromTransaction(tpkg.RandBlockID(), testTx.ConsumedOutputCreationTransaction, testTx.BlockID.Slot(), testTx.TransactionID), + }, + // the created output + Created: []*nodebridge.Output{ + ts.NewNodeBridgeOutputFromOutputWithMetadata(testTx.OutputWithMetadataResponse), + }, + }) + }, + } + }(), + // ok - Basic block with transaction and tagged data payload func() *test { testTx := ts.NewTestTransaction(true, tpkg.WithTxEssencePayload( @@ -349,6 +405,7 @@ func TestMqttTopics(t *testing.T) { mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), mqtt.GetTopicTransaction(testTx.TransactionID), + mqtt.GetTopicTransactionMetadata(testTx.TransactionID), }, jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Block)), rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Block)), @@ -497,6 +554,7 @@ func TestMqttTopics(t *testing.T) { topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), mqtt.GetTopicTransaction(testTx.TransactionID), + mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -544,6 +602,7 @@ func TestMqttTopics(t *testing.T) { topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), mqtt.GetTopicTransaction(testTx.TransactionID), + mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -588,6 +647,7 @@ func TestMqttTopics(t *testing.T) { topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), mqtt.GetTopicTransaction(testTx.TransactionID), + mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -634,6 +694,7 @@ func TestMqttTopics(t *testing.T) { topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), mqtt.GetTopicTransaction(testTx.TransactionID), + mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -679,6 +740,7 @@ func TestMqttTopics(t *testing.T) { topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), mqtt.GetTopicTransaction(testTx.TransactionID), + mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -723,6 +785,7 @@ func TestMqttTopics(t *testing.T) { topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), mqtt.GetTopicTransaction(testTx.TransactionID), + mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -808,6 +871,7 @@ func TestMqttTopics(t *testing.T) { topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), mqtt.GetTopicTransaction(testTx.TransactionID), + mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.SenderAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -874,6 +938,7 @@ func TestMqttTopics(t *testing.T) { topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), mqtt.GetTopicTransaction(testTx.TransactionID), + mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicAnchorOutputs(anchorOutput.AnchorID, ts.API().ProtocolParameters().Bech32HRP()), @@ -926,6 +991,7 @@ func TestMqttTopics(t *testing.T) { topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), mqtt.GetTopicTransaction(testTx.TransactionID), + mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicFoundryOutputs(lo.PanicOnErr(foundryOutput.FoundryID())), diff --git a/pkg/testsuite/nodebridge_mock.go b/pkg/testsuite/nodebridge_mock.go index 638b5e0..2cee565 100644 --- a/pkg/testsuite/nodebridge_mock.go +++ b/pkg/testsuite/nodebridge_mock.go @@ -27,9 +27,10 @@ type MockedNodeBridge struct { mockedLatestCommitment *nodebridge.Commitment mockedLatestFinalizedCommitment *nodebridge.Commitment - mockedBlocks map[iotago.BlockID]*iotago.Block - mockedBlockMetadata map[iotago.BlockID]*iotaapi.BlockMetadataResponse - mockedOutputs map[iotago.OutputID]*nodebridge.Output + mockedBlocks map[iotago.BlockID]*iotago.Block + mockedBlockMetadata map[iotago.BlockID]*iotaapi.BlockMetadataResponse + mockedTransactionMetadata map[iotago.TransactionID]*iotaapi.TransactionMetadataResponse + mockedOutputs map[iotago.OutputID]*nodebridge.Output mockedStreamListenToBlocks *MockedStream[MockedBlock] mockedStreamListenToAcceptedBlocks *MockedStream[inx.BlockMetadata] @@ -50,10 +51,11 @@ func NewMockedNodeBridge(t *testing.T, api iotago.API) *MockedNodeBridge { LatestCommitmentChanged: event.New1[*nodebridge.Commitment](), LatestFinalizedCommitmentChanged: event.New1[*nodebridge.Commitment](), }, - apiProvider: iotago.SingleVersionProvider(api), - mockedBlocks: make(map[iotago.BlockID]*iotago.Block), - mockedBlockMetadata: make(map[iotago.BlockID]*iotaapi.BlockMetadataResponse), - mockedOutputs: make(map[iotago.OutputID]*nodebridge.Output), + apiProvider: iotago.SingleVersionProvider(api), + mockedBlocks: make(map[iotago.BlockID]*iotago.Block), + mockedBlockMetadata: make(map[iotago.BlockID]*iotaapi.BlockMetadataResponse), + mockedTransactionMetadata: make(map[iotago.TransactionID]*iotaapi.TransactionMetadataResponse), + mockedOutputs: make(map[iotago.OutputID]*nodebridge.Output), } } @@ -200,7 +202,11 @@ func (m *MockedNodeBridge) ListenToConfirmedBlocks(ctx context.Context, consumer // TransactionMetadata returns the transaction metadata for the given transaction ID. func (m *MockedNodeBridge) TransactionMetadata(ctx context.Context, transactionID iotago.TransactionID) (*iotaapi.TransactionMetadataResponse, error) { - panic("not implemented") + if transactionMetadata, ok := m.mockedTransactionMetadata[transactionID]; ok { + return transactionMetadata, nil + } + + return nil, status.Errorf(codes.NotFound, "transaction %s not found", transactionID.ToHex()) } func (m *MockedNodeBridge) Output(ctx context.Context, outputID iotago.OutputID) (*nodebridge.Output, error) { @@ -346,6 +352,10 @@ func (m *MockedNodeBridge) MockAddBlockMetadata(blockID iotago.BlockID, blockMet m.mockedBlockMetadata[blockID] = blockMetadata } +func (m *MockedNodeBridge) MockAddTransactionMetadata(transactionID iotago.TransactionID, transactionMetadata *iotaapi.TransactionMetadataResponse) { + m.mockedTransactionMetadata[transactionID] = transactionMetadata +} + func (m *MockedNodeBridge) MockAddOutput(outputID iotago.OutputID, output *nodebridge.Output) { m.mockedOutputs[outputID] = output } diff --git a/pkg/testsuite/testsuite.go b/pkg/testsuite/testsuite.go index d87fadc..c89db6e 100644 --- a/pkg/testsuite/testsuite.go +++ b/pkg/testsuite/testsuite.go @@ -125,6 +125,10 @@ func (ts *TestSuite) MockAddBlockMetadata(blockID iotago.BlockID, blockMetadata ts.nodeBridge.MockAddBlockMetadata(blockID, blockMetadata) } +func (ts *TestSuite) MockAddTransactionMetadata(transactionID iotago.TransactionID, transactionMetadata *api.TransactionMetadataResponse) { + ts.nodeBridge.MockAddTransactionMetadata(transactionID, transactionMetadata) +} + func (ts *TestSuite) MockAddOutput(outputID iotago.OutputID, output *nodebridge.Output) { ts.nodeBridge.MockAddOutput(outputID, output) } From 44cea2831ee9c93dc25ebc5f33eecaef2d9efa65 Mon Sep 17 00:00:00 2001 From: muXxer Date: Thu, 25 Jan 2024 18:07:09 +0100 Subject: [PATCH 5/5] Remove `TopicTransactions` and `TopicTransaction` and fix `TopicTransactionMetadata` --- pkg/mqtt/publish.go | 27 -------- pkg/mqtt/server.go | 108 ++++--------------------------- pkg/mqtt/topics.go | 23 +------ pkg/testsuite/mqtt_test.go | 106 +----------------------------- pkg/testsuite/nodebridge_mock.go | 2 +- 5 files changed, 16 insertions(+), 250 deletions(-) diff --git a/pkg/mqtt/publish.go b/pkg/mqtt/publish.go index ee45aff..7d9d8bb 100644 --- a/pkg/mqtt/publish.go +++ b/pkg/mqtt/publish.go @@ -156,24 +156,6 @@ func (s *Server) publishBlockIfSubscribed(block *iotago.Block, rawData []byte) e }, blockTopics...) } -func (s *Server) publishTransactionIfSubscribed(block *iotago.Block) error { - blk, ok := block.Body.(*iotago.BasicBlockBody) - if !ok { - return nil - } - - signedTx, ok := blk.Payload.(*iotago.SignedTransaction) - if !ok { - return nil - } - - return s.publishWithPayloadFuncsOnTopicsIfSubscribed(func() ([]byte, error) { - return signedTx.Transaction.API.JSONEncode(signedTx.Transaction) - }, func() ([]byte, error) { - return signedTx.Transaction.API.Encode(signedTx.Transaction) - }, TopicTransactions) -} - func (s *Server) publishBlockMetadataOnTopicsIfSubscribed(metadataFunc func() (*iotaapi.BlockMetadataResponse, error), topics ...string) error { return s.publishPayloadOnTopicsIfSubscribed( func() (iotago.API, error) { return s.NodeBridge.APIProvider().CommittedAPI(), nil }, @@ -206,15 +188,6 @@ func (s *Server) publishOutputIfSubscribed(ctx context.Context, output *nodebrid return output.Metadata.BlockID, nil }, ) - - s.fetchAndPublishTransaction(ctx, - output.OutputID.TransactionID(), - func() (iotago.BlockID, error) { - return output.Metadata.BlockID, nil - }, - ) - - s.fetchAndPublishTransactionMetadata(ctx, output.OutputID.TransactionID()) } bech32HRP := s.NodeBridge.APIProvider().CommittedAPI().ProtocolParameters().Bech32HRP() diff --git a/pkg/mqtt/server.go b/pkg/mqtt/server.go index 6ccea24..98c8944 100644 --- a/pkg/mqtt/server.go +++ b/pkg/mqtt/server.go @@ -213,8 +213,7 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st TopicBlocksBasic, TopicBlocksBasicTransaction, TopicBlocksBasicTransactionTaggedData, - TopicBlocksBasicTaggedData, - TopicTransactions: + TopicBlocksBasicTaggedData: s.startListenIfNeeded(ctx, GrpcListenToBlocks, s.listenToBlocks) case TopicBlockMetadataAccepted: @@ -251,7 +250,6 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st // topicOutputsByUnlockConditionAndAddress // topicSpentOutputsByUnlockConditionAndAddress // topicTransactionsIncludedBlock - // topicTransactions // topicTransactionMetadata s.startListenIfNeeded(ctx, GrpcListenToAcceptedTransactions, s.listenToAcceptedTransactions) s.startListenIfNeeded(ctx, GrpcListenToLedgerUpdates, s.listenToLedgerUpdates) @@ -259,9 +257,6 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st if transactionID := TransactionIDFromTransactionsIncludedBlockTopic(topic); transactionID != iotago.EmptyTransactionID { go s.fetchAndPublishTransactionInclusion(ctx, transactionID) } - if transactionID := TransactionIDFromTransactionTopic(topic); transactionID != iotago.EmptyTransactionID { - go s.fetchAndPublishTransactions(ctx, transactionID) - } if transactionID := TransactionIDFromTransactionMetadataTopic(topic); transactionID != iotago.EmptyTransactionID { go s.fetchAndPublishTransactionMetadata(ctx, transactionID) } @@ -289,8 +284,7 @@ func (s *Server) onUnsubscribeTopic(clientID string, topic string) { TopicBlocksBasic, TopicBlocksBasicTransaction, TopicBlocksBasicTransactionTaggedData, - TopicBlocksBasicTaggedData, - TopicTransactions: + TopicBlocksBasicTaggedData: s.stopListenIfNeeded(GrpcListenToBlocks) case TopicBlockMetadataAccepted: @@ -321,7 +315,6 @@ func (s *Server) onUnsubscribeTopic(clientID string, topic string) { // topicOutputsByUnlockConditionAndAddress // topicSpentOutputsByUnlockConditionAndAddress // topicTransactionsIncludedBlock - // topicTransactions // topicTransactionMetadata s.stopListenIfNeeded(GrpcListenToAcceptedTransactions) s.stopListenIfNeeded(GrpcListenToLedgerUpdates) @@ -411,10 +404,6 @@ func (s *Server) listenToBlocks(ctx context.Context) error { s.LogErrorf("failed to publish block: %v", err) } - if err := s.publishTransactionIfSubscribed(block); err != nil { - s.LogErrorf("failed to publish block: %v", err) - } - // we don't return an error here, because we want to continue listening even if publishing fails once return nil }) @@ -462,6 +451,9 @@ func (s *Server) listenToAcceptedTransactions(ctx context.Context) error { } } + // publish the transaction metadata for this transaction in case someone subscribed to it + s.fetchAndPublishTransactionMetadata(ctx, payload.TransactionID) + // we don't return an error here, because we want to continue listening even if publishing fails once return nil }) @@ -479,6 +471,12 @@ func (s *Server) listenToLedgerUpdates(ctx context.Context) error { if err := s.publishOutputIfSubscribed(ctx, created, true); err != nil { s.LogErrorf("failed to publish created output in ledger update: %v", err) } + + // If this is the first output in a transaction (index 0), publish the transaction + // metadata for the transaction that created this output in case someone subscribed to it. + if created.OutputID.Index() == 0 { + s.fetchAndPublishTransactionMetadata(ctx, created.OutputID.TransactionID()) + } } // we don't return an error here, because we want to continue listening even if publishing fails once @@ -623,87 +621,3 @@ func (s *Server) fetchAndPublishTransactionInclusionWithBlock(ctx context.Contex s.LogErrorf("failed to publish transaction inclusion %s: %v", transactionID.ToHex(), err) } } - -func (s *Server) fetchAndPublishTransactions(ctx context.Context, transactionID iotago.TransactionID) { - - var blockID iotago.BlockID - blockIDFunc := func() (iotago.BlockID, error) { - if blockID.Empty() { - // get the output and then the blockID of the transaction that created the output - outputID := iotago.OutputID{} - copy(outputID[:], transactionID[:]) - - ctxFetch, cancelFetch := context.WithTimeout(ctx, fetchTimeout) - defer cancelFetch() - - output, err := s.NodeBridge.Output(ctxFetch, outputID) - if err != nil { - return iotago.EmptyBlockID, ierrors.Wrapf(err, "failed to retrieve output of transaction %s", transactionID.ToHex()) - } - - return output.Metadata.BlockID, nil - } - - return blockID, nil - } - - s.fetchAndPublishTransaction(ctx, transactionID, blockIDFunc) -} - -func (s *Server) fetchAndPublishTransaction(ctx context.Context, transactionID iotago.TransactionID, blockIDFunc func() (iotago.BlockID, error)) { - ctxFetch, cancelFetch := context.WithTimeout(ctx, fetchTimeout) - defer cancelFetch() - - var tx *iotago.Transaction - txFunc := func() (*iotago.Transaction, error) { - if tx != nil { - return tx, nil - } - - blockID, err := blockIDFunc() - if err != nil { - return nil, err - } - - resp, err := s.NodeBridge.Block(ctxFetch, blockID) - if err != nil { - s.LogErrorf("failed to retrieve block %s :%v", blockID.ToHex(), err) - return nil, err - } - - basicBlk, ok := resp.Body.(*iotago.BasicBlockBody) - if !ok { - err := ierrors.Errorf("block body is not basic block %s ", blockID.ToHex()) - s.LogError(err.Error()) - return nil, err - } - - signedTx, ok := basicBlk.Payload.(*iotago.SignedTransaction) - if !ok { - err := ierrors.Errorf("block payload is not signed transaction %s ", blockID.ToHex()) - s.LogError(err.Error()) - return nil, err - } - - tx = signedTx.Transaction - - return tx, nil - } - - if err := s.publishPayloadOnTopicsIfSubscribed( - func() (iotago.API, error) { - tx, err := txFunc() - if err != nil { - return nil, err - } - - return tx.API, nil - }, - func() (any, error) { - return txFunc() - }, - GetTopicTransaction(transactionID), - ); err != nil { - s.LogErrorf("failed to publish transaction %s: %v", transactionID.ToHex(), err) - } -} diff --git a/pkg/mqtt/topics.go b/pkg/mqtt/topics.go index 92b1a85..edeefdc 100644 --- a/pkg/mqtt/topics.go +++ b/pkg/mqtt/topics.go @@ -38,10 +38,8 @@ const ( TopicBlocksBasicTransactionTaggedData = "blocks/basic/transaction/tagged-data" // iotago.Block (track all incoming basic blocks with transactions and tagged data) TopicBlocksBasicTransactionTaggedDataTag = "blocks/basic/transaction/tagged-data/" + ParameterTag // iotago.Block (track all incoming basic blocks with transactions and specific tagged data) - TopicTransactions = "transactions" // iotago.Transaction (track all incoming transactions) // single block on subscribe and changes in it's metadata (accepted, confirmed). TopicTransactionsIncludedBlock = "transactions/" + ParameterTransactionID + "/included-block" // api.BlockWithMetadataResponse (track inclusion of a single transaction) - TopicTransaction = "transactions/" + ParameterTransactionID // iotago.Transaction (track a specific transaction) TopicTransactionMetadata = "transaction-metadata/" + ParameterTransactionID // api.TransactionMetadataResponse (track a specific transaction) // single block on subscribe and changes in it's metadata (accepted, confirmed). @@ -105,23 +103,8 @@ func TransactionIDFromTransactionsIncludedBlockTopic(topic string) iotago.Transa return iotago.EmptyTransactionID } -func TransactionIDFromTransactionTopic(topic string) iotago.TransactionID { - if strings.HasPrefix(topic, "transactions/") && !strings.HasSuffix(topic, "/included-block") { - transactionIDHex := strings.Replace(topic, "transactions/", "", 1) - - transactionID, err := iotago.TransactionIDFromHexString(transactionIDHex) - if err != nil || len(transactionID) != iotago.TransactionIDLength { - return iotago.EmptyTransactionID - } - - return transactionID - } - - return iotago.EmptyTransactionID -} - func TransactionIDFromTransactionMetadataTopic(topic string) iotago.TransactionID { - if strings.HasPrefix(topic, "transaction-metadata/") { + if strings.HasPrefix(topic, "transaction-metadata/") && strings.Count(topic, "/") == 1 { transactionIDHex := strings.Replace(topic, "transaction-metadata/", "", 1) transactionID, err := iotago.TransactionIDFromHexString(transactionIDHex) @@ -169,10 +152,6 @@ func GetTopicTransactionsIncludedBlock(transactionID iotago.TransactionID) strin return strings.ReplaceAll(TopicTransactionsIncludedBlock, ParameterTransactionID, transactionID.ToHex()) } -func GetTopicTransaction(transactionID iotago.TransactionID) string { - return strings.ReplaceAll(TopicTransaction, ParameterTransactionID, transactionID.ToHex()) -} - func GetTopicTransactionMetadata(transactionID iotago.TransactionID) string { return strings.ReplaceAll(TopicTransactionMetadata, ParameterTransactionID, transactionID.ToHex()) } diff --git a/pkg/testsuite/mqtt_test.go b/pkg/testsuite/mqtt_test.go index d147730..a9811ae 100644 --- a/pkg/testsuite/mqtt_test.go +++ b/pkg/testsuite/mqtt_test.go @@ -199,86 +199,6 @@ func TestMqttTopics(t *testing.T) { } }(), - // ok - Transactions - func() *test { - testTx := ts.NewTestTransaction(true) - - return &test{ - name: "ok - Transactions", - topics: []*testTopic{ - { - topic: mqtt.TopicTransactions, - isPollingTarget: false, - isEventTarget: true, - }, - }, - topicsIgnore: []string{ - mqtt.TopicBlocks, - mqtt.TopicBlocksBasic, - mqtt.TopicBlocksBasicTransaction, - mqtt.TopicBlocksBasicTransactionTaggedData, - }, - jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Transaction)), - rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Transaction)), - postSubscribeFunc: func() { - ts.ReceiveBlock(&testsuite.MockedBlock{ - Block: testTx.Block, - RawBlockData: lo.PanicOnErr(ts.API().Encode(testTx.Block)), - }) - }, - } - }(), - - // ok - Transaction - func() *test { - testTx := ts.NewTestTransaction(true) - - return &test{ - name: "ok - Transaction - TransactionTopic", - topics: []*testTopic{ - { - topic: mqtt.GetTopicTransaction(testTx.TransactionID), - isPollingTarget: true, - isEventTarget: true, - }, - }, - topicsIgnore: []string{ - mqtt.GetTopicOutput(testTx.ConsumedOutputID), - mqtt.GetTopicOutput(testTx.OutputID), - mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), - mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), - mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), - mqtt.GetTopicTransactionMetadata(testTx.TransactionID), - }, - jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Transaction)), - rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Transaction)), - preSubscribeFunc: func() { - // we need to add the block to the nodebridge, so that it is available - // for the TransactionsIncludedBlockTopic - ts.MockAddBlock(testTx.BlockID, testTx.Block) - - // we also need to add the first output to the nodebridge, so that it is available. - // this is also used by the TransactionsIncludedBlockTopic to get the blockID of the block containing the transaction of that output - ts.MockAddOutput(testTx.OutputID, testTx.Output) - }, - postSubscribeFunc: func() { - ts.ReceiveAcceptedTransaction(&nodebridge.AcceptedTransaction{ - API: ts.API(), - Slot: testTx.BlockID.Slot(), - TransactionID: testTx.TransactionID, - // the consumed input - Consumed: []*nodebridge.Output{ - ts.NewSpentNodeBridgeOutputFromTransaction(tpkg.RandBlockID(), testTx.ConsumedOutputCreationTransaction, testTx.BlockID.Slot(), testTx.TransactionID), - }, - // the created output - Created: []*nodebridge.Output{ - ts.NewNodeBridgeOutputFromTransaction(testTx.BlockID, testTx.Transaction), - }, - }) - }, - } - }(), - // ok - TransactionMetadata func() *test { testTx := ts.NewTestTransaction(true) @@ -302,18 +222,10 @@ func TestMqttTopics(t *testing.T) { mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), - mqtt.GetTopicTransaction(testTx.TransactionID), }, jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(transactionMetadataResponse)), rawTarget: lo.PanicOnErr(ts.API().Encode(transactionMetadataResponse)), preSubscribeFunc: func() { - // we need to add the block to the nodebridge, so that it is available - // for the TransactionsIncludedBlockTopic - ts.MockAddBlock(testTx.BlockID, testTx.Block) - - // we also need to add the first output to the nodebridge, so that it is available. - // this is also used by the TransactionsIncludedBlockTopic to get the blockID of the block containing the transaction of that output - ts.MockAddOutput(testTx.OutputID, testTx.Output) ts.MockAddTransactionMetadata(transactionMetadataResponse.TransactionID, transactionMetadataResponse) }, postSubscribeFunc: func() { @@ -372,11 +284,9 @@ func TestMqttTopics(t *testing.T) { isEventTarget: true, }, }, - topicsIgnore: []string{ - mqtt.TopicTransactions, - }, - jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Block)), - rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Block)), + topicsIgnore: []string{}, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Block)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Block)), postSubscribeFunc: func() { ts.ReceiveBlock(&testsuite.MockedBlock{ Block: testTx.Block, @@ -404,7 +314,6 @@ func TestMqttTopics(t *testing.T) { mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), - mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicTransactionMetadata(testTx.TransactionID), }, jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Block)), @@ -553,7 +462,6 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), - mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), @@ -601,7 +509,6 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), - mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), @@ -646,7 +553,6 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), - mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), @@ -693,7 +599,6 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), - mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), @@ -739,7 +644,6 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), - mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), @@ -784,7 +688,6 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), - mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), @@ -870,7 +773,6 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), - mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), @@ -937,7 +839,6 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), - mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), @@ -990,7 +891,6 @@ func TestMqttTopics(t *testing.T) { }, topicsIgnore: []string{ mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), - mqtt.GetTopicTransaction(testTx.TransactionID), mqtt.GetTopicTransactionMetadata(testTx.TransactionID), mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), diff --git a/pkg/testsuite/nodebridge_mock.go b/pkg/testsuite/nodebridge_mock.go index 2cee565..c2b05fc 100644 --- a/pkg/testsuite/nodebridge_mock.go +++ b/pkg/testsuite/nodebridge_mock.go @@ -206,7 +206,7 @@ func (m *MockedNodeBridge) TransactionMetadata(ctx context.Context, transactionI return transactionMetadata, nil } - return nil, status.Errorf(codes.NotFound, "transaction %s not found", transactionID.ToHex()) + return nil, status.Errorf(codes.NotFound, "metadata for transaction %s not found", transactionID.ToHex()) } func (m *MockedNodeBridge) Output(ctx context.Context, outputID iotago.OutputID) (*nodebridge.Output, error) {