Skip to content

Commit

Permalink
feat: allow custom route extractor on pg notify router
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Sep 11, 2024
1 parent a662432 commit 98b03a9
Showing 1 changed file with 42 additions and 9 deletions.
51 changes: 42 additions & 9 deletions postq/pg/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,49 @@ import (
"github.com/flanksource/duty/context"
)

type routeExtractorFn func(string) (string, string, error)

func defaultRouteExtractor(payload string) (string, string, error) {
// The original payload is expected to be in the form of
// <route> <...optional payload>
fields := strings.Fields(payload)
route := fields[0]
derivedPayload := strings.Join(fields[1:], " ")
return route, derivedPayload, nil
}

// notifyRouter distributes the pgNotify event to multiple channels
// based on the payload.
type notifyRouter struct {
registry map[string]chan string
registry map[string]chan string
routeExtractor routeExtractorFn
}

func NewNotifyRouter() *notifyRouter {
return &notifyRouter{
registry: make(map[string]chan string),
registry: make(map[string]chan string),
routeExtractor: defaultRouteExtractor,
}
}

func (t *notifyRouter) WithRouteExtractor(routeExtractor routeExtractorFn) *notifyRouter {
t.routeExtractor = routeExtractor
return t
}

// RegisterRoutes creates a single channel for the given routes and returns it.
func (t *notifyRouter) RegisterRoutes(routes ...string) <-chan string {
// If any of the routes already has a channel, we use that
// for all the routes.
// Caution: The caller needs to ensure that the route
// groups do not overlap.
pgNotifyChannel := make(chan string)
for _, we := range routes {
if existing, ok := t.registry[we]; ok {
pgNotifyChannel = existing
}
}

for _, we := range routes {
t.registry[we] = pgNotifyChannel
}
Expand All @@ -33,18 +61,23 @@ func (t *notifyRouter) Run(ctx context.Context, channel string) {
go Listen(ctx, channel, eventQueueNotifyChannel)

for payload := range eventQueueNotifyChannel {
if _, ok := t.registry[payload]; !ok || payload == "" {
if payload == "" {
continue
}

// The original payload is expected to be in the form of
// <route> <...optional payload>
fields := strings.Fields(payload)
route := fields[0]
derivedPayload := strings.Join(fields[1:], " ")
route, extractedPayload, err := t.routeExtractor(payload)
if err != nil {
continue
}

if _, ok := t.registry[route]; !ok {
continue
}

if ch, ok := t.registry[route]; ok {
ch <- derivedPayload
go func() {
ch <- extractedPayload
}()
}
}
}

0 comments on commit 98b03a9

Please sign in to comment.