Skip to content

Commit

Permalink
Merge pull request #86 from multiversx/feat/publisher-refactor
Browse files Browse the repository at this point in the history
Feat/publisher refactor
  • Loading branch information
ssd04 authored Mar 7, 2024
2 parents f35eb11 + 09ed246 commit 8455f53
Show file tree
Hide file tree
Showing 53 changed files with 1,309 additions and 1,216 deletions.
3 changes: 1 addition & 2 deletions api/groups/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (

// EventsFacadeHandler defines the behavior of a facade handler needed for events group
type EventsFacadeHandler interface {
HandlePushEventsV2(events data.ArgsSaveBlockData) error
HandlePushEventsV1(events data.SaveBlockData) error
HandlePushEvents(events data.ArgsSaveBlockData) error
HandleRevertEvents(revertBlock data.RevertBlock)
HandleFinalizedEvents(finalizedBlock data.FinalizedBlock)
GetConnectorUserAndPass() (string, string)
Expand Down
3 changes: 1 addition & 2 deletions api/shared/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ type GroupHandler interface {

// FacadeHandler defines the behavior of a notifier base facade handler
type FacadeHandler interface {
HandlePushEventsV2(events data.ArgsSaveBlockData) error
HandlePushEventsV1(events data.SaveBlockData) error
HandlePushEvents(events data.ArgsSaveBlockData) error
HandleRevertEvents(revertBlock data.RevertBlock)
HandleFinalizedEvents(finalizedBlock data.FinalizedBlock)
GetConnectorUserAndPass() (string, string)
Expand Down
6 changes: 6 additions & 0 deletions common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ var ErrNilFacadeHandler = errors.New("nil facade handler")

// ErrNilStatusMetricsHandler signals that a nil status metrics handler has been provided
var ErrNilStatusMetricsHandler = errors.New("nil status metrics handler")

// ErrWrongTypeAssertion signals a wrong type assertion
var ErrWrongTypeAssertion = errors.New("wrong type assertion")

// ErrLoopAlreadyStarted signals that a loop has already been started
var ErrLoopAlreadyStarted = errors.New("loop already started")
28 changes: 12 additions & 16 deletions disabled/disabledHub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,28 @@ import (
type Hub struct {
}

// Run does nothing
func (h *Hub) Run() {
// Publish does nothing
func (h *Hub) Publish(events data.BlockEvents) {
}

// Broadcast does nothing
func (h *Hub) Broadcast(_ data.BlockEvents) {
// PublishRevert does nothing
func (h *Hub) PublishRevert(revertBlock data.RevertBlock) {
}

// BroadcastRevert does nothing
func (h *Hub) BroadcastRevert(_ data.RevertBlock) {
// PublishFinalized does nothing
func (h *Hub) PublishFinalized(finalizedBlock data.FinalizedBlock) {
}

// BroadcastFinalized does nothing
func (h *Hub) BroadcastFinalized(_ data.FinalizedBlock) {
// PublishTxs does nothing
func (h *Hub) PublishTxs(blockTxs data.BlockTxs) {
}

// BroadcastTxs does nothing
func (h *Hub) BroadcastTxs(_ data.BlockTxs) {
// PublishScrs does nothing
func (h *Hub) PublishScrs(blockScrs data.BlockScrs) {
}

// BroadcastScrs does nothing
func (h *Hub) BroadcastScrs(_ data.BlockScrs) {
}

// BroadcastBlockEventsWithOrder does nothing
func (h *Hub) BroadcastBlockEventsWithOrder(_ data.BlockEventsWithOrder) {
// PublishBlockEventsWithOrder does nothing
func (h *Hub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) {
}

// RegisterEvent does nothing
Expand Down
205 changes: 30 additions & 175 deletions dispatcher/hub/commonHub.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package hub

import (
"context"
"sync"

"github.com/google/uuid"
Expand All @@ -22,20 +21,10 @@ type ArgsCommonHub struct {
}

type commonHub struct {
filter filters.EventFilter
subscriptionMapper dispatcher.SubscriptionMapperHandler
mutDispatchers sync.RWMutex
dispatchers map[uuid.UUID]dispatcher.EventDispatcher
register chan dispatcher.EventDispatcher
unregister chan dispatcher.EventDispatcher
broadcast chan data.BlockEvents
broadcastRevert chan data.RevertBlock
broadcastFinalized chan data.FinalizedBlock
broadcastTxs chan data.BlockTxs
broadcastBlockEventsWithOrder chan data.BlockEventsWithOrder
broadcastScrs chan data.BlockScrs
closeChan chan struct{}
cancelFunc func()
filter filters.EventFilter
subscriptionMapper dispatcher.SubscriptionMapperHandler
mutDispatchers sync.RWMutex
dispatchers map[uuid.UUID]dispatcher.EventDispatcher
}

// NewCommonHub creates a new commonHub instance
Expand All @@ -46,19 +35,10 @@ func NewCommonHub(args ArgsCommonHub) (*commonHub, error) {
}

return &commonHub{
mutDispatchers: sync.RWMutex{},
filter: args.Filter,
subscriptionMapper: args.SubscriptionMapper,
dispatchers: make(map[uuid.UUID]dispatcher.EventDispatcher),
register: make(chan dispatcher.EventDispatcher),
unregister: make(chan dispatcher.EventDispatcher),
broadcast: make(chan data.BlockEvents),
broadcastRevert: make(chan data.RevertBlock),
broadcastFinalized: make(chan data.FinalizedBlock),
broadcastTxs: make(chan data.BlockTxs),
broadcastBlockEventsWithOrder: make(chan data.BlockEventsWithOrder),
broadcastScrs: make(chan data.BlockScrs),
closeChan: make(chan struct{}),
mutDispatchers: sync.RWMutex{},
filter: args.Filter,
subscriptionMapper: args.SubscriptionMapper,
dispatchers: make(map[uuid.UUID]dispatcher.EventDispatcher),
}, nil
}

Expand All @@ -73,131 +53,27 @@ func checkArgs(args ArgsCommonHub) error {
return nil
}

// Run is launched as a goroutine and listens for events on the exposed channels
func (ch *commonHub) Run() {
var ctx context.Context
ctx, ch.cancelFunc = context.WithCancel(context.Background())

go ch.run(ctx)
}

func (ch *commonHub) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Debug("commonHub is stopping...")
return

case events := <-ch.broadcast:
ch.handleBroadcast(events)

case revertEvent := <-ch.broadcastRevert:
ch.handleRevertBroadcast(revertEvent)

case finalizedEvent := <-ch.broadcastFinalized:
ch.handleFinalizedBroadcast(finalizedEvent)

case txsEvent := <-ch.broadcastTxs:
ch.handleTxsBroadcast(txsEvent)

case txsEvent := <-ch.broadcastBlockEventsWithOrder:
ch.handleBlockEventsWithOrderBroadcast(txsEvent)

case scrsEvent := <-ch.broadcastScrs:
ch.handleScrsBroadcast(scrsEvent)

case dispatcherClient := <-ch.register:
ch.registerDispatcher(dispatcherClient)

case dispatcherClient := <-ch.unregister:
ch.unregisterDispatcher(dispatcherClient)
}
}
}

// Subscribe is used by a dispatcher to send a dispatcher.SubscribeEvent
func (ch *commonHub) Subscribe(event data.SubscribeEvent) {
ch.subscriptionMapper.MatchSubscribeEvent(event)
}

// Broadcast handles block events pushed by producers into the broadcast channel
// Upon reading the channel, the hub notifies the registered dispatchers, if any
func (ch *commonHub) Broadcast(events data.BlockEvents) {
select {
case ch.broadcast <- events:
case <-ch.closeChan:
}
}

// BroadcastRevert handles revert event pushed by producers into the broadcast channel
// Upon reading the channel, the hub notifies the registered dispatchers, if any
func (ch *commonHub) BroadcastRevert(event data.RevertBlock) {
select {
case ch.broadcastRevert <- event:
case <-ch.closeChan:
}
}

// BroadcastFinalized handles finalized event pushed by producers into the broadcast channel
// Upon reading the channel, the hub notifies the registered dispatchers, if any
func (ch *commonHub) BroadcastFinalized(event data.FinalizedBlock) {
select {
case ch.broadcastFinalized <- event:
case <-ch.closeChan:
}
}

// BroadcastTxs handles block txs event pushed by producers into the broadcast channel
// Upon reading the channel, the hub notifies the registered dispatchers, if any
func (ch *commonHub) BroadcastTxs(event data.BlockTxs) {
select {
case ch.broadcastTxs <- event:
case <-ch.closeChan:
}
}

// BroadcastScrs handles block scrs event pushed by producers into the broadcast channel
// Upon reading the channel, the hub notifies the registered dispatchers, if any
func (ch *commonHub) BroadcastScrs(event data.BlockScrs) {
select {
case ch.broadcastScrs <- event:
case <-ch.closeChan:
}
}

// BroadcastBlockEventsWithOrder handles full block events pushed by producers into the channel
func (ch *commonHub) BroadcastBlockEventsWithOrder(event data.BlockEventsWithOrder) {
select {
case ch.broadcastBlockEventsWithOrder <- event:
case <-ch.closeChan:
}
}

// RegisterEvent will send event to a receive-only channel used to register dispatchers
func (ch *commonHub) RegisterEvent(event dispatcher.EventDispatcher) {
select {
case ch.register <- event:
case <-ch.closeChan:
}
ch.registerDispatcher(event)
}

// UnregisterEvent will send event to a receive-only channel used by a dispatcher to signal it has disconnected
func (ch *commonHub) UnregisterEvent(event dispatcher.EventDispatcher) {
select {
case ch.unregister <- event:
case <-ch.closeChan:
}
ch.unregisterDispatcher(event)
}

func (ch *commonHub) handleBroadcast(blockEvents data.BlockEvents) {
// Publish will publish logs and events to dispatcher
func (ch *commonHub) Publish(blockEvents data.BlockEvents) {
subscriptions := ch.subscriptionMapper.Subscriptions()

for _, subscription := range subscriptions {
if subscription.EventType != common.PushLogsAndEvents {
continue
}

ch.handlePushBlockEvents(blockEvents, subscription)
for _, sub := range subscriptions[common.PushLogsAndEvents] {
ch.handlePushBlockEvents(blockEvents, sub)
}
}

Expand All @@ -217,17 +93,14 @@ func (ch *commonHub) handlePushBlockEvents(blockEvents data.BlockEvents, subscri
ch.mutDispatchers.RUnlock()
}

func (ch *commonHub) handleRevertBroadcast(revertBlock data.RevertBlock) {
// PublishRevert will publish revert event to dispatcher
func (ch *commonHub) PublishRevert(revertBlock data.RevertBlock) {
subscriptions := ch.subscriptionMapper.Subscriptions()

dispatchersMap := make(map[uuid.UUID]data.RevertBlock)

for _, subscription := range subscriptions {
if subscription.EventType != common.RevertBlockEvents {
continue
}

dispatchersMap[subscription.DispatcherID] = revertBlock
for _, sub := range subscriptions[common.RevertBlockEvents] {
dispatchersMap[sub.DispatcherID] = revertBlock
}

ch.mutDispatchers.RLock()
Expand All @@ -239,16 +112,13 @@ func (ch *commonHub) handleRevertBroadcast(revertBlock data.RevertBlock) {
}
}

func (ch *commonHub) handleFinalizedBroadcast(finalizedBlock data.FinalizedBlock) {
// PublishFinalized will publish finalized event to dispatcher
func (ch *commonHub) PublishFinalized(finalizedBlock data.FinalizedBlock) {
subscriptions := ch.subscriptionMapper.Subscriptions()

dispatchersMap := make(map[uuid.UUID]data.FinalizedBlock)

for _, subscription := range subscriptions {
if subscription.EventType != common.FinalizedBlockEvents {
continue
}

for _, subscription := range subscriptions[common.FinalizedBlockEvents] {
dispatchersMap[subscription.DispatcherID] = finalizedBlock
}

Expand All @@ -261,16 +131,13 @@ func (ch *commonHub) handleFinalizedBroadcast(finalizedBlock data.FinalizedBlock
}
}

func (ch *commonHub) handleTxsBroadcast(blockTxs data.BlockTxs) {
// PublishTxs will publish txs event to dispatcher
func (ch *commonHub) PublishTxs(blockTxs data.BlockTxs) {
subscriptions := ch.subscriptionMapper.Subscriptions()

dispatchersMap := make(map[uuid.UUID]data.BlockTxs)

for _, subscription := range subscriptions {
if subscription.EventType != common.BlockTxs {
continue
}

for _, subscription := range subscriptions[common.BlockTxs] {
dispatchersMap[subscription.DispatcherID] = blockTxs
}

Expand All @@ -283,16 +150,13 @@ func (ch *commonHub) handleTxsBroadcast(blockTxs data.BlockTxs) {
}
}

func (ch *commonHub) handleBlockEventsWithOrderBroadcast(blockTxs data.BlockEventsWithOrder) {
// PublishBlockEventsWithOrder will publish block events with order to dispatcher
func (ch *commonHub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) {
subscriptions := ch.subscriptionMapper.Subscriptions()

dispatchersMap := make(map[uuid.UUID]data.BlockEventsWithOrder)

for _, subscription := range subscriptions {
if subscription.EventType != common.BlockEvents {
continue
}

for _, subscription := range subscriptions[common.BlockEvents] {
dispatchersMap[subscription.DispatcherID] = blockTxs
}

Expand All @@ -305,16 +169,13 @@ func (ch *commonHub) handleBlockEventsWithOrderBroadcast(blockTxs data.BlockEven
}
}

func (ch *commonHub) handleScrsBroadcast(blockScrs data.BlockScrs) {
// PublishScrs will publish scrs events to dispatcher
func (ch *commonHub) PublishScrs(blockScrs data.BlockScrs) {
subscriptions := ch.subscriptionMapper.Subscriptions()

dispatchersMap := make(map[uuid.UUID]data.BlockScrs)

for _, subscription := range subscriptions {
if subscription.EventType != common.BlockScrs {
continue
}

for _, subscription := range subscriptions[common.BlockScrs] {
dispatchersMap[subscription.DispatcherID] = blockScrs
}

Expand Down Expand Up @@ -355,12 +216,6 @@ func (ch *commonHub) unregisterDispatcher(d dispatcher.EventDispatcher) {

// Close will close the goroutine and channels
func (ch *commonHub) Close() error {
if ch.cancelFunc != nil {
ch.cancelFunc()
}

close(ch.closeChan)

return nil
}

Expand Down
Loading

0 comments on commit 8455f53

Please sign in to comment.