Skip to content

Commit

Permalink
Fix memory backend support for multiple queues
Browse files Browse the repository at this point in the history
Fixes #56
  • Loading branch information
acaloiaro committed Aug 20, 2023
1 parent 57c75de commit 631c508
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 29 deletions.
33 changes: 26 additions & 7 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type MemBackend struct {
mu *sync.Mutex // mutext to protect mutating state on a pgWorker
cancelFuncs []context.CancelFunc // A collection of cancel functions to be called upon Shutdown()
jobCount int64 // number of jobs that have been queued since start
initialized bool
}

// Backend is a [config.BackendInitializer] that initializes a new memory-backed neoq backend
Expand Down Expand Up @@ -72,6 +73,8 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string,
var qc any
var ok bool

m.logger.Debug("adding a new job", "queue", job.Queue)

if qc, ok = m.queues.Load(job.Queue); !ok {
return jobs.UnqueuedJobID, fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, job.Queue)
}
Expand Down Expand Up @@ -120,7 +123,7 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string,

// Start starts processing jobs with the specified queue and handler
func (m *MemBackend) Start(ctx context.Context, queue string, h handler.Handler) (err error) {
var queueCapacity = h.QueueCapacity
queueCapacity := h.QueueCapacity
if queueCapacity == emptyCapacity {
queueCapacity = defaultMemQueueCapacity
}
Expand Down Expand Up @@ -200,14 +203,18 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {
var ok bool

if ht, ok = m.handlers.Load(queue); !ok {
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue)
err = fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue)
m.logger.Error("error loading handler for queue", queue)
return
}

if qc, ok = m.queues.Load(queue); !ok {
return fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, queue)
err = fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, queue)
m.logger.Error("error loading channel for queue", queue)
return err
}

go func() { m.scheduleFutureJobs(ctx, queue) }()
go func() { m.scheduleFutureJobs(ctx) }()

h = ht.(handler.Handler)
queueChan = qc.(chan *jobs.Job)
Expand Down Expand Up @@ -244,10 +251,20 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {
return nil
}

