Skip to content

Commit

Permalink
fix: processing pickup race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored and cisse21 committed Dec 17, 2024
1 parent aef8585 commit 5e40fa6
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion warehouse/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Router struct {
inProgressMap map[workerIdentifierMapKey][]jobID
inProgressMapLock sync.RWMutex

processingMu sync.Mutex

scheduledTimesCache map[string][]int
scheduledTimesCacheLock sync.RWMutex

Expand Down Expand Up @@ -371,18 +373,22 @@ loop:
continue
}

r.processingMu.Lock()
inProgressNamespaces := r.getInProgressNamespaces()
r.logger.Debugf(`Current inProgress namespace identifiers for %s: %v`, r.destType, inProgressNamespaces)

uploadJobsToProcess, err := r.uploadsToProcess(ctx, availableWorkers, inProgressNamespaces)
if err != nil && ctx.Err() == nil {
r.logger.Errorn("Error getting uploads to process", logger.NewErrorField(err))
r.processingMu.Unlock()
return err
}

for _, uploadJob := range uploadJobsToProcess {
r.setDestInProgress(uploadJob.warehouse, uploadJob.upload.ID)
}
r.processingMu.Unlock()

for _, uploadJob := range uploadJobsToProcess {
workerName := r.workerIdentifier(uploadJob.warehouse)

r.workerChannelMapLock.RLock()
Expand Down Expand Up @@ -596,6 +602,9 @@ func (r *Router) handlePriorityForWaitingUploads(ctx context.Context, warehouse
return defaultUploadPriority, nil
}

r.processingMu.Lock()
defer r.processingMu.Unlock()

// If it is present do nothing else delete it
if _, inProgress := r.isUploadJobInProgress(warehouse, latestInfo.ID); !inProgress {
if err := r.uploadRepo.DeleteWaiting(ctx, latestInfo.ID); err != nil {
Expand Down

0 comments on commit 5e40fa6

Please sign in to comment.