diff --git a/api/groups/interface.go b/api/groups/interface.go index 2267421..acd2375 100644 --- a/api/groups/interface.go +++ b/api/groups/interface.go @@ -10,8 +10,7 @@ import ( // EventsFacadeHandler defines the behavior of a facade handler needed for events group type EventsFacadeHandler interface { - HandlePushEventsV2(events data.ArgsSaveBlockData) error - HandlePushEventsV1(events data.SaveBlockData) error + HandlePushEvents(events data.ArgsSaveBlockData) error HandleRevertEvents(revertBlock data.RevertBlock) HandleFinalizedEvents(finalizedBlock data.FinalizedBlock) GetConnectorUserAndPass() (string, string) diff --git a/api/shared/interface.go b/api/shared/interface.go index 84d53db..da4fc4d 100644 --- a/api/shared/interface.go +++ b/api/shared/interface.go @@ -24,8 +24,7 @@ type GroupHandler interface { // FacadeHandler defines the behavior of a notifier base facade handler type FacadeHandler interface { - HandlePushEventsV2(events data.ArgsSaveBlockData) error - HandlePushEventsV1(events data.SaveBlockData) error + HandlePushEvents(events data.ArgsSaveBlockData) error HandleRevertEvents(revertBlock data.RevertBlock) HandleFinalizedEvents(finalizedBlock data.FinalizedBlock) GetConnectorUserAndPass() (string, string) diff --git a/facade/errors.go b/facade/errors.go index dd16a5d..69c384f 100644 --- a/facade/errors.go +++ b/facade/errors.go @@ -7,6 +7,3 @@ var ErrNilEventsHandler = errors.New("nil events handler") // ErrNilWSHandler signals that a nil websocket handler was provided var ErrNilWSHandler = errors.New("nil websocket handler") - -// ErrNilEventsInterceptor signals that a nil events interceptor was provided -var ErrNilEventsInterceptor = errors.New("nil events interceptor") diff --git a/facade/interface.go b/facade/interface.go index bfb20ae..7507dca 100644 --- a/facade/interface.go +++ b/facade/interface.go @@ -8,12 +8,9 @@ import ( // EventsHandler defines the behavior of an events handler component. // This will handle push events from observer node. type EventsHandler interface { - HandlePushEvents(events data.BlockEvents) error + HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) error HandleRevertEvents(revertBlock data.RevertBlock) HandleFinalizedEvents(finalizedBlock data.FinalizedBlock) - HandleBlockTxs(blockTxs data.BlockTxs) - HandleBlockScrs(blockScrs data.BlockScrs) - HandleBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) IsInterfaceNil() bool } diff --git a/facade/notifierFacade.go b/facade/notifierFacade.go index 8fc9095..de3230a 100644 --- a/facade/notifierFacade.go +++ b/facade/notifierFacade.go @@ -18,16 +18,14 @@ type ArgsNotifierFacade struct { APIConfig config.ConnectorApiConfig EventsHandler EventsHandler WSHandler dispatcher.WSHandler - EventsInterceptor EventsInterceptor StatusMetricsHandler common.StatusMetricsHandler } type notifierFacade struct { - config config.ConnectorApiConfig - eventsHandler EventsHandler - wsHandler dispatcher.WSHandler - eventsInterceptor EventsInterceptor - statusMetrics common.StatusMetricsHandler + config config.ConnectorApiConfig + eventsHandler EventsHandler + wsHandler dispatcher.WSHandler + statusMetrics common.StatusMetricsHandler } // NewNotifierFacade creates a new notifier facade instance @@ -38,11 +36,10 @@ func NewNotifierFacade(args ArgsNotifierFacade) (*notifierFacade, error) { } return ¬ifierFacade{ - eventsHandler: args.EventsHandler, - config: args.APIConfig, - wsHandler: args.WSHandler, - eventsInterceptor: args.EventsInterceptor, - statusMetrics: args.StatusMetricsHandler, + eventsHandler: args.EventsHandler, + config: args.APIConfig, + wsHandler: args.WSHandler, + statusMetrics: args.StatusMetricsHandler, }, nil } @@ -53,9 +50,6 @@ func checkArgs(args ArgsNotifierFacade) error { if check.IfNil(args.WSHandler) { return ErrNilWSHandler } - if check.IfNil(args.EventsInterceptor) { - return ErrNilEventsInterceptor - } if check.IfNil(args.StatusMetricsHandler) { return common.ErrNilStatusMetricsHandler } @@ -63,76 +57,10 @@ func checkArgs(args ArgsNotifierFacade) error { return nil } -// HandlePushEventsV2 will handle push events received from observer -// It splits block data and handles log, txs and srcs events separately -func (nf *notifierFacade) HandlePushEventsV2(allEvents data.ArgsSaveBlockData) error { - eventsData, err := nf.eventsInterceptor.ProcessBlockEvents(&allEvents) - if err != nil { - return err - } - - pushEvents := data.BlockEvents{ - Hash: eventsData.Hash, - ShardID: eventsData.Header.GetShardID(), - TimeStamp: eventsData.Header.GetTimeStamp(), - Events: eventsData.LogEvents, - } - err = nf.eventsHandler.HandlePushEvents(pushEvents) - if err != nil { - return err - } - - txs := data.BlockTxs{ - Hash: eventsData.Hash, - Txs: eventsData.Txs, - } - nf.eventsHandler.HandleBlockTxs(txs) - - scrs := data.BlockScrs{ - Hash: eventsData.Hash, - Scrs: eventsData.Scrs, - } - nf.eventsHandler.HandleBlockScrs(scrs) - - txsWithOrder := data.BlockEventsWithOrder{ - Hash: eventsData.Hash, - ShardID: eventsData.Header.GetShardID(), - TimeStamp: eventsData.Header.GetTimeStamp(), - Txs: eventsData.TxsWithOrder, - Scrs: eventsData.ScrsWithOrder, - Events: eventsData.LogEvents, - } - nf.eventsHandler.HandleBlockEventsWithOrder(txsWithOrder) - - return nil -} - -// HandlePushEventsV1 will handle push events received from observer +// HandlePushEvents will handle push events received from observer // It splits block data and handles log, txs and srcs events separately -// TODO: remove this implementation -func (nf *notifierFacade) HandlePushEventsV1(eventsData data.SaveBlockData) error { - pushEvents := data.BlockEvents{ - Hash: eventsData.Hash, - Events: eventsData.LogEvents, - } - err := nf.eventsHandler.HandlePushEvents(pushEvents) - if err != nil { - return err - } - - txs := data.BlockTxs{ - Hash: eventsData.Hash, - Txs: eventsData.Txs, - } - nf.eventsHandler.HandleBlockTxs(txs) - - scrs := data.BlockScrs{ - Hash: eventsData.Hash, - Scrs: eventsData.Scrs, - } - nf.eventsHandler.HandleBlockScrs(scrs) - - return nil +func (nf *notifierFacade) HandlePushEvents(allEvents data.ArgsSaveBlockData) error { + return nf.eventsHandler.HandleSaveBlockEvents(allEvents) } // HandleRevertEvents will handle revents events received from observer diff --git a/facade/notifierFacade_test.go b/facade/notifierFacade_test.go index 42731d2..577fdbb 100644 --- a/facade/notifierFacade_test.go +++ b/facade/notifierFacade_test.go @@ -1,7 +1,6 @@ package facade_test import ( - "errors" "net/http" "net/http/httptest" "testing" @@ -9,7 +8,6 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" - "github.com/multiversx/mx-chain-core-go/data/smartContractResult" "github.com/multiversx/mx-chain-core-go/data/transaction" "github.com/multiversx/mx-chain-notifier-go/common" "github.com/multiversx/mx-chain-notifier-go/config" @@ -25,7 +23,6 @@ func createMockFacadeArgs() facade.ArgsNotifierFacade { EventsHandler: &mocks.EventsHandlerStub{}, APIConfig: config.ConnectorApiConfig{}, WSHandler: &mocks.WSHandlerStub{}, - EventsInterceptor: &mocks.EventsInterceptorStub{}, StatusMetricsHandler: &mocks.StatusMetricsStub{}, } } @@ -55,17 +52,6 @@ func TestNewNotifierFacade(t *testing.T) { require.Equal(t, facade.ErrNilWSHandler, err) }) - t.Run("nil events interceptor", func(t *testing.T) { - t.Parallel() - - args := createMockFacadeArgs() - args.EventsInterceptor = nil - - f, err := facade.NewNotifierFacade(args) - require.True(t, check.IfNil(f)) - require.Equal(t, facade.ErrNilEventsInterceptor, err) - }) - t.Run("nil status metrics handler", func(t *testing.T) { t.Parallel() @@ -90,29 +76,6 @@ func TestNewNotifierFacade(t *testing.T) { func TestHandlePushEvents(t *testing.T) { t.Parallel() - t.Run("process block events error, should fail", func(t *testing.T) { - t.Parallel() - - args := createMockFacadeArgs() - - expectedErr := errors.New("expected error") - args.EventsInterceptor = &mocks.EventsInterceptorStub{ - ProcessBlockEventsCalled: func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { - return nil, expectedErr - }, - } - - facade, err := facade.NewNotifierFacade(args) - require.Nil(t, err) - - blockData := data.ArgsSaveBlockData{ - HeaderHash: []byte("blockHash"), - Header: &block.HeaderV2{}, - } - err = facade.HandlePushEventsV2(blockData) - require.Equal(t, expectedErr, err) - }) - t.Run("should work", func(t *testing.T) { t.Parallel() @@ -127,138 +90,29 @@ func TestHandlePushEvents(t *testing.T) { ExecutionOrder: 1, }, } - scrs := map[string]*outport.SCRInfo{ - "hash2": { - SmartContractResult: &smartContractResult.SmartContractResult{ - Nonce: 2, - }, - }, - } - logData := []*outport.LogData{ - { - Log: &transaction.Log{ - Address: []byte("logaddr1"), - Events: []*transaction.Event{}, - }, - TxHash: "logHash1", - }, - } - - logEvents := []data.Event{ - { - Address: "addr1", - }, - } - - header := &block.HeaderV2{ - Header: &block.Header{ - ShardID: 2, - }, - } blockData := data.ArgsSaveBlockData{ HeaderHash: []byte(blockHash), TransactionsPool: &outport.TransactionPool{ - Transactions: txs, - SmartContractResults: scrs, - Logs: logData, + Transactions: txs, }, Header: &block.HeaderV2{}, } - expTxs := map[string]*transaction.Transaction{ - "hash1": { - Nonce: 1, - }, - } - expScrs := map[string]*smartContractResult.SmartContractResult{ - "hash2": { - Nonce: 2, - }, - } - - expTxsData := data.BlockTxs{ - Hash: blockHash, - Txs: expTxs, - } - expScrsData := data.BlockScrs{ - Hash: blockHash, - Scrs: expScrs, - } - expLogEvents := data.BlockEvents{ - Hash: blockHash, - Events: logEvents, - ShardID: 2, - } - - expTxsWithOrder := map[string]*outport.TxInfo{ - "hash1": { - Transaction: &transaction.Transaction{ - Nonce: 1, - }, - ExecutionOrder: 1, - }, - } - expScrsWithOrder := map[string]*outport.SCRInfo{ - "hash2": { - SmartContractResult: &smartContractResult.SmartContractResult{ - Nonce: 2, - }, - }, - } - expTxsWithOrderData := data.BlockEventsWithOrder{ - Hash: blockHash, - ShardID: 2, - Txs: expTxsWithOrder, - Scrs: expScrsWithOrder, - Events: logEvents, - } - - pushWasCalled := false - txsWasCalled := false - scrsWasCalled := false - blockEventsWithOrderWasCalled := false + saveBlockWasCalled := false args.EventsHandler = &mocks.EventsHandlerStub{ - HandlePushEventsCalled: func(events data.BlockEvents) error { - pushWasCalled = true - assert.Equal(t, expLogEvents, events) + HandleSaveBlockEventsCalled: func(allEvents data.ArgsSaveBlockData) error { + saveBlockWasCalled = true return nil }, - HandleBlockTxsCalled: func(blockTxs data.BlockTxs) { - txsWasCalled = true - assert.Equal(t, expTxsData, blockTxs) - }, - HandleBlockScrsCalled: func(blockScrs data.BlockScrs) { - scrsWasCalled = true - assert.Equal(t, expScrsData, blockScrs) - }, - HandleBlockEventsWithOrderCalled: func(blockTxs data.BlockEventsWithOrder) { - blockEventsWithOrderWasCalled = true - assert.Equal(t, expTxsWithOrderData, blockTxs) - }, - } - args.EventsInterceptor = &mocks.EventsInterceptorStub{ - ProcessBlockEventsCalled: func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { - return &data.InterceptorBlockData{ - Hash: blockHash, - Header: header, - Txs: expTxs, - Scrs: expScrs, - LogEvents: logEvents, - TxsWithOrder: expTxsWithOrder, - ScrsWithOrder: expScrsWithOrder, - }, nil - }, } facade, err := facade.NewNotifierFacade(args) require.Nil(t, err) - facade.HandlePushEventsV2(blockData) + err = facade.HandlePushEvents(blockData) + require.Nil(t, err) - assert.True(t, pushWasCalled) - assert.True(t, txsWasCalled) - assert.True(t, scrsWasCalled) - assert.True(t, blockEventsWithOrderWasCalled) + assert.True(t, saveBlockWasCalled) }) } diff --git a/factory/processFactory.go b/factory/processFactory.go index 49f4325..9fd41f6 100644 --- a/factory/processFactory.go +++ b/factory/processFactory.go @@ -16,30 +16,6 @@ var log = logger.GetOrCreate("factory") const bech32PubkeyConverterType = "bech32" -// ArgsEventsHandlerFactory defines the arguments needed for events handler creation -type ArgsEventsHandlerFactory struct { - CheckDuplicates bool - Locker process.LockService - Publisher process.Publisher - StatusMetricsHandler common.StatusMetricsHandler -} - -// CreateEventsHandler will create an events handler processor -func CreateEventsHandler(args ArgsEventsHandlerFactory) (process.EventsHandler, error) { - argsEventsHandler := process.ArgsEventsHandler{ - Locker: args.Locker, - Publisher: args.Publisher, - StatusMetricsHandler: args.StatusMetricsHandler, - CheckDuplicates: args.CheckDuplicates, - } - eventsHandler, err := process.NewEventsHandler(argsEventsHandler) - if err != nil { - return nil, err - } - - return eventsHandler, nil -} - // CreateEventsInterceptor will create the events interceptor func CreateEventsInterceptor(cfg config.GeneralConfig) (process.EventsInterceptor, error) { pubKeyConverter, err := getPubKeyConverter(cfg) @@ -63,7 +39,8 @@ func getPubKeyConverter(cfg config.GeneralConfig) (core.PubkeyConverter, error) } } -func createPayloadHandler(marshaller marshal.Marshalizer, facade process.EventsFacadeHandler) (websocket.PayloadHandler, error) { +// CreatePayloadHandler will create a new instance of payload handler +func CreatePayloadHandler(marshaller marshal.Marshalizer, facade process.EventsFacadeHandler) (websocket.PayloadHandler, error) { dataPreProcessorArgs := preprocess.ArgsEventsPreProcessor{ Marshaller: marshaller, Facade: facade, diff --git a/factory/webFactory.go b/factory/webFactory.go index e4d4d75..521db4f 100644 --- a/factory/webFactory.go +++ b/factory/webFactory.go @@ -14,7 +14,7 @@ func CreateWebServerHandler(facade shared.FacadeHandler, configs config.Configs) return nil, err } - payloadHandler, err := createPayloadHandler(marshaller, facade) + payloadHandler, err := CreatePayloadHandler(marshaller, facade) if err != nil { return nil, err } diff --git a/factory/wsFactory.go b/factory/wsFactory.go index c0e7249..4122a1d 100644 --- a/factory/wsFactory.go +++ b/factory/wsFactory.go @@ -70,7 +70,7 @@ func createWsObsConnector( return nil, err } - payloadHandler, err := createPayloadHandler(marshaller, facade) + payloadHandler, err := CreatePayloadHandler(marshaller, facade) if err != nil { return nil, err } diff --git a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go index 18ab63c..0c9f97e 100644 --- a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go +++ b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go @@ -21,10 +21,6 @@ import ( var log = logger.GetOrCreate("integrationTests/rabbitmq") func TestNotifierWithRabbitMQ(t *testing.T) { - t.Run("with http observer connnector + payload version 0", func(t *testing.T) { - testNotifierWithRabbitMQ(t, common.HTTPConnectorType, common.PayloadV0) - }) - t.Run("with http observer connnector", func(t *testing.T) { testNotifierWithRabbitMQ(t, common.HTTPConnectorType, common.PayloadV1) }) @@ -40,7 +36,7 @@ func testNotifierWithRabbitMQ(t *testing.T, observerType string, payloadVersion notifier, err := integrationTests.NewTestNotifierWithRabbitMq(cfg.MainConfig) require.Nil(t, err) - client, err := integrationTests.CreateObserverConnector(notifier.Facade, observerType, common.MessageQueuePublisherType, common.PayloadV1) + client, err := integrationTests.CreateObserverConnector(notifier.Facade, observerType, common.MessageQueuePublisherType, payloadVersion) require.Nil(t, err) _ = notifier.Publisher.Run() @@ -59,7 +55,7 @@ func testNotifierWithRabbitMQ(t *testing.T, observerType string, payloadVersion integrationTests.WaitTimeout(t, wg, time.Second*2) - assert.Equal(t, 6, len(notifier.RedisClient.GetEntries())) + assert.Equal(t, 3, len(notifier.RedisClient.GetEntries())) assert.Equal(t, 6, len(notifier.RabbitMQClient.GetEntries())) } diff --git a/integrationTests/testNotifierProxy.go b/integrationTests/testNotifierProxy.go index a98cfe9..0a0c839 100644 --- a/integrationTests/testNotifierProxy.go +++ b/integrationTests/testNotifierProxy.go @@ -54,11 +54,20 @@ func NewTestNotifierWithWS(cfg config.MainConfig) (*testNotifier, error) { statusMetricsHandler := metrics.NewStatusMetrics() + eventsInterceptorArgs := process.ArgsEventsInterceptor{ + PubKeyConverter: &mocks.PubkeyConverterMock{}, + } + eventsInterceptor, err := process.NewEventsInterceptor(eventsInterceptorArgs) + if err != nil { + return nil, err + } + argsEventsHandler := process.ArgsEventsHandler{ Locker: locker, Publisher: publisher, StatusMetricsHandler: statusMetricsHandler, CheckDuplicates: cfg.General.CheckDuplicates, + EventsInterceptor: eventsInterceptor, } eventsHandler, err := process.NewEventsHandler(argsEventsHandler) if err != nil { @@ -79,19 +88,10 @@ func NewTestNotifierWithWS(cfg config.MainConfig) (*testNotifier, error) { return nil, err } - eventsInterceptorArgs := process.ArgsEventsInterceptor{ - PubKeyConverter: &mocks.PubkeyConverterMock{}, - } - eventsInterceptor, err := process.NewEventsInterceptor(eventsInterceptorArgs) - if err != nil { - return nil, err - } - facadeArgs := facade.ArgsNotifierFacade{ EventsHandler: eventsHandler, APIConfig: cfg.ConnectorApi, WSHandler: wsHandler, - EventsInterceptor: eventsInterceptor, StatusMetricsHandler: statusMetricsHandler, } facade, err := facade.NewNotifierFacade(facadeArgs) @@ -139,31 +139,31 @@ func NewTestNotifierWithRabbitMq(cfg config.MainConfig) (*testNotifier, error) { return nil, err } + eventsInterceptorArgs := process.ArgsEventsInterceptor{ + PubKeyConverter: &mocks.PubkeyConverterMock{}, + } + eventsInterceptor, err := process.NewEventsInterceptor(eventsInterceptorArgs) + if err != nil { + return nil, err + } + argsEventsHandler := process.ArgsEventsHandler{ Locker: locker, Publisher: publisher, StatusMetricsHandler: statusMetricsHandler, CheckDuplicates: cfg.General.CheckDuplicates, + EventsInterceptor: eventsInterceptor, } eventsHandler, err := process.NewEventsHandler(argsEventsHandler) if err != nil { return nil, err } - eventsInterceptorArgs := process.ArgsEventsInterceptor{ - PubKeyConverter: &mocks.PubkeyConverterMock{}, - } - eventsInterceptor, err := process.NewEventsInterceptor(eventsInterceptorArgs) - if err != nil { - return nil, err - } - wsHandler := &disabled.WSHandler{} facadeArgs := facade.ArgsNotifierFacade{ EventsHandler: eventsHandler, APIConfig: cfg.ConnectorApi, WSHandler: wsHandler, - EventsInterceptor: eventsInterceptor, StatusMetricsHandler: statusMetricsHandler, } facade, err := facade.NewNotifierFacade(facadeArgs) diff --git a/integrationTests/testObserverConnector.go b/integrationTests/testObserverConnector.go index 7987b99..a550fb3 100644 --- a/integrationTests/testObserverConnector.go +++ b/integrationTests/testObserverConnector.go @@ -15,26 +15,12 @@ import ( "github.com/multiversx/mx-chain-notifier-go/common" "github.com/multiversx/mx-chain-notifier-go/config" "github.com/multiversx/mx-chain-notifier-go/factory" - "github.com/multiversx/mx-chain-notifier-go/process" - "github.com/multiversx/mx-chain-notifier-go/process/preprocess" ) // CreateObserverConnector will create observer connector component func CreateObserverConnector(facade shared.FacadeHandler, connType string, apiType string, payloadVersion uint32) (ObserverConnector, error) { marshaller := &marshal.JsonMarshalizer{} - preProcessorArgs := preprocess.ArgsEventsPreProcessor{ - Marshaller: marshaller, - Facade: facade, - } - - eventsProcessors := make(map[uint32]process.DataProcessor) - dataPreProcessor, err := preprocess.NewEventsPreProcessorV1(preProcessorArgs) - if err != nil { - return nil, err - } - - eventsProcessors[payloadVersion] = dataPreProcessor - payloadHandler, err := process.NewPayloadHandler(eventsProcessors) + payloadHandler, err := factory.CreatePayloadHandler(marshaller, facade) if err != nil { return nil, err } diff --git a/integrationTests/websocket/testNotifierWithWebsockets_test.go b/integrationTests/websocket/testNotifierWithWebsockets_test.go index 128df49..a546161 100644 --- a/integrationTests/websocket/testNotifierWithWebsockets_test.go +++ b/integrationTests/websocket/testNotifierWithWebsockets_test.go @@ -715,6 +715,6 @@ func testNotifierWithWebsockets_AllEvents(t *testing.T, observerType string) { integrationTests.WaitTimeout(t, wg, time.Second*4) - assert.Equal(t, numEvents, len(notifier.RedisClient.GetEntries())) - assert.Equal(t, numEvents, len(notifier.RedisClient.GetEntries())) + expectedNumRedisEvents := 3 // one redis event per push, revert, finalized + assert.Equal(t, expectedNumRedisEvents, len(notifier.RedisClient.GetEntries())) } diff --git a/mocks/eventsHandlerStub.go b/mocks/eventsHandlerStub.go index e7da9d0..ab00d34 100644 --- a/mocks/eventsHandlerStub.go +++ b/mocks/eventsHandlerStub.go @@ -4,18 +4,15 @@ import "github.com/multiversx/mx-chain-notifier-go/data" // EventsHandlerStub implements EventsHandler interface type EventsHandlerStub struct { - HandlePushEventsCalled func(events data.BlockEvents) error - HandleRevertEventsCalled func(revertBlock data.RevertBlock) - HandleFinalizedEventsCalled func(finalizedBlock data.FinalizedBlock) - HandleBlockTxsCalled func(blockTxs data.BlockTxs) - HandleBlockScrsCalled func(blockScrs data.BlockScrs) - HandleBlockEventsWithOrderCalled func(blockTxs data.BlockEventsWithOrder) + HandleSaveBlockEventsCalled func(allEvents data.ArgsSaveBlockData) error + HandleRevertEventsCalled func(revertBlock data.RevertBlock) + HandleFinalizedEventsCalled func(finalizedBlock data.FinalizedBlock) } -// HandlePushEvents - -func (e *EventsHandlerStub) HandlePushEvents(events data.BlockEvents) error { - if e.HandlePushEventsCalled != nil { - return e.HandlePushEventsCalled(events) +// HandleSaveBlockEvents - +func (e *EventsHandlerStub) HandleSaveBlockEvents(events data.ArgsSaveBlockData) error { + if e.HandleSaveBlockEventsCalled != nil { + return e.HandleSaveBlockEventsCalled(events) } return nil @@ -35,27 +32,6 @@ func (e *EventsHandlerStub) HandleFinalizedEvents(finalizedBlock data.FinalizedB } } -// HandleBlockTxs - -func (e *EventsHandlerStub) HandleBlockTxs(blockTxs data.BlockTxs) { - if e.HandleBlockTxsCalled != nil { - e.HandleBlockTxsCalled(blockTxs) - } -} - -// HandleBlockScrs - -func (e *EventsHandlerStub) HandleBlockScrs(blockScrs data.BlockScrs) { - if e.HandleBlockScrsCalled != nil { - e.HandleBlockScrsCalled(blockScrs) - } -} - -// HandleBlockEventsWithOrder - -func (e *EventsHandlerStub) HandleBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { - if e.HandleBlockEventsWithOrderCalled != nil { - e.HandleBlockEventsWithOrderCalled(blockTxs) - } -} - // IsInterfaceNil - func (e *EventsHandlerStub) IsInterfaceNil() bool { return e == nil diff --git a/mocks/facadeStub.go b/mocks/facadeStub.go index 5e20751..c8c1175 100644 --- a/mocks/facadeStub.go +++ b/mocks/facadeStub.go @@ -8,8 +8,7 @@ import ( // FacadeStub implements FacadeHandler interface type FacadeStub struct { - HandlePushEventsV2Called func(events data.ArgsSaveBlockData) error - HandlePushEventsV1Called func(eventsData data.SaveBlockData) error + HandlePushEventsCalled func(events data.ArgsSaveBlockData) error HandleRevertEventsCalled func(events data.RevertBlock) HandleFinalizedEventsCalled func(events data.FinalizedBlock) ServeCalled func(w http.ResponseWriter, r *http.Request) @@ -18,19 +17,10 @@ type FacadeStub struct { GetMetricsForPrometheusCalled func() string } -// HandlePushEventsV2 - -func (fs *FacadeStub) HandlePushEventsV2(events data.ArgsSaveBlockData) error { - if fs.HandlePushEventsV2Called != nil { - return fs.HandlePushEventsV2Called(events) - } - - return nil -} - -// HandlePushEventsV1 - -func (fs *FacadeStub) HandlePushEventsV1(events data.SaveBlockData) error { - if fs.HandlePushEventsV1Called != nil { - return fs.HandlePushEventsV1Called(events) +// HandlePushEvents - +func (fs *FacadeStub) HandlePushEvents(events data.ArgsSaveBlockData) error { + if fs.HandlePushEventsCalled != nil { + return fs.HandlePushEventsCalled(events) } return nil diff --git a/mocks/rabbitMqClientMock.go b/mocks/rabbitMqClientMock.go index 6f6389b..65242d9 100644 --- a/mocks/rabbitMqClientMock.go +++ b/mocks/rabbitMqClientMock.go @@ -54,8 +54,8 @@ func (rc *RabbitClientMock) ReopenChannel() { // GetEntries - func (rc *RabbitClientMock) GetEntries() map[string]amqp.Publishing { - rc.mut.Lock() - defer rc.mut.Unlock() + rc.mut.RLock() + defer rc.mut.RUnlock() return rc.events } diff --git a/notifier/notifierRunner.go b/notifier/notifierRunner.go index 0b06a1e..2e02a69 100644 --- a/notifier/notifierRunner.go +++ b/notifier/notifierRunner.go @@ -63,18 +63,19 @@ func (nr *notifierRunner) Start() error { statusMetricsHandler := metrics.NewStatusMetrics() - argsEventsHandler := factory.ArgsEventsHandlerFactory{ + eventsInterceptor, err := factory.CreateEventsInterceptor(nr.configs.MainConfig.General) + if err != nil { + return err + } + + argsEventsHandler := process.ArgsEventsHandler{ CheckDuplicates: nr.configs.MainConfig.General.CheckDuplicates, Locker: lockService, Publisher: publisher, StatusMetricsHandler: statusMetricsHandler, + EventsInterceptor: eventsInterceptor, } - eventsHandler, err := factory.CreateEventsHandler(argsEventsHandler) - if err != nil { - return err - } - - eventsInterceptor, err := factory.CreateEventsInterceptor(nr.configs.MainConfig.General) + eventsHandler, err := process.NewEventsHandler(argsEventsHandler) if err != nil { return err } @@ -83,7 +84,6 @@ func (nr *notifierRunner) Start() error { EventsHandler: eventsHandler, APIConfig: nr.configs.MainConfig.ConnectorApi, WSHandler: wsHandler, - EventsInterceptor: eventsInterceptor, StatusMetricsHandler: statusMetricsHandler, } facade, err := facade.NewNotifierFacade(facadeArgs) diff --git a/process/errors.go b/process/errors.go index 8ebc30a..51ed175 100644 --- a/process/errors.go +++ b/process/errors.go @@ -28,3 +28,6 @@ var ErrNilBlockHeader = errors.New("nil block header provided") // ErrNilPublisherHandler signals that a nil publisher handler has been provided var ErrNilPublisherHandler = errors.New("nil publisher handler provided") + +// ErrNilEventsInterceptor signals that a nil events interceptor was provided +var ErrNilEventsInterceptor = errors.New("nil events interceptor") diff --git a/process/eventsHandler.go b/process/eventsHandler.go index 5f84cbf..a5a131f 100644 --- a/process/eventsHandler.go +++ b/process/eventsHandler.go @@ -2,6 +2,7 @@ package process import ( "context" + "encoding/hex" "fmt" "time" @@ -19,9 +20,6 @@ const ( minRetries = 1 revertKeyPrefix = "revert_" finalizedKeyPrefix = "finalized_" - txsKeyPrefix = "txs_" - txsWithOrderKeyPrefix = "txsWithOrder_" - scrsKeyPrefix = "scrs_" rabbitmqMetricPrefix = "RabbitMQ" redisMetricPrefix = "Redis" @@ -32,14 +30,16 @@ type ArgsEventsHandler struct { Locker LockService Publisher Publisher StatusMetricsHandler common.StatusMetricsHandler + EventsInterceptor EventsInterceptor CheckDuplicates bool } type eventsHandler struct { - locker LockService - publisher Publisher - metricsHandler common.StatusMetricsHandler - checkDuplicates bool + locker LockService + publisher Publisher + metricsHandler common.StatusMetricsHandler + eventsInterceptor EventsInterceptor + checkDuplicates bool } // NewEventsHandler creates a new events handler component @@ -50,10 +50,11 @@ func NewEventsHandler(args ArgsEventsHandler) (*eventsHandler, error) { } return &eventsHandler{ - locker: args.Locker, - publisher: args.Publisher, - metricsHandler: args.StatusMetricsHandler, - checkDuplicates: args.CheckDuplicates, + locker: args.Locker, + publisher: args.Publisher, + metricsHandler: args.StatusMetricsHandler, + eventsInterceptor: args.EventsInterceptor, + checkDuplicates: args.CheckDuplicates, }, nil } @@ -67,42 +68,79 @@ func checkArgs(args ArgsEventsHandler) error { if check.IfNil(args.StatusMetricsHandler) { return common.ErrNilStatusMetricsHandler } + if check.IfNil(args.EventsInterceptor) { + return ErrNilEventsInterceptor + } return nil } -// HandlePushEvents will handle push events received from observer -func (eh *eventsHandler) HandlePushEvents(events data.BlockEvents) error { - if events.Hash == "" { - log.Debug("received empty hash", "event", common.PushLogsAndEvents, - "will process", false, - ) - return common.ErrReceivedEmptyEvents +// HandleSaveBlockEvents will handle save block events received from observer +func (eh *eventsHandler) HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) error { + blockHash := hex.EncodeToString(allEvents.HeaderHash) + shouldProcessPushEvents := eh.shouldProcessSaveBlockEvents(blockHash) + if !shouldProcessPushEvents { + return nil } - shouldProcessEvents := true - if eh.checkDuplicates { - shouldProcessEvents = eh.tryCheckProcessedWithRetry(common.PushLogsAndEvents, events.Hash) + eventsData, err := eh.eventsInterceptor.ProcessBlockEvents(&allEvents) + if err != nil { + return err } - if !shouldProcessEvents { - log.Info("received duplicated events", "event", common.PushLogsAndEvents, - "block hash", events.Hash, + pushEvents := data.BlockEvents{ + Hash: eventsData.Hash, + ShardID: eventsData.Header.GetShardID(), + TimeStamp: eventsData.Header.GetTimeStamp(), + Events: eventsData.LogEvents, + } + err = eh.handlePushEvents(pushEvents) + if err != nil { + return err + } + + txs := data.BlockTxs{ + Hash: eventsData.Hash, + Txs: eventsData.Txs, + } + eh.handleBlockTxs(txs) + + scrs := data.BlockScrs{ + Hash: eventsData.Hash, + Scrs: eventsData.Scrs, + } + eh.handleBlockScrs(scrs) + + txsWithOrder := data.BlockEventsWithOrder{ + Hash: eventsData.Hash, + ShardID: eventsData.Header.GetShardID(), + TimeStamp: eventsData.Header.GetTimeStamp(), + Txs: eventsData.TxsWithOrder, + Scrs: eventsData.ScrsWithOrder, + Events: eventsData.LogEvents, + } + eh.handleBlockEventsWithOrder(txsWithOrder) + + return nil +} + +// HandlePushEvents will handle push events received from observer +func (eh *eventsHandler) handlePushEvents(events data.BlockEvents) error { + if events.Hash == "" { + log.Debug("received empty hash", "event", common.PushLogsAndEvents, "will process", false, ) - return nil + return common.ErrReceivedEmptyEvents } if len(events.Events) == 0 { log.Warn("received empty events", "event", common.PushLogsAndEvents, "block hash", events.Hash, - "will process", shouldProcessEvents, ) events.Events = make([]data.Event, 0) } else { log.Info("received", "event", common.PushLogsAndEvents, "block hash", events.Hash, - "will process", shouldProcessEvents, ) } @@ -112,6 +150,24 @@ func (eh *eventsHandler) HandlePushEvents(events data.BlockEvents) error { return nil } +func (eh *eventsHandler) shouldProcessSaveBlockEvents(blockHash string) bool { + shouldProcessEvents := true + if eh.checkDuplicates { + shouldProcessEvents = eh.tryCheckProcessedWithRetry(common.PushLogsAndEvents, blockHash) + } + + if !shouldProcessEvents { + log.Info("received duplicated push events", + "block hash", blockHash, + "will process", false, + ) + + return false + } + + return true +} + // HandleRevertEvents will handle revents events received from observer func (eh *eventsHandler) HandleRevertEvents(revertBlock data.RevertBlock) { if revertBlock.Hash == "" { @@ -175,36 +231,22 @@ func (eh *eventsHandler) HandleFinalizedEvents(finalizedBlock data.FinalizedBloc eh.metricsHandler.AddRequest(getRabbitOpID(common.FinalizedBlockEvents), time.Since(t)) } -// HandleBlockTxs will handle txs events received from observer -func (eh *eventsHandler) HandleBlockTxs(blockTxs data.BlockTxs) { +// handleBlockTxs will handle txs events received from observer +func (eh *eventsHandler) handleBlockTxs(blockTxs data.BlockTxs) { if blockTxs.Hash == "" { log.Warn("received empty hash", "event", common.BlockTxs, "will process", false, ) return } - shouldProcessTxs := true - if eh.checkDuplicates { - shouldProcessTxs = eh.tryCheckProcessedWithRetry(common.BlockTxs, blockTxs.Hash) - } - - if !shouldProcessTxs { - log.Info("received duplicated events", "event", common.BlockTxs, - "block hash", blockTxs.Hash, - "will process", false, - ) - return - } if len(blockTxs.Txs) == 0 { log.Warn("received empty events", "event", common.BlockTxs, "block hash", blockTxs.Hash, - "will process", shouldProcessTxs, ) } else { log.Info("received", "event", common.BlockTxs, "block hash", blockTxs.Hash, - "will process", shouldProcessTxs, ) } @@ -213,36 +255,22 @@ func (eh *eventsHandler) HandleBlockTxs(blockTxs data.BlockTxs) { eh.metricsHandler.AddRequest(getRabbitOpID(common.BlockTxs), time.Since(t)) } -// HandleBlockScrs will handle scrs events received from observer -func (eh *eventsHandler) HandleBlockScrs(blockScrs data.BlockScrs) { +// handleBlockScrs will handle scrs events received from observer +func (eh *eventsHandler) handleBlockScrs(blockScrs data.BlockScrs) { if blockScrs.Hash == "" { log.Warn("received empty hash", "event", common.BlockScrs, "will process", false, ) return } - shouldProcessScrs := true - if eh.checkDuplicates { - shouldProcessScrs = eh.tryCheckProcessedWithRetry(common.BlockScrs, blockScrs.Hash) - } - - if !shouldProcessScrs { - log.Info("received duplicated events", "event", common.BlockScrs, - "block hash", blockScrs.Hash, - "will process", false, - ) - return - } if len(blockScrs.Scrs) == 0 { log.Warn("received empty events", "event", common.BlockScrs, "block hash", blockScrs.Hash, - "will process", shouldProcessScrs, ) } else { log.Info("received", "event", common.BlockScrs, "block hash", blockScrs.Hash, - "will process", shouldProcessScrs, ) } @@ -251,30 +279,17 @@ func (eh *eventsHandler) HandleBlockScrs(blockScrs data.BlockScrs) { eh.metricsHandler.AddRequest(getRabbitOpID(common.BlockScrs), time.Since(t)) } -// HandleBlockEventsWithOrder will handle full block events received from observer -func (eh *eventsHandler) HandleBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { +// handleBlockEventsWithOrder will handle full block events received from observer +func (eh *eventsHandler) handleBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { if blockTxs.Hash == "" { log.Warn("received empty hash", "event", common.BlockEvents, "will process", false, ) return } - shouldProcessTxs := true - if eh.checkDuplicates { - shouldProcessTxs = eh.tryCheckProcessedWithRetry(common.BlockEvents, blockTxs.Hash) - } - - if !shouldProcessTxs { - log.Info("received duplicated events", "event", common.BlockEvents, - "block hash", blockTxs.Hash, - "will process", false, - ) - return - } log.Info("received", "event", common.BlockEvents, "block hash", blockTxs.Hash, - "will process", shouldProcessTxs, ) t := time.Now() @@ -322,12 +337,6 @@ func getPrefixLockerKey(id string) string { return revertKeyPrefix case common.FinalizedBlockEvents: return finalizedKeyPrefix - case common.BlockTxs: - return txsKeyPrefix - case common.BlockScrs: - return scrsKeyPrefix - case common.BlockEvents: - return txsWithOrderKeyPrefix } return "" diff --git a/process/eventsHandler_test.go b/process/eventsHandler_test.go index a617362..6e46d95 100644 --- a/process/eventsHandler_test.go +++ b/process/eventsHandler_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" "github.com/multiversx/mx-chain-core-go/data/transaction" @@ -13,15 +14,23 @@ import ( "github.com/multiversx/mx-chain-notifier-go/data" "github.com/multiversx/mx-chain-notifier-go/mocks" "github.com/multiversx/mx-chain-notifier-go/process" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func createMockEventsHandlerArgs() process.ArgsEventsHandler { return process.ArgsEventsHandler{ - CheckDuplicates: false, - Locker: &mocks.LockerStub{}, + Locker: &mocks.LockerStub{ + HasConnectionCalled: func(ctx context.Context) bool { + return true + }, + IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { + return true, nil + }, + }, Publisher: &mocks.PublisherStub{}, StatusMetricsHandler: &mocks.StatusMetricsStub{}, + EventsInterceptor: &mocks.EventsInterceptorStub{}, } } @@ -61,6 +70,17 @@ func TestNewEventsHandler(t *testing.T) { require.Nil(t, eventsHandler) }) + t.Run("nil events interceptor", func(t *testing.T) { + t.Parallel() + + args := createMockEventsHandlerArgs() + args.EventsInterceptor = nil + + eventsHandler, err := process.NewEventsHandler(args) + require.Equal(t, process.ErrNilEventsInterceptor, err) + require.Nil(t, eventsHandler) + }) + t.Run("should work", func(t *testing.T) { t.Parallel() @@ -71,39 +91,279 @@ func TestNewEventsHandler(t *testing.T) { }) } -func TestHandlePushEvents(t *testing.T) { +func TestHandleSaveBlockEvents(t *testing.T) { t.Parallel() - t.Run("broadcast event was called", func(t *testing.T) { + t.Run("duplicated events, should return early", func(t *testing.T) { t.Parallel() - events := data.BlockEvents{ - Hash: "hash1", - ShardID: 1, - TimeStamp: 1234, - Events: []data.Event{}, + args := createMockEventsHandlerArgs() + args.CheckDuplicates = true + + args.Locker = &mocks.LockerStub{ + IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { + return false, nil + }, } - wasCalled := false + args.EventsInterceptor = &mocks.EventsInterceptorStub{ + ProcessBlockEventsCalled: func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { + require.Fail(t, "should have not been called") + return nil, nil + }, + } + + eventsHandler, err := process.NewEventsHandler(args) + require.Nil(t, err) + + err = eventsHandler.HandleSaveBlockEvents(data.ArgsSaveBlockData{}) + require.Nil(t, err) + }) + + t.Run("failed to pre-process events, should fail", func(t *testing.T) { + t.Parallel() + + args := createMockEventsHandlerArgs() + args.CheckDuplicates = true + + args.Locker = &mocks.LockerStub{ + IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { + return true, nil + }, + } + + expectedErr := errors.New("expected err") + args.EventsInterceptor = &mocks.EventsInterceptorStub{ + ProcessBlockEventsCalled: func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { + return nil, expectedErr + }, + } + + eventsHandler, err := process.NewEventsHandler(args) + require.Nil(t, err) + + err = eventsHandler.HandleSaveBlockEvents(data.ArgsSaveBlockData{}) + require.Equal(t, expectedErr, err) + }) + + t.Run("should work", func(t *testing.T) { + t.Parallel() + + blockHash := "blockHash1" + txs := map[string]*outport.TxInfo{ + "hash1": { + Transaction: &transaction.Transaction{ + Nonce: 1, + }, + ExecutionOrder: 1, + }, + } + scrs := map[string]*outport.SCRInfo{ + "hash2": { + SmartContractResult: &smartContractResult.SmartContractResult{ + Nonce: 2, + }, + }, + } + logData := []*outport.LogData{ + { + Log: &transaction.Log{ + Address: []byte("logaddr1"), + Events: []*transaction.Event{}, + }, + TxHash: "logHash1", + }, + } + + logEvents := []data.Event{ + { + Address: "addr1", + }, + } + + header := &block.HeaderV2{ + Header: &block.Header{ + ShardID: 2, + }, + } + blockData := data.ArgsSaveBlockData{ + HeaderHash: []byte(blockHash), + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + Logs: logData, + }, + Header: &block.HeaderV2{}, + } + + expTxs := map[string]*transaction.Transaction{ + "hash1": { + Nonce: 1, + }, + } + expScrs := map[string]*smartContractResult.SmartContractResult{ + "hash2": { + Nonce: 2, + }, + } + + expTxsData := data.BlockTxs{ + Hash: blockHash, + Txs: expTxs, + } + expScrsData := data.BlockScrs{ + Hash: blockHash, + Scrs: expScrs, + } + expLogEvents := data.BlockEvents{ + Hash: blockHash, + Events: logEvents, + ShardID: 2, + } + + expTxsWithOrder := map[string]*outport.TxInfo{ + "hash1": { + Transaction: &transaction.Transaction{ + Nonce: 1, + }, + ExecutionOrder: 1, + }, + } + expScrsWithOrder := map[string]*outport.SCRInfo{ + "hash2": { + SmartContractResult: &smartContractResult.SmartContractResult{ + Nonce: 2, + }, + }, + } + expTxsWithOrderData := data.BlockEventsWithOrder{ + Hash: blockHash, + ShardID: 2, + Txs: expTxsWithOrder, + Scrs: expScrsWithOrder, + Events: logEvents, + } + + pushWasCalled := false + txsWasCalled := false + scrsWasCalled := false + blockEventsWithOrderWasCalled := false + args := createMockEventsHandlerArgs() + + args.EventsInterceptor = &mocks.EventsInterceptorStub{ + ProcessBlockEventsCalled: func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { + return &data.InterceptorBlockData{ + Hash: blockHash, + Header: header, + Txs: expTxs, + Scrs: expScrs, + LogEvents: logEvents, + TxsWithOrder: expTxsWithOrder, + ScrsWithOrder: expScrsWithOrder, + }, nil + }, + } + args.Publisher = &mocks.PublisherStub{ - BroadcastCalled: func(evs data.BlockEvents) { - require.Equal(t, events, evs) - wasCalled = true + BroadcastCalled: func(events data.BlockEvents) { + pushWasCalled = true + assert.Equal(t, expLogEvents, events) + }, + BroadcastTxsCalled: func(event data.BlockTxs) { + txsWasCalled = true + assert.Equal(t, expTxsData, event) + }, + BroadcastScrsCalled: func(event data.BlockScrs) { + scrsWasCalled = true + assert.Equal(t, expScrsData, event) + }, + BroadcastBlockEventsWithOrderCalled: func(event data.BlockEventsWithOrder) { + blockEventsWithOrderWasCalled = true + assert.Equal(t, expTxsWithOrderData, event) }, } eventsHandler, err := process.NewEventsHandler(args) require.Nil(t, err) - eventsHandler.HandlePushEvents(events) - require.True(t, wasCalled) + err = eventsHandler.HandleSaveBlockEvents(blockData) + require.Nil(t, err) + + assert.True(t, pushWasCalled) + assert.True(t, txsWasCalled) + assert.True(t, scrsWasCalled) + assert.True(t, blockEventsWithOrderWasCalled) }) +} - t.Run("check duplicates enabled, should not process event", func(t *testing.T) { +func TestShouldProcessSaveBlockEvents(t *testing.T) { + t.Parallel() + + t.Run("should process", func(t *testing.T) { + t.Parallel() + + args := createMockEventsHandlerArgs() + args.CheckDuplicates = true + + args.Locker = &mocks.LockerStub{ + IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { + return true, nil + }, + } + + eventsHandler, err := process.NewEventsHandler(args) + require.Nil(t, err) + + shouldProcess := eventsHandler.ShouldProcessSaveBlockEvents("blockHash1") + require.True(t, shouldProcess) + }) + + t.Run("duplicated events, should not process", func(t *testing.T) { + t.Parallel() + + args := createMockEventsHandlerArgs() + args.CheckDuplicates = true + + args.Locker = &mocks.LockerStub{ + IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { + return false, nil + }, + } + + eventsHandler, err := process.NewEventsHandler(args) + require.Nil(t, err) + + shouldProcess := eventsHandler.ShouldProcessSaveBlockEvents("blockHash1") + require.False(t, shouldProcess) + }) +} + +func TestHandlePushEvents(t *testing.T) { + t.Parallel() + + t.Run("empty hash should return error", func(t *testing.T) { + t.Parallel() + + args := createMockEventsHandlerArgs() + eventsHandler, err := process.NewEventsHandler(args) + require.Nil(t, err) + + events := data.BlockEvents{ + Hash: "", + ShardID: 1, + TimeStamp: 1234, + Events: []data.Event{}, + } + + err = eventsHandler.HandlePushEvents(events) + require.Equal(t, common.ErrReceivedEmptyEvents, err) + }) + + t.Run("broadcast event was called", func(t *testing.T) { t.Parallel() - blockEvents := data.BlockEvents{ + events := data.BlockEvents{ Hash: "hash1", ShardID: 1, TimeStamp: 1234, @@ -112,24 +372,19 @@ func TestHandlePushEvents(t *testing.T) { wasCalled := false args := createMockEventsHandlerArgs() - args.CheckDuplicates = true args.Publisher = &mocks.PublisherStub{ - BroadcastCalled: func(events data.BlockEvents) { - require.Equal(t, blockEvents, events) + BroadcastCalled: func(evs data.BlockEvents) { + require.Equal(t, events, evs) wasCalled = true }, } - args.Locker = &mocks.LockerStub{ - IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { - return false, nil - }, - } eventsHandler, err := process.NewEventsHandler(args) require.Nil(t, err) - eventsHandler.HandlePushEvents(blockEvents) - require.False(t, wasCalled) + err = eventsHandler.HandlePushEvents(events) + require.Nil(t, err) + require.True(t, wasCalled) }) } @@ -275,40 +530,6 @@ func TestHandleTxsEvents(t *testing.T) { eventsHandler.HandleBlockTxs(blockTxs) require.True(t, wasCalled) }) - - t.Run("check duplicates enabled, should not process event", func(t *testing.T) { - t.Parallel() - - blockTxs := data.BlockTxs{ - Hash: "hash1", - Txs: map[string]*transaction.Transaction{ - "hash1": { - Nonce: 1, - }, - }, - } - - wasCalled := false - args := createMockEventsHandlerArgs() - args.CheckDuplicates = true - args.Publisher = &mocks.PublisherStub{ - BroadcastTxsCalled: func(event data.BlockTxs) { - require.Equal(t, blockTxs, event) - wasCalled = true - }, - } - args.Locker = &mocks.LockerStub{ - IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { - return false, nil - }, - } - - eventsHandler, err := process.NewEventsHandler(args) - require.Nil(t, err) - - eventsHandler.HandleBlockTxs(blockTxs) - require.False(t, wasCalled) - }) } func TestHandleScrsEvents(t *testing.T) { @@ -341,39 +562,6 @@ func TestHandleScrsEvents(t *testing.T) { eventsHandler.HandleBlockScrs(blockScrs) require.True(t, wasCalled) }) - - t.Run("check duplicates enabled, should not process event", func(t *testing.T) { - t.Parallel() - - wasCalled := false - args := createMockEventsHandlerArgs() - args.CheckDuplicates = true - args.Publisher = &mocks.PublisherStub{ - BroadcastScrsCalled: func(event data.BlockScrs) { - wasCalled = true - }, - } - args.Locker = &mocks.LockerStub{ - IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { - return false, nil - }, - } - - eventsHandler, err := process.NewEventsHandler(args) - require.Nil(t, err) - - events := data.BlockScrs{ - Hash: "hash1", - Scrs: map[string]*smartContractResult.SmartContractResult{ - "hash2": { - Nonce: 2, - }, - }, - } - - eventsHandler.HandleBlockScrs(events) - require.False(t, wasCalled) - }) } func TestHandleBlockEventsWithOrderEvents(t *testing.T) { @@ -409,31 +597,6 @@ func TestHandleBlockEventsWithOrderEvents(t *testing.T) { eventsHandler.HandleBlockEventsWithOrder(events) require.True(t, wasCalled) }) - - t.Run("check duplicates enabled, should not process event", func(t *testing.T) { - t.Parallel() - - wasCalled := false - args := createMockEventsHandlerArgs() - args.CheckDuplicates = true - args.Publisher = &mocks.PublisherStub{ - BroadcastBlockEventsWithOrderCalled: func(event data.BlockEventsWithOrder) { - require.Equal(t, events, events) - wasCalled = true - }, - } - args.Locker = &mocks.LockerStub{ - IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { - return false, nil - }, - } - - eventsHandler, err := process.NewEventsHandler(args) - require.Nil(t, err) - - eventsHandler.HandleBlockEventsWithOrder(events) - require.False(t, wasCalled) - }) } func TestTryCheckProcessedWithRetry(t *testing.T) { diff --git a/process/export_test.go b/process/export_test.go index 6ff81d5..8e107de 100644 --- a/process/export_test.go +++ b/process/export_test.go @@ -10,6 +10,31 @@ func (eh *eventsHandler) TryCheckProcessedWithRetry(prefix, blockHash string) bo return eh.tryCheckProcessedWithRetry(prefix, blockHash) } +// HandlePushEvents - +func (eh *eventsHandler) HandlePushEvents(events data.BlockEvents) error { + return eh.handlePushEvents(events) +} + +// HandleBlockTxs - +func (eh *eventsHandler) HandleBlockTxs(blockTxs data.BlockTxs) { + eh.handleBlockTxs(blockTxs) +} + +// HandleBlockScrs - +func (eh *eventsHandler) HandleBlockScrs(blockScrs data.BlockScrs) { + eh.handleBlockScrs(blockScrs) +} + +// HandleBlockEventsWithOrder - +func (eh *eventsHandler) HandleBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { + eh.handleBlockEventsWithOrder(blockTxs) +} + +// ShouldProcessSaveBlockEvents - +func (eh *eventsHandler) ShouldProcessSaveBlockEvents(blockHash string) bool { + return eh.shouldProcessSaveBlockEvents(blockHash) +} + // GetLogEventsFromTransactionsPool exports internal method for testing func (ei *eventsInterceptor) GetLogEventsFromTransactionsPool(logs []*outport.LogData) []data.Event { return ei.getLogEventsFromTransactionsPool(logs) diff --git a/process/interface.go b/process/interface.go index a9b8f69..34c93fe 100644 --- a/process/interface.go +++ b/process/interface.go @@ -30,12 +30,9 @@ type Publisher interface { // EventsHandler defines the behaviour of an events handler component type EventsHandler interface { - HandlePushEvents(events data.BlockEvents) error + HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) error HandleRevertEvents(revertBlock data.RevertBlock) HandleFinalizedEvents(finalizedBlock data.FinalizedBlock) - HandleBlockTxs(blockTxs data.BlockTxs) - HandleBlockScrs(blockScrs data.BlockScrs) - HandleBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) IsInterfaceNil() bool } @@ -60,7 +57,7 @@ type DataProcessor interface { // EventsFacadeHandler defines the behavior of a facade handler needed for events group type EventsFacadeHandler interface { - HandlePushEventsV2(events data.ArgsSaveBlockData) error + HandlePushEvents(events data.ArgsSaveBlockData) error HandleRevertEvents(revertBlock data.RevertBlock) HandleFinalizedEvents(finalizedBlock data.FinalizedBlock) IsInterfaceNil() bool diff --git a/process/preprocess/eventsPreProcessorV0.go b/process/preprocess/eventsPreProcessorV0.go index 446d806..8d2398b 100644 --- a/process/preprocess/eventsPreProcessorV0.go +++ b/process/preprocess/eventsPreProcessorV0.go @@ -59,7 +59,7 @@ func (d *eventsPreProcessorV0) SaveBlock(marshalledData []byte) error { Header: header, } - err = d.facade.HandlePushEventsV2(*saveBlockData) + err = d.facade.HandlePushEvents(*saveBlockData) if err != nil { return err } diff --git a/process/preprocess/eventsPreProcessorV0_test.go b/process/preprocess/eventsPreProcessorV0_test.go index f026650..84ff20c 100644 --- a/process/preprocess/eventsPreProcessorV0_test.go +++ b/process/preprocess/eventsPreProcessorV0_test.go @@ -43,7 +43,7 @@ func TestPreProcessorV0_SaveBlock(t *testing.T) { expectedErr := errors.New("exp error") args.Facade = &mocks.FacadeStub{ - HandlePushEventsV2Called: func(events data.ArgsSaveBlockData) error { + HandlePushEventsCalled: func(events data.ArgsSaveBlockData) error { return expectedErr }, } @@ -65,7 +65,7 @@ func TestPreProcessorV0_SaveBlock(t *testing.T) { wasCalled := false args.Facade = &mocks.FacadeStub{ - HandlePushEventsV2Called: func(events data.ArgsSaveBlockData) error { + HandlePushEventsCalled: func(events data.ArgsSaveBlockData) error { wasCalled = true return nil }, diff --git a/process/preprocess/eventsPreProcessorV1.go b/process/preprocess/eventsPreProcessorV1.go index e6a25fe..f4392b0 100644 --- a/process/preprocess/eventsPreProcessorV1.go +++ b/process/preprocess/eventsPreProcessorV1.go @@ -66,8 +66,7 @@ func (d *eventsPreProcessorV1) SaveBlock(marshalledData []byte) error { Header: header, } - // TODO: refactor to remove facade versioning - err = d.facade.HandlePushEventsV2(*saveBlockData) + err = d.facade.HandlePushEvents(*saveBlockData) if err != nil { return err } diff --git a/process/preprocess/eventsPreProcessorV1_test.go b/process/preprocess/eventsPreProcessorV1_test.go index fb7a6a9..fa30d37 100644 --- a/process/preprocess/eventsPreProcessorV1_test.go +++ b/process/preprocess/eventsPreProcessorV1_test.go @@ -82,7 +82,7 @@ func TestPreProcessorV1_SaveBlock(t *testing.T) { expectedErr := errors.New("exp error") args.Facade = &mocks.FacadeStub{ - HandlePushEventsV2Called: func(events data.ArgsSaveBlockData) error { + HandlePushEventsCalled: func(events data.ArgsSaveBlockData) error { return expectedErr }, }