Skip to content

Commit

Permalink
chore: make upstream reconciler public
Browse files Browse the repository at this point in the history
  • Loading branch information
moshloop committed Jan 1, 2024
1 parent b434d63 commit 6c1779b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 44 deletions.
45 changes: 12 additions & 33 deletions upstream/event_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,22 @@ package upstream

import (
"fmt"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/duty"
"github.com/flanksource/duty/context"
"github.com/flanksource/postq"
)

const EventPushQueueCreate = "push_queue.create"

// NewPushQueueConsumer creates a new push queue consumer
func NewPushQueueConsumer(config UpstreamConfig) postq.AsyncEventConsumer {
return postq.AsyncEventConsumer{
WatchEvents: []string{EventPushQueueCreate},
Consumer: getPushUpstreamConsumer(config),
BatchSize: 50,
ConsumerOption: &postq.ConsumerOption{
NumConsumers: 5,
ErrorHandler: func(err error) bool {
logger.Errorf("error consuming upstream push_queue.create events: %v", err)
time.Sleep(time.Second)
return true
},
},
}
}

// getPushUpstreamConsumer acts as an adapter to supply PushToUpstream event consumer.
func getPushUpstreamConsumer(config UpstreamConfig) func(ctx postq.Context, events postq.Events) postq.Events {
return func(ctx postq.Context, events postq.Events) postq.Events {
dbCtx, ok := ctx.(duty.DBContext)
if !ok {
return addErrorToFailedEvents(events, fmt.Errorf("invalid context type: %T. Need duty.DBContext", ctx))
}

return PushToUpstream(dbCtx, config, events)
func NewPushUpstreamConsumer(config UpstreamConfig) func(ctx context.Context, events postq.Events) postq.Events {
return func(ctx context.Context, events postq.Events) postq.Events {
return PushToUpstream(ctx, config, events)
}
}

// PushToUpstream fetches records specified in events from this instance and sends them to the upstream instance.
func PushToUpstream(ctx duty.DBContext, config UpstreamConfig, events []postq.Event) []postq.Event {
func PushToUpstream(ctx context.Context, config UpstreamConfig, events []postq.Event) []postq.Event {
upstreamMsg := &PushData{
AgentName: config.AgentName,
}
Expand Down Expand Up @@ -134,18 +110,21 @@ func PushToUpstream(ctx duty.DBContext, config UpstreamConfig, events []postq.Ev
if len(events) == 1 {
errMsg := fmt.Errorf("failed to push to upstream: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(events, errMsg)...)
return failedEvents
} else {
// Error encountered while pushing could be an SQL or Application error
// Since we do not know which event in the bulk is failing
// Process each event individually since upsteam.Push is idempotent
var failedIndividualEvents []postq.Event

for _, e := range events {
failedIndividualEvents = append(failedIndividualEvents, PushToUpstream(ctx, config, []postq.Event{e})...)
failedEvents = append(failedEvents, PushToUpstream(ctx, config, []postq.Event{e})...)
}
}

return failedIndividualEvents
if len(events) > 0 || len(failedEvents) > 0 {
ctx.Tracef("processed %d events, %d errors", len(events), len(failedEvents))
}
return failedEvents

}

func addErrorToFailedEvents(events []postq.Event, err error) []postq.Event {
Expand Down
22 changes: 11 additions & 11 deletions upstream/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func (p PaginateResponse) String() string {
return fmt.Sprintf("hash=%s, next=%s, count=%d", p.Hash, p.Next, p.Total)
}

// upstreamReconciler pushes missing resources from an agent to the upstream.
type upstreamReconciler struct {
// UpstreamReconciler pushes missing resources from an agent to the upstream.
type UpstreamReconciler struct {
upstreamConf UpstreamConfig
upstreamClient *UpstreamClient

Expand All @@ -43,8 +43,8 @@ type upstreamReconciler struct {
pageSize int
}

func NewUpstreamReconciler(upstreamConf UpstreamConfig, pageSize int) *upstreamReconciler {
return &upstreamReconciler{
func NewUpstreamReconciler(upstreamConf UpstreamConfig, pageSize int) *UpstreamReconciler {
return &UpstreamReconciler{
upstreamConf: upstreamConf,
pageSize: pageSize,
upstreamClient: NewUpstreamClient(upstreamConf),
Expand All @@ -53,15 +53,15 @@ func NewUpstreamReconciler(upstreamConf UpstreamConfig, pageSize int) *upstreamR

// Sync compares all the resource of the given table against
// the upstream server and pushes any missing resources to the upstream.
func (t *upstreamReconciler) Sync(ctx context.Context, table string) (int, error) {
func (t *UpstreamReconciler) Sync(ctx context.Context, table string) (int, error) {
logger.Debugf("Reconciling table %q with upstream", table)

// Empty starting cursor, so we sync everything
return t.sync(ctx, table, uuid.Nil.String())
}

// SyncAfter pushes all the records of the given table that were updated in the given duration
func (t *upstreamReconciler) SyncAfter(ctx context.Context, table string, after time.Duration) (int, error) {
func (t *UpstreamReconciler) SyncAfter(ctx context.Context, table string, after time.Duration) (int, error) {
logger.WithValues("since", time.Now().Add(-after).Format(time.RFC3339Nano)).Debugf("Reconciling table %q with upstream", table)

// We find the item that falls just before the requested duration & begin from there
Expand All @@ -80,7 +80,7 @@ func (t *upstreamReconciler) SyncAfter(ctx context.Context, table string, after

// Sync compares all the resource of the given table against
// the upstream server and pushes any missing resources to the upstream.
func (t *upstreamReconciler) sync(ctx context.Context, table, next string) (int, error) {
func (t *UpstreamReconciler) sync(ctx context.Context, table, next string) (int, error) {
var errorList []error
// We keep this counter to keep a track of attempts for a batch
pushed := 0
Expand Down Expand Up @@ -133,7 +133,7 @@ func (t *upstreamReconciler) sync(ctx context.Context, table, next string) (int,
}
pushed += pushData.Length()
}
if upstreamStatus.Next == "" {
if next == "" {
break
}
}
Expand All @@ -143,7 +143,7 @@ func (t *upstreamReconciler) sync(ctx context.Context, table, next string) (int,

// fetchUpstreamResourceIDs requests all the existing resource ids from the upstream
// that were sent by this agent.
func (t *upstreamReconciler) fetchUpstreamResourceIDs(ctx context.Context, request PaginateRequest) ([]string, error) {
func (t *UpstreamReconciler) fetchUpstreamResourceIDs(ctx context.Context, request PaginateRequest) ([]string, error) {
httpReq := t.createPaginateRequest(ctx, request)
httpResponse, err := httpReq.Get(fmt.Sprintf("pull/%s", t.upstreamConf.AgentName))
if err != nil {
Expand All @@ -167,7 +167,7 @@ func (t *upstreamReconciler) fetchUpstreamResourceIDs(ctx context.Context, reque
return response, nil
}

func (t *upstreamReconciler) fetchUpstreamStatus(ctx gocontext.Context, request PaginateRequest) (*PaginateResponse, error) {
func (t *UpstreamReconciler) fetchUpstreamStatus(ctx gocontext.Context, request PaginateRequest) (*PaginateResponse, error) {
httpReq := t.createPaginateRequest(ctx, request)
httpResponse, err := httpReq.Get(fmt.Sprintf("status/%s", t.upstreamConf.AgentName))
if err != nil {
Expand All @@ -190,7 +190,7 @@ func (t *upstreamReconciler) fetchUpstreamStatus(ctx gocontext.Context, request
return &response, nil
}

func (t *upstreamReconciler) createPaginateRequest(ctx gocontext.Context, request PaginateRequest) *http.Request {
func (t *UpstreamReconciler) createPaginateRequest(ctx gocontext.Context, request PaginateRequest) *http.Request {
return t.upstreamClient.R(ctx).
QueryParam("table", request.Table).
QueryParam("from", request.From).
Expand Down

0 comments on commit 6c1779b

Please sign in to comment.