func (m *MemBackend) scheduleFutureJobs(ctx context.Context, queue string) {
func (m *MemBackend) scheduleFutureJobs(ctx context.Context) {
// check for new future jobs on an interval
ticker := time.NewTicker(m.config.JobCheckInterval)

// if the queues list is non-empty, then we've already started, and this function is a no-op
m.mu.Lock()
if !m.initialized {
m.initialized = true
m.mu.Unlock()
} else {
m.mu.Unlock()
return
}

for {
// loop over list of future jobs, scheduling goroutines to wait for jobs that are due within the next 30 seconds
m.futureJobs.Range(func(_, v any) bool {
Expand All @@ -257,14 +274,16 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context, queue string) {
timeUntilRunAfter := time.Until(job.RunAfter)
if timeUntilRunAfter <= m.config.FutureJobWindow {
m.removeFutureJob(job.ID)
m.logger.Debug("dequeued future job", "id", job.ID, "queue", job.Queue)
go func(j *jobs.Job) {
scheduleCh := time.After(timeUntilRunAfter)
<-scheduleCh
if qc, ok := m.queues.Load(queue); ok {
m.logger.Debug("loading job for queue", "queue", j.Queue)
if qc, ok := m.queues.Load(j.Queue); ok {
queueChan = qc.(chan *jobs.Job)
queueChan <- j
} else {
m.logger.Error(fmt.Sprintf("no queue processor for queue '%s'", queue), handler.ErrNoHandlerForQueue)
m.logger.Error(fmt.Sprintf("no queue processor for queue '%s'", j.Queue), handler.ErrNoHandlerForQueue)
}
}(job)
}
Expand Down
137 changes: 130 additions & 7 deletions backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package memory_test
import (
"context"
"fmt"
"log"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand All @@ -15,7 +14,7 @@ import (
"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/testutils"
"github.com/acaloiaro/neoq/logging"
"github.com/pkg/errors"
"github.com/robfig/cron"
"golang.org/x/exp/slog"
Expand All @@ -33,7 +32,7 @@ func TestBasicJobProcessing(t *testing.T) {
numJobs := 1000
doneCnt := 0
done := make(chan bool)
var timeoutTimer = time.After(5 * time.Second)
timeoutTimer := time.After(5 * time.Second)

ctx := context.TODO()
nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend))
Expand Down Expand Up @@ -158,8 +157,14 @@ var testFutureJobs = &sync.Map{}

func TestFutureJobScheduling(t *testing.T) {
ctx := context.Background()
testLogger := testutils.TestLogger{L: log.New(&strings.Builder{}, "", 0), Done: make(chan bool)}
testBackend := memory.TestingBackend(config.New(), cron.New(), &sync.Map{}, &sync.Map{}, testFutureJobs, &sync.Map{}, testLogger)
testBackend := memory.TestingBackend(
config.New(),
cron.New(),
&sync.Map{},
&sync.Map{},
testFutureJobs,
&sync.Map{},
slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: logging.LogLevelDebug})))

nq, err := neoq.New(ctx, neoq.WithBackend(testBackend))
if err != nil {
Expand Down Expand Up @@ -194,6 +199,124 @@ func TestFutureJobScheduling(t *testing.T) {
}
}

// This test was added in response to the following issue: https://github.com/acaloiaro/neoq/issues/56
// nolint: gocognit, gocyclo
func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
ctx := context.Background()
nq, err := neoq.New(ctx)
if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)

jobsPerQueueCount := 100

jobsProcessed1 := 0
jobsProcessed2 := 0

q1 := "queue1"
q2 := "queue2"

done1 := make(chan bool)
done2 := make(chan bool)

h1 := handler.New(func(ctx context.Context) (err error) {
var j *jobs.Job
j, err = jobs.FromContext(ctx)
if err != nil {
return
}

if j.Queue != q1 {
err = errors.New("handling job from queu2 with queue1 handler. this is a bug")
}

done1 <- true
return
})

h2 := handler.New(func(ctx context.Context) (err error) {
var j *jobs.Job
j, err = jobs.FromContext(ctx)
if err != nil {
return
}

if j.Queue != q2 {
err = errors.New("handling job from queue1 with queue2 handler. this is a bug")
}

done2 <- true
return
})

if err := nq.Start(ctx, q1, h1); err != nil {
t.Fatal(err)
}

if err := nq.Start(ctx, q2, h2); err != nil {
t.Fatal(err)
}

go func() {
for i := 1; i <= jobsPerQueueCount; i++ {
_, err = nq.Enqueue(ctx, &jobs.Job{
ID: int64(i),
Queue: q1,
Payload: map[string]interface{}{"ID": i},
RunAfter: time.Now().Add(1 * time.Millisecond),
})
if err != nil {
err = fmt.Errorf("job was not enqueued. either it was duplicate or this error caused it: %w", err)
t.Error(err)
}
}

for j := 1; j <= jobsPerQueueCount; j++ {
_, err = nq.Enqueue(ctx, &jobs.Job{
ID: int64(j),
Queue: q2,
Payload: map[string]interface{}{"ID": j},
RunAfter: time.Now().Add(1 * time.Millisecond),
})
if err != nil {
err = fmt.Errorf("job was not enqueued. either it was duplicate or this error caused it: %w", err)
t.Error(err)
}
}
}()

timeoutTimer := time.After(10 * time.Second)
results_loop:
for {
select {
case <-timeoutTimer:
err = jobs.ErrJobTimeout
break results_loop
case <-done1:
jobsProcessed1++
case <-done2:
jobsProcessed2++
default:
if jobsProcessed1 == jobsPerQueueCount && jobsProcessed2 == jobsPerQueueCount {
break results_loop
}
time.Sleep(10 * time.Millisecond)
}
}
if err != nil {
t.Error(err)
}
if jobsProcessed1 != jobsPerQueueCount {
// nolint: goerr113
t.Error(fmt.Errorf("handler1 should have handled %d jobs, but handled %d", jobsPerQueueCount, jobsProcessed1))
}
if jobsProcessed2 != jobsPerQueueCount {
// nolint: goerr113
t.Error(fmt.Errorf("handler2 should have handled %d jobs, but handled %d", jobsPerQueueCount, jobsProcessed2))
}
}

func TestCron(t *testing.T) {
const cronSpec = "* * * * * *"
ctx := context.TODO()
Expand All @@ -203,7 +326,7 @@ func TestCron(t *testing.T) {
}
defer nq.Shutdown(ctx)

var done = make(chan bool)
done := make(chan bool)
h := handler.New(func(ctx context.Context) (err error) {
done <- true
return
Expand Down
15 changes: 7 additions & 8 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func flushDB() {
_, err = conn.Query(context.Background(), "DELETE FROM neoq_jobs") // nolint: gocritic
if err != nil {
fmt.Fprintf(os.Stderr, "'neoq_jobs' table flush failed: %v\n", err)
os.Exit(1) // nolint: gocritic
}
}

Expand All @@ -56,9 +55,9 @@ func TestBasicJobProcessing(t *testing.T) {
done := make(chan bool)
defer close(done)

var timeoutTimer = time.After(5 * time.Second)
timeoutTimer := time.After(5 * time.Second)

var connString = os.Getenv("TEST_DATABASE_URL")
connString := os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
return
Expand Down Expand Up @@ -113,9 +112,9 @@ func TestBasicJobMultipleQueue(t *testing.T) {
done := make(chan bool)
doneCnt := 0

var timeoutTimer = time.After(5 * time.Second)
timeoutTimer := time.After(5 * time.Second)

var connString = os.Getenv("TEST_DATABASE_URL")
connString := os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
return
Expand Down Expand Up @@ -195,7 +194,7 @@ func TestCron(t *testing.T) {
done := make(chan bool, 1)
defer close(done)
const cron = "* * * * * *"
var connString = os.Getenv("TEST_DATABASE_URL")
connString := os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
return
Expand Down Expand Up @@ -247,9 +246,9 @@ func TestBasicJobProcessingWithErrors(t *testing.T) {
done := make(chan bool, 10)
defer close(done)

var timeoutTimer = time.After(5 * time.Second)
timeoutTimer := time.After(5 * time.Second)

var connString = os.Getenv("TEST_DATABASE_URL")
connString := os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
return
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (
// E.g. right now is 16:00 and a job's RunAfter is 16:30 of the same date. This job will get a dedicated goroutine to
// wait until the job's RunAfter, scheduling the job to be run exactly at RunAfter
DefaultFutureJobWindow = 30 * time.Second
DefaultJobCheckInterval = 5 * time.Second
DefaultJobCheckInterval = 1 * time.Second
)

// Config configures neoq and its backends
Expand Down
14 changes: 8 additions & 6 deletions neoq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
"github.com/acaloiaro/neoq/testutils"
)

var errTrigger = errors.New("triggerering a log error")
var errPeriodicTimeout = errors.New("timed out waiting for periodic job")
var (
errTrigger = errors.New("triggerering a log error")
errPeriodicTimeout = errors.New("timed out waiting for periodic job")
)

func ExampleNew() {
ctx := context.Background()
Expand Down Expand Up @@ -91,7 +93,7 @@ func TestStart(t *testing.T) {
timeout := false
numJobs := 1
doneCnt := 0
var done = make(chan bool, numJobs)
done := make(chan bool, numJobs)

ctx := context.TODO()
nq, err := New(ctx, WithBackend(memory.Backend))
Expand Down Expand Up @@ -162,7 +164,7 @@ func TestStartCron(t *testing.T) {
}
defer nq.Shutdown(ctx)

var done = make(chan bool)
done := make(chan bool)
h := handler.New(func(ctx context.Context) (err error) {
done <- true
return
Expand Down Expand Up @@ -194,7 +196,7 @@ func TestStartCron(t *testing.T) {

func TestSetLogger(t *testing.T) {
const queue = "testing"
var done = make(chan bool)
done := make(chan bool, 1)
buf := &strings.Builder{}
ctx := context.TODO()

Expand Down Expand Up @@ -225,7 +227,7 @@ func TestSetLogger(t *testing.T) {
}

<-done
expectedLogMsg := "job failed [job failed to process: triggerering a log error job_id 1]"
expectedLogMsg := "adding a new job [queue testing]"
actualLogMsg := strings.Trim(buf.String(), "\n")
if actualLogMsg != expectedLogMsg {
t.Error(fmt.Errorf("%s != %s", actualLogMsg, expectedLogMsg)) //nolint:all
Expand Down

0 comments on commit 631c508

Please sign in to comment.