Skip to content

Commit

Permalink
Support postgres backend schema migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro committed Aug 27, 2023
1 parent 631c508 commit 1e3fbd6
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 148 deletions.
19 changes: 19 additions & 0 deletions backends/postgres/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Migrations

Migrations are implemented with https://github.com/golang-migrate/migrate

## Install the CLI

To add new migrations, install the CLI. The version of the CLI is not particularly important.

`go install -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/[email protected]`

## Adding migrations

Adding new migrations is done with the `migrate create` command

`migrate create -dir backends/postgres/migrations -ext sql <descriptive_migration_name>`

## Running migrations

Migrations are run every time `neoq` initializes the `postgres` backend. There is no need to run migrations explicitly.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
DROP TABLE IF EXISTS neoq_dead_jobs;
DROP TABLE IF EXISTS neoq_jobs;

DROP TYPE IF EXISTS job_status;

DROP INDEX IF EXISTS neoq_job_fetcher_idx;
DROP INDEX IF EXISTS neoq_jobs_fetcher_idx;
DROP INDEX IF EXISTS neoq_jobs_fingerprint_idx;

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
DO $$ BEGIN
CREATE TYPE job_status AS ENUM (
'new',
'processed',
'failed'
);
EXCEPTION
WHEN duplicate_object THEN null;
END $$;

CREATE TABLE IF NOT EXISTS neoq_dead_jobs (
id SERIAL NOT NULL,
fingerprint text NOT NULL,
queue text NOT NULL,
status job_status NOT NULL default 'failed',
payload jsonb,
retries integer,
max_retries integer,
created_at timestamp with time zone DEFAULT now(),
error text
);

CREATE TABLE IF NOT EXISTS neoq_jobs (
id SERIAL NOT NULL,
fingerprint text NOT NULL,
queue text NOT NULL,
status job_status NOT NULL default 'new',
payload jsonb,
retries integer default 0,
max_retries integer default 23,
run_after timestamp with time zone DEFAULT now(),
ran_at timestamp with time zone,
created_at timestamp with time zone DEFAULT now(),
error text
);

CREATE INDEX IF NOT EXISTS neoq_job_fetcher_idx ON neoq_jobs (id, status, run_after);
CREATE INDEX IF NOT EXISTS neoq_jobs_fetcher_idx ON neoq_jobs (queue, status, run_after);
CREATE INDEX IF NOT EXISTS neoq_jobs_fingerprint_idx ON neoq_jobs (fingerprint, status);

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE neoq_jobs DROP COLUMN IF EXISTS deadline;
ALTER TABLE neoq_dead_jobs DROP COLUMN IF EXISTS deadline;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE neoq_jobs ADD COLUMN IF NOT EXISTS deadline timestamp with time zone;
ALTER TABLE neoq_dead_jobs ADD COLUMN IF NOT EXISTS deadline timestamp with time zone;
132 changes: 16 additions & 116 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package postgres

