diff --git a/events/api_session.go b/events/api_session.go index fbe11b1ba..ca493483d 100644 --- a/events/api_session.go +++ b/events/api_session.go @@ -43,3 +43,8 @@ func (event *ApiSessionEvent) String() string { type ApiSessionEventHandler interface { AcceptApiSessionEvent(event *ApiSessionEvent) } + +type ApiSessionEventHandlerWrapper interface { + ApiSessionEventHandler + IsWrapping(value ApiSessionEventHandler) bool +} diff --git a/events/dispatcher_api_session.go b/events/dispatcher_api_session.go index 7805a19be..5835d0573 100644 --- a/events/dispatcher_api_session.go +++ b/events/dispatcher_api_session.go @@ -32,7 +32,15 @@ func (self *Dispatcher) AddApiSessionEventHandler(handler ApiSessionEventHandler } func (self *Dispatcher) RemoveApiSessionEventHandler(handler ApiSessionEventHandler) { - self.apiSessionEventHandlers.Delete(handler) + self.apiSessionEventHandlers.DeleteIf(func(val ApiSessionEventHandler) bool { + if val == handler { + return true + } + if w, ok := val.(ApiSessionEventHandlerWrapper); ok { + return w.IsWrapping(handler) + } + return false + }) } func (self *Dispatcher) initApiSessionEvents(stores *persistence.Stores) { @@ -148,3 +156,13 @@ func (adapter *apiSessionEventAdapter) AcceptApiSessionEvent(event *ApiSessionEv adapter.wrapped.AcceptApiSessionEvent(event) } } + +func (self *apiSessionEventAdapter) IsWrapping(value ApiSessionEventHandler) bool { + if self.wrapped == value { + return true + } + if w, ok := self.wrapped.(ApiSessionEventHandlerWrapper); ok { + return w.IsWrapping(value) + } + return false +} diff --git a/events/dispatcher_entity_counts.go b/events/dispatcher_entity_counts.go index 4aec24408..25dc487b8 100644 --- a/events/dispatcher_entity_counts.go +++ b/events/dispatcher_entity_counts.go @@ -19,14 +19,16 @@ package events import ( "github.com/pkg/errors" "reflect" + "strings" "time" ) -func (self *Dispatcher) AddEntityCountEventHandler(handler EntityCountEventHandler, interval time.Duration) { +func (self *Dispatcher) AddEntityCountEventHandler(handler EntityCountEventHandler, interval time.Duration, onlyLeaderEvents bool) { self.entityCountEventHandlers.Append(&entityCountState{ - handler: handler, - interval: interval, - nextRun: time.Now(), + handler: handler, + onlyLeaderEvents: onlyLeaderEvents, + interval: interval, + nextRun: time.Now(), }) } @@ -50,13 +52,16 @@ func (self *Dispatcher) generateEntityEvents() { select { case t := <-ticker.C: var event *EntityCountEvent + leader := self.network.Dispatcher.IsLeaderOrLeaderless() for _, state := range self.entityCountEventHandlers.Value() { - if t.After(state.nextRun) { - if event == nil { - event = self.generateEntityCountEvent() + if !state.onlyLeaderEvents || leader { + if t.After(state.nextRun) { + if event == nil { + event = self.generateEntityCountEvent() + } + state.handler.AcceptEntityCountEvent(event) + state.nextRun = state.nextRun.Add(state.interval) } - state.handler.AcceptEntityCountEvent(event) - state.nextRun = state.nextRun.Add(state.interval) } } case <-self.closeNotify: @@ -102,7 +107,18 @@ func (self *Dispatcher) registerEntityCountEventHandler(val interface{}, config } } - self.AddEntityCountEventHandler(handler, interval) + propagateAlways := false + if val, found := config["propagateAlways"]; found { + if b, ok := val.(bool); ok { + propagateAlways = b + } else if s, ok := val.(string); ok { + propagateAlways = strings.EqualFold(s, "true") + } else { + return errors.New("invalid value for propagateAlways, must be boolean or string") + } + } + + self.AddEntityCountEventHandler(handler, interval, !propagateAlways) return nil } @@ -114,7 +130,8 @@ func (self *Dispatcher) unregisterEntityCountEventHandler(val interface{}) { } type entityCountState struct { - handler EntityCountEventHandler - interval time.Duration - nextRun time.Time + handler EntityCountEventHandler + onlyLeaderEvents bool + interval time.Duration + nextRun time.Time } diff --git a/events/dispatcher_session.go b/events/dispatcher_session.go index 74b901643..0f87f968d 100644 --- a/events/dispatcher_session.go +++ b/events/dispatcher_session.go @@ -32,7 +32,15 @@ func (self *Dispatcher) AddSessionEventHandler(handler SessionEventHandler) { } func (self *Dispatcher) RemoveSessionEventHandler(handler SessionEventHandler) { - self.sessionEventHandlers.Delete(handler) + self.sessionEventHandlers.DeleteIf(func(val SessionEventHandler) bool { + if val == handler { + return true + } + if w, ok := val.(SessionEventHandlerWrapper); ok { + return w.IsWrapping(handler) + } + return false + }) } func (self *Dispatcher) initSessionEvents(stores *persistence.Stores) { @@ -151,3 +159,13 @@ func (adapter *sessionEventAdapter) AcceptSessionEvent(event *SessionEvent) { adapter.wrapped.AcceptSessionEvent(event) } } + +func (self *sessionEventAdapter) IsWrapping(value SessionEventHandler) bool { + if self.wrapped == value { + return true + } + if w, ok := self.wrapped.(SessionEventHandlerWrapper); ok { + return w.IsWrapping(value) + } + return false +} diff --git a/events/session.go b/events/session.go index 8deacc093..766b3c32e 100644 --- a/events/session.go +++ b/events/session.go @@ -45,3 +45,8 @@ func (event *SessionEvent) String() string { type SessionEventHandler interface { AcceptSessionEvent(event *SessionEvent) } + +type SessionEventHandlerWrapper interface { + SessionEventHandler + IsWrapping(value SessionEventHandler) bool +}