Skip to content

Commit

Permalink
Remove TopicTransactions and TopicTransaction and fix `TopicTrans…
Browse files Browse the repository at this point in the history
…actionMetadata`
  • Loading branch information
muXxer committed Jan 25, 2024
1 parent d9cbec9 commit ca05227
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 242 deletions.
27 changes: 0 additions & 27 deletions pkg/mqtt/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,6 @@ func (s *Server) publishBlockIfSubscribed(block *iotago.Block, rawData []byte) e
}, blockTopics...)
}

func (s *Server) publishTransactionIfSubscribed(block *iotago.Block) error {
blk, ok := block.Body.(*iotago.BasicBlockBody)
if !ok {
return nil
}

signedTx, ok := blk.Payload.(*iotago.SignedTransaction)
if !ok {
return nil
}

return s.publishWithPayloadFuncsOnTopicsIfSubscribed(func() ([]byte, error) {
return signedTx.Transaction.API.JSONEncode(signedTx.Transaction)
}, func() ([]byte, error) {
return signedTx.Transaction.API.Encode(signedTx.Transaction)
}, TopicTransactions)
}

func (s *Server) publishBlockMetadataOnTopicsIfSubscribed(metadataFunc func() (*iotaapi.BlockMetadataResponse, error), topics ...string) error {
return s.publishPayloadOnTopicsIfSubscribed(
func() (iotago.API, error) { return s.NodeBridge.APIProvider().CommittedAPI(), nil },
Expand Down Expand Up @@ -206,15 +188,6 @@ func (s *Server) publishOutputIfSubscribed(ctx context.Context, output *nodebrid
return output.Metadata.BlockID, nil
},
)

s.fetchAndPublishTransaction(ctx,
output.OutputID.TransactionID(),
func() (iotago.BlockID, error) {
return output.Metadata.BlockID, nil
},
)

s.fetchAndPublishTransactionMetadata(ctx, output.OutputID.TransactionID())
}

bech32HRP := s.NodeBridge.APIProvider().CommittedAPI().ProtocolParameters().Bech32HRP()
Expand Down
105 changes: 9 additions & 96 deletions pkg/mqtt/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st
TopicBlocksBasic,
TopicBlocksBasicTransaction,
TopicBlocksBasicTransactionTaggedData,
TopicBlocksBasicTaggedData,
TopicTransactions:
TopicBlocksBasicTaggedData:
s.startListenIfNeeded(ctx, GrpcListenToBlocks, s.listenToBlocks)

case TopicBlockMetadataAccepted:
Expand Down Expand Up @@ -251,17 +250,13 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st
// topicOutputsByUnlockConditionAndAddress
// topicSpentOutputsByUnlockConditionAndAddress
// topicTransactionsIncludedBlock
// topicTransactions
// topicTransactionMetadata
s.startListenIfNeeded(ctx, GrpcListenToAcceptedTransactions, s.listenToAcceptedTransactions)
s.startListenIfNeeded(ctx, GrpcListenToLedgerUpdates, s.listenToLedgerUpdates)

if transactionID := TransactionIDFromTransactionsIncludedBlockTopic(topic); transactionID != iotago.EmptyTransactionID {
go s.fetchAndPublishTransactionInclusion(ctx, transactionID)
}
if transactionID := TransactionIDFromTransactionTopic(topic); transactionID != iotago.EmptyTransactionID {
go s.fetchAndPublishTransactions(ctx, transactionID)
}
if transactionID := TransactionIDFromTransactionMetadataTopic(topic); transactionID != iotago.EmptyTransactionID {
go s.fetchAndPublishTransactionMetadata(ctx, transactionID)
}
Expand Down Expand Up @@ -289,8 +284,7 @@ func (s *Server) onUnsubscribeTopic(clientID string, topic string) {
TopicBlocksBasic,
TopicBlocksBasicTransaction,
TopicBlocksBasicTransactionTaggedData,
TopicBlocksBasicTaggedData,
TopicTransactions:
TopicBlocksBasicTaggedData:
s.stopListenIfNeeded(GrpcListenToBlocks)

case TopicBlockMetadataAccepted:
Expand Down Expand Up @@ -411,10 +405,6 @@ func (s *Server) listenToBlocks(ctx context.Context) error {
s.LogErrorf("failed to publish block: %v", err)
}

if err := s.publishTransactionIfSubscribed(block); err != nil {
s.LogErrorf("failed to publish block: %v", err)
}

// we don't return an error here, because we want to continue listening even if publishing fails once
return nil
})
Expand Down Expand Up @@ -462,6 +452,8 @@ func (s *Server) listenToAcceptedTransactions(ctx context.Context) error {
}
}

s.fetchAndPublishTransactionMetadata(ctx, payload.TransactionID)

// we don't return an error here, because we want to continue listening even if publishing fails once
return nil
})
Expand All @@ -479,6 +471,11 @@ func (s *Server) listenToLedgerUpdates(ctx context.Context) error {
if err := s.publishOutputIfSubscribed(ctx, created, true); err != nil {
s.LogErrorf("failed to publish created output in ledger update: %v", err)
}

// If this is the first output in a transaction (index 0), then check if someone is observing the transaction that generated this output
if created.OutputID.Index() == 0 {
s.fetchAndPublishTransactionMetadata(ctx, created.OutputID.TransactionID())
}
}

// we don't return an error here, because we want to continue listening even if publishing fails once
Expand Down Expand Up @@ -623,87 +620,3 @@ func (s *Server) fetchAndPublishTransactionInclusionWithBlock(ctx context.Contex
s.LogErrorf("failed to publish transaction inclusion %s: %v", transactionID.ToHex(), err)
}
}

