Skip to content

Commit

Permalink
Merge pull request #1007 from swensone/master
Browse files Browse the repository at this point in the history
Add support for rqlite
  • Loading branch information
dhui authored Dec 20, 2023
2 parents 834fa39 + bfedabb commit f375aeb
Show file tree
Hide file tree
Showing 12 changed files with 695 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SOURCE ?= file go_bindata github github_ee bitbucket aws_s3 google_cloud_storage godoc_vfs gitlab
DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5
DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite
DATABASE_TEST ?= $(DATABASE) sqlite sqlite3 sqlcipher
VERSION ?= $(shell git describe --tags 2>/dev/null | cut -c 2-)
TEST_FLAGS ?=
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Database drivers run migrations. [Add a new database?](database/driver.go)
* [ClickHouse](database/clickhouse)
* [Firebird](database/firebird)
* [MS SQL Server](database/sqlserver)
* [RQLite](database/rqlite)

### Database URLs

Expand Down
18 changes: 18 additions & 0 deletions database/rqlite/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# rqlite

`rqlite://admin:[email protected]:4001/?level=strong&timeout=5`

The `rqlite` url scheme is used for both secure and insecure connections. If connecting to an insecure database, pass `x-connect-insecure` in your URL query, or use `WithInstance` to pass an established connection.

The migrations table name is configurable through the `x-migrations-table` URL query parameter, or by using `WithInstance` and passing `MigrationsTable` through `Config`.

Other connect parameters are directly passed through to the database driver. For examples of connection strings, see https://github.com/rqlite/gorqlite#examples.

| URL Query | WithInstance Config | Description |
|------------|---------------------|-------------|
| `x-connect-insecure` | n/a: set on instance | Boolean to indicate whether to use an insecure connection. Defaults to `false`. |
| `x-migrations-table` | `MigrationsTable` | Name of the migrations table. Defaults to `schema_migrations`. |

## Notes

