-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support for SQLite backend #130
base: main
Are you sure you want to change the base?
Conversation
…ob; multiple tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking solid! Thanks for the contribution.
I've commented directly on code lines where I have question and change requests.
Here are a few more thoughts and questions that don't direclty relate to code.
- Can you help me understand how mutual exclusion of jobs is achieved? For Postgres it is achieved with
FOR UPDATE SKIP LOCKED
at the query level. - Is mutual exclusion affected by: -DSQLITE_THREADSAFE=0
- Is mutual exclusion affected by WAL mode?: https://www.sqlite.org/wal.html
- Would it make sense to get test coverage over sqlite in these alternative modes? Or is that being too paranoid?
Tests and lints
There are some tests (data races) and lints failing. Feel free to annotate some of the lints to be ignored if you don't think they make sense or seem to onerous.
You can lint and test locally
make lint
go test backends/sqlite/sqlite_backend_test.go -tags testing -race
); | ||
|
||
CREATE TABLE neoq_dead_jobs ( | ||
id integer primary key not null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like your editor/autoformatter adds an extreanous indent at the start of these CREATE TABLE
statements (same for the neoq_jobs
table).
handlers map[string]handler.Handler // a map of queue names to queue handlers | ||
queueListenerChan map[string]chan string // each queue has a listener channel to process enqueued jobs | ||
logger logging.Logger // backend-wide logger | ||
dbMutex *sync.RWMutex // protects concurrent access to sqlite db on SqliteBackend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this may be how we're achieving job mutual exclusion. Though I do wonder if we should consider using SQLlite's internal mutual exclusion instead and enforce THREADSAFE=1
...is that possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not aware of this mode. I can try experimenting with this and see if it works well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @acaloiaro, I tried couple of things and here's my understanding:
-
We might not need a custom job mutual exclusion (with
sync.RWMutex
). Apparently, SQLite by default works inserialized
mode aka -DSQLITE_THREADSAFE=1, meaning it handles it internally. It could handle around 3k concurrent writes (neoq.Enqueue) before getting adatabase is locked
error. I think this is why I went ahead with the custom handling earlier.
But alternatively (and perhaps a better solution), go-sqlite3 driver FAQ recommends using a?cache=shared
mode withdb.SetMaxOpenConns(1)
(a related thread). It seems to handle as high as 10k concurrent writes and probably more.
I couldn't observe possible implications from this as long as the transactions are very small - which brings me to the question - do we need to wrap entirehandleJob
code in a transaction (like how it's done in postgres_backend.go), or do one small transaction beforehandler.Exec
(for deadline check) and one after (marking as processed)? Is there a reason why the former is done? Because otherwise, the latter makes more sense to me as both it follows the general thumb rule and also does not slow down writes. The enqueues get stuck otherwise beyond a point as one of thehandleJob
will obtain exclusive access to db. Let me know your thoughts. -
Mutual exclusion would most likely be affected by -DSQLITE_THREADSAFE=0. I didn't try this - it requires manually building SQLite source code and linking it to go-sqlite3 driver. Is this something necessary to consider?
-
WAL Mode works well too without the need for custom handling. According the documentation, it helps in achieving higher concurrency. But it introduces additional files
-shm
and-wal
, and the latter came to be around 3 times the size of db.
Note: QueueListenerChanBufferSize
and handler.Concurrency
needs to be correctly configured by user based on their specific needs for things to work well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @acaloiaro Let me know what you think and which way to go. I'll do the changes accordingly and wrap this up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to wrap entire handleJob code in a transaction (like how it's done in postgres_backend.go)
My recollection of why handleJob
is wrapped in a transaction is because I was considering exposing access to neoq's transaction to the user, so users always have a transaction available within their jobs. Doing so would ensure that any database modifications within jobs that use the exposed transaction can be rolled back if the job fails. Ultimately, I haven't done any API work to expose the transaction, but I'd like to retain that as an option. So for that reason, I'd like sqlite be able to do the same, if the time comes. I believe there are performance implications to this decision that I haven't fully thought out. If there's a good reason to push back on this decision, I think we should entertain any objections.
-
Let's not do that.
-
Ok. Maybe just think through whether we need any settings that are WAL mode. If not, feel free to ignore this.
Maybe having a mutex isn't a bad idea. We certainly don't want people to have to have special builds of sqlite to have a mutual exclusion guarantee, and I doubt this mutex you've added will dominate performance behavior. I just wanted to make sure it was thoroughly thought through. If having a mutex here achieves the best result in terms of reliability, then we should keep it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have a look through any remaining, unresolved conversations on the PR. When everything is resolved, I'll give this another review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also have a look at this top level of this thread. Lints and tests with -race
are failing. So you'll want to run those locally and get them resolved.
backends/sqlite/sqlite_backend.go
Outdated
} | ||
|
||
dbURI := strings.Split(s.config.ConnectionString, "/") | ||
dbPath := strings.Join(dbURI[1:], "/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure len(dbURI) > 1
, and throw an error that the connection string is malformed, before indexing into its slice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, I thought to have:
dbPath := strings.TrimPrefix(s.config.ConnectionString, "sqlite3://")
When testing for Windows I found that the connection string must be of the form sqlite3://file:///<db_path>
for it to open the db successfully. If we do a split on "/" we get an extra "/" attached at the start of the resulting db path. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds better than splitting.
} | ||
|
||
// Rollback is safe to call even if the tx is already closed, so if | ||
// the tx commits successfully, this is a no-op |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember making this comment about Postgres transactions. Is it true of Sqlite too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pgx
lib clearly documents that tx.Rollback
is safe to call multiple times even after commit. But the same is not there for sql
lib. Theoretically rollback shouldn't have any effect after commit right? But if it is a concern then maybe we can do something like:
defer func() {
if err != nil {
_ = tx.Rollback(ctx)
s.logger.Error("Transaction rolled back due to error", slog.Any("error", err))
}
}()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This confirms that it’s a no-op for the sql.DB
interface more generally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is from the fact that in their example code they are doing a defer tx.Rollback() like we are doing currently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was referring to the following from the example:
Defer the transaction’s rollback. If the transaction succeeds, it will be committed before the function exits, making the deferred rollback call a no-op. If the transaction fails it won’t be committed, meaning that the rollback will be called as the function exits.
return | ||
} | ||
|
||
func (s *SqliteBackend) updateJobToInProgress(ctx context.Context, h handler.Handler, job *jobs.Job) (err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should consider having this function receive a *sql.Tx
rather than it create one. That way, job handling can use a single transaction for all db operations.
In the PG backend, we do this by adding a tx
to the ctx
, e.g. from handleJob
:
ctx = context.WithValue(ctx, txCtxVarKey, tx)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In any case we'll need atleast 2 transactions right? One for in progress/failed
and one for processed/failed
? Since in progress
needs to be committed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How important is in progress
to you? Is there a specific use case you have in mind?
I intentionally avoided it for postgres because it's an additional database round trip, and the best use case I had for it was to be able to show in-progress jobs in a UI (which doesn't exist). As you can see, leaving in progress out also reduced complexity because the "state machine" for determining its correct value also needs to be considered. I'd be inclined to remove the in progress
requirement unless it's serving a specific goal of yours.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the very same use case for being able to show in-progress jobs in a UI. But maybe you're right, it adds more complexity. We can perhaps remove the requirement for now as it would also be consistent with other backends.
return | ||
}) | ||
// Make sure that each neoq worker only works on one thing at a time. | ||
h.Concurrency = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This possibly places the test under unrealistic conditions. I would expect real world users to use default concurrency. Will this test work with concurrency > 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are test cases borrowed from postgres backend. I hadn't put much thought on this specific case. I can try your suggestion.
const WaitForJobTime = 1100 * time.Millisecond | ||
|
||
// allow time for listener to start and for at least one job to process | ||
time.Sleep(WaitForJobTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a done
channel here instead? 1.1s
is a pretty long fixed wait time. Some of my early tests did this, but I've tried to get away from any fixed wait times in favor of a done
channel and a timeout instead.
Any chance for this getting merged? It would be very useful for me :) |
@timaa2k If you or @pranavmodx want to get the pull request over the finish line, for sure. |
Fixes #129
✅ Feature parity with existing backends
✅ Roughly the same amount of test coverage as existing backends
All of the major features are working with this new SQLite backend. Almost all of the tests in other backends have been extended for this one. More can be added as required to capture missed or specific cases. Currently tests create test.db within
backends/sqlite
. Not sure if that is ideal. I am open to hearing your thoughts.For listening to jobs, Go channels are used as we don't have something similar to
pg_notify
for SQLite.I added a new
in progress
job status which was useful for our use case but up to you to decide whether to keep it.