func (s *Server) fetchAndPublishTransactions(ctx context.Context, transactionID iotago.TransactionID) {

var blockID iotago.BlockID
blockIDFunc := func() (iotago.BlockID, error) {
if blockID.Empty() {
// get the output and then the blockID of the transaction that created the output
outputID := iotago.OutputID{}
copy(outputID[:], transactionID[:])

ctxFetch, cancelFetch := context.WithTimeout(ctx, fetchTimeout)
defer cancelFetch()

output, err := s.NodeBridge.Output(ctxFetch, outputID)
if err != nil {
return iotago.EmptyBlockID, ierrors.Wrapf(err, "failed to retrieve output of transaction %s", transactionID.ToHex())
}

return output.Metadata.BlockID, nil
}

return blockID, nil
}

s.fetchAndPublishTransaction(ctx, transactionID, blockIDFunc)
}

func (s *Server) fetchAndPublishTransaction(ctx context.Context, transactionID iotago.TransactionID, blockIDFunc func() (iotago.BlockID, error)) {
ctxFetch, cancelFetch := context.WithTimeout(ctx, fetchTimeout)
defer cancelFetch()

var tx *iotago.Transaction
txFunc := func() (*iotago.Transaction, error) {
if tx != nil {
return tx, nil
}

blockID, err := blockIDFunc()
if err != nil {
return nil, err
}

resp, err := s.NodeBridge.Block(ctxFetch, blockID)
if err != nil {
s.LogErrorf("failed to retrieve block %s :%v", blockID.ToHex(), err)
return nil, err
}

basicBlk, ok := resp.Body.(*iotago.BasicBlockBody)
if !ok {
err := ierrors.Errorf("block body is not basic block %s ", blockID.ToHex())
s.LogError(err.Error())
return nil, err
}

signedTx, ok := basicBlk.Payload.(*iotago.SignedTransaction)
if !ok {
err := ierrors.Errorf("block payload is not signed transaction %s ", blockID.ToHex())
s.LogError(err.Error())
return nil, err
}

tx = signedTx.Transaction

return tx, nil
}

if err := s.publishPayloadOnTopicsIfSubscribed(
func() (iotago.API, error) {
tx, err := txFunc()
if err != nil {
return nil, err
}

return tx.API, nil
},
func() (any, error) {
return txFunc()
},
GetTopicTransaction(transactionID),
); err != nil {
s.LogErrorf("failed to publish transaction %s: %v", transactionID.ToHex(), err)
}
}
23 changes: 1 addition & 22 deletions pkg/mqtt/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ const (
TopicBlocksBasicTransactionTaggedData = "blocks/basic/transaction/tagged-data" // iotago.Block (track all incoming basic blocks with transactions and tagged data)
TopicBlocksBasicTransactionTaggedDataTag = "blocks/basic/transaction/tagged-data/" + ParameterTag // iotago.Block (track all incoming basic blocks with transactions and specific tagged data)

TopicTransactions = "transactions" // iotago.Transaction (track all incoming transactions)
// single block on subscribe and changes in it's metadata (accepted, confirmed).
TopicTransactionsIncludedBlock = "transactions/" + ParameterTransactionID + "/included-block" // api.BlockWithMetadataResponse (track inclusion of a single transaction)
TopicTransaction = "transactions/" + ParameterTransactionID // iotago.Transaction (track a specific transaction)
TopicTransactionMetadata = "transaction-metadata/" + ParameterTransactionID // api.TransactionMetadataResponse (track a specific transaction)

// single block on subscribe and changes in it's metadata (accepted, confirmed).
Expand Down Expand Up @@ -105,23 +103,8 @@ func TransactionIDFromTransactionsIncludedBlockTopic(topic string) iotago.Transa
return iotago.EmptyTransactionID
}

func TransactionIDFromTransactionTopic(topic string) iotago.TransactionID {
if strings.HasPrefix(topic, "transactions/") && !strings.HasSuffix(topic, "/included-block") {
transactionIDHex := strings.Replace(topic, "transactions/", "", 1)

transactionID, err := iotago.TransactionIDFromHexString(transactionIDHex)
if err != nil || len(transactionID) != iotago.TransactionIDLength {
return iotago.EmptyTransactionID
}

return transactionID
}

return iotago.EmptyTransactionID
}

func TransactionIDFromTransactionMetadataTopic(topic string) iotago.TransactionID {
if strings.HasPrefix(topic, "transaction-metadata/") {
if strings.HasPrefix(topic, "transaction-metadata/") && strings.Count(topic, "/") == 1 {
transactionIDHex := strings.Replace(topic, "transaction-metadata/", "", 1)

transactionID, err := iotago.TransactionIDFromHexString(transactionIDHex)
Expand Down Expand Up @@ -169,10 +152,6 @@ func GetTopicTransactionsIncludedBlock(transactionID iotago.TransactionID) strin
return strings.ReplaceAll(TopicTransactionsIncludedBlock, ParameterTransactionID, transactionID.ToHex())
}

func GetTopicTransaction(transactionID iotago.TransactionID) string {
return strings.ReplaceAll(TopicTransaction, ParameterTransactionID, transactionID.ToHex())
}

func GetTopicTransactionMetadata(transactionID iotago.TransactionID) string {
return strings.ReplaceAll(TopicTransactionMetadata, ParameterTransactionID, transactionID.ToHex())
}
Expand Down
Loading

0 comments on commit ca05227

Please sign in to comment.