From 1f68a8410342339cc6a61acd977072746ed5e8d7 Mon Sep 17 00:00:00 2001 From: muXxer Date: Tue, 23 Apr 2024 13:36:38 +0200 Subject: [PATCH 1/2] Replace block metadata topics with `ListenToBlockMetadata` --- go.mod | 4 +-- go.sum | 8 ++--- pkg/mqtt/server.go | 12 +++++-- pkg/testsuite/mqtt_test.go | 8 ++--- pkg/testsuite/nodebridge_mock.go | 59 +++++++------------------------- pkg/testsuite/testsuite.go | 17 +++------ tools/gendoc/go.mod | 4 +-- tools/gendoc/go.sum | 8 ++--- 8 files changed, 44 insertions(+), 76 deletions(-) diff --git a/go.mod b/go.mod index b02cee5..40f9878 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,8 @@ require ( github.com/iotaledger/hive.go/log v0.0.0-20240419094509-31dbb7270ad9 github.com/iotaledger/hive.go/runtime v0.0.0-20240419094509-31dbb7270ad9 github.com/iotaledger/hive.go/web v0.0.0-20240216135101-261e99d9d84a - github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66 - github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d + github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850 + github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63 github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61 github.com/labstack/echo/v4 v4.11.4 github.com/mochi-mqtt/server/v2 v2.4.6 diff --git a/go.sum b/go.sum index 50003ab..e4d6870 100644 --- a/go.sum +++ b/go.sum @@ -214,10 +214,10 @@ github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9 h1:NJ github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9/go.mod h1:O4p7UmsfoeLqtAUwrKbq0lXMxjY/MLQSpZSavvvvGig= github.com/iotaledger/hive.go/web v0.0.0-20240216135101-261e99d9d84a h1:aqMFY9c+VMevIB/Hc3Om9OZsXViNAbl4xrF8yTiYn0I= github.com/iotaledger/hive.go/web v0.0.0-20240216135101-261e99d9d84a/go.mod h1:qDs633y8GLKk9HegRrvuD2edmyCbbkgiluqrG6ko0yo= -github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66 h1:28gH76448EjukxCz1H0OIbM0Yeoq0HP2jk4+v1tDcWQ= -github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66/go.mod h1:aQWBB1p5CLWKFWBTXB6TwSGZu3piuNHTjhWYyE3H22I= -github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d h1:aTLIfyVtJHLMKgYEUY0tPNBv+B522JZbttH1DslX2ck= -github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d/go.mod h1:YYko1kTtJgfETXQqWHgJkHQv6gGYGDxjnwDC6FbXxic= +github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850 h1:PBFs3UuwpCdd7jqHozVx2/UMJCQ6fwZeIzkedv1bum4= +github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850/go.mod h1:kk+TNI0FkHRkSHuLXMkAmnbdxZjmizZgVo1vE2fXXJ8= +github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63 h1:vt8LvpthPv2iVgIDzHN0N3Gee5+KEmqm/3eeF5G6hyA= +github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63/go.mod h1:YYko1kTtJgfETXQqWHgJkHQv6gGYGDxjnwDC6FbXxic= github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61 h1:vC1YXh2b8WleeAJvqf76PtBDvOXNIaI2Xdn0eLi2YFU= github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61/go.mod h1:ui1VcUuBHzN4eO0VR89UKqkiYR443VsznFMJgY2YRUQ= github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= diff --git a/pkg/mqtt/server.go b/pkg/mqtt/server.go index cae2eda..79409c3 100644 --- a/pkg/mqtt/server.go +++ b/pkg/mqtt/server.go @@ -408,7 +408,11 @@ func (s *Server) listenToBlocks(ctx context.Context) error { } func (s *Server) listenToAcceptedBlocksMetadata(ctx context.Context) error { - return s.NodeBridge.ListenToAcceptedBlocks(ctx, func(blockMetadata *api.BlockMetadataResponse) error { + return s.NodeBridge.ListenToBlockMetadata(ctx, func(blockMetadata *api.BlockMetadataResponse) error { + if blockMetadata.BlockState != api.BlockStateAccepted { + return nil + } + if err := s.publishBlockMetadataOnTopicsIfSubscribed(func() (*api.BlockMetadataResponse, error) { return blockMetadata, nil }, api.EventAPITopicBlockMetadataAccepted, GetTopicBlockMetadata(blockMetadata.BlockID), @@ -422,7 +426,11 @@ func (s *Server) listenToAcceptedBlocksMetadata(ctx context.Context) error { } func (s *Server) listenToConfirmedBlocksMetadata(ctx context.Context) error { - return s.NodeBridge.ListenToConfirmedBlocks(ctx, func(blockMetadata *api.BlockMetadataResponse) error { + return s.NodeBridge.ListenToBlockMetadata(ctx, func(blockMetadata *api.BlockMetadataResponse) error { + if blockMetadata.BlockState != api.BlockStateConfirmed { + return nil + } + if err := s.publishBlockMetadataOnTopicsIfSubscribed(func() (*api.BlockMetadataResponse, error) { return blockMetadata, nil }, api.EventAPITopicBlockMetadataConfirmed, GetTopicBlockMetadata(blockMetadata.BlockID), diff --git a/pkg/testsuite/mqtt_test.go b/pkg/testsuite/mqtt_test.go index cc4a5c9..86d63cc 100644 --- a/pkg/testsuite/mqtt_test.go +++ b/pkg/testsuite/mqtt_test.go @@ -375,7 +375,7 @@ func TestMqttTopics(t *testing.T) { ts.MockAddBlockMetadata(blockMetadataResponse.BlockID, blockMetadataResponse) }, postSubscribeFunc: func() { - ts.ReceiveAcceptedBlock(lo.PanicOnErr(inx.WrapBlockMetadata(blockMetadataResponse))) + ts.ReceiveBlockMetadata(inx.WrapBlockMetadata(blockMetadataResponse)) }, } }(), @@ -402,7 +402,7 @@ func TestMqttTopics(t *testing.T) { jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(blockMetadataResponse)), rawTarget: lo.PanicOnErr(ts.API().Encode(blockMetadataResponse)), postSubscribeFunc: func() { - ts.ReceiveAcceptedBlock(lo.PanicOnErr(inx.WrapBlockMetadata(blockMetadataResponse))) + ts.ReceiveBlockMetadata(inx.WrapBlockMetadata(blockMetadataResponse)) }, } }(), @@ -411,7 +411,7 @@ func TestMqttTopics(t *testing.T) { func() *test { blockMetadataResponse := &api.BlockMetadataResponse{ BlockID: tpkg.RandBlockID(), - BlockState: api.BlockStateAccepted, + BlockState: api.BlockStateConfirmed, } return &test{ @@ -429,7 +429,7 @@ func TestMqttTopics(t *testing.T) { jsonTarget: lo.PanicOnErr(ts.API().JSONEncode(blockMetadataResponse)), rawTarget: lo.PanicOnErr(ts.API().Encode(blockMetadataResponse)), postSubscribeFunc: func() { - ts.ReceiveConfirmedBlock(lo.PanicOnErr(inx.WrapBlockMetadata(blockMetadataResponse))) + ts.ReceiveBlockMetadata(inx.WrapBlockMetadata(blockMetadataResponse)) }, } }(), diff --git a/pkg/testsuite/nodebridge_mock.go b/pkg/testsuite/nodebridge_mock.go index 6831586..eddd39b 100644 --- a/pkg/testsuite/nodebridge_mock.go +++ b/pkg/testsuite/nodebridge_mock.go @@ -33,8 +33,7 @@ type MockedNodeBridge struct { mockedOutputs map[iotago.OutputID]*nodebridge.Output mockedStreamListenToBlocks *MockedStream[MockedBlock] - mockedStreamListenToAcceptedBlocks *MockedStream[inx.BlockMetadata] - mockedStreamListenToConfirmedBlocks *MockedStream[inx.BlockMetadata] + mockedStreamListenToBlockMetadata *MockedStream[inx.BlockMetadata] mockedStreamListenToCommitments *MockedStream[MockedCommitment] mockedStreamListenToLedgerUpdates *MockedStream[nodebridge.LedgerUpdate] mockedStreamListenToAcceptedTransactions *MockedStream[nodebridge.AcceptedTransaction] @@ -164,38 +163,15 @@ func (m *MockedNodeBridge) ListenToBlocks(ctx context.Context, consumer func(blo return nil } -func (m *MockedNodeBridge) ListenToAcceptedBlocks(ctx context.Context, consumer func(blockMetadata *api.BlockMetadataResponse) error) error { - if m.mockedStreamListenToAcceptedBlocks == nil { - require.FailNow(m.t, "ListenToAcceptedBlocks mock not initialized") +func (m *MockedNodeBridge) ListenToBlockMetadata(ctx context.Context, consumer func(blockMetadata *api.BlockMetadataResponse) error) error { + if m.mockedStreamListenToBlockMetadata == nil { + require.FailNow(m.t, "ListenToBlockMetadata mock not initialized") } - err := nodebridge.ListenToStream(ctx, m.mockedStreamListenToAcceptedBlocks.receiverFunc(), func(inxBlockMetadata *inx.BlockMetadata) error { - blockMetadata, err := inxBlockMetadata.Unwrap() - if err != nil { - return err - } - - return consumer(blockMetadata) - }) - require.NoError(m.t, err, "ListenToAcceptedBlocks failed") - - return nil -} - -func (m *MockedNodeBridge) ListenToConfirmedBlocks(ctx context.Context, consumer func(blockMetadata *api.BlockMetadataResponse) error) error { - if m.mockedStreamListenToConfirmedBlocks == nil { - require.FailNow(m.t, "ListenToConfirmedBlocks mock not initialized") - } - - err := nodebridge.ListenToStream(ctx, m.mockedStreamListenToConfirmedBlocks.receiverFunc(), func(inxBlockMetadata *inx.BlockMetadata) error { - blockMetadata, err := inxBlockMetadata.Unwrap() - if err != nil { - return err - } - - return consumer(blockMetadata) + err := nodebridge.ListenToStream(ctx, m.mockedStreamListenToBlockMetadata.receiverFunc(), func(inxBlockMetadata *inx.BlockMetadata) error { + return consumer(inxBlockMetadata.Unwrap()) }) - require.NoError(m.t, err, "ListenToConfirmedBlocks failed") + require.NoError(m.t, err, "ListenToBlockMetadata failed") return nil } @@ -304,13 +280,9 @@ func (m *MockedNodeBridge) MockClear() { m.mockedStreamListenToBlocks.Close() m.mockedStreamListenToBlocks = nil } - if m.mockedStreamListenToAcceptedBlocks != nil { - m.mockedStreamListenToAcceptedBlocks.Close() - m.mockedStreamListenToAcceptedBlocks = nil - } - if m.mockedStreamListenToConfirmedBlocks != nil { - m.mockedStreamListenToConfirmedBlocks.Close() - m.mockedStreamListenToConfirmedBlocks = nil + if m.mockedStreamListenToBlockMetadata != nil { + m.mockedStreamListenToBlockMetadata.Close() + m.mockedStreamListenToBlockMetadata = nil } if m.mockedStreamListenToCommitments != nil { m.mockedStreamListenToCommitments.Close() @@ -376,14 +348,9 @@ func (m *MockedNodeBridge) MockListenToBlocks() *MockedStream[MockedBlock] { return m.mockedStreamListenToBlocks } -func (m *MockedNodeBridge) MockListenToAcceptedBlocks() *MockedStream[inx.BlockMetadata] { - m.mockedStreamListenToAcceptedBlocks = InitMockedStream[inx.BlockMetadata]() - return m.mockedStreamListenToAcceptedBlocks -} - -func (m *MockedNodeBridge) MockListenToConfirmedBlocks() *MockedStream[inx.BlockMetadata] { - m.mockedStreamListenToConfirmedBlocks = InitMockedStream[inx.BlockMetadata]() - return m.mockedStreamListenToConfirmedBlocks +func (m *MockedNodeBridge) MockListenToBlockMetadata() *MockedStream[inx.BlockMetadata] { + m.mockedStreamListenToBlockMetadata = InitMockedStream[inx.BlockMetadata]() + return m.mockedStreamListenToBlockMetadata } func (m *MockedNodeBridge) MockListenToCommitments() *MockedStream[MockedCommitment] { diff --git a/pkg/testsuite/testsuite.go b/pkg/testsuite/testsuite.go index db79374..81400c1 100644 --- a/pkg/testsuite/testsuite.go +++ b/pkg/testsuite/testsuite.go @@ -28,8 +28,7 @@ type TestSuite struct { server *mqtt.Server mockedStreamListenToBlocks *MockedStream[MockedBlock] - mockedStreamListenToAcceptedBlocks *MockedStream[inx.BlockMetadata] - mockedStreamListenToConfirmedBlocks *MockedStream[inx.BlockMetadata] + mockedStreamListenToBlockMetadata *MockedStream[inx.BlockMetadata] mockedStreamListenToCommitments *MockedStream[MockedCommitment] mockedStreamListenToLedgerUpdates *MockedStream[nodebridge.LedgerUpdate] mockedStreamListenToAcceptedTransactions *MockedStream[nodebridge.AcceptedTransaction] @@ -61,8 +60,7 @@ func NewTestSuite(t *testing.T) *TestSuite { server: server, mockedStreamListenToBlocks: bridge.MockListenToBlocks(), - mockedStreamListenToAcceptedBlocks: bridge.MockListenToAcceptedBlocks(), - mockedStreamListenToConfirmedBlocks: bridge.MockListenToConfirmedBlocks(), + mockedStreamListenToBlockMetadata: bridge.MockListenToBlockMetadata(), mockedStreamListenToCommitments: bridge.MockListenToCommitments(), mockedStreamListenToLedgerUpdates: bridge.MockListenToLedgerUpdates(), mockedStreamListenToAcceptedTransactions: bridge.MockListenToAcceptedTransactions(), @@ -90,8 +88,7 @@ func (ts *TestSuite) Reset() { ts.broker.MockClear() ts.mockedStreamListenToBlocks = ts.nodeBridge.MockListenToBlocks() - ts.mockedStreamListenToAcceptedBlocks = ts.nodeBridge.MockListenToAcceptedBlocks() - ts.mockedStreamListenToConfirmedBlocks = ts.nodeBridge.MockListenToConfirmedBlocks() + ts.mockedStreamListenToBlockMetadata = ts.nodeBridge.MockListenToBlockMetadata() ts.mockedStreamListenToCommitments = ts.nodeBridge.MockListenToCommitments() ts.mockedStreamListenToLedgerUpdates = ts.nodeBridge.MockListenToLedgerUpdates() ts.mockedStreamListenToAcceptedTransactions = ts.nodeBridge.MockListenToAcceptedTransactions() @@ -137,12 +134,8 @@ func (ts *TestSuite) ReceiveBlock(block *MockedBlock) { ts.mockedStreamListenToBlocks.Receive(block) } -func (ts *TestSuite) ReceiveAcceptedBlock(metadata *inx.BlockMetadata) { - ts.mockedStreamListenToAcceptedBlocks.Receive(metadata) -} - -func (ts *TestSuite) ReceiveConfirmedBlock(metadata *inx.BlockMetadata) { - ts.mockedStreamListenToConfirmedBlocks.Receive(metadata) +func (ts *TestSuite) ReceiveBlockMetadata(metadata *inx.BlockMetadata) { + ts.mockedStreamListenToBlockMetadata.Receive(metadata) } func (ts *TestSuite) ReceiveCommitment(commitment *MockedCommitment) { diff --git a/tools/gendoc/go.mod b/tools/gendoc/go.mod index 969196e..fcba410 100644 --- a/tools/gendoc/go.mod +++ b/tools/gendoc/go.mod @@ -43,8 +43,8 @@ require ( github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20240419094509-31dbb7270ad9 // indirect github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9 // indirect github.com/iotaledger/hive.go/web v0.0.0-20240216135101-261e99d9d84a // indirect - github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66 // indirect - github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d // indirect + github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850 // indirect + github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63 // indirect github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61 // indirect github.com/knadh/koanf v1.5.0 // indirect github.com/kr/text v0.2.0 // indirect diff --git a/tools/gendoc/go.sum b/tools/gendoc/go.sum index 877457f..303a5fb 100644 --- a/tools/gendoc/go.sum +++ b/tools/gendoc/go.sum @@ -218,10 +218,10 @@ github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9 h1:NJ github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9/go.mod h1:O4p7UmsfoeLqtAUwrKbq0lXMxjY/MLQSpZSavvvvGig= github.com/iotaledger/hive.go/web v0.0.0-20240216135101-261e99d9d84a h1:aqMFY9c+VMevIB/Hc3Om9OZsXViNAbl4xrF8yTiYn0I= github.com/iotaledger/hive.go/web v0.0.0-20240216135101-261e99d9d84a/go.mod h1:qDs633y8GLKk9HegRrvuD2edmyCbbkgiluqrG6ko0yo= -github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66 h1:28gH76448EjukxCz1H0OIbM0Yeoq0HP2jk4+v1tDcWQ= -github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66/go.mod h1:aQWBB1p5CLWKFWBTXB6TwSGZu3piuNHTjhWYyE3H22I= -github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d h1:aTLIfyVtJHLMKgYEUY0tPNBv+B522JZbttH1DslX2ck= -github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d/go.mod h1:YYko1kTtJgfETXQqWHgJkHQv6gGYGDxjnwDC6FbXxic= +github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850 h1:PBFs3UuwpCdd7jqHozVx2/UMJCQ6fwZeIzkedv1bum4= +github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850/go.mod h1:kk+TNI0FkHRkSHuLXMkAmnbdxZjmizZgVo1vE2fXXJ8= +github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63 h1:vt8LvpthPv2iVgIDzHN0N3Gee5+KEmqm/3eeF5G6hyA= +github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63/go.mod h1:YYko1kTtJgfETXQqWHgJkHQv6gGYGDxjnwDC6FbXxic= github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61 h1:vC1YXh2b8WleeAJvqf76PtBDvOXNIaI2Xdn0eLi2YFU= github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61/go.mod h1:ui1VcUuBHzN4eO0VR89UKqkiYR443VsznFMJgY2YRUQ= github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= From e8f6b6347e3285b7d74d1d6e08587106c6e1e921 Mon Sep 17 00:00:00 2001 From: muXxer Date: Wed, 24 Apr 2024 09:52:44 +0200 Subject: [PATCH 2/2] Merge `listenToAcceptedBlocksMetadata` and `listenToConfirmedBlocksMetadata` --- pkg/mqtt/server.go | 61 +++++++++++++++++----------------------------- 1 file changed, 23 insertions(+), 38 deletions(-) diff --git a/pkg/mqtt/server.go b/pkg/mqtt/server.go index 79409c3..3da2805 100644 --- a/pkg/mqtt/server.go +++ b/pkg/mqtt/server.go @@ -27,8 +27,7 @@ const ( const ( GrpcListenToBlocks = "INX.ListenToBlocks" - GrpcListenToAcceptedBlocks = "INX.ListenToAcceptedBlocks" - GrpcListenToConfirmedBlocks = "INX.ListenToConfirmedBlocks" + GrpcListenToBlockMetadata = "INX.ListenToBlockMetadata" GrpcListenToAcceptedTransactions = "INX.ListenToAcceptedTransactions" GrpcListenToLedgerUpdates = "INX.ListenToLedgerUpdates" ) @@ -216,10 +215,10 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st s.startListenIfNeeded(ctx, GrpcListenToBlocks, s.listenToBlocks) case api.EventAPITopicBlockMetadataAccepted: - s.startListenIfNeeded(ctx, GrpcListenToAcceptedBlocks, s.listenToAcceptedBlocksMetadata) + s.startListenIfNeeded(ctx, GrpcListenToBlockMetadata, s.listenToBlockMetadata) case api.EventAPITopicBlockMetadataConfirmed: - s.startListenIfNeeded(ctx, GrpcListenToConfirmedBlocks, s.listenToConfirmedBlocksMetadata) + s.startListenIfNeeded(ctx, GrpcListenToBlockMetadata, s.listenToBlockMetadata) default: switch { @@ -234,8 +233,7 @@ func (s *Server) onSubscribeTopic(ctx context.Context, clientID string, topic st // so it must be a blockID if blockID := BlockIDFromBlockMetadataTopic(topic); !blockID.Empty() { // start listening to accepted and confirmed blocks if not already done to get state updates for that blockID - s.startListenIfNeeded(ctx, GrpcListenToAcceptedBlocks, s.listenToAcceptedBlocksMetadata) - s.startListenIfNeeded(ctx, GrpcListenToConfirmedBlocks, s.listenToConfirmedBlocksMetadata) + s.startListenIfNeeded(ctx, GrpcListenToBlockMetadata, s.listenToBlockMetadata) go s.fetchAndPublishBlockMetadata(ctx, blockID) } @@ -286,10 +284,10 @@ func (s *Server) onUnsubscribeTopic(clientID string, topic string) { s.stopListenIfNeeded(GrpcListenToBlocks) case api.EventAPITopicBlockMetadataAccepted: - s.stopListenIfNeeded(GrpcListenToAcceptedBlocks) + s.stopListenIfNeeded(GrpcListenToBlockMetadata) case api.EventAPITopicBlockMetadataConfirmed: - s.stopListenIfNeeded(GrpcListenToConfirmedBlocks) + s.stopListenIfNeeded(GrpcListenToBlockMetadata) default: switch { @@ -301,8 +299,7 @@ func (s *Server) onUnsubscribeTopic(clientID string, topic string) { case strings.HasPrefix(topic, "block-metadata/"): // topicBlockMetadata // it can't be topicBlockMetadataAccepted or topicBlockMetadataConfirmed because they are handled above - s.stopListenIfNeeded(GrpcListenToAcceptedBlocks) - s.stopListenIfNeeded(GrpcListenToConfirmedBlocks) + s.stopListenIfNeeded(GrpcListenToBlockMetadata) case strings.HasPrefix(topic, "outputs/") || strings.HasPrefix(topic, "transactions/") || strings.HasPrefix(topic, "transaction-metadata/"): // topicOutputs @@ -407,35 +404,23 @@ func (s *Server) listenToBlocks(ctx context.Context) error { }) } -func (s *Server) listenToAcceptedBlocksMetadata(ctx context.Context) error { +func (s *Server) listenToBlockMetadata(ctx context.Context) error { return s.NodeBridge.ListenToBlockMetadata(ctx, func(blockMetadata *api.BlockMetadataResponse) error { - if blockMetadata.BlockState != api.BlockStateAccepted { - return nil - } - - if err := s.publishBlockMetadataOnTopicsIfSubscribed(func() (*api.BlockMetadataResponse, error) { return blockMetadata, nil }, - api.EventAPITopicBlockMetadataAccepted, - GetTopicBlockMetadata(blockMetadata.BlockID), - ); err != nil { - s.LogErrorf("failed to publish accepted block metadata: %v", err) - } - - // we don't return an error here, because we want to continue listening even if publishing fails once - return nil - }) -} - -func (s *Server) listenToConfirmedBlocksMetadata(ctx context.Context) error { - return s.NodeBridge.ListenToBlockMetadata(ctx, func(blockMetadata *api.BlockMetadataResponse) error { - if blockMetadata.BlockState != api.BlockStateConfirmed { - return nil - } - - if err := s.publishBlockMetadataOnTopicsIfSubscribed(func() (*api.BlockMetadataResponse, error) { return blockMetadata, nil }, - api.EventAPITopicBlockMetadataConfirmed, - GetTopicBlockMetadata(blockMetadata.BlockID), - ); err != nil { - s.LogErrorf("failed to publish confirmed block metadata: %v", err) + switch blockMetadata.BlockState { + case api.BlockStateAccepted: + if err := s.publishBlockMetadataOnTopicsIfSubscribed(func() (*api.BlockMetadataResponse, error) { return blockMetadata, nil }, + api.EventAPITopicBlockMetadataAccepted, + GetTopicBlockMetadata(blockMetadata.BlockID), + ); err != nil { + s.LogErrorf("failed to publish accepted block metadata: %v", err) + } + case api.BlockStateConfirmed: + if err := s.publishBlockMetadataOnTopicsIfSubscribed(func() (*api.BlockMetadataResponse, error) { return blockMetadata, nil }, + api.EventAPITopicBlockMetadataConfirmed, + GetTopicBlockMetadata(blockMetadata.BlockID), + ); err != nil { + s.LogErrorf("failed to publish confirmed block metadata: %v", err) + } } // we don't return an error here, because we want to continue listening even if publishing fails once