diff --git a/pkg/mqtt/publish.go b/pkg/mqtt/publish.go index 12e5c75..a8f58fa 100644 --- a/pkg/mqtt/publish.go +++ b/pkg/mqtt/publish.go @@ -182,7 +182,7 @@ func (s *Server) publishOutputIfSubscribed(ctx context.Context, output *nodebrid if publishOnAllTopics { // If this is the first output in a transaction (index 0), then check if someone is observing the transaction that generated this output if output.Metadata.Spent == nil && output.OutputID.Index() == 0 { - s.fetchAndPublishTransactionInclusionWithBlock(ctx, + s.fetchAndPublishTransactionInclusionBlockMetadataWithBlockID(ctx, output.OutputID.TransactionID(), func() (iotago.BlockID, error) { return output.Metadata.BlockID, nil diff --git a/pkg/mqtt/server.go b/pkg/mqtt/server.go index 02812cc..a968fe4 100644 --- a/pkg/mqtt/server.go +++ b/pkg/mqtt/server.go @@ -249,13 +249,13 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st // topicNFTOutputs // topicOutputsByUnlockConditionAndAddress // topicSpentOutputsByUnlockConditionAndAddress - // topicTransactionsIncludedBlock + // topicTransactionsIncludedBlockMetadata // topicTransactionMetadata s.startListenIfNeeded(ctx, GrpcListenToAcceptedTransactions, s.listenToAcceptedTransactions) s.startListenIfNeeded(ctx, GrpcListenToLedgerUpdates, s.listenToLedgerUpdates) - if transactionID := TransactionIDFromTransactionsIncludedBlockTopic(topic); transactionID != iotago.EmptyTransactionID { - go s.fetchAndPublishTransactionInclusion(ctx, transactionID) + if transactionID := TransactionIDFromTransactionsIncludedBlockMetadataTopic(topic); transactionID != iotago.EmptyTransactionID { + go s.fetchAndPublishTransactionInclusionBlockMetadata(ctx, transactionID) } if transactionID := TransactionIDFromTransactionMetadataTopic(topic); transactionID != iotago.EmptyTransactionID { go s.fetchAndPublishTransactionMetadata(ctx, transactionID) @@ -314,7 +314,7 @@ func (s *Server) onUnsubscribeTopic(clientID string, topic string) { // topicNFTOutputs // topicOutputsByUnlockConditionAndAddress // topicSpentOutputsByUnlockConditionAndAddress - // topicTransactionsIncludedBlock + // topicTransactionsIncludedBlockMetadata // topicTransactionMetadata s.stopListenIfNeeded(GrpcListenToAcceptedTransactions) s.stopListenIfNeeded(GrpcListenToLedgerUpdates) @@ -553,7 +553,7 @@ func (s *Server) fetchAndPublishTransactionMetadata(ctx context.Context, transac } } -func (s *Server) fetchAndPublishTransactionInclusion(ctx context.Context, transactionID iotago.TransactionID) { +func (s *Server) fetchAndPublishTransactionInclusionBlockMetadata(ctx context.Context, transactionID iotago.TransactionID) { var blockID iotago.BlockID blockIDFunc := func() (iotago.BlockID, error) { @@ -576,17 +576,17 @@ func (s *Server) fetchAndPublishTransactionInclusion(ctx context.Context, transa return blockID, nil } - s.fetchAndPublishTransactionInclusionWithBlock(ctx, transactionID, blockIDFunc) + s.fetchAndPublishTransactionInclusionBlockMetadataWithBlockID(ctx, transactionID, blockIDFunc) } -func (s *Server) fetchAndPublishTransactionInclusionWithBlock(ctx context.Context, transactionID iotago.TransactionID, blockIDFunc func() (iotago.BlockID, error)) { +func (s *Server) fetchAndPublishTransactionInclusionBlockMetadataWithBlockID(ctx context.Context, transactionID iotago.TransactionID, blockIDFunc func() (iotago.BlockID, error)) { ctxFetch, cancelFetch := context.WithTimeout(ctx, fetchTimeout) defer cancelFetch() - var block *iotago.Block - blockFunc := func() (*iotago.Block, error) { - if block != nil { - return block, nil + var blockMetadata *api.BlockMetadataResponse + if err := s.publishBlockMetadataOnTopicsIfSubscribed(func() (*api.BlockMetadataResponse, error) { + if blockMetadata != nil { + return blockMetadata, nil } blockID, err := blockIDFunc() @@ -594,30 +594,14 @@ func (s *Server) fetchAndPublishTransactionInclusionWithBlock(ctx context.Contex return nil, err } - resp, err := s.NodeBridge.Block(ctxFetch, blockID) + resp, err := s.NodeBridge.BlockMetadata(ctxFetch, blockID) if err != nil { - s.LogErrorf("failed to retrieve block %s :%v", blockID.ToHex(), err) - return nil, err + return nil, ierrors.Wrapf(err, "failed to retrieve block metadata %s", blockID.ToHex()) } - block = resp + blockMetadata = resp - return block, nil - } - - if err := s.publishPayloadOnTopicsIfSubscribed( - func() (iotago.API, error) { - block, err := blockFunc() - if err != nil { - return nil, err - } - - return block.API, nil - }, - func() (any, error) { - return blockFunc() - }, - GetTopicTransactionsIncludedBlock(transactionID), - ); err != nil { + return blockMetadata, nil + }, GetTopicTransactionsIncludedBlockMetadata(transactionID)); err != nil { s.LogErrorf("failed to publish transaction inclusion %s: %v", transactionID.ToHex(), err) } } diff --git a/pkg/mqtt/topics.go b/pkg/mqtt/topics.go index c14c492..213516d 100644 --- a/pkg/mqtt/topics.go +++ b/pkg/mqtt/topics.go @@ -23,10 +23,10 @@ func BlockIDFromBlockMetadataTopic(topic string) iotago.BlockID { return iotago.EmptyBlockID } -func TransactionIDFromTransactionsIncludedBlockTopic(topic string) iotago.TransactionID { - if strings.HasPrefix(topic, "transactions/") && strings.HasSuffix(topic, "/included-block") { +func TransactionIDFromTransactionsIncludedBlockMetadataTopic(topic string) iotago.TransactionID { + if strings.HasPrefix(topic, "transactions/") && strings.HasSuffix(topic, "/included-block-metadata") { transactionIDHex := strings.Replace(topic, "transactions/", "", 1) - transactionIDHex = strings.Replace(transactionIDHex, "/included-block", "", 1) + transactionIDHex = strings.Replace(transactionIDHex, "/included-block-metadata", "", 1) transactionID, err := iotago.TransactionIDFromHexString(transactionIDHex) if err != nil || len(transactionID) != iotago.TransactionIDLength { diff --git a/pkg/testsuite/mqtt_test.go b/pkg/testsuite/mqtt_test.go index 482f607..53ab64b 100644 --- a/pkg/testsuite/mqtt_test.go +++ b/pkg/testsuite/mqtt_test.go @@ -221,7 +221,7 @@ func TestMqttTopics(t *testing.T) { mqtt.GetTopicOutput(testTx.OutputID), mqtt.GetTopicOutputsByUnlockConditionAndAddress(api.EventAPIUnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), mqtt.GetTopicOutputsByUnlockConditionAndAddress(api.EventAPIUnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), - mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicTransactionsIncludedBlockMetadata(testTx.TransactionID), }, jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(transactionMetadataResponse)), rawTarget: lo.PanicOnErr(ts.API().Encode(transactionMetadataResponse)), @@ -296,15 +296,26 @@ func TestMqttTopics(t *testing.T) { } }(), - // ok - Basic block with transaction and tagged data payload - TransactionsIncludedBlockTopic + // ok - Basic block with transaction and tagged data payload - TransactionsIncludedBlockMetadata func() *test { testTx := ts.NewTestTransaction(true) + blockMetadataResponse := &api.BlockMetadataResponse{ + BlockID: testTx.BlockID, + BlockState: api.BlockStateAccepted, + BlockFailureReason: api.BlockFailureNone, + TransactionMetadata: &api.TransactionMetadataResponse{ + TransactionID: testTx.TransactionID, + TransactionState: api.TransactionStateFailed, + TransactionFailureReason: api.TxFailureBICInputReferenceInvalid, + }, + } + return &test{ - name: "ok - Basic block with transaction and tagged data payload - TransactionsIncludedBlockTopic", + name: "ok - Basic block with transaction and tagged data payload - TransactionsIncludedBlockMetadata", topics: []*testTopic{ { - topic: mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + topic: mqtt.GetTopicTransactionsIncludedBlockMetadata(testTx.TransactionID), isPollingTarget: true, isEventTarget: true, }, @@ -312,19 +323,19 @@ func TestMqttTopics(t *testing.T) { topicsIgnore: []string{ mqtt.GetTopicOutput(testTx.ConsumedOutputID), mqtt.GetTopicOutput(testTx.OutputID), - mqtt.GetTopicOutputsByUnlockConditionAndAddress(api.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), - mqtt.GetTopicOutputsByUnlockConditionAndAddress(api.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(api.EventAPIUnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(api.EventAPIUnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), mqtt.GetTopicTransactionMetadata(testTx.TransactionID), }, - jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Block)), - rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Block)), + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(blockMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(blockMetadataResponse)), preSubscribeFunc: func() { // we need to add the block to the nodebridge, so that it is available - // for the TransactionsIncludedBlockTopic - ts.MockAddBlock(testTx.BlockID, testTx.Block) + // for the TransactionsIncludedBlockMetadataTopic + ts.MockAddBlockMetadata(blockMetadataResponse.BlockID, blockMetadataResponse) // we also need to add the first output to the nodebridge, so that it is available. - // this is also used by the TransactionsIncludedBlockTopic to get the blockID of the block containing the transaction of that output + // this is also used by the TransactionsIncludedBlockMetadataTopic to get the blockID of the block containing the transaction of that output ts.MockAddOutput(testTx.OutputID, testTx.Output) }, postSubscribeFunc: func() {