Skip to content

Commit

Permalink
Merge pull request #1 from pconstantinou/feature/bump-version
Browse files Browse the repository at this point in the history
Fix overwrite
  • Loading branch information
pconstantinou authored Feb 23, 2024
2 parents 9255606 + 675327e commit e7ef0a4
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 21 deletions.

This file was deleted.

This file was deleted.

42 changes: 26 additions & 16 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,18 @@ func (p *PgBackend) initializeDB() (err error) {
return nil
}

func isUniqueConflict(err error) bool {
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Code == pgerrcode.UniqueViolation {
return true
}
}
}
return false
}

// Enqueue adds jobs to the specified queue
func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job, jobOptions ...neoq.JobOption) (jobID string, err error) {
options := neoq.JobOptions{}
Expand Down Expand Up @@ -415,16 +427,17 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job, jobOptions ...ne
// the tx commits successfully, this is a no-op
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed
jobID, err = p.enqueueJob(ctx, tx, job, options)

if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Code == pgerrcode.UniqueViolation {
err = jobs.ErrJobFingerprintConflict
return
}
if isUniqueConflict(err) {
err = jobs.ErrJobFingerprintConflict
return
}
p.logger.Error("error enqueueing job", slog.String("queue", job.Queue), slog.Any("error", err))
err = fmt.Errorf("error enqueuing job: %w", err)
if err != nil {
return
}
}

err = tx.Commit(ctx)
Expand Down Expand Up @@ -558,19 +571,16 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job, opti
err = jobs.FingerprintJob(j)
if err != nil {
return jobID,
fmt.Errorf("%w: %v", jobs.ErrCantGenerateFingerprint, err)
fmt.Errorf("%w: %s", jobs.ErrCantGenerateFingerprint, err.Error())
}
p.logger.Debug("adding job to the queue", slog.String("queue", j.Queue))
if !options.Override {
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
} else {
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (queue, status, fingerprint, ran_at) DO
UPDATE SET
payload=$3, run_after=$4, deadline=$5, max_retries=$6
RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
if isUniqueConflict(err) && options.Override {
err = tx.QueryRow(ctx, `UPDATE neoq_jobs set payload=$3, run_after=$4, deadline=$5, max_retries=$6, retries=0
WHERE queue = $1 and fingerprint = $2 and status != "processed"
RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
if err != nil {
p.logger.Error("error enqueueing override job", slog.Any("error", err))
Expand Down

0 comments on commit e7ef0a4

Please sign in to comment.