diff --git a/pkg/testsuite/broker_mock.go b/pkg/testsuite/broker_mock.go new file mode 100644 index 0000000..30f1018 --- /dev/null +++ b/pkg/testsuite/broker_mock.go @@ -0,0 +1,176 @@ +//nolint:revive // skip linter for this package name +package testsuite + +import ( + "sync" + "testing" + + "github.com/mochi-co/mqtt/server/system" + "github.com/stretchr/testify/require" + + "github.com/iotaledger/hive.go/web/subscriptionmanager" + "github.com/iotaledger/inx-mqtt/pkg/broker" +) + +type MockedBroker struct { + t *testing.T + + hasSubscribersCallback func(topic string) + subscriptionmanager *subscriptionmanager.SubscriptionManager[string, string] + + mockedSubscribedTopicsAndClientsLock sync.RWMutex + mockedSubscribedTopicsAndClients map[string]map[string]func(topic string, payload []byte) +} + +var _ broker.Broker = &MockedBroker{} + +func NewMockedBroker(t *testing.T) *MockedBroker { + t.Helper() + + broker := &MockedBroker{ + t: t, + hasSubscribersCallback: nil, + subscriptionmanager: subscriptionmanager.New[string, string](), + mockedSubscribedTopicsAndClients: make(map[string]map[string]func(topic string, payload []byte)), + } + + return broker +} + +// +// Broker interface +// + +func (m *MockedBroker) Events() *subscriptionmanager.Events[string, string] { + return m.subscriptionmanager.Events() +} + +func (m *MockedBroker) Start() error { + return nil +} + +func (m *MockedBroker) Stop() error { + return nil +} + +func (m *MockedBroker) HasSubscribers(topic string) bool { + // this callback is used in the testsuite to check if a message is + // about to be sent to a topic that was not expected to have subscribers + if m.hasSubscribersCallback != nil { + m.hasSubscribersCallback(topic) + } + + return m.subscriptionmanager.TopicHasSubscribers(topic) +} + +func (m *MockedBroker) Send(topic string, payload []byte) error { + m.mockedSubscribedTopicsAndClientsLock.RLock() + defer m.mockedSubscribedTopicsAndClientsLock.RUnlock() + + if _, ok := m.mockedSubscribedTopicsAndClients[topic]; ok { + // send to all subscribers + for _, callback := range m.mockedSubscribedTopicsAndClients[topic] { + if callback != nil { + callback(topic, payload) + } + } + } + + return nil +} + +func (m *MockedBroker) SystemInfo() *system.Info { + panic("not implemented") +} + +func (m *MockedBroker) SubscribersSize() int { + return m.subscriptionmanager.SubscribersSize() +} + +func (m *MockedBroker) TopicsSize() int { + return m.subscriptionmanager.TopicsSize() +} + +// +// Mock functions +// + +func (m *MockedBroker) MockClear() { + m.hasSubscribersCallback = nil + + // we can't replace the subscriptionmanager, otherwise the events will not be wired correctly + // so we need to manually disconnect all clients and remove all subscriptions + clientIDs := make(map[string]struct{}) + for _, clients := range m.mockedSubscribedTopicsAndClients { + for clientID := range clients { + clientIDs[clientID] = struct{}{} + } + } + + for clientID := range clientIDs { + m.MockClientDisconnected(clientID) + } + require.Equal(m.t, m.subscriptionmanager.TopicsSize(), 0, "topics not empty") + require.Equal(m.t, m.subscriptionmanager.SubscribersSize(), 0, "subscribers not empty") + + m.mockedSubscribedTopicsAndClients = make(map[string]map[string]func(topic string, payload []byte)) +} +func (m *MockedBroker) MockSetHasSubscribersCallback(hasSubscribersCallback func(topic string)) { + m.hasSubscribersCallback = hasSubscribersCallback +} + +func (m *MockedBroker) MockClientConnected(clientID string) { + m.subscriptionmanager.Connect(clientID) +} + +func (m *MockedBroker) MockClientDisconnected(clientID string) { + m.mockedSubscribedTopicsAndClientsLock.Lock() + defer m.mockedSubscribedTopicsAndClientsLock.Unlock() + + if !m.subscriptionmanager.Disconnect(clientID) { + require.FailNow(m.t, "client was not connected") + return + } + + // client was disconnected, so we need to remove all subscriptions + for topic, clients := range m.mockedSubscribedTopicsAndClients { + if _, exists := clients[clientID]; exists { + delete(clients, clientID) + if len(clients) == 0 { + delete(m.mockedSubscribedTopicsAndClients, topic) + } + } + } +} + +func (m *MockedBroker) MockTopicSubscribed(clientID string, topic string, callback func(topic string, payload []byte)) { + m.mockedSubscribedTopicsAndClientsLock.Lock() + defer m.mockedSubscribedTopicsAndClientsLock.Unlock() + + if !m.subscriptionmanager.Subscribe(clientID, topic) { + require.FailNow(m.t, "subscription failed") + return + } + + // topic was subscribed, so we need to add the callback + if _, ok := m.mockedSubscribedTopicsAndClients[topic]; !ok { + m.mockedSubscribedTopicsAndClients[topic] = make(map[string]func(topic string, payload []byte)) + } + m.mockedSubscribedTopicsAndClients[topic][clientID] = callback +} + +func (m *MockedBroker) MockTopicUnsubscribed(clientID string, topic string) { + m.mockedSubscribedTopicsAndClientsLock.Lock() + defer m.mockedSubscribedTopicsAndClientsLock.Unlock() + + if !m.subscriptionmanager.Unsubscribe(clientID, topic) { + require.FailNow(m.t, "unsubscription failed") + return + } + + // topic was unsubscribed, so we need to remove the callback + delete(m.mockedSubscribedTopicsAndClients[topic], clientID) + if len(m.mockedSubscribedTopicsAndClients[topic]) == 0 { + delete(m.mockedSubscribedTopicsAndClients, topic) + } +} diff --git a/pkg/testsuite/mqtt_test.go b/pkg/testsuite/mqtt_test.go new file mode 100644 index 0000000..77394c0 --- /dev/null +++ b/pkg/testsuite/mqtt_test.go @@ -0,0 +1,981 @@ +//nolint:forcetypeassert,scopelint,goconst,dupl +package testsuite_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/iotaledger/hive.go/lo" + "github.com/iotaledger/inx-app/pkg/nodebridge" + "github.com/iotaledger/inx-mqtt/pkg/mqtt" + "github.com/iotaledger/inx-mqtt/pkg/testsuite" + inx "github.com/iotaledger/inx/go" + iotago "github.com/iotaledger/iota.go/v4" + "github.com/iotaledger/iota.go/v4/api" + "github.com/iotaledger/iota.go/v4/tpkg" +) + +func TestMqttTopics(t *testing.T) { + ts := testsuite.NewTestSuite(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts.Run(ctx) + + type testTopic struct { + // the topic to subscribe to + topic string + // indicates that the message is received during the subscription to the topic. + // this is used to test the "polling mode". + isPollingTarget bool + // indicates that the message is received after the subscription to the topic. + // this is used to test the "event driven mode". + isEventTarget bool + } + + type test struct { + // the name of the test + name string + // the topics the test should subscribe to ("/raw" topics will be checked automatically) + topics []*testTopic + // the topics that should be ignored by the test (it's legit to receive messages on these topics) + topicsIgnore []string + // the expected JSON result received by the client on the subscribed topic + jsonTarget []byte + // the expected raw result received by the client on the subscribed topic + rawTarget []byte + // the function is called by the test before the MQTT topic is subscribed to (e.g. to inject test data) + preSubscribeFunc func() + // the function is called by the test after the MQTT topic is subscribed to (e.g. to inject test data) + postSubscribeFunc func() + } + + tests := []*test{ + + // ok - LatestCommitment + func() *test { + commitment := tpkg.RandCommitment() + + return &test{ + name: "ok - LatestCommitment", + topics: []*testTopic{ + { + topic: mqtt.TopicCommitmentsLatest, + // we receive the topic once during the subscription + // and a second time when the commitment is received + isPollingTarget: true, + isEventTarget: true, + }, + }, + topicsIgnore: []string{}, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(commitment)), + rawTarget: lo.PanicOnErr(ts.API().Encode(commitment)), + preSubscribeFunc: func() { + ts.SetLatestCommitment(&nodebridge.Commitment{ + CommitmentID: commitment.MustID(), + Commitment: commitment, + }) + }, + postSubscribeFunc: func() { + ts.ReceiveLatestCommitment(&nodebridge.Commitment{ + CommitmentID: commitment.MustID(), + Commitment: commitment, + }) + }, + } + }(), + + // ok - LatestFinalizedCommitment + func() *test { + commitment := tpkg.RandCommitment() + + return &test{ + name: "ok - LatestFinalizedCommitment", + topics: []*testTopic{ + { + topic: mqtt.TopicCommitmentsFinalized, + // we receive the topic once during the subscription + // and a second time when the commitment is received + isPollingTarget: true, + isEventTarget: true, + }, + }, + topicsIgnore: []string{}, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(commitment)), + rawTarget: lo.PanicOnErr(ts.API().Encode(commitment)), + preSubscribeFunc: func() { + ts.SetLatestFinalizedCommitment(&nodebridge.Commitment{ + CommitmentID: commitment.MustID(), + Commitment: commitment, + }) + }, + postSubscribeFunc: func() { + ts.ReceiveLatestFinalizedCommitment(&nodebridge.Commitment{ + CommitmentID: commitment.MustID(), + Commitment: commitment, + }) + }, + } + }(), + + // ok - Validation block + func() *test { + block := tpkg.RandBlock(tpkg.RandValidationBlockBody(ts.API()), ts.API(), iotago.Mana(500)) + + return &test{ + name: "ok - Validation block", + topics: []*testTopic{ + { + topic: mqtt.TopicBlocks, + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.TopicBlocksValidation, + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{}, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(block)), + rawTarget: lo.PanicOnErr(ts.API().Encode(block)), + postSubscribeFunc: func() { + ts.ReceiveBlock(&testsuite.MockedBlock{ + Block: block, + RawBlockData: lo.PanicOnErr(ts.API().Encode(block)), + }) + }, + } + }(), + + // ok - Basic block with tagged data + func() *test { + block := tpkg.RandBlock( + tpkg.RandBasicBlockBodyWithPayload(ts.API(), + &iotago.TaggedData{ + Tag: []byte("my tagged data payload"), + Data: []byte("some nice data"), + }, + ), ts.API(), iotago.Mana(500)) + + return &test{ + name: "ok - Basic block with tagged data", + topics: []*testTopic{ + { + topic: mqtt.TopicBlocks, + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.TopicBlocksBasic, + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.TopicBlocksBasicTaggedData, + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.GetTopicBlocksBasicTaggedDataTag([]byte("my tagged data payload")), + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{}, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(block)), + rawTarget: lo.PanicOnErr(ts.API().Encode(block)), + postSubscribeFunc: func() { + ts.ReceiveBlock(&testsuite.MockedBlock{ + Block: block, + RawBlockData: lo.PanicOnErr(ts.API().Encode(block)), + }) + }, + } + }(), + + // ok - Basic block with transaction and tagged data payload + func() *test { + testTx := ts.NewTestTransaction(true, tpkg.WithTxEssencePayload( + &iotago.TaggedData{ + Tag: []byte("my tagged data payload"), + Data: []byte("some nice data"), + }, + )) + + return &test{ + name: "ok - Basic block with transaction and tagged data payload", + topics: []*testTopic{ + { + topic: mqtt.TopicBlocks, + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.TopicBlocksBasic, + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.TopicBlocksBasicTransaction, + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.TopicBlocksBasicTransactionTaggedData, + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.GetTopicBlocksBasicTransactionTaggedDataTag([]byte("my tagged data payload")), + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{}, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Block)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Block)), + postSubscribeFunc: func() { + ts.ReceiveBlock(&testsuite.MockedBlock{ + Block: testTx.Block, + RawBlockData: lo.PanicOnErr(ts.API().Encode(testTx.Block)), + }) + }, + } + }(), + + // ok - Basic block with transaction and tagged data payload - TransactionsIncludedBlockTopic + func() *test { + testTx := ts.NewTestTransaction(true) + + return &test{ + name: "ok - Basic block with transaction and tagged data payload - TransactionsIncludedBlockTopic", + topics: []*testTopic{ + { + topic: mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + isPollingTarget: true, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicOutput(testTx.ConsumedOutputID), + mqtt.GetTopicOutput(testTx.OutputID), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.Block)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.Block)), + preSubscribeFunc: func() { + // we need to add the block to the nodebridge, so that it is available + // for the TransactionsIncludedBlockTopic + ts.MockAddBlock(testTx.BlockID, testTx.Block) + + // we also need to add the first output to the nodebridge, so that it is available. + // this is also used by the TransactionsIncludedBlockTopic to get the blockID of the block containing the transaction of that output + ts.MockAddOutput(testTx.OutputID, testTx.Output) + }, + postSubscribeFunc: func() { + ts.ReceiveAcceptedTransaction(&nodebridge.AcceptedTransaction{ + API: ts.API(), + Slot: testTx.BlockID.Slot(), + TransactionID: testTx.TransactionID, + // the consumed input + Consumed: []*nodebridge.Output{ + ts.NewSpentNodeBridgeOutputFromTransaction(tpkg.RandBlockID(), testTx.ConsumedOutputCreationTransaction, testTx.BlockID.Slot(), testTx.TransactionID), + }, + // the created output + Created: []*nodebridge.Output{ + ts.NewNodeBridgeOutputFromTransaction(testTx.BlockID, testTx.Transaction), + }, + }) + }, + } + }(), + + // ok - Basic block with tagged data - TopicBlockMetadata + func() *test { + blockMetadataResponse := &api.BlockMetadataResponse{ + BlockID: tpkg.RandBlockID(), + BlockState: api.BlockStateAccepted, + BlockFailureReason: api.BlockFailureNone, + TransactionMetadata: &api.TransactionMetadataResponse{ + TransactionID: tpkg.RandTransactionID(), + TransactionState: api.TransactionStateFailed, + TransactionFailureReason: api.TxFailureBICInputInvalid, + }, + } + + return &test{ + name: "ok - Basic block with tagged data - TopicBlockMetadata", + topics: []*testTopic{ + { + topic: mqtt.GetTopicBlockMetadata(blockMetadataResponse.BlockID), + isPollingTarget: true, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.TopicBlockMetadataAccepted, + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(blockMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(blockMetadataResponse)), + preSubscribeFunc: func() { + ts.MockAddBlockMetadata(blockMetadataResponse.BlockID, blockMetadataResponse) + }, + postSubscribeFunc: func() { + ts.ReceiveAcceptedBlock(lo.PanicOnErr(inx.WrapBlockMetadata(blockMetadataResponse))) + }, + } + }(), + + // ok - Basic block with tagged data - TopicBlockMetadataAccepted + func() *test { + blockMetadataResponse := &api.BlockMetadataResponse{ + BlockID: tpkg.RandBlockID(), + BlockState: api.BlockStateAccepted, + BlockFailureReason: api.BlockFailureNone, + TransactionMetadata: &api.TransactionMetadataResponse{ + TransactionID: tpkg.RandTransactionID(), + TransactionState: api.TransactionStateFailed, + TransactionFailureReason: api.TxFailureBICInputInvalid, + }, + } + + return &test{ + name: "ok - Basic block with tagged data - TopicBlockMetadataAccepted", + topics: []*testTopic{ + { + topic: mqtt.TopicBlockMetadataAccepted, + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicBlockMetadata(blockMetadataResponse.BlockID), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(blockMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(blockMetadataResponse)), + postSubscribeFunc: func() { + ts.ReceiveAcceptedBlock(lo.PanicOnErr(inx.WrapBlockMetadata(blockMetadataResponse))) + }, + } + }(), + + // ok - Basic block with tagged data - TopicBlockMetadataConfirmed + func() *test { + blockMetadataResponse := &api.BlockMetadataResponse{ + BlockID: tpkg.RandBlockID(), + BlockState: api.BlockStateAccepted, + BlockFailureReason: api.BlockFailureNone, + TransactionMetadata: &api.TransactionMetadataResponse{ + TransactionID: tpkg.RandTransactionID(), + TransactionState: api.TransactionStateFailed, + TransactionFailureReason: api.TxFailureBICInputInvalid, + }, + } + + return &test{ + name: "ok - Basic block with tagged data - TopicBlockMetadataConfirmed", + topics: []*testTopic{ + { + topic: mqtt.TopicBlockMetadataConfirmed, + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicBlockMetadata(blockMetadataResponse.BlockID), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(blockMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(blockMetadataResponse)), + postSubscribeFunc: func() { + ts.ReceiveConfirmedBlock(lo.PanicOnErr(inx.WrapBlockMetadata(blockMetadataResponse))) + }, + } + }(), + + // ok - TopicOutputs + func() *test { + testTx := ts.NewTestTransaction(true) + + return &test{ + name: "ok - TopicOutputs", + topics: []*testTopic{ + { + topic: mqtt.GetTopicOutput(testTx.OutputID), + isPollingTarget: true, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicOutput(testTx.ConsumedOutputID), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.OutputWithMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.OutputWithMetadataResponse)), + preSubscribeFunc: func() { + output := ts.NewNodeBridgeOutputFromOutputWithMetadata(testTx.OutputWithMetadataResponse) + ts.MockAddOutput(testTx.OutputID, output) + }, + postSubscribeFunc: func() { + ts.ReceiveAcceptedTransaction(&nodebridge.AcceptedTransaction{ + API: ts.API(), + Slot: testTx.BlockID.Slot(), + TransactionID: testTx.TransactionID, + // the consumed input + Consumed: []*nodebridge.Output{ + ts.NewSpentNodeBridgeOutputFromTransaction(tpkg.RandBlockID(), testTx.ConsumedOutputCreationTransaction, testTx.BlockID.Slot(), testTx.TransactionID), + }, + // the created output + Created: []*nodebridge.Output{ + ts.NewNodeBridgeOutputFromOutputWithMetadata(testTx.OutputWithMetadataResponse), + }, + }) + }, + } + }(), + + // ok - TopicAccountOutputs + func() *test { + accountOutput := tpkg.RandOutput(iotago.OutputAccount).(*iotago.AccountOutput) + testTx := ts.NewTestTransaction(true, tpkg.WithOutputs(iotago.Outputs[iotago.TxEssenceOutput]{ + accountOutput, + })) + + return &test{ + name: "ok - TopicAccountOutputs", + topics: []*testTopic{ + { + topic: mqtt.GetTopicAccountOutputs(accountOutput.AccountID, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicOutput(testTx.ConsumedOutputID), + mqtt.GetTopicOutput(testTx.OutputID), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.OutputWithMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.OutputWithMetadataResponse)), + postSubscribeFunc: func() { + ts.ReceiveAcceptedTransaction(&nodebridge.AcceptedTransaction{ + API: ts.API(), + Slot: testTx.BlockID.Slot(), + TransactionID: testTx.TransactionID, + // the consumed input + Consumed: []*nodebridge.Output{ + ts.NewSpentNodeBridgeOutputFromTransaction(tpkg.RandBlockID(), testTx.ConsumedOutputCreationTransaction, testTx.BlockID.Slot(), testTx.TransactionID), + }, + // the created output + Created: []*nodebridge.Output{ + ts.NewNodeBridgeOutputFromOutputWithMetadata(testTx.OutputWithMetadataResponse), + }, + }) + }, + } + }(), + + // ok - TopicAnchorOutputs + func() *test { + anchorOutput := tpkg.RandOutput(iotago.OutputAnchor).(*iotago.AnchorOutput) + testTx := ts.NewTestTransaction(true, tpkg.WithOutputs(iotago.Outputs[iotago.TxEssenceOutput]{ + anchorOutput, + })) + + return &test{ + name: "ok - TopicAnchorOutputs", + topics: []*testTopic{ + { + topic: mqtt.GetTopicAnchorOutputs(anchorOutput.AnchorID, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicOutput(testTx.ConsumedOutputID), + mqtt.GetTopicOutput(testTx.OutputID), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionStateController, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionGovernor, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.OutputWithMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.OutputWithMetadataResponse)), + postSubscribeFunc: func() { + ts.ReceiveAcceptedTransaction(&nodebridge.AcceptedTransaction{ + API: ts.API(), + Slot: testTx.BlockID.Slot(), + TransactionID: testTx.TransactionID, + // the consumed input + Consumed: []*nodebridge.Output{ + ts.NewSpentNodeBridgeOutputFromTransaction(tpkg.RandBlockID(), testTx.ConsumedOutputCreationTransaction, testTx.BlockID.Slot(), testTx.TransactionID), + }, + // the created output + Created: []*nodebridge.Output{ + ts.NewNodeBridgeOutputFromOutputWithMetadata(testTx.OutputWithMetadataResponse), + }, + }) + }, + } + }(), + + // ok - TopicFoundryOutputs + func() *test { + foundryOutput := tpkg.RandOutput(iotago.OutputFoundry).(*iotago.FoundryOutput) + testTx := ts.NewTestTransaction(true, tpkg.WithOutputs(iotago.Outputs[iotago.TxEssenceOutput]{ + foundryOutput, + })) + + return &test{ + name: "ok - TopicFoundryOutputs", + topics: []*testTopic{ + { + topic: mqtt.GetTopicFoundryOutputs(lo.PanicOnErr(foundryOutput.FoundryID())), + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicOutput(testTx.ConsumedOutputID), + mqtt.GetTopicOutput(testTx.OutputID), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionImmutableAccount, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.OutputWithMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.OutputWithMetadataResponse)), + postSubscribeFunc: func() { + ts.ReceiveAcceptedTransaction(&nodebridge.AcceptedTransaction{ + API: ts.API(), + Slot: testTx.BlockID.Slot(), + TransactionID: testTx.TransactionID, + // the consumed input + Consumed: []*nodebridge.Output{ + ts.NewSpentNodeBridgeOutputFromTransaction(tpkg.RandBlockID(), testTx.ConsumedOutputCreationTransaction, testTx.BlockID.Slot(), testTx.TransactionID), + }, + // the created output + Created: []*nodebridge.Output{ + ts.NewNodeBridgeOutputFromOutputWithMetadata(testTx.OutputWithMetadataResponse), + }, + }) + }, + } + }(), + + // ok - TopicNFTOutputs + func() *test { + nftOutput := tpkg.RandOutput(iotago.OutputNFT).(*iotago.NFTOutput) + testTx := ts.NewTestTransaction(true, tpkg.WithOutputs(iotago.Outputs[iotago.TxEssenceOutput]{ + nftOutput, + })) + + return &test{ + name: "ok - TopicNFTOutputs", + topics: []*testTopic{ + { + topic: mqtt.GetTopicNFTOutputs(nftOutput.NFTID, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicOutput(testTx.ConsumedOutputID), + mqtt.GetTopicOutput(testTx.OutputID), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.OutputWithMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.OutputWithMetadataResponse)), + postSubscribeFunc: func() { + ts.ReceiveAcceptedTransaction(&nodebridge.AcceptedTransaction{ + API: ts.API(), + Slot: testTx.BlockID.Slot(), + TransactionID: testTx.TransactionID, + // the consumed input + Consumed: []*nodebridge.Output{ + ts.NewSpentNodeBridgeOutputFromTransaction(tpkg.RandBlockID(), testTx.ConsumedOutputCreationTransaction, testTx.BlockID.Slot(), testTx.TransactionID), + }, + // the created output + Created: []*nodebridge.Output{ + ts.NewNodeBridgeOutputFromOutputWithMetadata(testTx.OutputWithMetadataResponse), + }, + }) + }, + } + }(), + + // ok - TopicDelegationOutputs + func() *test { + delegationOutput := tpkg.RandOutput(iotago.OutputDelegation).(*iotago.DelegationOutput) + testTx := ts.NewTestTransaction(true, tpkg.WithOutputs(iotago.Outputs[iotago.TxEssenceOutput]{ + delegationOutput, + })) + + return &test{ + name: "ok - TopicDelegationOutputs", + topics: []*testTopic{ + { + topic: mqtt.GetTopicDelegationOutputs(delegationOutput.DelegationID), + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicOutput(testTx.ConsumedOutputID), + mqtt.GetTopicOutput(testTx.OutputID), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.OwnerAddress, ts.API().ProtocolParameters().Bech32HRP()), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.OutputWithMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.OutputWithMetadataResponse)), + postSubscribeFunc: func() { + ts.ReceiveAcceptedTransaction(&nodebridge.AcceptedTransaction{ + API: ts.API(), + Slot: testTx.BlockID.Slot(), + TransactionID: testTx.TransactionID, + // the consumed input + Consumed: []*nodebridge.Output{ + ts.NewSpentNodeBridgeOutputFromTransaction(tpkg.RandBlockID(), testTx.ConsumedOutputCreationTransaction, testTx.BlockID.Slot(), testTx.TransactionID), + }, + // the created output + Created: []*nodebridge.Output{ + ts.NewNodeBridgeOutputFromOutputWithMetadata(testTx.OutputWithMetadataResponse), + }, + }) + }, + } + }(), + + // ok - TopicOutputsByUnlockConditionAndAddress - Address/StorageReturn/Expiration + func() *test { + unlockAddress := tpkg.RandEd25519Address() + returnAddress := tpkg.RandEd25519Address() + + basicOutput := &iotago.BasicOutput{ + Amount: 1337, + Mana: 1337, + UnlockConditions: iotago.BasicOutputUnlockConditions{ + &iotago.AddressUnlockCondition{ + Address: unlockAddress, + }, + &iotago.StorageDepositReturnUnlockCondition{ + ReturnAddress: returnAddress, + Amount: 1337, + }, + &iotago.ExpirationUnlockCondition{ + ReturnAddress: returnAddress, + Slot: 1337, + }, + }, + Features: iotago.BasicOutputFeatures{}, + } + + testTx := ts.NewTestTransaction(false, tpkg.WithOutputs(iotago.Outputs[iotago.TxEssenceOutput]{ + basicOutput, + })) + + return &test{ + name: "ok - TopicOutputsByUnlockConditionAndAddress - Address/StorageReturn/Expiration", + topics: []*testTopic{ + { + topic: mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, unlockAddress, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, returnAddress, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, unlockAddress, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionStorageReturn, returnAddress, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionExpiration, returnAddress, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicOutput(testTx.ConsumedOutputID), + mqtt.GetTopicOutput(testTx.OutputID), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.SenderAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.SenderAddress, ts.API().ProtocolParameters().Bech32HRP()), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.OutputWithMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.OutputWithMetadataResponse)), + postSubscribeFunc: func() { + ts.ReceiveAcceptedTransaction(&nodebridge.AcceptedTransaction{ + API: ts.API(), + Slot: testTx.BlockID.Slot(), + TransactionID: testTx.TransactionID, + // the consumed input + Consumed: []*nodebridge.Output{ + ts.NewSpentNodeBridgeOutputFromTransaction(tpkg.RandBlockID(), testTx.ConsumedOutputCreationTransaction, testTx.BlockID.Slot(), testTx.TransactionID), + }, + // the created output + Created: []*nodebridge.Output{ + ts.NewNodeBridgeOutputFromOutputWithMetadata(testTx.OutputWithMetadataResponse), + }, + }) + }, + } + }(), + + // ok - TopicOutputsByUnlockConditionAndAddress - StateController/Governor + func() *test { + anchorOutput := tpkg.RandOutput(iotago.OutputAnchor).(*iotago.AnchorOutput) + + // we want to have different addresses for the state controller and governor to check the "any" topic + anchorOutput.UnlockConditionSet().GovernorAddress().Address = tpkg.RandAddress(iotago.AddressEd25519) + + stateControllerAddress := anchorOutput.UnlockConditionSet().StateControllerAddress().Address + governorAddress := anchorOutput.UnlockConditionSet().GovernorAddress().Address + + testTx := ts.NewTestTransaction(false, tpkg.WithOutputs(iotago.Outputs[iotago.TxEssenceOutput]{ + anchorOutput, + })) + + return &test{ + name: "ok - TopicOutputsByUnlockConditionAndAddress - StateController/Governor", + topics: []*testTopic{ + { + topic: mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, stateControllerAddress, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, governorAddress, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionStateController, stateControllerAddress, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionGovernor, governorAddress, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicOutput(testTx.ConsumedOutputID), + mqtt.GetTopicOutput(testTx.OutputID), + mqtt.GetTopicAnchorOutputs(anchorOutput.AnchorID, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.SenderAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.SenderAddress, ts.API().ProtocolParameters().Bech32HRP()), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.OutputWithMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.OutputWithMetadataResponse)), + postSubscribeFunc: func() { + ts.ReceiveAcceptedTransaction(&nodebridge.AcceptedTransaction{ + API: ts.API(), + Slot: testTx.BlockID.Slot(), + TransactionID: testTx.TransactionID, + // the consumed input + Consumed: []*nodebridge.Output{ + ts.NewSpentNodeBridgeOutputFromTransaction(tpkg.RandBlockID(), testTx.ConsumedOutputCreationTransaction, testTx.BlockID.Slot(), testTx.TransactionID), + }, + // the created output + Created: []*nodebridge.Output{ + ts.NewNodeBridgeOutputFromOutputWithMetadata(testTx.OutputWithMetadataResponse), + }, + }) + }, + } + }(), + + // ok - TopicOutputsByUnlockConditionAndAddress - ImmutableAccount + func() *test { + foundryOutput := tpkg.RandOutput(iotago.OutputFoundry).(*iotago.FoundryOutput) + immutableAccountAddress := foundryOutput.UnlockConditionSet().ImmutableAccount().Address + + testTx := ts.NewTestTransaction(false, tpkg.WithOutputs(iotago.Outputs[iotago.TxEssenceOutput]{ + foundryOutput, + })) + + return &test{ + name: "ok - TopicOutputsByUnlockConditionAndAddress - ImmutableAccount", + topics: []*testTopic{ + { + topic: mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, immutableAccountAddress, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + { + topic: mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionImmutableAccount, immutableAccountAddress, ts.API().ProtocolParameters().Bech32HRP()), + isPollingTarget: false, + isEventTarget: true, + }, + }, + topicsIgnore: []string{ + mqtt.GetTopicTransactionsIncludedBlock(testTx.TransactionID), + mqtt.GetTopicOutput(testTx.ConsumedOutputID), + mqtt.GetTopicOutput(testTx.OutputID), + mqtt.GetTopicFoundryOutputs(lo.PanicOnErr(foundryOutput.FoundryID())), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAny, testTx.SenderAddress, ts.API().ProtocolParameters().Bech32HRP()), + mqtt.GetTopicOutputsByUnlockConditionAndAddress(mqtt.UnlockConditionAddress, testTx.SenderAddress, ts.API().ProtocolParameters().Bech32HRP()), + }, + jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(testTx.OutputWithMetadataResponse)), + rawTarget: lo.PanicOnErr(ts.API().Encode(testTx.OutputWithMetadataResponse)), + postSubscribeFunc: func() { + ts.ReceiveAcceptedTransaction(&nodebridge.AcceptedTransaction{ + API: ts.API(), + Slot: testTx.BlockID.Slot(), + TransactionID: testTx.TransactionID, + // the consumed input + Consumed: []*nodebridge.Output{ + ts.NewSpentNodeBridgeOutputFromTransaction(tpkg.RandBlockID(), testTx.ConsumedOutputCreationTransaction, testTx.BlockID.Slot(), testTx.TransactionID), + }, + // the created output + Created: []*nodebridge.Output{ + ts.NewNodeBridgeOutputFromOutputWithMetadata(testTx.OutputWithMetadataResponse), + }, + }) + }, + } + }(), + } + + for _, test := range tests { + + t.Run(test.name, func(t *testing.T) { + ts.Reset() + + ts.MQTTClientConnect("client1") + defer ts.MQTTClientDisconnect("client1") + + topicsReceived := make(map[string]struct{}) + subscriptionDone := false + + // wrap it in a function to avoid the topic variable to be overwritten by the loop + subscribeToMQTTTopic := func(topic *testTopic) { + ts.MQTTSubscribe("client1", topic.topic, func(topicName string, payloadBytes []byte) { + if !subscriptionDone && !topic.isPollingTarget { + // if the subscription is not done and it is not a polled target, we don't want to receive it. + return + } + + if subscriptionDone && !topic.isEventTarget { + // if the subscription is done and it is not an event target, we don't want to receive it. + return + } + + require.Equal(t, topic.topic, topicName, "topic mismatch") + require.Equal(t, test.jsonTarget, payloadBytes, "JSON payload mismatch") + topicsReceived[topicName] = struct{}{} + }) + + // also subscribe to the raw topics + topicNameRaw := topic.topic + "/raw" + ts.MQTTSubscribe("client1", topicNameRaw, func(topicName string, payloadBytes []byte) { + if !subscriptionDone && !topic.isPollingTarget { + // if the subscription is not done and it is not a polled target, we don't want to receive it. + return + } + + if subscriptionDone && !topic.isEventTarget { + // if the subscription is done and it is not an event target, we don't want to receive it. + return + } + + require.Equal(t, topicNameRaw, topicName, "topic mismatch") + require.Equal(t, test.rawTarget, payloadBytes, "raw payload mismatch") + topicsReceived[topicName] = struct{}{} + }) + } + + // check that we don't receive topics we don't subscribe to + receivedTopics := make(map[string]int) + ts.MQTTSetHasSubscribersCallback(func(topicName string) { + for _, ignoredTopic := range test.topicsIgnore { + if topicName == ignoredTopic { + return + } + if topicName == ignoredTopic+"/raw" { + return + } + } + + receivedTopics[topicName]++ + }) + + // collect all topics for later comparison with the received topics + collectedTopics := lo.Reduce(test.topics, func(collectedTopics map[string]int, topic *testTopic) map[string]int { + if topic.isPollingTarget { + collectedTopics[topic.topic]++ + collectedTopics[topic.topic+"/raw"]++ + } + if topic.isEventTarget { + collectedTopics[topic.topic]++ + collectedTopics[topic.topic+"/raw"]++ + } + + return collectedTopics + }, make(map[string]int)) + + // this step can be used to receive "polled" topics (inject the payload before subscribing) + if test.preSubscribeFunc != nil { + test.preSubscribeFunc() + } + + // subscribe to the topics + for _, testTopic := range test.topics { + subscribeToMQTTTopic(testTopic) + } + + // unfortunately we need to wait a bit here, because the MQTT broker is running in a separate goroutine + time.Sleep(50 * time.Millisecond) + + // everything we receive now is "event driven" + subscriptionDone = true + + // this step can be used to receive "event driven" topics + if test.postSubscribeFunc != nil { + test.postSubscribeFunc() + + // unfortunately we need to wait a bit here, because the MQTT broker is running in a separate goroutine + time.Sleep(50 * time.Millisecond) + } + + // check if all topics were received + for _, testTopic := range test.topics { + require.Containsf(t, topicsReceived, testTopic.topic, "topic not received: %s", testTopic.topic) + require.Containsf(t, topicsReceived, testTopic.topic+"/raw", "topic not received: %s", testTopic.topic+"/raw") + } + + // check that we don't receive topics we don't subscribe to + for topic, count := range receivedTopics { + if _, ok := collectedTopics[topic]; !ok { + require.Failf(t, "received topic that was not subscribed to", "topic: %s, count: %d", topic, count) + } + + require.Equalf(t, collectedTopics[topic], count, "topic count mismatch: %s", topic) + } + }) + } +} diff --git a/pkg/testsuite/nodebridge_mock.go b/pkg/testsuite/nodebridge_mock.go new file mode 100644 index 0000000..638b5e0 --- /dev/null +++ b/pkg/testsuite/nodebridge_mock.go @@ -0,0 +1,434 @@ +//nolint:revive,nilnil,structcheck,containedctx // skip linter for this package name +package testsuite + +import ( + "context" + "io" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/iotaledger/hive.go/runtime/event" + "github.com/iotaledger/inx-app/pkg/nodebridge" + inx "github.com/iotaledger/inx/go" + iotago "github.com/iotaledger/iota.go/v4" + iotaapi "github.com/iotaledger/iota.go/v4/api" + "github.com/iotaledger/iota.go/v4/nodeclient" +) + +type MockedNodeBridge struct { + t *testing.T + + events *nodebridge.Events + apiProvider iotago.APIProvider + + mockedLatestCommitment *nodebridge.Commitment + mockedLatestFinalizedCommitment *nodebridge.Commitment + + mockedBlocks map[iotago.BlockID]*iotago.Block + mockedBlockMetadata map[iotago.BlockID]*iotaapi.BlockMetadataResponse + mockedOutputs map[iotago.OutputID]*nodebridge.Output + + mockedStreamListenToBlocks *MockedStream[MockedBlock] + mockedStreamListenToAcceptedBlocks *MockedStream[inx.BlockMetadata] + mockedStreamListenToConfirmedBlocks *MockedStream[inx.BlockMetadata] + mockedStreamListenToCommitments *MockedStream[MockedCommitment] + mockedStreamListenToLedgerUpdates *MockedStream[nodebridge.LedgerUpdate] + mockedStreamListenToAcceptedTransactions *MockedStream[nodebridge.AcceptedTransaction] +} + +var _ nodebridge.NodeBridge = &MockedNodeBridge{} + +func NewMockedNodeBridge(t *testing.T, api iotago.API) *MockedNodeBridge { + t.Helper() + + return &MockedNodeBridge{ + t: t, + events: &nodebridge.Events{ + LatestCommitmentChanged: event.New1[*nodebridge.Commitment](), + LatestFinalizedCommitmentChanged: event.New1[*nodebridge.Commitment](), + }, + apiProvider: iotago.SingleVersionProvider(api), + mockedBlocks: make(map[iotago.BlockID]*iotago.Block), + mockedBlockMetadata: make(map[iotago.BlockID]*iotaapi.BlockMetadataResponse), + mockedOutputs: make(map[iotago.OutputID]*nodebridge.Output), + } +} + +// +// NodeBridge interface +// + +func (m *MockedNodeBridge) Events() *nodebridge.Events { + return m.events +} + +func (m *MockedNodeBridge) Connect(ctx context.Context, address string, maxConnectionAttempts uint) error { + panic("not implemented") +} + +func (m *MockedNodeBridge) Run(ctx context.Context) { + panic("not implemented") +} + +func (m *MockedNodeBridge) Client() inx.INXClient { + panic("not implemented") +} + +func (m *MockedNodeBridge) NodeConfig() *inx.NodeConfiguration { + panic("not implemented") +} + +func (m *MockedNodeBridge) APIProvider() iotago.APIProvider { + return m.apiProvider +} + +func (m *MockedNodeBridge) INXNodeClient() (*nodeclient.Client, error) { + panic("not implemented") +} + +func (m *MockedNodeBridge) Management(ctx context.Context) (nodeclient.ManagementClient, error) { + panic("not implemented") +} + +func (m *MockedNodeBridge) Indexer(ctx context.Context) (nodeclient.IndexerClient, error) { + panic("not implemented") +} + +func (m *MockedNodeBridge) EventAPI(ctx context.Context) (*nodeclient.EventAPIClient, error) { + panic("not implemented") +} + +func (m *MockedNodeBridge) BlockIssuer(ctx context.Context) (nodeclient.BlockIssuerClient, error) { + panic("not implemented") +} + +func (m *MockedNodeBridge) ReadIsCandidate(ctx context.Context, id iotago.AccountID, slot iotago.SlotIndex) (bool, error) { + panic("not implemented") +} + +func (m *MockedNodeBridge) ReadIsCommitteeMember(ctx context.Context, id iotago.AccountID, slot iotago.SlotIndex) (bool, error) { + panic("not implemented") +} + +func (m *MockedNodeBridge) ReadIsValidatorAccount(ctx context.Context, id iotago.AccountID, slot iotago.SlotIndex) (bool, error) { + panic("not implemented") +} + +func (m *MockedNodeBridge) RegisterAPIRoute(ctx context.Context, route string, bindAddress string, path string) error { + return nil +} + +func (m *MockedNodeBridge) UnregisterAPIRoute(ctx context.Context, route string) error { + return nil +} + +func (m *MockedNodeBridge) ActiveRootBlocks(ctx context.Context) (map[iotago.BlockID]iotago.CommitmentID, error) { + panic("not implemented") +} + +func (m *MockedNodeBridge) SubmitBlock(ctx context.Context, block *iotago.Block) (iotago.BlockID, error) { + panic("not implemented") +} + +func (m *MockedNodeBridge) Block(ctx context.Context, blockID iotago.BlockID) (*iotago.Block, error) { + if block, ok := m.mockedBlocks[blockID]; ok { + return block, nil + } + + return nil, status.Errorf(codes.NotFound, "block %s not found", blockID.ToHex()) +} + +func (m *MockedNodeBridge) BlockMetadata(ctx context.Context, blockID iotago.BlockID) (*iotaapi.BlockMetadataResponse, error) { + if blockMetadata, ok := m.mockedBlockMetadata[blockID]; ok { + return blockMetadata, nil + } + + return nil, status.Errorf(codes.NotFound, "metadata for block %s not found", blockID.ToHex()) +} + +func (m *MockedNodeBridge) ListenToBlocks(ctx context.Context, consumer func(block *iotago.Block, rawData []byte) error) error { + if m.mockedStreamListenToBlocks == nil { + require.FailNow(m.t, "ListenToBlocks mock not initialized") + } + + err := nodebridge.ListenToStream(ctx, m.mockedStreamListenToBlocks.receiverFunc(), func(block *MockedBlock) error { + return consumer(block.Block, block.RawBlockData) + }) + require.NoError(m.t, err, "ListenToBlocks failed") + + return nil +} + +func (m *MockedNodeBridge) ListenToAcceptedBlocks(ctx context.Context, consumer func(blockMetadata *iotaapi.BlockMetadataResponse) error) error { + if m.mockedStreamListenToAcceptedBlocks == nil { + require.FailNow(m.t, "ListenToAcceptedBlocks mock not initialized") + } + + err := nodebridge.ListenToStream(ctx, m.mockedStreamListenToAcceptedBlocks.receiverFunc(), func(inxBlockMetadata *inx.BlockMetadata) error { + blockMetadata, err := inxBlockMetadata.Unwrap() + if err != nil { + return err + } + + return consumer(blockMetadata) + }) + require.NoError(m.t, err, "ListenToAcceptedBlocks failed") + + return nil +} + +func (m *MockedNodeBridge) ListenToConfirmedBlocks(ctx context.Context, consumer func(blockMetadata *iotaapi.BlockMetadataResponse) error) error { + if m.mockedStreamListenToConfirmedBlocks == nil { + require.FailNow(m.t, "ListenToConfirmedBlocks mock not initialized") + } + + err := nodebridge.ListenToStream(ctx, m.mockedStreamListenToConfirmedBlocks.receiverFunc(), func(inxBlockMetadata *inx.BlockMetadata) error { + blockMetadata, err := inxBlockMetadata.Unwrap() + if err != nil { + return err + } + + return consumer(blockMetadata) + }) + require.NoError(m.t, err, "ListenToConfirmedBlocks failed") + + return nil +} + +// TransactionMetadata returns the transaction metadata for the given transaction ID. +func (m *MockedNodeBridge) TransactionMetadata(ctx context.Context, transactionID iotago.TransactionID) (*iotaapi.TransactionMetadataResponse, error) { + panic("not implemented") +} + +func (m *MockedNodeBridge) Output(ctx context.Context, outputID iotago.OutputID) (*nodebridge.Output, error) { + if output, ok := m.mockedOutputs[outputID]; ok { + return output, nil + } + + return nil, status.Errorf(codes.NotFound, "output %s not found", outputID.ToHex()) +} + +func (m *MockedNodeBridge) ForceCommitUntil(ctx context.Context, slot iotago.SlotIndex) error { + panic("not implemented") +} + +func (m *MockedNodeBridge) Commitment(ctx context.Context, slot iotago.SlotIndex) (*nodebridge.Commitment, error) { + panic("not implemented") +} + +func (m *MockedNodeBridge) CommitmentByID(ctx context.Context, id iotago.CommitmentID) (*nodebridge.Commitment, error) { + panic("not implemented") +} + +func (m *MockedNodeBridge) ListenToCommitments(ctx context.Context, startSlot, endSlot iotago.SlotIndex, consumer func(commitment *nodebridge.Commitment, rawData []byte) error) error { + if m.mockedStreamListenToCommitments == nil { + require.FailNow(m.t, "ListenToCommitments mock not initialized") + } + + err := nodebridge.ListenToStream(ctx, m.mockedStreamListenToCommitments.receiverFunc(), func(commitment *MockedCommitment) error { + return consumer(&nodebridge.Commitment{ + CommitmentID: commitment.CommitmentID, + Commitment: commitment.Commitment, + }, commitment.RawCommitmentData) + }) + require.NoError(m.t, err, "ListenToCommitments failed") + + return nil +} + +func (m *MockedNodeBridge) ListenToLedgerUpdates(ctx context.Context, startSlot, endSlot iotago.SlotIndex, consumer func(update *nodebridge.LedgerUpdate) error) error { + if m.mockedStreamListenToLedgerUpdates == nil { + require.FailNow(m.t, "ListenToLedgerUpdates mock not initialized") + } + + err := nodebridge.ListenToStream(ctx, m.mockedStreamListenToLedgerUpdates.receiverFunc(), consumer) + require.NoError(m.t, err, "ListenToLedgerUpdates failed") + + return nil +} + +func (m *MockedNodeBridge) ListenToAcceptedTransactions(ctx context.Context, consumer func(tx *nodebridge.AcceptedTransaction) error) error { + if m.mockedStreamListenToAcceptedTransactions == nil { + require.FailNow(m.t, "ListenToAcceptedTransactions mock not initialized") + } + + err := nodebridge.ListenToStream(ctx, m.mockedStreamListenToAcceptedTransactions.receiverFunc(), consumer) + require.NoError(m.t, err, "ListenToAcceptedTransactions failed") + + return nil +} + +func (m *MockedNodeBridge) NodeStatus() *inx.NodeStatus { + panic("not implemented") +} + +func (m *MockedNodeBridge) IsNodeHealthy() bool { + panic("not implemented") +} + +func (m *MockedNodeBridge) LatestCommitment() *nodebridge.Commitment { + return m.mockedLatestCommitment +} + +func (m *MockedNodeBridge) LatestFinalizedCommitment() *nodebridge.Commitment { + return m.mockedLatestFinalizedCommitment +} + +func (m *MockedNodeBridge) PruningEpoch() iotago.EpochIndex { + panic("not implemented") +} + +func (m *MockedNodeBridge) RequestTips(ctx context.Context, count uint32) (strong iotago.BlockIDs, weak iotago.BlockIDs, shallowLike iotago.BlockIDs, err error) { + panic("not implemented") +} + +// +// Mock functions +// + +func (m *MockedNodeBridge) MockClear() { + m.mockedBlocks = make(map[iotago.BlockID]*iotago.Block) + m.mockedBlockMetadata = make(map[iotago.BlockID]*iotaapi.BlockMetadataResponse) + m.mockedOutputs = make(map[iotago.OutputID]*nodebridge.Output) + + if m.mockedStreamListenToBlocks != nil { + m.mockedStreamListenToBlocks.Close() + m.mockedStreamListenToBlocks = nil + } + if m.mockedStreamListenToAcceptedBlocks != nil { + m.mockedStreamListenToAcceptedBlocks.Close() + m.mockedStreamListenToAcceptedBlocks = nil + } + if m.mockedStreamListenToConfirmedBlocks != nil { + m.mockedStreamListenToConfirmedBlocks.Close() + m.mockedStreamListenToConfirmedBlocks = nil + } + if m.mockedStreamListenToCommitments != nil { + m.mockedStreamListenToCommitments.Close() + m.mockedStreamListenToCommitments = nil + } + if m.mockedStreamListenToLedgerUpdates != nil { + m.mockedStreamListenToLedgerUpdates.Close() + m.mockedStreamListenToLedgerUpdates = nil + } + if m.mockedStreamListenToAcceptedTransactions != nil { + m.mockedStreamListenToAcceptedTransactions.Close() + m.mockedStreamListenToAcceptedTransactions = nil + } +} + +func (m *MockedNodeBridge) MockSetLatestCommitment(commitment *nodebridge.Commitment) { + m.mockedLatestCommitment = commitment +} + +func (m *MockedNodeBridge) MockSetLatestFinalizedCommitment(commitment *nodebridge.Commitment) { + m.mockedLatestFinalizedCommitment = commitment +} + +func (m *MockedNodeBridge) MockReceiveLatestCommitment(commitment *nodebridge.Commitment) { + m.mockedLatestCommitment = commitment + m.Events().LatestCommitmentChanged.Trigger(commitment) +} + +func (m *MockedNodeBridge) MockReceiveLatestFinalizedCommitment(commitment *nodebridge.Commitment) { + m.mockedLatestFinalizedCommitment = commitment + m.Events().LatestFinalizedCommitmentChanged.Trigger(commitment) +} + +func (m *MockedNodeBridge) MockAddBlock(blockID iotago.BlockID, block *iotago.Block) { + m.mockedBlocks[blockID] = block +} + +func (m *MockedNodeBridge) MockAddBlockMetadata(blockID iotago.BlockID, blockMetadata *iotaapi.BlockMetadataResponse) { + m.mockedBlockMetadata[blockID] = blockMetadata +} + +func (m *MockedNodeBridge) MockAddOutput(outputID iotago.OutputID, output *nodebridge.Output) { + m.mockedOutputs[outputID] = output +} + +type MockedBlock struct { + Block *iotago.Block + RawBlockData []byte +} + +type MockedCommitment struct { + CommitmentID iotago.CommitmentID + Commitment *iotago.Commitment + RawCommitmentData []byte +} + +func (m *MockedNodeBridge) MockListenToBlocks() *MockedStream[MockedBlock] { + m.mockedStreamListenToBlocks = InitMockedStream[MockedBlock]() + return m.mockedStreamListenToBlocks +} + +func (m *MockedNodeBridge) MockListenToAcceptedBlocks() *MockedStream[inx.BlockMetadata] { + m.mockedStreamListenToAcceptedBlocks = InitMockedStream[inx.BlockMetadata]() + return m.mockedStreamListenToAcceptedBlocks +} + +func (m *MockedNodeBridge) MockListenToConfirmedBlocks() *MockedStream[inx.BlockMetadata] { + m.mockedStreamListenToConfirmedBlocks = InitMockedStream[inx.BlockMetadata]() + return m.mockedStreamListenToConfirmedBlocks +} + +func (m *MockedNodeBridge) MockListenToCommitments() *MockedStream[MockedCommitment] { + m.mockedStreamListenToCommitments = InitMockedStream[MockedCommitment]() + return m.mockedStreamListenToCommitments +} + +func (m *MockedNodeBridge) MockListenToLedgerUpdates() *MockedStream[nodebridge.LedgerUpdate] { + m.mockedStreamListenToLedgerUpdates = InitMockedStream[nodebridge.LedgerUpdate]() + return m.mockedStreamListenToLedgerUpdates +} + +func (m *MockedNodeBridge) MockListenToAcceptedTransactions() *MockedStream[nodebridge.AcceptedTransaction] { + m.mockedStreamListenToAcceptedTransactions = InitMockedStream[nodebridge.AcceptedTransaction]() + return m.mockedStreamListenToAcceptedTransactions +} + +type MockedStream[T any] struct { + ctx context.Context + cancel context.CancelFunc + receiverChan chan *T +} + +func InitMockedStream[T any]() *MockedStream[T] { + ctx, cancel := context.WithCancel(context.Background()) + receiverChan := make(chan *T) + + return &MockedStream[T]{ + ctx: ctx, + cancel: cancel, + receiverChan: receiverChan, + } +} + +func (m *MockedStream[T]) receiverFunc() func() (*T, error) { + return func() (*T, error) { + select { + case <-m.ctx.Done(): + return nil, io.EOF + + case obj, ok := <-m.receiverChan: + if !ok { + return nil, io.EOF + } + + return obj, nil + } + } +} + +func (m *MockedStream[T]) Receive(obj *T) { + m.receiverChan <- obj +} + +func (m *MockedStream[T]) Close() { + m.cancel() + close(m.receiverChan) +} diff --git a/pkg/testsuite/testsuite.go b/pkg/testsuite/testsuite.go new file mode 100644 index 0000000..e090594 --- /dev/null +++ b/pkg/testsuite/testsuite.go @@ -0,0 +1,421 @@ +//nolint:contextcheck,forcetypeassert,exhaustive +package testsuite + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/iotaledger/hive.go/lo" + "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/runtime/options" + "github.com/iotaledger/inx-app/pkg/nodebridge" + "github.com/iotaledger/inx-mqtt/pkg/mqtt" + inx "github.com/iotaledger/inx/go" + iotago "github.com/iotaledger/iota.go/v4" + "github.com/iotaledger/iota.go/v4/api" + "github.com/iotaledger/iota.go/v4/tpkg" +) + +type TestSuite struct { + T *testing.T + + api iotago.API + nodeBridge *MockedNodeBridge + broker *MockedBroker + server *mqtt.Server + + mockedStreamListenToBlocks *MockedStream[MockedBlock] + mockedStreamListenToAcceptedBlocks *MockedStream[inx.BlockMetadata] + mockedStreamListenToConfirmedBlocks *MockedStream[inx.BlockMetadata] + mockedStreamListenToCommitments *MockedStream[MockedCommitment] + mockedStreamListenToLedgerUpdates *MockedStream[nodebridge.LedgerUpdate] + mockedStreamListenToAcceptedTransactions *MockedStream[nodebridge.AcceptedTransaction] +} + +func NewTestSuite(t *testing.T) *TestSuite { + t.Helper() + + rootLogger, err := logger.NewRootLogger(logger.DefaultCfg) + require.NoError(t, err) + + api := iotago.V3API(iotago.NewV3ProtocolParameters()) + + bridge := NewMockedNodeBridge(t, api) + broker := NewMockedBroker(t) + server, err := mqtt.NewServer( + rootLogger.Named(t.Name()), + bridge, + broker, + nil, + ) + require.NoError(t, err) + + return &TestSuite{ + T: t, + api: api, + nodeBridge: bridge, + broker: broker, + server: server, + + mockedStreamListenToBlocks: bridge.MockListenToBlocks(), + mockedStreamListenToAcceptedBlocks: bridge.MockListenToAcceptedBlocks(), + mockedStreamListenToConfirmedBlocks: bridge.MockListenToConfirmedBlocks(), + mockedStreamListenToCommitments: bridge.MockListenToCommitments(), + mockedStreamListenToLedgerUpdates: bridge.MockListenToLedgerUpdates(), + mockedStreamListenToAcceptedTransactions: bridge.MockListenToAcceptedTransactions(), + } +} + +func (ts *TestSuite) Run(ctx context.Context) { + err := ts.server.Start(ctx) + require.NoError(ts.T, err) + + go func() { + <-ctx.Done() + + err = ts.server.Stop() + require.NoError(ts.T, err) + }() +} + +func (ts *TestSuite) API() iotago.API { + return ts.api +} + +func (ts *TestSuite) Reset() { + ts.nodeBridge.MockClear() + ts.broker.MockClear() + + ts.mockedStreamListenToBlocks = ts.nodeBridge.MockListenToBlocks() + ts.mockedStreamListenToAcceptedBlocks = ts.nodeBridge.MockListenToAcceptedBlocks() + ts.mockedStreamListenToConfirmedBlocks = ts.nodeBridge.MockListenToConfirmedBlocks() + ts.mockedStreamListenToCommitments = ts.nodeBridge.MockListenToCommitments() + ts.mockedStreamListenToLedgerUpdates = ts.nodeBridge.MockListenToLedgerUpdates() + ts.mockedStreamListenToAcceptedTransactions = ts.nodeBridge.MockListenToAcceptedTransactions() +} + +// +// NodeBridge +// + +func (ts *TestSuite) SetLatestCommitment(commitment *nodebridge.Commitment) { + ts.nodeBridge.MockSetLatestCommitment(commitment) +} + +func (ts *TestSuite) SetLatestFinalizedCommitment(commitment *nodebridge.Commitment) { + ts.nodeBridge.MockSetLatestFinalizedCommitment(commitment) +} + +func (ts *TestSuite) ReceiveLatestCommitment(commitment *nodebridge.Commitment) { + ts.nodeBridge.MockReceiveLatestCommitment(commitment) +} + +func (ts *TestSuite) ReceiveLatestFinalizedCommitment(commitment *nodebridge.Commitment) { + ts.nodeBridge.MockReceiveLatestFinalizedCommitment(commitment) +} + +func (ts *TestSuite) MockAddBlock(blockID iotago.BlockID, block *iotago.Block) { + ts.nodeBridge.MockAddBlock(blockID, block) +} + +func (ts *TestSuite) MockAddBlockMetadata(blockID iotago.BlockID, blockMetadata *api.BlockMetadataResponse) { + ts.nodeBridge.MockAddBlockMetadata(blockID, blockMetadata) +} + +func (ts *TestSuite) MockAddOutput(outputID iotago.OutputID, output *nodebridge.Output) { + ts.nodeBridge.MockAddOutput(outputID, output) +} + +func (ts *TestSuite) ReceiveBlock(block *MockedBlock) { + ts.mockedStreamListenToBlocks.Receive(block) +} + +func (ts *TestSuite) ReceiveAcceptedBlock(metadata *inx.BlockMetadata) { + ts.mockedStreamListenToAcceptedBlocks.Receive(metadata) +} + +func (ts *TestSuite) ReceiveConfirmedBlock(metadata *inx.BlockMetadata) { + ts.mockedStreamListenToConfirmedBlocks.Receive(metadata) +} + +func (ts *TestSuite) ReceiveCommitment(commitment *MockedCommitment) { + ts.mockedStreamListenToCommitments.Receive(commitment) +} + +func (ts *TestSuite) ReceiveLedgerUpdate(update *nodebridge.LedgerUpdate) { + ts.mockedStreamListenToLedgerUpdates.Receive(update) +} + +func (ts *TestSuite) ReceiveAcceptedTransaction(tx *nodebridge.AcceptedTransaction) { + ts.mockedStreamListenToAcceptedTransactions.Receive(tx) +} + +// +// MQTT Broker +// + +func (ts *TestSuite) MQTTSetHasSubscribersCallback(callback func(topic string)) { + ts.broker.MockSetHasSubscribersCallback(callback) +} + +func (ts *TestSuite) MQTTClientConnect(clientID string) { + ts.broker.MockClientConnected(clientID) +} + +func (ts *TestSuite) MQTTClientDisconnect(clientID string) { + ts.broker.MockClientDisconnected(clientID) +} + +func (ts *TestSuite) MQTTSubscribe(clientID string, topic string, callback func(topic string, payload []byte)) (unsubscribe func()) { + ts.broker.MockTopicSubscribed(clientID, topic, callback) + + return func() { + ts.broker.MockTopicUnsubscribed(clientID, topic) + } +} + +func (ts *TestSuite) MQTTUnsubscribe(clientID string, topic string) { + ts.broker.MockTopicUnsubscribed(clientID, topic) +} + +// +// Utility functions +// + +type TestTransaction struct { + ConsumedOutputCreationTransaction *iotago.Transaction + ConsumedOutputID iotago.OutputID + Transaction *iotago.Transaction + TransactionID iotago.TransactionID + Output *nodebridge.Output + OutputID iotago.OutputID + OutputWithMetadataResponse *api.OutputWithMetadataResponse + SenderAddress iotago.Address + OwnerAddress iotago.Address + Block *iotago.Block + BlockID iotago.BlockID +} + +func (ts *TestSuite) NewTestTransaction(fromSameAddress bool, opts ...options.Option[iotago.Transaction]) *TestTransaction { + // we need to create the transaction first to apply the options, so we can simplify the test by sending from the same address + transaction := &iotago.Transaction{ + API: ts.API(), + TransactionEssence: &iotago.TransactionEssence{ + NetworkID: ts.API().ProtocolParameters().NetworkID(), + CreationSlot: tpkg.RandSlot(), + ContextInputs: nil, + // we set those later + Inputs: iotago.TxEssenceInputs{}, + Allotments: nil, + Capabilities: nil, + Payload: nil, + }, + Outputs: iotago.Outputs[iotago.TxEssenceOutput]{ + tpkg.RandOutput(iotago.OutputBasic).(*iotago.BasicOutput), + }, + } + options.Apply(transaction, opts) + + createdOutput := transaction.Outputs[0] + + var ownerAddress iotago.Address + switch createdOutput.Type() { + case iotago.OutputAnchor: + ownerAddress = createdOutput.UnlockConditionSet().StateControllerAddress().Address + case iotago.OutputFoundry: + ownerAddress = createdOutput.UnlockConditionSet().ImmutableAccount().Address + default: + ownerAddress = createdOutput.UnlockConditionSet().Address().Address + } + + // simplify the test by sending from the same address (less topics to ignore) + consumedOutput := tpkg.RandOutput(iotago.OutputBasic).(*iotago.BasicOutput) + if fromSameAddress { + consumedOutput.UnlockConditionSet().Address().Address = ownerAddress + } + senderAddress := consumedOutput.UnlockConditionSet().Address().Address + + consumedOutputCreationTransaction := &iotago.Transaction{ + API: ts.API(), + TransactionEssence: &iotago.TransactionEssence{ + NetworkID: ts.API().ProtocolParameters().NetworkID(), + CreationSlot: tpkg.RandSlot(), + ContextInputs: nil, + Inputs: iotago.TxEssenceInputs{ + tpkg.RandUTXOInput(), + }, + Allotments: nil, + Capabilities: nil, + Payload: nil, + }, + Outputs: iotago.Outputs[iotago.TxEssenceOutput]{ + consumedOutput, + }, + } + consumedOutputID := iotago.OutputIDFromTransactionIDAndIndex(lo.PanicOnErr(consumedOutputCreationTransaction.ID()), 0) + + // now we can set the correct inputs + transaction.TransactionEssence.Inputs = iotago.TxEssenceInputs{ + consumedOutputID.UTXOInput(), + } + transactionID := lo.PanicOnErr(transaction.ID()) + + block := tpkg.RandBlock( + tpkg.RandBasicBlockBodyWithPayload(ts.API(), + tpkg.RandSignedTransactionWithTransaction(ts.API(), + transaction, + ), + ), ts.API(), iotago.Mana(500)) + blockID := block.MustID() + + output := ts.NewNodeBridgeOutputFromTransaction(blockID, transaction) + + return &TestTransaction{ + ConsumedOutputCreationTransaction: consumedOutputCreationTransaction, + ConsumedOutputID: consumedOutputID, + Transaction: transaction, + TransactionID: transactionID, + Output: output, + OutputID: output.OutputID, + OutputWithMetadataResponse: ts.NewOutputWithMetadataResponseFromTransaction(blockID, transaction), + SenderAddress: senderAddress, + OwnerAddress: ownerAddress, + Block: block, + BlockID: blockID, + } +} + +func (ts *TestSuite) NewOutputWithMetadataResponseFromTransaction(blockID iotago.BlockID, transaction *iotago.Transaction) *api.OutputWithMetadataResponse { + transactionID := lo.PanicOnErr(transaction.ID()) + output := transaction.Outputs[0] + outputID := iotago.OutputIDFromTransactionIDAndIndex(transactionID, 0) + outputIDProof := lo.PanicOnErr(iotago.OutputIDProofFromTransaction(transaction, 0)) + + return &api.OutputWithMetadataResponse{ + Output: output, + OutputIDProof: outputIDProof, + Metadata: &api.OutputMetadata{ + OutputID: outputID, + BlockID: blockID, + Included: &api.OutputInclusionMetadata{ + Slot: blockID.Slot(), + TransactionID: transactionID, + CommitmentID: tpkg.RandCommitmentID(), + }, + Spent: nil, + LatestCommitmentID: tpkg.RandCommitmentID(), + }, + } +} + +func (ts *TestSuite) NewSpentOutputWithMetadataResponseFromTransaction(creationBlockID iotago.BlockID, creationTx *iotago.Transaction, spendBlockID iotago.BlockID, spendTx *iotago.Transaction) *api.OutputWithMetadataResponse { + creationTransactionID := lo.PanicOnErr(creationTx.ID()) + spendTransactionID := lo.PanicOnErr(spendTx.ID()) + + output := creationTx.Outputs[0] + outputID := iotago.OutputIDFromTransactionIDAndIndex(creationTransactionID, 0) + outputIDProof := lo.PanicOnErr(iotago.OutputIDProofFromTransaction(creationTx, 0)) + + return &api.OutputWithMetadataResponse{ + Output: output, + OutputIDProof: outputIDProof, + Metadata: &api.OutputMetadata{ + OutputID: outputID, + BlockID: creationBlockID, + Included: &api.OutputInclusionMetadata{ + Slot: creationBlockID.Slot(), + TransactionID: creationTransactionID, + CommitmentID: tpkg.RandCommitmentID(), + }, + Spent: &api.OutputConsumptionMetadata{ + Slot: spendBlockID.Slot(), + TransactionID: spendTransactionID, + CommitmentID: tpkg.RandCommitmentID(), + }, + LatestCommitmentID: tpkg.RandCommitmentID(), + }, + } +} + +func (ts *TestSuite) NewNodeBridgeOutputFromTransaction(blockID iotago.BlockID, transaction *iotago.Transaction) *nodebridge.Output { + transactionID := lo.PanicOnErr(transaction.ID()) + output := transaction.Outputs[0] + outputID := iotago.OutputIDFromTransactionIDAndIndex(transactionID, 0) + outputIDProof := lo.PanicOnErr(iotago.OutputIDProofFromTransaction(transaction, 0)) + + return &nodebridge.Output{ + OutputID: outputID, + Output: output, + OutputIDProof: outputIDProof, + Metadata: &api.OutputMetadata{ + OutputID: outputID, + BlockID: blockID, + Included: &api.OutputInclusionMetadata{ + Slot: blockID.Slot(), + TransactionID: transactionID, + CommitmentID: tpkg.RandCommitmentID(), + }, + Spent: nil, + LatestCommitmentID: tpkg.RandCommitmentID(), + }, + RawOutputData: lo.PanicOnErr(ts.API().Encode(output)), + } +} + +func (ts *TestSuite) NewNodeBridgeOutputFromOutputWithMetadata(outputWithMetadata *api.OutputWithMetadataResponse) *nodebridge.Output { + return &nodebridge.Output{ + OutputID: outputWithMetadata.Metadata.OutputID, + Output: outputWithMetadata.Output, + OutputIDProof: outputWithMetadata.OutputIDProof, + Metadata: outputWithMetadata.Metadata, + RawOutputData: lo.PanicOnErr(ts.API().Encode(outputWithMetadata.Output)), + } +} + +func (ts *TestSuite) NewSpentNodeBridgeOutputFromTransaction(creationBlockID iotago.BlockID, creationTx *iotago.Transaction, spendSlot iotago.SlotIndex, spendTransactionID iotago.TransactionID) *nodebridge.Output { + creationTransactionID := lo.PanicOnErr(creationTx.ID()) + + output := creationTx.Outputs[0] + outputID := iotago.OutputIDFromTransactionIDAndIndex(creationTransactionID, 0) + outputIDProof := lo.PanicOnErr(iotago.OutputIDProofFromTransaction(creationTx, 0)) + + return &nodebridge.Output{ + OutputID: outputID, + Output: output, + OutputIDProof: outputIDProof, + Metadata: &api.OutputMetadata{ + OutputID: outputID, + BlockID: creationBlockID, + Included: &api.OutputInclusionMetadata{ + Slot: creationBlockID.Slot(), + TransactionID: creationTransactionID, + CommitmentID: tpkg.RandCommitmentID(), + }, + Spent: &api.OutputConsumptionMetadata{ + Slot: spendSlot, + TransactionID: spendTransactionID, + CommitmentID: tpkg.RandCommitmentID(), + }, + LatestCommitmentID: tpkg.RandCommitmentID(), + }, + RawOutputData: lo.PanicOnErr(ts.API().Encode(output)), + } +} + +func (ts *TestSuite) NewSpentNodeBridgeOutputFromOutputWithMetadata(outputWithMetadata *api.OutputWithMetadataResponse, spendSlot iotago.SlotIndex, spendTransactionID iotago.TransactionID) *nodebridge.Output { + outputWithMetadata.Metadata.Spent = &api.OutputConsumptionMetadata{ + Slot: spendSlot, + TransactionID: spendTransactionID, + CommitmentID: tpkg.RandCommitmentID(), + } + + return &nodebridge.Output{ + OutputID: outputWithMetadata.Metadata.OutputID, + Output: outputWithMetadata.Output, + OutputIDProof: outputWithMetadata.OutputIDProof, + Metadata: outputWithMetadata.Metadata, + RawOutputData: lo.PanicOnErr(ts.API().Encode(outputWithMetadata.Output)), + } +}