Skip to content

Commit

Permalink
chore: minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Nov 21, 2024
1 parent f908e21 commit 86a7a84
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 20 deletions.
8 changes: 4 additions & 4 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ func (proc *Handle) recordEventDeliveryStatus(jobsByDestID map[string][]*jobsdb.
var params map[string]interface{}
err := jsonfast.Unmarshal(job.Parameters, &params)
if err != nil {
proc.logger.Errorf("Error while UnMarshaling live event parameters: %w", err)
proc.logger.Errorf("Error while UnMarshaling live event parameters: %v", err)
continue
}

Expand All @@ -1075,14 +1075,14 @@ func (proc *Handle) recordEventDeliveryStatus(jobsByDestID map[string][]*jobsdb.
events := make([]map[string]interface{}, 0)
err = jsonfast.Unmarshal(job.EventPayload, &events)
if err != nil {
proc.logger.Errorf("Error while UnMarshaling live event payload: %w", err)
proc.logger.Errorf("Error while UnMarshaling live event payload: %v", err)
continue
}
for i := range events {
event := &events[i]
eventPayload, err := jsonfast.Marshal(*event)
if err != nil {
proc.logger.Errorf("Error while Marshaling live event payload: %w", err)
proc.logger.Errorf("Error while Marshaling live event payload: %v", err)
continue
}

Expand Down Expand Up @@ -1813,7 +1813,7 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra
dedupKey := event.dedupKey
ok := keyMap[dedupKey]
if !ok {
proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey)
proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey.Key)
sourceDupStats[dupStatKey{sourceID: event.eventParams.SourceId}] += 1
continue
}
Expand Down
2 changes: 1 addition & 1 deletion regulation-worker/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (j *JobAPI) Get(ctx context.Context) (model.Job, error) {
return model.Job{}, err
}
defer func() { httputil.CloseResponse(resp) }()
pkgLogger.Debugf("obtained response code: %v", resp.StatusCode, "response body: ", resp.Body)
pkgLogger.Debugf("obtained response code: %v wit resp body %v", resp.StatusCode, resp.Body)

// if successful
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
Expand Down
4 changes: 2 additions & 2 deletions regulation-worker/internal/delete/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ func NewRouter(managers ...deleteManager) *Router {
}

func (r *Router) Delete(ctx context.Context, job model.Job, dest model.Destination) model.JobStatus {
pkgLogger.Debugf("deleting job: %v", job, "from destination: %v", dest)
pkgLogger.Debugf("deleting job: %v from destination: %v", job, dest)
r.once.Do(func() {
pkgLogger.Info("getting all the supported destination")
r.router = make(map[string]deleteManager, len(r.Managers))

for _, m := range r.Managers {
destinations := m.GetSupportedDestinations()
pkgLogger.Infof("got deletion manager supporting deletion from: %v", m, destinations)
pkgLogger.Infof("got deletion manager supporting deletion from: %v for destinations %v", m, destinations)
for _, d := range destinations {
r.router[d] = m
}
Expand Down
13 changes: 5 additions & 8 deletions regulation-worker/internal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
)

var (
ErrDestTypeNotFound = errors.New("destination type not found for the destination ID")
ErrDestDetail = errors.New("error while getting destination details")
ErrNoRunnableJob = errors.New("no runnable job found")
ErrInvalidDestination = errors.New("invalid destination")
ErrRequestTimeout = errors.New("request timeout")
Expand All @@ -22,12 +20,11 @@ type JobStatus struct {
}

const (
JobStatusUndefined Status = ""
JobStatusPending Status = "pending"
JobStatusRunning Status = "running"
JobStatusComplete Status = "complete"
JobStatusFailed Status = "failed"
JobStatusAborted Status = "aborted"
JobStatusPending Status = "pending"
JobStatusRunning Status = "running"
JobStatusComplete Status = "complete"
JobStatusFailed Status = "failed"
JobStatusAborted Status = "aborted"
)

type Job struct {
Expand Down
5 changes: 3 additions & 2 deletions regulation-worker/internal/service/looper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"errors"
"fmt"
"os"
"strconv"
Expand Down Expand Up @@ -34,7 +35,7 @@ func (l *Looper) Loop(ctx context.Context) error {

err := l.Svc.JobSvc(ctx)

if err == model.ErrNoRunnableJob {
if errors.Is(err, model.ErrNoRunnableJob) {
pkgLogger.Debugf("no runnable job found... sleeping")
if err := misc.SleepCtx(ctx, time.Duration(interval)*time.Minute); err != nil {
pkgLogger.Debugf("context cancelled... exiting infinite loop %v", err)
Expand All @@ -43,7 +44,7 @@ func (l *Looper) Loop(ctx context.Context) error {
continue
}
// this is to make sure that we don't panic when any of the API call fails with deadline exceeded error.
if err == model.ErrRequestTimeout {
if errors.Is(err, model.ErrRequestTimeout) {
pkgLogger.Errorf("context deadline exceeded... retrying after %d minute(s): %v", retryDelay, err)
if err := misc.SleepCtx(ctx, time.Duration(retryDelay)*time.Second); err != nil {
pkgLogger.Debugf("context cancelled... exiting infinite loop %v", err)
Expand Down
7 changes: 4 additions & 3 deletions regulation-worker/internal/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"errors"
"time"

"github.com/cenkalti/backoff"
Expand Down Expand Up @@ -30,7 +31,7 @@ type JobSvc struct {
MaxFailedAttempts int
}

// called by looper
// JobSvc called by looper
// calls api-client.getJob(workspaceID)
// calls api-client to get new job with workspaceID, which returns jobID.
func (js *JobSvc) JobSvc(ctx context.Context) error {
Expand All @@ -43,7 +44,7 @@ func (js *JobSvc) JobSvc(ctx context.Context) error {
return err
}

// once job is successfully received, calling updatestatus API to update the status of job to running.
// once job is successfully received, calling update-status API to update the status of job to running.
jobStatus := model.JobStatus{Status: model.JobStatusRunning}
err = js.updateStatus(ctx, jobStatus, job.ID)
if err != nil {
Expand All @@ -53,7 +54,7 @@ func (js *JobSvc) JobSvc(ctx context.Context) error {
destDetail, err := js.DestDetail.GetDestDetails(job.DestinationID)
if err != nil {
pkgLogger.Errorf("error while getting destination details: %v", err)
if err == model.ErrInvalidDestination {
if errors.Is(err, model.ErrInvalidDestination) {
return js.updateStatus(ctx, model.JobStatus{Status: model.JobStatusAborted, Error: model.ErrInvalidDestination}, job.ID)
}
return js.updateStatus(ctx, model.JobStatus{Status: model.JobStatusFailed, Error: err}, job.ID)
Expand Down

0 comments on commit 86a7a84

Please sign in to comment.