From 20f5b139b51bcf557cfd2eb6c02e6d9e3125854a Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Thu, 7 Sep 2023 11:50:31 +0200 Subject: [PATCH] break: Rename types.Backend -> neoq.Neoq --- .goreleaser.yaml | 6 +- Makefile | 1 - README.md | 4 +- backends/doc.go | 2 +- backends/memory/memory_backend.go | 21 ++-- backends/memory/memory_backend_helper_test.go | 10 +- backends/memory/memory_backend_test.go | 18 +++- backends/postgres/postgres_backend.go | 24 ++--- backends/redis/redis_backend.go | 29 +++--- config/config.go | 51 ---------- examples/add_future_postgres_job/main.go | 2 +- examples/add_postgres_job/main.go | 4 +- neoq.go | 95 ++++++++++++++++--- neoq_test.go | 17 ++-- types/types.go | 34 ------- 15 files changed, 158 insertions(+), 160 deletions(-) delete mode 100644 config/config.go delete mode 100644 types/types.go diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 89d7ef3..40af948 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -32,12 +32,14 @@ changelog: - '^docs:' - '^test:' groups: + - title: 'BREAKING CHANGES' + regexp: '^.*?break(\([[:word:]]+\))??!?:.+$' - title: Features regexp: '^.*?feat(\([[:word:]]+\))??!?:.+$' - order: 0 + order: 1 - title: Maintenance regexp: '^.*?maint(\([[:word:]]+\))??!?:.+$' - order: 1 + order: 2 - title: "Bug fixes" regexp: '^.*?fix(\([[:word:]]+\))??!?:.+$' - title: Others diff --git a/Makefile b/Makefile index 51764d8..5505c4e 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,3 @@ -SHELL := /bin/bash GOLANGCI_LINT_VERSION ?= v1.51.1 all: fmt vet mod diff --git a/README.md b/README.md index 0b1a65e..597c40d 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ Queue Handlers are simple Go functions that accept a `Context` parameter. ```go ctx := context.Background() -nq, _ := neoq.New(ctx) +nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend)) nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) { j, _ := jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) @@ -61,7 +61,7 @@ Enqueuing adds jobs to the specified queue to be processed asynchronously. ```go ctx := context.Background() -nq, _ := neoq.New(ctx) +nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend)) nq.Enqueue(ctx, &jobs.Job{ Queue: "hello_world", Payload: map[string]interface{}{ diff --git a/backends/doc.go b/backends/doc.go index ef8a689..275c8ca 100644 --- a/backends/doc.go +++ b/backends/doc.go @@ -1,4 +1,4 @@ -// Package backends provides concrete implementations of [pkg/github.com/acaloiaro/neoq/types.Backend] +// Package backends provides concrete implementations of [pkg/github.com/acaloiaro/neoq/neoq.Neoq] // // These backends provide the bulk of Neoq's functionality. package backends diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index 920f231..c82465c 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -8,28 +8,27 @@ import ( "sync" "time" - "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq" "github.com/acaloiaro/neoq/handler" "github.com/acaloiaro/neoq/internal" "github.com/acaloiaro/neoq/jobs" "github.com/acaloiaro/neoq/logging" - "github.com/acaloiaro/neoq/types" "github.com/guregu/null" - "github.com/iancoleman/strcase" // TODO factor out - "github.com/jsuar/go-cron-descriptor/pkg/crondescriptor" // TODO factor out + "github.com/iancoleman/strcase" + "github.com/jsuar/go-cron-descriptor/pkg/crondescriptor" "github.com/robfig/cron" "golang.org/x/exp/slog" ) const ( - defaultMemQueueCapacity = 10000 // the default capacity of individual queues - emptyCapacity = 0 + defaultMemQueueCapacity = 10000 // default capacity of individual queues + emptyCapacity = 0 // queue size at which queues are considered empty ) // MemBackend is a memory-backed neoq backend type MemBackend struct { - types.Backend - config *config.Config + neoq.Neoq + config *neoq.Config logger logging.Logger handlers *sync.Map // map queue names [string] to queue handlers [Handler] fingerprints *sync.Map // map fingerprints [string] to job [Job] @@ -42,10 +41,10 @@ type MemBackend struct { initialized bool } -// Backend is a [config.BackendInitializer] that initializes a new memory-backed neoq backend -func Backend(ctx context.Context, opts ...config.Option) (backend types.Backend, err error) { +// Backend is a [neoq.BackendInitializer] that initializes a new memory-backed neoq backend +func Backend(_ context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, err error) { mb := &MemBackend{ - config: config.New(), + config: neoq.NewConfig(), cron: cron.New(), mu: &sync.Mutex{}, queues: &sync.Map{}, diff --git a/backends/memory/memory_backend_helper_test.go b/backends/memory/memory_backend_helper_test.go index 82a72d6..c535484 100644 --- a/backends/memory/memory_backend_helper_test.go +++ b/backends/memory/memory_backend_helper_test.go @@ -6,18 +6,18 @@ import ( "context" "sync" - "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq" "github.com/acaloiaro/neoq/logging" - "github.com/acaloiaro/neoq/types" "github.com/robfig/cron" ) // TestingBackend initializes a backend for testing purposes -func TestingBackend(conf *config.Config, +func TestingBackend(conf *neoq.Config, c *cron.Cron, queues, h, futureJobs, fingerprints *sync.Map, - logger logging.Logger) config.BackendInitializer { - return func(ctx context.Context, opts ...config.Option) (backend types.Backend, err error) { + logger logging.Logger, +) neoq.BackendInitializer { + return func(ctx context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, err error) { mb := &MemBackend{ config: conf, cron: c, diff --git a/backends/memory/memory_backend_test.go b/backends/memory/memory_backend_test.go index cc5d3f3..b2fadeb 100644 --- a/backends/memory/memory_backend_test.go +++ b/backends/memory/memory_backend_test.go @@ -11,7 +11,6 @@ import ( "github.com/acaloiaro/neoq" "github.com/acaloiaro/neoq/backends/memory" - "github.com/acaloiaro/neoq/config" "github.com/acaloiaro/neoq/handler" "github.com/acaloiaro/neoq/jobs" "github.com/acaloiaro/neoq/logging" @@ -26,6 +25,19 @@ const ( var errPeriodicTimeout = errors.New("timed out waiting for periodic job") +func ExampleNew() { + ctx := context.Background() + nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) + if err != nil { + fmt.Println("initializing a new Neoq with no params should not return an error:", err) + return + } + defer nq.Shutdown(ctx) + + fmt.Println("neoq initialized with default memory backend") + // Output: neoq initialized with default memory backend +} + // TestBasicJobProcessing tests that the memory backend is able to process the most basic jobs with the // most basic configuration. func TestBasicJobProcessing(t *testing.T) { @@ -158,7 +170,7 @@ var testFutureJobs = &sync.Map{} func TestFutureJobScheduling(t *testing.T) { ctx := context.Background() testBackend := memory.TestingBackend( - config.New(), + neoq.NewConfig(), cron.New(), &sync.Map{}, &sync.Map{}, @@ -203,7 +215,7 @@ func TestFutureJobScheduling(t *testing.T) { // nolint: gocognit, gocyclo func TestFutureJobSchedulingMultipleQueues(t *testing.T) { ctx := context.Background() - nq, err := neoq.New(ctx) + nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) if err != nil { t.Fatal(err) } diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 0777747..46e9d0a 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -9,12 +9,11 @@ import ( "sync" "time" - "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq" "github.com/acaloiaro/neoq/handler" "github.com/acaloiaro/neoq/internal" "github.com/acaloiaro/neoq/jobs" "github.com/acaloiaro/neoq/logging" - "github.com/acaloiaro/neoq/types" "github.com/golang-migrate/migrate/v4" _ "github.com/golang-migrate/migrate/v4/database/postgres" // nolint: revive "github.com/golang-migrate/migrate/v4/source/iofs" @@ -69,8 +68,8 @@ var ( // PgBackend is a Postgres-based Neoq backend type PgBackend struct { - types.Backend - config *config.Config + neoq.Neoq + config *neoq.Config logger logging.Logger cron *cron.Cron mu *sync.RWMutex // mutex to protect mutating state on a pgWorker @@ -84,7 +83,7 @@ type PgBackend struct { // // If the database does not yet exist, Neoq will attempt to create the database and related tables by default. // -// Backend requires that one of the [config.ConfigOption] is [WithConnectionString] +// Backend requires that one of the [neoq.ConfigOption] is [WithConnectionString] // // Connection strings may be a URL or DSN-style connection strings. The connection string supports multiple // options detailed below. @@ -104,10 +103,13 @@ type PgBackend struct { // # Example URL // // postgres://worker:secret@workerdb.example.com:5432/mydb?sslmode=verify-ca&pool_max_conns=10 -func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err error) { +func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err error) { + cfg := neoq.NewConfig() + cfg.IdleTransactionTimeout = neoq.DefaultIdleTxTimeout + p := &PgBackend{ mu: &sync.RWMutex{}, - config: config.New(), + config: cfg, handlers: make(map[string]handler.Handler), futureJobs: make(map[string]time.Time), cron: cron.New(), @@ -164,8 +166,8 @@ func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err } // WithConnectionString configures neoq postgres backend to use the specified connection string when connecting to a backend -func WithConnectionString(connectionString string) config.Option { - return func(c *config.Config) { +func WithConnectionString(connectionString string) neoq.ConfigOption { + return func(c *neoq.Config) { c.ConnectionString = connectionString } } @@ -175,8 +177,8 @@ func WithConnectionString(connectionString string) config.Option { // The timeout is the number of milliseconds that a transaction may sit idle before postgres terminates the // transaction's underlying connection. The timeout should be longer than your longest job takes to complete. If set // too short, job state will become unpredictable, e.g. retry counts may become incorrect. -func WithTransactionTimeout(txTimeout int) config.Option { - return func(c *config.Config) { +func WithTransactionTimeout(txTimeout int) neoq.ConfigOption { + return func(c *neoq.Config) { c.IdleTransactionTimeout = txTimeout } } diff --git a/backends/redis/redis_backend.go b/backends/redis/redis_backend.go index 05b8963..a2199fe 100644 --- a/backends/redis/redis_backend.go +++ b/backends/redis/redis_backend.go @@ -12,12 +12,11 @@ import ( "sync" "time" - "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq" "github.com/acaloiaro/neoq/handler" "github.com/acaloiaro/neoq/internal" "github.com/acaloiaro/neoq/jobs" "github.com/acaloiaro/neoq/logging" - "github.com/acaloiaro/neoq/types" "github.com/hibiken/asynq" "github.com/iancoleman/strcase" "github.com/jsuar/go-cron-descriptor/pkg/crondescriptor" @@ -34,12 +33,12 @@ var ErrInvalidAddr = errors.New("invalid connecton string: see documentation for // RedisBackend is a Redis-backed neoq backend // nolint: revive type RedisBackend struct { - types.Backend + neoq.Neoq client *asynq.Client server *asynq.Server inspector *asynq.Inspector mux *asynq.ServeMux - config *config.Config + config *neoq.Config logger logging.Logger mu *sync.Mutex // mutext to protect mutating backend state taskProvider *memoryTaskConfigProvider @@ -75,10 +74,10 @@ func (m *memoryTaskConfigProvider) addConfig(taskConfig *asynq.PeriodicTaskConfi m.mu.Unlock() } -// Backend is a [config.BackendInitializer] that initializes a new Redis-backed neoq backend -func Backend(_ context.Context, opts ...config.Option) (backend types.Backend, err error) { +// Backend is a [neoq.BackendInitializer] that initializes a new Redis-backed neoq backend +func Backend(_ context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, err error) { b := &RedisBackend{ - config: config.New(), + config: neoq.NewConfig(), mu: &sync.Mutex{}, taskProvider: newMemoryTaskConfigProvider(), } @@ -149,22 +148,22 @@ func Backend(_ context.Context, opts ...config.Option) (backend types.Backend, e } // WithAddr configures neoq to connect to Redis with the given address -func WithAddr(addr string) config.Option { - return func(c *config.Config) { +func WithAddr(addr string) neoq.ConfigOption { + return func(c *neoq.Config) { c.ConnectionString = addr } } // WithPassword configures neoq to connect to Redis with the given password -func WithPassword(password string) config.Option { - return func(c *config.Config) { +func WithPassword(password string) neoq.ConfigOption { + return func(c *neoq.Config) { c.BackendAuthPassword = password } } // WithConcurrency configures the number of workers available to process jobs across all queues -func WithConcurrency(concurrency int) config.Option { - return func(c *config.Config) { +func WithConcurrency(concurrency int) neoq.ConfigOption { + return func(c *neoq.Config) { c.BackendConcurrency = concurrency } } @@ -173,8 +172,8 @@ func WithConcurrency(concurrency int) config.Option { // before forcing them to abort durning Shutdown() // // If unset or zero, default timeout of 8 seconds is used. -func WithShutdownTimeout(timeout time.Duration) config.Option { - return func(c *config.Config) { +func WithShutdownTimeout(timeout time.Duration) neoq.ConfigOption { + return func(c *neoq.Config) { c.ShutdownTimeout = timeout } } diff --git a/config/config.go b/config/config.go deleted file mode 100644 index d94d1d0..0000000 --- a/config/config.go +++ /dev/null @@ -1,51 +0,0 @@ -package config - -import ( - "context" - "time" - - "github.com/acaloiaro/neoq/logging" - "github.com/acaloiaro/neoq/types" -) - -const ( - DefaultIdleTxTimeout = 30000 - // the window of time between time.Now() and when a job's RunAfter comes due that neoq will schedule a goroutine to - // schdule the job for execution. - // E.g. right now is 16:00 and a job's RunAfter is 16:30 of the same date. This job will get a dedicated goroutine to - // wait until the job's RunAfter, scheduling the job to be run exactly at RunAfter - DefaultFutureJobWindow = 30 * time.Second - DefaultJobCheckInterval = 1 * time.Second -) - -// Config configures neoq and its backends -// -// This configuration struct includes options for all backends. As such, some of its options are not implicable to all -// backends. [BackendConcurrency], for example, is only used by the redis backend. Other backends manage concurrency on a -// per-handler basis. -type Config struct { - BackendInitializer BackendInitializer - BackendAuthPassword string // password with which to authenticate to the backend's data provider - BackendConcurrency int // total number of backend processes available to process jobs - ConnectionString string // a string containing connection details for the backend - JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs - FutureJobWindow time.Duration // time duration between current time and job.RunAfter that goroutines schedule for future jobs - IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed - ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown - LogLevel logging.LogLevel // the log level of the default logger -} - -// Option is a function that sets optional backend configuration -type Option func(c *Config) - -// New initiailizes a new Config with defaults -func New() *Config { - return &Config{ - FutureJobWindow: DefaultFutureJobWindow, - JobCheckInterval: DefaultJobCheckInterval, - IdleTransactionTimeout: DefaultIdleTxTimeout, - } -} - -// BackendInitializer is a function that initializes a backend -type BackendInitializer func(ctx context.Context, opts ...Option) (backend types.Backend, err error) diff --git a/examples/add_future_postgres_job/main.go b/examples/add_future_postgres_job/main.go index 1e7f833..2481c04 100644 --- a/examples/add_future_postgres_job/main.go +++ b/examples/add_future_postgres_job/main.go @@ -15,7 +15,7 @@ func main() { ctx := context.Background() nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), - postgres.WithConnectionString("postgres://postgres:postgres@127.0.0.1:5432/neoq"), + postgres.WithConnectionString("postgres://postgres:postgres@127.0.0.1:5432/neoq?sslmode=disable"), postgres.WithTransactionTimeout(1000), // nolint: mnd, gomnd ) if err != nil { diff --git a/examples/add_postgres_job/main.go b/examples/add_postgres_job/main.go index 5aa3af0..e0078e9 100644 --- a/examples/add_postgres_job/main.go +++ b/examples/add_postgres_job/main.go @@ -12,11 +12,11 @@ import ( ) func main() { - var done = make(chan bool, 1) + done := make(chan bool, 1) const queue = "foobar" ctx := context.Background() nq, err := neoq.New(ctx, - postgres.WithConnectionString("postgres://postgres:postgres@127.0.0.1:5432/neoq"), + postgres.WithConnectionString("postgres://postgres:postgres@127.0.0.1:5432/neoq?sslmode=disable"), neoq.WithBackend(postgres.Backend), ) if err != nil { diff --git a/neoq.go b/neoq.go index 3f910e2..2a30d80 100644 --- a/neoq.go +++ b/neoq.go @@ -2,29 +2,98 @@ package neoq import ( "context" + "errors" "time" - "github.com/acaloiaro/neoq/backends/memory" - "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/jobs" "github.com/acaloiaro/neoq/logging" - "github.com/acaloiaro/neoq/types" ) +const ( + DefaultIdleTxTimeout = 30000 + // the window of time between time.Now() and when a job's RunAfter comes due that neoq will schedule a goroutine to + // schdule the job for execution. + // E.g. right now is 16:00 and a job's RunAfter is 16:30 of the same date. This job will get a dedicated goroutine + // to wait until the job's RunAfter, scheduling the job to be run exactly at RunAfter + DefaultFutureJobWindow = 30 * time.Second + DefaultJobCheckInterval = 1 * time.Second +) + +var ErrBackendNotSpecified = errors.New("a backend must be specified") + +// Config configures neoq and its backends +// +// This configuration struct includes options for all backends. As such, some of its options are not applicable to all +// backends. [BackendConcurrency], for example, is only used by the redis backend. Other backends manage concurrency on a +// per-handler basis. +type Config struct { + BackendInitializer BackendInitializer + BackendAuthPassword string // password with which to authenticate to the backend's data provider + BackendConcurrency int // total number of backend processes available to process jobs + ConnectionString string // a string containing connection details for the backend + JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs + FutureJobWindow time.Duration // time duration between current time and job.RunAfter that goroutines schedule for future jobs + IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed + ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown + LogLevel logging.LogLevel // the log level of the default logger +} + +// ConfigOption is a function that sets optional backend configuration +type ConfigOption func(c *Config) + +// NewConfig initiailizes a new Config with defaults +func NewConfig() *Config { + return &Config{ + FutureJobWindow: DefaultFutureJobWindow, + JobCheckInterval: DefaultJobCheckInterval, + } +} + +// BackendInitializer is a function that initializes a backend +type BackendInitializer func(ctx context.Context, opts ...ConfigOption) (backend Neoq, err error) + +// Neoq interface is Neoq's primary API +// +// Neoq is implemented by: +// - [pkg/github.com/acaloiaro/neoq/backends/memory.MemBackend] +// - [pkg/github.com/acaloiaro/neoq/backends/postgres.PgBackend] +// - [pkg/github.com/acaloiaro/neoq/backends/redis.RedisBackend] +type Neoq interface { + // Enqueue queues jobs to be executed asynchronously + Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) + + // Start starts processing jobs with the specified queue and handler + Start(ctx context.Context, queue string, h handler.Handler) (err error) + + // StartCron starts processing jobs with the specified cron schedule and handler + // + // See: https://pkg.go.dev/github.com/robfig/cron?#hdr-CRON_Expression_Format for details on the cron spec format + StartCron(ctx context.Context, cron string, h handler.Handler) (err error) + + // SetLogger sets the backend logger + SetLogger(logger logging.Logger) + + // Shutdown halts job processing and releases resources + Shutdown(ctx context.Context) +} + // New creates a new backend instance for job processing. // // By default, neoq initializes [memory.Backend] if New() is called without a backend configuration option. // // Use [neoq.WithBackend] to initialize different backends. // -// For available configuration options see [config.ConfigOption]. -func New(ctx context.Context, opts ...config.Option) (b types.Backend, err error) { - c := config.Config{} +// For available configuration options see [neoq.ConfigOption]. +func New(ctx context.Context, opts ...ConfigOption) (b Neoq, err error) { + c := Config{} for _, opt := range opts { opt(&c) } if c.BackendInitializer == nil { - c.BackendInitializer = memory.Backend + err = ErrBackendNotSpecified + return } b, err = c.BackendInitializer(ctx, opts...) @@ -40,23 +109,23 @@ func New(ctx context.Context, opts ...config.Option) (b types.Backend, err error // Neoq provides two [config.BackendInitializer] that may be used with WithBackend // - [pkg/github.com/acaloiaro/neoq/backends/memory.Backend] // - [pkg/github.com/acaloiaro/neoq/backends/postgres.Backend] -func WithBackend(initializer config.BackendInitializer) config.Option { - return func(c *config.Config) { +func WithBackend(initializer BackendInitializer) ConfigOption { + return func(c *Config) { c.BackendInitializer = initializer } } // WithJobCheckInterval configures the duration of time between checking for future jobs -func WithJobCheckInterval(interval time.Duration) config.Option { - return func(c *config.Config) { +func WithJobCheckInterval(interval time.Duration) ConfigOption { + return func(c *Config) { c.JobCheckInterval = interval } } // WithLogLevel configures the log level for neoq's default logger. By default, log level is "INFO". // if SetLogger is used, WithLogLevel has no effect on the set logger -func WithLogLevel(level logging.LogLevel) config.Option { - return func(c *config.Config) { +func WithLogLevel(level logging.LogLevel) ConfigOption { + return func(c *Config) { c.LogLevel = level } } diff --git a/neoq_test.go b/neoq_test.go index 55d834b..53b3a54 100644 --- a/neoq_test.go +++ b/neoq_test.go @@ -1,4 +1,4 @@ -package neoq +package neoq_test import ( "context" @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/acaloiaro/neoq" "github.com/acaloiaro/neoq/backends/memory" "github.com/acaloiaro/neoq/backends/postgres" "github.com/acaloiaro/neoq/handler" @@ -23,7 +24,7 @@ var ( func ExampleNew() { ctx := context.Background() - nq, err := New(ctx, WithBackend(memory.Backend)) + nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) if err != nil { fmt.Println("initializing a new Neoq with no params should not return an error:", err) return @@ -43,7 +44,7 @@ func ExampleNew_postgres() { return } - nq, err := New(ctx, WithBackend(postgres.Backend), postgres.WithConnectionString(pgURL)) + nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(pgURL)) if err != nil { fmt.Println("neoq's postgres backend failed to initialize:", err) return @@ -56,7 +57,7 @@ func ExampleNew_postgres() { func ExampleWithBackend() { ctx := context.Background() - nq, err := New(ctx, WithBackend(memory.Backend)) + nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) if err != nil { fmt.Println("initializing a new Neoq with no params should not return an error:", err) return @@ -76,7 +77,7 @@ func ExampleWithBackend_postgres() { return } - nq, err := New(ctx, WithBackend(postgres.Backend), postgres.WithConnectionString(pgURL)) + nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(pgURL)) if err != nil { fmt.Println("initializing a new Neoq with no params should not return an error:", err) return @@ -95,7 +96,7 @@ func TestStart(t *testing.T) { done := make(chan bool, numJobs) ctx := context.TODO() - nq, err := New(ctx, WithBackend(memory.Backend)) + nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) if err != nil { t.Fatal(err) } @@ -157,7 +158,7 @@ func TestStart(t *testing.T) { func TestStartCron(t *testing.T) { const cron = "* * * * * *" ctx := context.TODO() - nq, err := New(ctx, WithBackend(memory.Backend)) + nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) if err != nil { t.Fatal(err) } @@ -199,7 +200,7 @@ func TestSetLogger(t *testing.T) { logsChan := make(chan string, 10) ctx := context.Background() - nq, err := New(ctx, WithBackend(memory.Backend)) + nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) if err != nil { t.Fatal(err) } diff --git a/types/types.go b/types/types.go deleted file mode 100644 index ff6c4d4..0000000 --- a/types/types.go +++ /dev/null @@ -1,34 +0,0 @@ -package types - -import ( - "context" - - "github.com/acaloiaro/neoq/handler" - "github.com/acaloiaro/neoq/jobs" - "github.com/acaloiaro/neoq/logging" -) - -// Backend interface is Neoq's primary API -// -// Backend is implemented by: -// - [pkg/github.com/acaloiaro/neoq/backends/memory.MemBackend] -// - [pkg/github.com/acaloiaro/neoq/backends/postgres.PgBackend] -// - [pkg/github.com/acaloiaro/neoq/backends/redis.RedisBackend] -type Backend interface { - // Enqueue queues jobs to be executed asynchronously - Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) - - // Start starts processing jobs with the specified queue and handler - Start(ctx context.Context, queue string, h handler.Handler) (err error) - - // StartCron starts processing jobs with the specified cron schedule and handler - // - // See: https://pkg.go.dev/github.com/robfig/cron?#hdr-CRON_Expression_Format for details on the cron spec format - StartCron(ctx context.Context, cron string, h handler.Handler) (err error) - - // SetLogger sets the backend logger - SetLogger(logger logging.Logger) - - // Shutdown halts job processing and releases resources - Shutdown(ctx context.Context) -}