Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major API refactor #63

Merged
merged 1 commit into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ changelog:
- '^docs:'
- '^test:'
groups:
- title: 'BREAKING CHANGES'
regexp: '^.*?break(\([[:word:]]+\))??!?:.+$'
- title: Features
regexp: '^.*?feat(\([[:word:]]+\))??!?:.+$'
order: 0
order: 1
- title: Maintenance
regexp: '^.*?maint(\([[:word:]]+\))??!?:.+$'
order: 1
order: 2
- title: "Bug fixes"
regexp: '^.*?fix(\([[:word:]]+\))??!?:.+$'
- title: Others
Expand Down
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
SHELL := /bin/bash
GOLANGCI_LINT_VERSION ?= v1.51.1

all: fmt vet mod
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Queue Handlers are simple Go functions that accept a `Context` parameter.

```go
ctx := context.Background()
nq, _ := neoq.New(ctx)
nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend))
nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
j, _ := jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
Expand All @@ -61,7 +61,7 @@ Enqueuing adds jobs to the specified queue to be processed asynchronously.

```go
ctx := context.Background()
nq, _ := neoq.New(ctx)
nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend))
nq.Enqueue(ctx, &jobs.Job{
Queue: "hello_world",
Payload: map[string]interface{}{
Expand Down
2 changes: 1 addition & 1 deletion backends/doc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package backends provides concrete implementations of [pkg/github.com/acaloiaro/neoq/types.Backend]
// Package backends provides concrete implementations of [pkg/github.com/acaloiaro/neoq/neoq.Neoq]
//
// These backends provide the bulk of Neoq's functionality.
package backends
21 changes: 10 additions & 11 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,27 @@ import (
"sync"
"time"

"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/internal"
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/types"
"github.com/guregu/null"
"github.com/iancoleman/strcase" // TODO factor out
"github.com/jsuar/go-cron-descriptor/pkg/crondescriptor" // TODO factor out
"github.com/iancoleman/strcase"
"github.com/jsuar/go-cron-descriptor/pkg/crondescriptor"
"github.com/robfig/cron"
"golang.org/x/exp/slog"
)

const (
defaultMemQueueCapacity = 10000 // the default capacity of individual queues
emptyCapacity = 0
defaultMemQueueCapacity = 10000 // default capacity of individual queues
emptyCapacity = 0 // queue size at which queues are considered empty
)

// MemBackend is a memory-backed neoq backend
type MemBackend struct {
types.Backend
config *config.Config
neoq.Neoq
config *neoq.Config
logger logging.Logger
handlers *sync.Map // map queue names [string] to queue handlers [Handler]
fingerprints *sync.Map // map fingerprints [string] to job [Job]
Expand All @@ -42,10 +41,10 @@ type MemBackend struct {
initialized bool
}

// Backend is a [config.BackendInitializer] that initializes a new memory-backed neoq backend
func Backend(ctx context.Context, opts ...config.Option) (backend types.Backend, err error) {
// Backend is a [neoq.BackendInitializer] that initializes a new memory-backed neoq backend
func Backend(_ context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, err error) {
mb := &MemBackend{
config: config.New(),
config: neoq.NewConfig(),
cron: cron.New(),
mu: &sync.Mutex{},
queues: &sync.Map{},
Expand Down
10 changes: 5 additions & 5 deletions backends/memory/memory_backend_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ import (
"context"
"sync"

"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/types"
"github.com/robfig/cron"
)

// TestingBackend initializes a backend for testing purposes
func TestingBackend(conf *config.Config,
func TestingBackend(conf *neoq.Config,
c *cron.Cron,
queues, h, futureJobs, fingerprints *sync.Map,
logger logging.Logger) config.BackendInitializer {
return func(ctx context.Context, opts ...config.Option) (backend types.Backend, err error) {
logger logging.Logger,
) neoq.BackendInitializer {
return func(ctx context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, err error) {
mb := &MemBackend{
config: conf,
cron: c,
Expand Down
18 changes: 15 additions & 3 deletions backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/backends/memory"
"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
Expand All @@ -26,6 +25,19 @@ const (

var errPeriodicTimeout = errors.New("timed out waiting for periodic job")

func ExampleNew() {
ctx := context.Background()
nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend))
if err != nil {
fmt.Println("initializing a new Neoq with no params should not return an error:", err)
return
}
defer nq.Shutdown(ctx)

fmt.Println("neoq initialized with default memory backend")
// Output: neoq initialized with default memory backend
}

// TestBasicJobProcessing tests that the memory backend is able to process the most basic jobs with the
// most basic configuration.
func TestBasicJobProcessing(t *testing.T) {
Expand Down Expand Up @@ -158,7 +170,7 @@ var testFutureJobs = &sync.Map{}
func TestFutureJobScheduling(t *testing.T) {
ctx := context.Background()
testBackend := memory.TestingBackend(
config.New(),
neoq.NewConfig(),
cron.New(),
&sync.Map{},
&sync.Map{},
Expand Down Expand Up @@ -203,7 +215,7 @@ func TestFutureJobScheduling(t *testing.T) {
// nolint: gocognit, gocyclo
func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
ctx := context.Background()
nq, err := neoq.New(ctx)
nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend))
if err != nil {
t.Fatal(err)
}
Expand Down
24 changes: 13 additions & 11 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import (
"sync"
"time"

"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/internal"
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/types"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres" // nolint: revive
"github.com/golang-migrate/migrate/v4/source/iofs"
Expand Down Expand Up @@ -69,8 +68,8 @@ var (

// PgBackend is a Postgres-based Neoq backend
type PgBackend struct {
types.Backend
config *config.Config
neoq.Neoq
config *neoq.Config
logger logging.Logger
cron *cron.Cron
mu *sync.RWMutex // mutex to protect mutating state on a pgWorker
Expand All @@ -84,7 +83,7 @@ type PgBackend struct {
//
// If the database does not yet exist, Neoq will attempt to create the database and related tables by default.
//
// Backend requires that one of the [config.ConfigOption] is [WithConnectionString]
// Backend requires that one of the [neoq.ConfigOption] is [WithConnectionString]
//
// Connection strings may be a URL or DSN-style connection strings. The connection string supports multiple
// options detailed below.
Expand All @@ -104,10 +103,13 @@ type PgBackend struct {
// # Example URL
//
// postgres://worker:[email protected]:5432/mydb?sslmode=verify-ca&pool_max_conns=10
func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err error) {
func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err error) {
cfg := neoq.NewConfig()
cfg.IdleTransactionTimeout = neoq.DefaultIdleTxTimeout

p := &PgBackend{
mu: &sync.RWMutex{},
config: config.New(),
config: cfg,
handlers: make(map[string]handler.Handler),
futureJobs: make(map[string]time.Time),
cron: cron.New(),
Expand Down Expand Up @@ -164,8 +166,8 @@ func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err
}

// WithConnectionString configures neoq postgres backend to use the specified connection string when connecting to a backend
func WithConnectionString(connectionString string) config.Option {
return func(c *config.Config) {
func WithConnectionString(connectionString string) neoq.ConfigOption {
return func(c *neoq.Config) {
c.ConnectionString = connectionString
}
}
Expand All @@ -175,8 +177,8 @@ func WithConnectionString(connectionString string) config.Option {
// The timeout is the number of milliseconds that a transaction may sit idle before postgres terminates the
// transaction's underlying connection. The timeout should be longer than your longest job takes to complete. If set
// too short, job state will become unpredictable, e.g. retry counts may become incorrect.
func WithTransactionTimeout(txTimeout int) config.Option {
return func(c *config.Config) {
func WithTransactionTimeout(txTimeout int) neoq.ConfigOption {
return func(c *neoq.Config) {
c.IdleTransactionTimeout = txTimeout
}
}
Expand Down
29 changes: 14 additions & 15 deletions backends/redis/redis_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ import (
"sync"
"time"

"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/internal"
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/types"
"github.com/hibiken/asynq"
"github.com/iancoleman/strcase"
"github.com/jsuar/go-cron-descriptor/pkg/crondescriptor"
Expand All @@ -34,12 +33,12 @@ var ErrInvalidAddr = errors.New("invalid connecton string: see documentation for
// RedisBackend is a Redis-backed neoq backend
// nolint: revive
type RedisBackend struct {
types.Backend
neoq.Neoq
client *asynq.Client
server *asynq.Server
inspector *asynq.Inspector
mux *asynq.ServeMux
config *config.Config
config *neoq.Config
logger logging.Logger
mu *sync.Mutex // mutext to protect mutating backend state
taskProvider *memoryTaskConfigProvider
Expand Down Expand Up @@ -75,10 +74,10 @@ func (m *memoryTaskConfigProvider) addConfig(taskConfig *asynq.PeriodicTaskConfi
m.mu.Unlock()
}

// Backend is a [config.BackendInitializer] that initializes a new Redis-backed neoq backend
func Backend(_ context.Context, opts ...config.Option) (backend types.Backend, err error) {
// Backend is a [neoq.BackendInitializer] that initializes a new Redis-backed neoq backend
func Backend(_ context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, err error) {
b := &RedisBackend{
config: config.New(),
config: neoq.NewConfig(),
mu: &sync.Mutex{},
taskProvider: newMemoryTaskConfigProvider(),
}
Expand Down Expand Up @@ -149,22 +148,22 @@ func Backend(_ context.Context, opts ...config.Option) (backend types.Backend, e
}

// WithAddr configures neoq to connect to Redis with the given address
func WithAddr(addr string) config.Option {
return func(c *config.Config) {
func WithAddr(addr string) neoq.ConfigOption {
return func(c *neoq.Config) {
c.ConnectionString = addr
}
}

// WithPassword configures neoq to connect to Redis with the given password
func WithPassword(password string) config.Option {
return func(c *config.Config) {
func WithPassword(password string) neoq.ConfigOption {
return func(c *neoq.Config) {
c.BackendAuthPassword = password
}
}

// WithConcurrency configures the number of workers available to process jobs across all queues
func WithConcurrency(concurrency int) config.Option {
return func(c *config.Config) {
func WithConcurrency(concurrency int) neoq.ConfigOption {
return func(c *neoq.Config) {
c.BackendConcurrency = concurrency
}
}
Expand All @@ -173,8 +172,8 @@ func WithConcurrency(concurrency int) config.Option {
// before forcing them to abort durning Shutdown()
//
// If unset or zero, default timeout of 8 seconds is used.
func WithShutdownTimeout(timeout time.Duration) config.Option {
return func(c *config.Config) {
func WithShutdownTimeout(timeout time.Duration) neoq.ConfigOption {
return func(c *neoq.Config) {
c.ShutdownTimeout = timeout
}
}
Expand Down
51 changes: 0 additions & 51 deletions config/config.go

This file was deleted.

2 changes: 1 addition & 1 deletion examples/add_future_postgres_job/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func main() {
ctx := context.Background()
nq, err := neoq.New(ctx,
neoq.WithBackend(postgres.Backend),
postgres.WithConnectionString("postgres://postgres:[email protected]:5432/neoq"),
postgres.WithConnectionString("postgres://postgres:[email protected]:5432/neoq?sslmode=disable"),
postgres.WithTransactionTimeout(1000), // nolint: mnd, gomnd
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions examples/add_postgres_job/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
)

func main() {
var done = make(chan bool, 1)
done := make(chan bool, 1)
const queue = "foobar"
ctx := context.Background()
nq, err := neoq.New(ctx,
postgres.WithConnectionString("postgres://postgres:[email protected]:5432/neoq"),
postgres.WithConnectionString("postgres://postgres:[email protected]:5432/neoq?sslmode=disable"),
neoq.WithBackend(postgres.Backend),
)
if err != nil {
Expand Down
Loading
Loading