Skip to content

Commit

Permalink
Ensure entity counts are on leader only by default. Fix handler remov…
Browse files Browse the repository at this point in the history
…al. Fixes #1282
  • Loading branch information
plorenz committed Jan 4, 2023
1 parent 4380a3b commit ce7f84e
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 15 deletions.
5 changes: 5 additions & 0 deletions events/api_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,8 @@ func (event *ApiSessionEvent) String() string {
type ApiSessionEventHandler interface {
AcceptApiSessionEvent(event *ApiSessionEvent)
}

type ApiSessionEventHandlerWrapper interface {
ApiSessionEventHandler
IsWrapping(value ApiSessionEventHandler) bool
}
20 changes: 19 additions & 1 deletion events/dispatcher_api_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ func (self *Dispatcher) AddApiSessionEventHandler(handler ApiSessionEventHandler
}

func (self *Dispatcher) RemoveApiSessionEventHandler(handler ApiSessionEventHandler) {
self.apiSessionEventHandlers.Delete(handler)
self.apiSessionEventHandlers.DeleteIf(func(val ApiSessionEventHandler) bool {
if val == handler {
return true
}
if w, ok := val.(ApiSessionEventHandlerWrapper); ok {
return w.IsWrapping(handler)
}
return false
})
}

func (self *Dispatcher) initApiSessionEvents(stores *persistence.Stores) {
Expand Down Expand Up @@ -148,3 +156,13 @@ func (adapter *apiSessionEventAdapter) AcceptApiSessionEvent(event *ApiSessionEv
adapter.wrapped.AcceptApiSessionEvent(event)
}
}

func (self *apiSessionEventAdapter) IsWrapping(value ApiSessionEventHandler) bool {
if self.wrapped == value {
return true
}
if w, ok := self.wrapped.(ApiSessionEventHandlerWrapper); ok {
return w.IsWrapping(value)
}
return false
}
43 changes: 30 additions & 13 deletions events/dispatcher_entity_counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ package events
import (
"github.com/pkg/errors"
"reflect"
"strings"
"time"
)

func (self *Dispatcher) AddEntityCountEventHandler(handler EntityCountEventHandler, interval time.Duration) {
func (self *Dispatcher) AddEntityCountEventHandler(handler EntityCountEventHandler, interval time.Duration, onlyLeaderEvents bool) {
self.entityCountEventHandlers.Append(&entityCountState{
handler: handler,
interval: interval,
nextRun: time.Now(),
handler: handler,
onlyLeaderEvents: onlyLeaderEvents,
interval: interval,
nextRun: time.Now(),
})
}

Expand All @@ -50,13 +52,16 @@ func (self *Dispatcher) generateEntityEvents() {
select {
case t := <-ticker.C:
var event *EntityCountEvent
leader := self.network.Dispatcher.IsLeaderOrLeaderless()
for _, state := range self.entityCountEventHandlers.Value() {
if t.After(state.nextRun) {
if event == nil {
event = self.generateEntityCountEvent()
if !state.onlyLeaderEvents || leader {
if t.After(state.nextRun) {
if event == nil {
event = self.generateEntityCountEvent()
}
state.handler.AcceptEntityCountEvent(event)
state.nextRun = state.nextRun.Add(state.interval)
}
state.handler.AcceptEntityCountEvent(event)
state.nextRun = state.nextRun.Add(state.interval)
}
}
case <-self.closeNotify:
Expand Down Expand Up @@ -102,7 +107,18 @@ func (self *Dispatcher) registerEntityCountEventHandler(val interface{}, config
}
}

self.AddEntityCountEventHandler(handler, interval)
propagateAlways := false
if val, found := config["propagateAlways"]; found {
if b, ok := val.(bool); ok {
propagateAlways = b
} else if s, ok := val.(string); ok {
propagateAlways = strings.EqualFold(s, "true")
} else {
return errors.New("invalid value for propagateAlways, must be boolean or string")
}
}

self.AddEntityCountEventHandler(handler, interval, !propagateAlways)

return nil
}
Expand All @@ -114,7 +130,8 @@ func (self *Dispatcher) unregisterEntityCountEventHandler(val interface{}) {
}

type entityCountState struct {
handler EntityCountEventHandler
interval time.Duration
nextRun time.Time
handler EntityCountEventHandler
onlyLeaderEvents bool
interval time.Duration
nextRun time.Time
}
20 changes: 19 additions & 1 deletion events/dispatcher_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ func (self *Dispatcher) AddSessionEventHandler(handler SessionEventHandler) {
}

func (self *Dispatcher) RemoveSessionEventHandler(handler SessionEventHandler) {
self.sessionEventHandlers.Delete(handler)
self.sessionEventHandlers.DeleteIf(func(val SessionEventHandler) bool {
if val == handler {
return true
}
if w, ok := val.(SessionEventHandlerWrapper); ok {
return w.IsWrapping(handler)
}
return false
})
}

func (self *Dispatcher) initSessionEvents(stores *persistence.Stores) {
Expand Down Expand Up @@ -151,3 +159,13 @@ func (adapter *sessionEventAdapter) AcceptSessionEvent(event *SessionEvent) {
adapter.wrapped.AcceptSessionEvent(event)
}
}

func (self *sessionEventAdapter) IsWrapping(value SessionEventHandler) bool {
if self.wrapped == value {
return true
}
if w, ok := self.wrapped.(SessionEventHandlerWrapper); ok {
return w.IsWrapping(value)
}
return false
}
5 changes: 5 additions & 0 deletions events/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,8 @@ func (event *SessionEvent) String() string {
type SessionEventHandler interface {
AcceptSessionEvent(event *SessionEvent)
}

type SessionEventHandlerWrapper interface {
SessionEventHandler
IsWrapping(value SessionEventHandler) bool
}

0 comments on commit ce7f84e

Please sign in to comment.