Skip to content

Commit

Permalink
break: Rename types.Backend -> neoq.Neoq
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro committed Sep 7, 2023
1 parent b7582da commit 5956e4a
Show file tree
Hide file tree
Showing 14 changed files with 153 additions and 158 deletions.
6 changes: 4 additions & 2 deletions .goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ changelog:
- '^docs:'
- '^test:'
groups:
- title: 'BREAKING CHANGES'
regexp: '^.*?break(\([[:word:]]+\))??!?:.+$'
- title: Features
regexp: '^.*?feat(\([[:word:]]+\))??!?:.+$'
order: 0
order: 1
- title: Maintenance
regexp: '^.*?maint(\([[:word:]]+\))??!?:.+$'
order: 1
order: 2
- title: "Bug fixes"
regexp: '^.*?fix(\([[:word:]]+\))??!?:.+$'
- title: Others
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Queue Handlers are simple Go functions that accept a `Context` parameter.

```go
ctx := context.Background()
nq, _ := neoq.New(ctx)
nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend))
nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
j, _ := jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
Expand All @@ -61,7 +61,7 @@ Enqueuing adds jobs to the specified queue to be processed asynchronously.

```go
ctx := context.Background()
nq, _ := neoq.New(ctx)
nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend))
nq.Enqueue(ctx, &jobs.Job{
Queue: "hello_world",
Payload: map[string]interface{}{
Expand Down
2 changes: 1 addition & 1 deletion backends/doc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package backends provides concrete implementations of [pkg/github.com/acaloiaro/neoq/types.Backend]
// Package backends provides concrete implementations of [pkg/github.com/acaloiaro/neoq/neoq.Neoq]
//
// These backends provide the bulk of Neoq's functionality.
package backends
19 changes: 9 additions & 10 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,27 @@ import (
"sync"
"time"

"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/internal"
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/types"
"github.com/guregu/null"
"github.com/iancoleman/strcase" // TODO factor out
"github.com/jsuar/go-cron-descriptor/pkg/crondescriptor" // TODO factor out
"github.com/iancoleman/strcase"
"github.com/jsuar/go-cron-descriptor/pkg/crondescriptor"
"github.com/robfig/cron"
"golang.org/x/exp/slog"
)

const (
defaultMemQueueCapacity = 10000 // the default capacity of individual queues
emptyCapacity = 0
defaultMemQueueCapacity = 10000 // default capacity of individual queues
emptyCapacity = 0 // queue size at which queues are considered empty
)

// MemBackend is a memory-backed neoq backend
type MemBackend struct {
types.Backend
config *config.Config
neoq.Neoq
config *neoq.Config
logger logging.Logger
handlers *sync.Map // map queue names [string] to queue handlers [Handler]
fingerprints *sync.Map // map fingerprints [string] to job [Job]
Expand All @@ -43,9 +42,9 @@ type MemBackend struct {
}

// Backend is a [config.BackendInitializer] that initializes a new memory-backed neoq backend
func Backend(ctx context.Context, opts ...config.Option) (backend types.Backend, err error) {
func Backend(_ context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, err error) {
mb := &MemBackend{
config: config.New(),
config: neoq.NewConfig(),
cron: cron.New(),
mu: &sync.Mutex{},
queues: &sync.Map{},
Expand Down
10 changes: 5 additions & 5 deletions backends/memory/memory_backend_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ import (
"context"
"sync"

"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/types"
"github.com/robfig/cron"
)

// TestingBackend initializes a backend for testing purposes
func TestingBackend(conf *config.Config,
func TestingBackend(conf *neoq.Config,
c *cron.Cron,
queues, h, futureJobs, fingerprints *sync.Map,
logger logging.Logger) config.BackendInitializer {
return func(ctx context.Context, opts ...config.Option) (backend types.Backend, err error) {
logger logging.Logger,
) neoq.BackendInitializer {
return func(ctx context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, err error) {
mb := &MemBackend{
config: conf,
cron: c,
Expand Down
18 changes: 15 additions & 3 deletions backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/backends/memory"
"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
Expand All @@ -26,6 +25,19 @@ const (

var errPeriodicTimeout = errors.New("timed out waiting for periodic job")

func ExampleNew() {
ctx := context.Background()
nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend))
if err != nil {
fmt.Println("initializing a new Neoq with no params should not return an error:", err)
return
}
defer nq.Shutdown(ctx)

fmt.Println("neoq initialized with default memory backend")
// Output: neoq initialized with default memory backend
}

// TestBasicJobProcessing tests that the memory backend is able to process the most basic jobs with the
// most basic configuration.
func TestBasicJobProcessing(t *testing.T) {
Expand Down Expand Up @@ -158,7 +170,7 @@ var testFutureJobs = &sync.Map{}
func TestFutureJobScheduling(t *testing.T) {
ctx := context.Background()
testBackend := memory.TestingBackend(
config.New(),
neoq.NewConfig(),
cron.New(),
&sync.Map{},
&sync.Map{},
Expand Down Expand Up @@ -203,7 +215,7 @@ func TestFutureJobScheduling(t *testing.T) {
// nolint: gocognit, gocyclo
func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
ctx := context.Background()
nq, err := neoq.New(ctx)
nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend))
if err != nil {
t.Fatal(err)
}
Expand Down
21 changes: 10 additions & 11 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import (
"sync"
"time"

"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/internal"
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/types"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres" // nolint: revive
"github.com/golang-migrate/migrate/v4/source/iofs"
Expand Down Expand Up @@ -69,8 +68,8 @@ var (

// PgBackend is a Postgres-based Neoq backend
type PgBackend struct {
types.Backend
config *config.Config
neoq.Neoq
config *neoq.Config
logger logging.Logger
cron *cron.Cron
mu *sync.RWMutex // mutex to protect mutating state on a pgWorker
Expand All @@ -84,7 +83,7 @@ type PgBackend struct {
//
// If the database does not yet exist, Neoq will attempt to create the database and related tables by default.
//
// Backend requires that one of the [config.ConfigOption] is [WithConnectionString]
// Backend requires that one of the [neoq.ConfigOption] is [WithConnectionString]
//
// Connection strings may be a URL or DSN-style connection strings. The connection string supports multiple
// options detailed below.
Expand All @@ -104,10 +103,10 @@ type PgBackend struct {
// # Example URL
//
// postgres://worker:[email protected]:5432/mydb?sslmode=verify-ca&pool_max_conns=10
func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err error) {
func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err error) {
p := &PgBackend{
mu: &sync.RWMutex{},
config: config.New(),
config: neoq.NewConfig(),
handlers: make(map[string]handler.Handler),
futureJobs: make(map[string]time.Time),
cron: cron.New(),
Expand Down Expand Up @@ -164,8 +163,8 @@ func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err
}

// WithConnectionString configures neoq postgres backend to use the specified connection string when connecting to a backend
func WithConnectionString(connectionString string) config.Option {
return func(c *config.Config) {
func WithConnectionString(connectionString string) neoq.ConfigOption {
return func(c *neoq.Config) {
c.ConnectionString = connectionString
}
}
Expand All @@ -175,8 +174,8 @@ func WithConnectionString(connectionString string) config.Option {
// The timeout is the number of milliseconds that a transaction may sit idle before postgres terminates the
// transaction's underlying connection. The timeout should be longer than your longest job takes to complete. If set
// too short, job state will become unpredictable, e.g. retry counts may become incorrect.
func WithTransactionTimeout(txTimeout int) config.Option {
return func(c *config.Config) {
func WithTransactionTimeout(txTimeout int) neoq.ConfigOption {
return func(c *neoq.Config) {
c.IdleTransactionTimeout = txTimeout
}
}
Expand Down
29 changes: 14 additions & 15 deletions backends/redis/redis_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ import (
"sync"
"time"

"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/internal"
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/types"
"github.com/hibiken/asynq"
"github.com/iancoleman/strcase"
"github.com/jsuar/go-cron-descriptor/pkg/crondescriptor"
Expand All @@ -34,12 +33,12 @@ var ErrInvalidAddr = errors.New("invalid connecton string: see documentation for
// RedisBackend is a Redis-backed neoq backend
// nolint: revive
type RedisBackend struct {
types.Backend
neoq.Neoq
client *asynq.Client
server *asynq.Server
inspector *asynq.Inspector
mux *asynq.ServeMux
config *config.Config
config *neoq.Config
logger logging.Logger
mu *sync.Mutex // mutext to protect mutating backend state
taskProvider *memoryTaskConfigProvider
Expand Down Expand Up @@ -75,10 +74,10 @@ func (m *memoryTaskConfigProvider) addConfig(taskConfig *asynq.PeriodicTaskConfi
m.mu.Unlock()
}

// Backend is a [config.BackendInitializer] that initializes a new Redis-backed neoq backend
func Backend(_ context.Context, opts ...config.Option) (backend types.Backend, err error) {
// Backend is a [neoq.BackendInitializer] that initializes a new Redis-backed neoq backend
func Backend(_ context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, err error) {
b := &RedisBackend{
config: config.New(),
config: neoq.NewConfig(),
mu: &sync.Mutex{},
taskProvider: newMemoryTaskConfigProvider(),
}
Expand Down Expand Up @@ -149,22 +148,22 @@ func Backend(_ context.Context, opts ...config.Option) (backend types.Backend, e
}

// WithAddr configures neoq to connect to Redis with the given address
func WithAddr(addr string) config.Option {
return func(c *config.Config) {
func WithAddr(addr string) neoq.ConfigOption {
return func(c *neoq.Config) {
c.ConnectionString = addr
}
}

// WithPassword configures neoq to connect to Redis with the given password
func WithPassword(password string) config.Option {
return func(c *config.Config) {
func WithPassword(password string) neoq.ConfigOption {
return func(c *neoq.Config) {
c.BackendAuthPassword = password
}
}

// WithConcurrency configures the number of workers available to process jobs across all queues
func WithConcurrency(concurrency int) config.Option {
return func(c *config.Config) {
func WithConcurrency(concurrency int) neoq.ConfigOption {
return func(c *neoq.Config) {
c.BackendConcurrency = concurrency
}
}
Expand All @@ -173,8 +172,8 @@ func WithConcurrency(concurrency int) config.Option {
// before forcing them to abort durning Shutdown()
//
// If unset or zero, default timeout of 8 seconds is used.
func WithShutdownTimeout(timeout time.Duration) config.Option {
return func(c *config.Config) {
func WithShutdownTimeout(timeout time.Duration) neoq.ConfigOption {
return func(c *neoq.Config) {
c.ShutdownTimeout = timeout
}
}
Expand Down
51 changes: 0 additions & 51 deletions config/config.go

This file was deleted.

2 changes: 1 addition & 1 deletion examples/add_future_postgres_job/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func main() {
ctx := context.Background()
nq, err := neoq.New(ctx,
neoq.WithBackend(postgres.Backend),
postgres.WithConnectionString("postgres://postgres:[email protected]:5432/neoq"),
postgres.WithConnectionString("postgres://postgres:[email protected]:5432/neoq?sslmode=disable"),
postgres.WithTransactionTimeout(1000), // nolint: mnd, gomnd
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions examples/add_postgres_job/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
)

func main() {
var done = make(chan bool, 1)
done := make(chan bool, 1)
const queue = "foobar"
ctx := context.Background()
nq, err := neoq.New(ctx,
postgres.WithConnectionString("postgres://postgres:[email protected]:5432/neoq"),
postgres.WithConnectionString("postgres://postgres:[email protected]:5432/neoq?sslmode=disable"),
neoq.WithBackend(postgres.Backend),
)
if err != nil {
Expand Down
Loading

0 comments on commit 5956e4a

Please sign in to comment.