Skip to content

Commit

Permalink
Publish block metadata instead of block in transaction inclusion topic
Browse files Browse the repository at this point in the history
  • Loading branch information
muXxer committed Feb 13, 2024
1 parent e4f35c5 commit a826e7e
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pkg/mqtt/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (s *Server) publishOutputIfSubscribed(ctx context.Context, output *nodebrid
if publishOnAllTopics {
// If this is the first output in a transaction (index 0), then check if someone is observing the transaction that generated this output
if output.Metadata.Spent == nil && output.OutputID.Index() == 0 {
s.fetchAndPublishTransactionInclusionWithBlock(ctx,
s.fetchAndPublishTransactionInclusionBlockMetadataWithBlockID(ctx,
output.OutputID.TransactionID(),
func() (iotago.BlockID, error) {
return output.Metadata.BlockID, nil
Expand Down
48 changes: 16 additions & 32 deletions pkg/mqtt/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st
// topicNFTOutputs
// topicOutputsByUnlockConditionAndAddress
// topicSpentOutputsByUnlockConditionAndAddress
// topicTransactionsIncludedBlock
// topicTransactionsIncludedBlockMetadata
// topicTransactionMetadata
s.startListenIfNeeded(ctx, GrpcListenToAcceptedTransactions, s.listenToAcceptedTransactions)
s.startListenIfNeeded(ctx, GrpcListenToLedgerUpdates, s.listenToLedgerUpdates)

if transactionID := TransactionIDFromTransactionsIncludedBlockTopic(topic); transactionID != iotago.EmptyTransactionID {
go s.fetchAndPublishTransactionInclusion(ctx, transactionID)
if transactionID := TransactionIDFromTransactionsIncludedBlockMetadataTopic(topic); transactionID != iotago.EmptyTransactionID {
go s.fetchAndPublishTransactionInclusionBlockMetadata(ctx, transactionID)
}
if transactionID := TransactionIDFromTransactionMetadataTopic(topic); transactionID != iotago.EmptyTransactionID {
go s.fetchAndPublishTransactionMetadata(ctx, transactionID)
Expand Down Expand Up @@ -314,7 +314,7 @@ func (s *Server) onUnsubscribeTopic(clientID string, topic string) {
// topicNFTOutputs
// topicOutputsByUnlockConditionAndAddress
// topicSpentOutputsByUnlockConditionAndAddress
// topicTransactionsIncludedBlock
// topicTransactionsIncludedBlockMetadata
// topicTransactionMetadata
s.stopListenIfNeeded(GrpcListenToAcceptedTransactions)
s.stopListenIfNeeded(GrpcListenToLedgerUpdates)
Expand Down Expand Up @@ -553,7 +553,7 @@ func (s *Server) fetchAndPublishTransactionMetadata(ctx context.Context, transac
}
}

func (s *Server) fetchAndPublishTransactionInclusion(ctx context.Context, transactionID iotago.TransactionID) {
func (s *Server) fetchAndPublishTransactionInclusionBlockMetadata(ctx context.Context, transactionID iotago.TransactionID) {

var blockID iotago.BlockID
blockIDFunc := func() (iotago.BlockID, error) {
Expand All @@ -576,48 +576,32 @@ func (s *Server) fetchAndPublishTransactionInclusion(ctx context.Context, transa
return blockID, nil
}

s.fetchAndPublishTransactionInclusionWithBlock(ctx, transactionID, blockIDFunc)
s.fetchAndPublishTransactionInclusionBlockMetadataWithBlockID(ctx, transactionID, blockIDFunc)
}

func (s *Server) fetchAndPublishTransactionInclusionWithBlock(ctx context.Context, transactionID iotago.TransactionID, blockIDFunc func() (iotago.BlockID, error)) {
func (s *Server) fetchAndPublishTransactionInclusionBlockMetadataWithBlockID(ctx context.Context, transactionID iotago.TransactionID, blockIDFunc func() (iotago.BlockID, error)) {
ctxFetch, cancelFetch := context.WithTimeout(ctx, fetchTimeout)
defer cancelFetch()

var block *iotago.Block
blockFunc := func() (*iotago.Block, error) {
if block != nil {
return block, nil
var blockMetadata *api.BlockMetadataResponse
if err := s.publishBlockMetadataOnTopicsIfSubscribed(func() (*api.BlockMetadataResponse, error) {
if blockMetadata != nil {
return blockMetadata, nil
}

blockID, err := blockIDFunc()
if err != nil {
return nil, err
}

resp, err := s.NodeBridge.Block(ctxFetch, blockID)
resp, err := s.NodeBridge.BlockMetadata(ctxFetch, blockID)
if err != nil {
s.LogErrorf("failed to retrieve block %s :%v", blockID.ToHex(), err)
return nil, err
return nil, ierrors.Wrapf(err, "failed to retrieve block metadata %s", blockID.ToHex())
}
block = resp
blockMetadata = resp

return block, nil
}

if err := s.publishPayloadOnTopicsIfSubscribed(
func() (iotago.API, error) {
block, err := blockFunc()
if err != nil {
return nil, err
}

return block.API, nil
},
func() (any, error) {
return blockFunc()
},
GetTopicTransactionsIncludedBlock(transactionID),
); err != nil {
return blockMetadata, nil
}, GetTopicTransactionsIncludedBlockMetadata(transactionID)); err != nil {
s.LogErrorf("failed to publish transaction inclusion %s: %v", transactionID.ToHex(), err)
}
}
6 changes: 3 additions & 3 deletions pkg/mqtt/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ func BlockIDFromBlockMetadataTopic(topic string) iotago.BlockID {
return iotago.EmptyBlockID
}

func TransactionIDFromTransactionsIncludedBlockTopic(topic string) iotago.TransactionID {
if strings.HasPrefix(topic, "transactions/") && strings.HasSuffix(topic, "/included-block") {
func TransactionIDFromTransactionsIncludedBlockMetadataTopic(topic string) iotago.TransactionID {
if strings.HasPrefix(topic, "transactions/") && strings.HasSuffix(topic, "/included-block-metadata") {
transactionIDHex := strings.Replace(topic, "transactions/", "", 1)
transactionIDHex = strings.Replace(transactionIDHex, "/included-block", "", 1)
transactionIDHex = strings.Replace(transactionIDHex, "/included-block-metadata", "", 1)

transactionID, err := iotago.TransactionIDFromHexString(transactionIDHex)
if err != nil || len(transactionID) != iotago.TransactionIDLength {
Expand Down
33 changes: 22 additions & 11 deletions pkg/testsuite/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestMqttTopics(t *testing.T) {
mqtt.GetTopicOutput(testTx.OutputID),
mqtt.GetTopicOutputsByUnlockConditionAndAddress(api.EventAPIUnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()),
mqtt.GetTopicOutputsByUnlockConditionAndAddress(api.EventAPIUnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()),
mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID),
mqtt.GetTopicTransactionsIncludedBlockMetadata(testTx.TransactionID),
},
jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(transactionMetadataResponse)),
rawTarget: lo.PanicOnErr(ts.API().Encode(transactionMetadataResponse)),
Expand Down Expand Up @@ -296,35 +296,46 @@ func TestMqttTopics(t *testing.T) {
}
}(),

// ok - Basic block with transaction and tagged data payload - TransactionsIncludedBlockTopic
// ok - Basic block with transaction and tagged data payload - TransactionsIncludedBlockMetadata
func() *test {
testTx := ts.NewTestTransaction(true)

blockMetadataResponse := &api.BlockMetadataResponse{
BlockID: testTx.BlockID,
BlockState: api.BlockStateAccepted,
BlockFailureReason: api.BlockFailureNone,
TransactionMetadata: &api.TransactionMetadataResponse{
TransactionID: testTx.TransactionID,
TransactionState: api.TransactionStateFailed,
TransactionFailureReason: api.TxFailureBICInputReferenceInvalid,
},
}

return &test{
name: "ok - Basic block with transaction and tagged data payload - TransactionsIncludedBlockTopic",
name: "ok - Basic block with transaction and tagged data payload - TransactionsIncludedBlockMetadata",
topics: []*testTopic{
{
topic: mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID),
topic: mqtt.GetTopicTransactionsIncludedBlockMetadata(testTx.TransactionID),
isPollingTarget: true,
isEventTarget: true,
},
},
topicsIgnore: []string{
mqtt.GetTopicOutput(testTx.ConsumedOutputID),
mqtt.GetTopicOutput(testTx.OutputID),
mqtt.GetTopicOutputsByUnlockConditionAndAddress(api.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()),
mqtt.GetTopicOutputsByUnlockConditionAndAddress(api.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()),
mqtt.GetTopicOutputsByUnlockConditionAndAddress(api.EventAPIUnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()),
mqtt.GetTopicOutputsByUnlockConditionAndAddress(api.EventAPIUnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()),
mqtt.GetTopicTransactionMetadata(testTx.TransactionID),
},
jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Block)),
rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Block)),
jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(blockMetadataResponse)),
rawTarget: lo.PanicOnErr(ts.API().Encode(blockMetadataResponse)),
preSubscribeFunc: func() {
// we need to add the block to the nodebridge, so that it is available
// for the TransactionsIncludedBlockTopic
ts.MockAddBlock(testTx.BlockID, testTx.Block)
// for the TransactionsIncludedBlockMetadataTopic
ts.MockAddBlockMetadata(blockMetadataResponse.BlockID, blockMetadataResponse)

// we also need to add the first output to the nodebridge, so that it is available.
// this is also used by the TransactionsIncludedBlockTopic to get the blockID of the block containing the transaction of that output
// this is also used by the TransactionsIncludedBlockMetadataTopic to get the blockID of the block containing the transaction of that output
ts.MockAddOutput(testTx.OutputID, testTx.Output)
},
postSubscribeFunc: func() {
Expand Down

0 comments on commit a826e7e

Please sign in to comment.