Skip to content

Commit

Permalink
fix: Incorrectly passing pgx connections params to pq
Browse files Browse the repository at this point in the history
Packages `pq` and `pgx` allow different connection string parameters.
Since `pgx` is neoq's primary SQL library, but migrations use `pq`,
we had to ensure that pgx connection parameters do not get passed to
the pq library when running migrations.

This issue was introduced in version `0.20.0`.
  • Loading branch information
acaloiaro committed Aug 29, 2023
1 parent 7259486 commit 342aad6
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 5 deletions.
8 changes: 8 additions & 0 deletions .goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ changelog:
exclude:
- '^docs:'
- '^test:'
groups:
- title: Features
regexp: '^.*?feat(\([[:word:]]+\))??!?:.+$'
order: 0
- title: "Bug fixes"
regexp: '^.*?fix(\([[:word:]]+\))??!?:.+$'
- title: Others
order: 999

# The lines beneath this are called `modelines`. See `:help modeline`
# Feel free to remove those if you don't want/use them.
Expand Down
33 changes: 28 additions & 5 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err
p.cancelFuncs = append(p.cancelFuncs, cancel)
p.mu.Unlock()

err = p.initializeDB(p.config.ConnectionString)
err = p.initializeDB()
if err != nil {
return
}
Expand Down Expand Up @@ -197,14 +197,37 @@ func txFromContext(ctx context.Context) (t pgx.Tx, err error) {
// initializeDB initializes the tables, types, and indices necessary to operate Neoq
//
//nolint:funlen,gocyclo,cyclop
func (p *PgBackend) initializeDB(connStr string) (err error) {
func (p *PgBackend) initializeDB() (err error) {
d, err := iofs.New(migrationsFS, "migrations")
if err != nil {
p.logger.Error("unable to run migrations", "error", err)
return
}

m, err := migrate.NewWithSourceInstance("iofs", d, connStr)
// `pgx` supports config params that `pq` does not. Since pgx is neoq's primary SQL interface, user often configure
// it with pgx-specific config params like `max_conn_count`. However, `go-migrate` uses `pq` under the hood, and
// these `pgx` config params cause `pq` to throw an "unknown config parameter" error when they're encountered.
// So we must first sanitize connection strings for pq
var pgxCfg *pgx.ConnConfig
pgxCfg, err = pgx.ParseConfig(p.config.ConnectionString)
if err != nil {
p.logger.Error("unable to run migrations", "error", err)
return
}

sslMode := "verify-ca"
// nil TLSConfig means "sslmode=disable" was set on the connection
if pgxCfg.TLSConfig == nil {
sslMode = "disable"
}

pqConnectionString := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=%s",
pgxCfg.User,
pgxCfg.Password,
pgxCfg.Host,
pgxCfg.Database,
sslMode)
m, err := migrate.NewWithSourceInstance("iofs", d, pqConnectionString)
if err != nil {
p.logger.Error("unable to run migrations", "error", err)
return
Expand Down Expand Up @@ -282,9 +305,9 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
}
} else {
p.mu.Lock()
p.logger.Debug("added job to future jobs list", "job_id", jobID, "run_after", job.RunAfter)
p.futureJobs[jobID] = job.RunAfter
p.mu.Unlock()
p.logger.Debug("added job to future jobs list", "job_id", jobID, "run_after", job.RunAfter)
}

err = tx.Commit(ctx)
Expand Down Expand Up @@ -731,7 +754,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl
// This will lead to jobs not getting processed until the worker is restarted.
// Implement disconnect handling.
func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, ready chan bool) {
c = make(chan string)
c = make(chan string, p.handlers[queue].Concurrency)
ready = make(chan bool)

go func(ctx context.Context) {
Expand Down

0 comments on commit 342aad6

Please sign in to comment.