From 52e118ebccf253256bcfd0b27c3c6b30a203b194 Mon Sep 17 00:00:00 2001 From: Elliot Courant Date: Wed, 27 Sep 2023 16:45:48 -0500 Subject: [PATCH 1/2] chore(test): Adding a test for multiple consumers with the pg backend. This test is to provide a minimal proof that jobs can be consumed by multiple workers and in a way can only be consumed once. If the execCount does not match the expected count then this test will fail because either too many jobs were executed (like one executing twice) or a job was dropped when it should not have been. --- backends/postgres/postgres_backend_test.go | 77 ++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 45c69b5..5412617 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -6,6 +6,8 @@ import ( "fmt" "os" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -114,6 +116,81 @@ func TestBasicJobProcessing(t *testing.T) { }) } +func TestMultipleProcessors(t *testing.T) { + const queue = "testing" + + connString := os.Getenv("TEST_DATABASE_URL") + if connString == "" { + t.Skip("Skipping: TEST_DATABASE_URL not set") + return + } + + t.Cleanup(func() { + flushDB() + }) + + var execCount uint32 + var wg sync.WaitGroup + count := 8 + neos := make([]neoq.Neoq, 0, count) + // Create several neoq processors such that we can enqueue several jobs and have them consumed by multiple different + // workers. We want to make sure that a job is not processed twice in a pool of many different neoq workers. + for i := 0; i < count; i++ { + ctx := context.Background() + nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString)) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + nq.Shutdown(ctx) + }) + + h := handler.New(queue, func(_ context.Context) (err error) { + // Make sure that by wasting some time working on a thing we don't consume two jobs back to back. + // This should give the other neoq clients enough time to grab a job as well. + time.Sleep(500 * time.Millisecond) + atomic.AddUint32(&execCount, 1) + wg.Done() + return + }) + // Make sure that each neoq worker only works on one thing at a time. + h.Concurrency = 1 + + err = nq.Start(ctx, h) + if err != nil { + t.Error(err) + } + + neos = append(neos, nq) + } + + // From one of the neoq clients, enqueue several jobs. At least one per processor registered above. + nq := neos[0] + for i := 0; i < count; i++ { + wg.Add(1) + ctx := context.Background() + deadline := time.Now().UTC().Add(10 * time.Second) + jid, e := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": fmt.Sprintf("hello world: %d", i), + }, + Deadline: &deadline, + }) + if e != nil || jid == jobs.DuplicateJobID { + t.Error(e) + } + } + + // Wait for all jobs to complete. + wg.Wait() + + // Make sure that we executed the expected number of jobs. + if execCount != uint32(count) { + t.Fatalf("mismatch number of executions. Expected: %d Found: %d", count, execCount) + } +} + // TestDuplicateJobRejection tests that the backend rejects jobs that are duplicates func TestDuplicateJobRejection(t *testing.T) { const queue = "testing" From fab0736b2bed24a32c909156ac38b29e76c6cc41 Mon Sep 17 00:00:00 2001 From: Elliot Courant Date: Wed, 27 Sep 2023 19:00:00 -0500 Subject: [PATCH 2/2] fix(ci): Fixed github actions tests on PRs --- .github/workflows/test.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d8fdd5b..ae1f325 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,5 +1,11 @@ name: test -on: [push] +on: + push: + branches: + - main + pull_request: + branches: + - main concurrency: group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}'