* Uses the https://github.com/rqlite/gorqlite driver
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS pets;
3 changes: 3 additions & 0 deletions database/rqlite/examples/migrations/33_create_table.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE pets (
name string
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS pets;
1 change: 1 addition & 0 deletions database/rqlite/examples/migrations/44_alter_table.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE pets ADD predator bool;
334 changes: 334 additions & 0 deletions database/rqlite/rqlite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
package rqlite

import (
"fmt"
"io"
nurl "net/url"
"strconv"
"strings"

"go.uber.org/atomic"

"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/rqlite/gorqlite"
)

func init() {
database.Register("rqlite", &Rqlite{})
}

const (
// DefaultMigrationsTable defines the default rqlite migrations table
DefaultMigrationsTable = "schema_migrations"

// DefaultConnectInsecure defines the default setting for connect insecure
DefaultConnectInsecure = false
)

// ErrNilConfig is returned if no configuration was passed to WithInstance
var ErrNilConfig = fmt.Errorf("no config")

// ErrBadConfig is returned if configuration was invalid
var ErrBadConfig = fmt.Errorf("bad parameter")

// Config defines the driver configuration
type Config struct {
// ConnectInsecure sets whether the connection uses TLS. Ineffectual when using WithInstance
ConnectInsecure bool
// MigrationsTable configures the migrations table name
MigrationsTable string
}

type Rqlite struct {
db *gorqlite.Connection
isLocked atomic.Bool

config *Config
}

// WithInstance creates a rqlite database driver with an existing gorqlite database connection
// and a Config struct
func WithInstance(instance *gorqlite.Connection, config *Config) (database.Driver, error) {
if config == nil {
return nil, ErrNilConfig
}

// we use the consistency level check as a database ping
if _, err := instance.ConsistencyLevel(); err != nil {
return nil, err
}

if len(config.MigrationsTable) == 0 {
config.MigrationsTable = DefaultMigrationsTable
}

driver := &Rqlite{
db: instance,
config: config,
}

if err := driver.ensureVersionTable(); err != nil {
return nil, err
}

return driver, nil
}

// OpenURL creates a rqlite database driver from a connect URL
func OpenURL(url string) (database.Driver, error) {
d := &Rqlite{}
return d.Open(url)
}

func (r *Rqlite) ensureVersionTable() (err error) {
if err = r.Lock(); err != nil {
return err
}

defer func() {
if e := r.Unlock(); e != nil {
if err == nil {
err = e
} else {
err = multierror.Append(err, e)
}
}
}()

stmts := []string{
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (version uint64, dirty bool)`, r.config.MigrationsTable),
fmt.Sprintf(`CREATE UNIQUE INDEX IF NOT EXISTS version_unique ON %s (version)`, r.config.MigrationsTable),
}

if _, err := r.db.Write(stmts); err != nil {
return err
}

return nil
}

// Open returns a new driver instance configured with parameters
// coming from the URL string. Migrate will call this function
// only once per instance.
func (r *Rqlite) Open(url string) (database.Driver, error) {
dburl, config, err := parseUrl(url)
if err != nil {
return nil, err
}
r.config = config

r.db, err = gorqlite.Open(dburl.String())
if err != nil {
return nil, err
}

if err := r.ensureVersionTable(); err != nil {
return nil, err
}

return r, nil
}

// Close closes the underlying database instance managed by the driver.
// Migrate will call this function only once per instance.
func (r *Rqlite) Close() error {
r.db.Close()
return nil
}

// Lock should acquire a database lock so that only one migration process
// can run at a time. Migrate will call this function before Run is called.
// If the implementation can't provide this functionality, return nil.
// Return database.ErrLocked if database is already locked.
func (r *Rqlite) Lock() error {
if !r.isLocked.CAS(false, true) {
return database.ErrLocked
}
return nil
}

// Unlock should release the lock. Migrate will call this function after
// all migrations have been run.
func (r *Rqlite) Unlock() error {
if !r.isLocked.CAS(true, false) {
return database.ErrNotLocked
}
return nil
}

// Run applies a migration to the database. migration is guaranteed to be not nil.
func (r *Rqlite) Run(migration io.Reader) error {
migr, err := io.ReadAll(migration)
if err != nil {
return err
}

query := string(migr[:])
if _, err := r.db.WriteOne(query); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}

return nil
}

// SetVersion saves version and dirty state.
// Migrate will call this function before and after each call to Run.
// version must be >= -1. -1 means NilVersion.
func (r *Rqlite) SetVersion(version int, dirty bool) error {
deleteQuery := fmt.Sprintf(`DELETE FROM %s`, r.config.MigrationsTable)
statements := []gorqlite.ParameterizedStatement{
{
Query: deleteQuery,
},
}

// Also re-write the schema version for nil dirty versions to prevent
// empty schema version for failed down migration on the first migration
// See: https://github.com/golang-migrate/migrate/issues/330
insertQuery := fmt.Sprintf(`INSERT INTO %s (version, dirty) VALUES (?, ?)`, r.config.MigrationsTable)
if version >= 0 || (version == database.NilVersion && dirty) {
statements = append(statements, gorqlite.ParameterizedStatement{
Query: insertQuery,
Arguments: []interface{}{
version,
dirty,
},
})
}

wr, err := r.db.WriteParameterized(statements)
if err != nil {
for i, res := range wr {
if res.Err != nil {
return &database.Error{OrigErr: err, Query: []byte(statements[i].Query)}
}
}

// if somehow we're still here, return the original error with combined queries
return &database.Error{OrigErr: err, Query: []byte(deleteQuery + "\n" + insertQuery)}
}

return nil
}

// Version returns the currently active version and if the database is dirty.
// When no migration has been applied, it must return version -1.
// Dirty means, a previous migration failed and user interaction is required.
func (r *Rqlite) Version() (version int, dirty bool, err error) {
query := "SELECT version, dirty FROM " + r.config.MigrationsTable + " LIMIT 1"

qr, err := r.db.QueryOne(query)
if err != nil {
return database.NilVersion, false, nil
}

if !qr.Next() {
return database.NilVersion, false, nil
}

if err := qr.Scan(&version, &dirty); err != nil {
return database.NilVersion, false, &database.Error{OrigErr: err, Query: []byte(query)}
}

return version, dirty, nil
}

// Drop deletes everything in the database.
// Note that this is a breaking action, a new call to Open() is necessary to
// ensure subsequent calls work as expected.
func (r *Rqlite) Drop() error {
query := `SELECT name FROM sqlite_master WHERE type = 'table'`

tables, err := r.db.QueryOne(query)
if err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}

statements := make([]string, 0)
for tables.Next() {
var tableName string
if err := tables.Scan(&tableName); err != nil {
return err
}

if len(tableName) > 0 {
statement := fmt.Sprintf(`DROP TABLE %s`, tableName)
statements = append(statements, statement)
}
}

// return if nothing to do
if len(statements) <= 0 {
return nil
}

wr, err := r.db.Write(statements)
if err != nil {
for i, res := range wr {
if res.Err != nil {
return &database.Error{OrigErr: err, Query: []byte(statements[i])}
}
}

// if somehow we're still here, return the original error with combined queries
return &database.Error{OrigErr: err, Query: []byte(strings.Join(statements, "\n"))}
}

return nil
}

func parseUrl(url string) (*nurl.URL, *Config, error) {
parsedUrl, err := nurl.Parse(url)
if err != nil {
return nil, nil, err
}

config, err := parseConfigFromQuery(parsedUrl.Query())
if err != nil {
return nil, nil, err
}

if parsedUrl.Scheme != "rqlite" {
return nil, nil, errors.Wrap(ErrBadConfig, "bad scheme")
}

// adapt from rqlite to http/https schemes
if config.ConnectInsecure {
parsedUrl.Scheme = "http"
} else {
parsedUrl.Scheme = "https"
}

filteredUrl := migrate.FilterCustomQuery(parsedUrl)

return filteredUrl, config, nil
}

func parseConfigFromQuery(queryVals nurl.Values) (*Config, error) {
c := Config{
ConnectInsecure: DefaultConnectInsecure,
MigrationsTable: DefaultMigrationsTable,
}

migrationsTable := queryVals.Get("x-migrations-table")
if migrationsTable != "" {
if strings.HasPrefix(migrationsTable, "sqlite_") {
return nil, errors.Wrap(ErrBadConfig, "invalid value for x-migrations-table")
}
c.MigrationsTable = migrationsTable
}

connectInsecureStr := queryVals.Get("x-connect-insecure")
if connectInsecureStr != "" {
connectInsecure, err := strconv.ParseBool(connectInsecureStr)
if err != nil {
return nil, errors.Wrap(ErrBadConfig, "invalid value for x-connect-insecure")
}
c.ConnectInsecure = connectInsecure
}

return &c, nil
}
Loading

0 comments on commit f375aeb

Please sign in to comment.