import (
"context"
"embed"
"errors"
"fmt"
"os"
Expand All @@ -14,6 +15,9 @@ import (
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/types"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres" // nolint: revive
"github.com/golang-migrate/migrate/v4/source/iofs"
"github.com/iancoleman/strcase"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
Expand All @@ -22,6 +26,9 @@ import (
"golang.org/x/exp/slog"
)

//go:embed migrations/*.sql
var migrationsFS embed.FS

const (
postgresBackendName = "postgres"
DefaultPgConnectionString = "postgres://postgres:[email protected]:5432/neoq"
Expand Down Expand Up @@ -119,7 +126,7 @@ func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err
p.cancelFuncs = append(p.cancelFuncs, cancel)
p.mu.Unlock()

err = p.initializeDB(ctx)
err = p.initializeDB(p.config.ConnectionString)
if err != nil {
return
}
Expand Down Expand Up @@ -190,132 +197,25 @@ func txFromContext(ctx context.Context) (t pgx.Tx, err error) {
// initializeDB initializes the tables, types, and indices necessary to operate Neoq
//
//nolint:funlen,gocyclo,cyclop
func (p *PgBackend) initializeDB(ctx context.Context) (err error) {
var pgxCfg *pgx.ConnConfig
var tx pgx.Tx
pgxCfg, err = pgx.ParseConfig(p.config.ConnectionString)
func (p *PgBackend) initializeDB(connStr string) (err error) {
d, err := iofs.New(migrationsFS, "migrations")
if err != nil {
p.logger.Error("unable to run migrations", "error", err)
return
}

connectStr := fmt.Sprintf("postgres://%s:%s@%s/%s", pgxCfg.User, pgxCfg.Password, pgxCfg.Host, pgxCfg.Database)
conn, err := pgx.Connect(ctx, connectStr)
m, err := migrate.NewWithSourceInstance("iofs", d, connStr)
if err != nil {
p.logger.Error("unableto connect to database", err)
p.logger.Error("unable to run migrations", "error", err)
return
}
defer conn.Close(ctx)

var dbExists bool
dbExistsQ := fmt.Sprintf(`SELECT EXISTS (SELECT datname FROM pg_catalog.pg_database WHERE datname = '%s');`, pgxCfg.Database)
rows, err := conn.Query(ctx, dbExistsQ)
if err != nil {
return fmt.Errorf("unable to determne if jobs table exists: %w", err)
}
for rows.Next() {
err = rows.Scan(&dbExists)
if err != nil {
return fmt.Errorf("unable to determine if jobs table exists: %w", err)
}
}
defer rows.Close()

conn.Close(ctx)
conn, err = pgx.Connect(ctx, connectStr)
if err != nil {
return fmt.Errorf("unable to connect to database: %w", err)
}
defer conn.Close(ctx)

if !dbExists {
createDBQ := fmt.Sprintf("CREATE DATABASE %s", pgxCfg.Database)
if _, err = conn.Exec(ctx, createDBQ); err != nil {
return fmt.Errorf("unable to create neoq database: %w", err)
}
}

conn, err = pgx.Connect(ctx, connectStr)
if err != nil {
err = m.Up()
if err != nil && !errors.Is(err, migrate.ErrNoChange) {
p.logger.Error("unable to run migrations", "error", err)
return
}

tx, err = conn.Begin(ctx)
if err != nil {
return
}
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed

jobsTableExistsQ := `SELECT EXISTS (SELECT FROM
pg_tables
WHERE
schemaname = 'public' AND
tablename = 'neoq_jobs'
);`
rows, err = tx.Query(ctx, jobsTableExistsQ)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to determne if jobs table exists: %v", err)
return
}

var tablesInitialized bool
for rows.Next() {
err = rows.Scan(&tablesInitialized)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to determine if jobs table exists: %v", err)
return
}
}
defer rows.Close()

if !tablesInitialized {
createTablesQ := `
CREATE TYPE job_status AS ENUM (
'new',
'processed',
'failed'
);
CREATE TABLE neoq_dead_jobs (
id SERIAL NOT NULL,
fingerprint text NOT NULL,
queue text NOT NULL,
status job_status NOT NULL default 'failed',
payload jsonb,
retries integer,
max_retries integer,
created_at timestamp with time zone DEFAULT now(),
error text
);
CREATE TABLE neoq_jobs (
id SERIAL NOT NULL,
fingerprint text NOT NULL,
queue text NOT NULL,
status job_status NOT NULL default 'new',
payload jsonb,
retries integer default 0,
max_retries integer default 23,
run_after timestamp with time zone DEFAULT now(),
ran_at timestamp with time zone,
created_at timestamp with time zone DEFAULT now(),
error text
);
CREATE INDEX neoq_job_fetcher_idx ON neoq_jobs (id, status, run_after);
CREATE INDEX neoq_jobs_fetcher_idx ON neoq_jobs (queue, status, run_after);
CREATE INDEX neoq_jobs_fingerprint_idx ON neoq_jobs (fingerprint, status);
`

_, err = tx.Exec(ctx, createTablesQ)
if err != nil {
return fmt.Errorf("unable to create job status enum: %w", err)
}

if err = tx.Commit(ctx); err != nil {
return fmt.Errorf("error committing transaction: %w", err)
}
}

return nil
}

Expand Down
3 changes: 2 additions & 1 deletion env.sample
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
TEST_DATABASE_URL=<TEST_DATABASE_URL>
TEST_DATABASE_URL=<DATABASE_URL>
DATABASE_URL=<DATABASE_URL>
TEST_REDIS_URL=<TEST_REDIS_URL>
REDIS_PASSWORD=<REDIS_PASSWORD>
24 changes: 14 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,38 @@ module github.com/acaloiaro/neoq
go 1.20

require (
github.com/golang-migrate/migrate/v4 v4.16.2
github.com/guregu/null v4.0.0+incompatible
github.com/hibiken/asynq v0.24.0
github.com/iancoleman/strcase v0.2.0
github.com/jackc/pgx/v5 v5.3.0
github.com/jackc/pgx/v5 v5.3.1
github.com/jsuar/go-cron-descriptor v0.1.0
github.com/pkg/errors v0.8.1
github.com/pkg/errors v0.9.1
github.com/robfig/cron v1.2.0
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
)

require (
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redis/redis/v8 v8.11.2 // indirect
github.com/golang/protobuf v1.4.2 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.0 // indirect
github.com/lib/pq v1.10.2 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/spf13/cast v1.3.1 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/protobuf v1.25.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
Loading

0 comments on commit 1e3fbd6

Please sign in to comment.