Unordered batch commit queue for compaction jobs #3686
Labels
enhancement
New feature or request
parent-issue
An issue that is or should be split into multiple sub-issues
Milestone
Background
Split from:
Related to:
Description
We'd like to batch up applying many compaction jobs to the state store at once. We can create a separate queue and lambda that feed into the state store commit queue but create big batches of commits when we know they can be applied in any order.
This should allow for much higher throughput of compactions in the state store committer lambda.
Analysis
We can add a separate SQS queue and lambda to perform batching of the jobs into larger commit messages. The queue can be a normal, non-FIFO queue, because we don't care about ordering of compaction job commits as the affected files are already locked.
We want to take a large number of messages from the queue at once and combine them into one commit message, then forward that to the state store committer queue.
Batch size
The lambda SQS integration can wait for a larger batch of messages to arrive before it invokes the lambda. See the following documentation:
https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
When deploying an SQS event source, you can set a maximum batching window and batch size. The lambda will be invoked when any of these occur:
The maximum batching window can be up to 5 minutes, in seconds. The batch size can be up to 10,000 messages. The invocation payload size is set in a quota for the AWS account, based on the size of a synchronous invocation.
See the following documentation for the invocation payload size limit:
https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
By default this is 6MB for a synchronous invocation, as is used by the lambda SQS integration. We can check whether a JSON array of 10,000 compaction jobs is likely to fit in that 6MB.
Batching across Sleeper tables
There's a complication in that compaction jobs from all Sleeper tables in the instance will end up on one queue, so batching will happen across all Sleeper tables as well. This is in contrast to the state store committer queue where commits are split across message groups, one per Sleeper table, which SQS only allows for FIFO queues.
This means that a single invocation of the batching lambda can include compaction commits from any Sleeper table in the instance. When we batch up the state store commits, this can result in as many commits as there are Sleeper tables. That should only be a problem if there are enough messages that it reduces the effective batch size per Sleeper table. It does also considerably reduce our control over the size of each transaction when it reaches the committer lambda.
If we used a partitioned messaging system, e.g. Kinesis, we may be able to avoid processing multiple tables at once. If we had a persistent instance receiving messages we could raise the limit on messages we can receive at once, and process Sleeper tables more independently. For now we want to stick to SQS and Lambda.
Status store updates
The state store committer lambda currently makes a separate update to the status store once it's committed each job to the state store. This seems likely to pose a problem when dealing with a large batch of compactions.
In the next milestone, we're planning to remove the status store updates from the state store committer lambda. We have the following issue to make this work:
For now we could leave the status store update in the committer, but we could consider disabling it temporarily, or using an SQS queue to do the updates in a separate lambda. We'd like to minimise the amount of work done on this in this milestone.
Commit message vs transaction
We'll need to allow multiple compaction job commits in one compaction job commit request message. We'll want to apply all those commits in a single state store transaction.
For the current transaction types that can get quite large, we write the commit request to S3, send a pointer to it on SQS, then the committer reads the commit request from S3 and writes a new S3 object for the transaction.
With very large commits, we've seen the committer take a considerable amount of time to write the transaction to S3. We could avoid this by removing the commit request as a separate S3 object, so that before sending the commit request to SQS we write the actual transaction object in S3. The state store committer then only needs to read it, and doesn't need to write it back to S3. When it writes the transaction to the log, it will just validate and point to the S3 object that was created outside of the committer.
We could also make this change for the transaction type assigning compaction input files. We could prioritise making the compaction commits work this way first.
Sub Issues
We can approach this by starting with the changes to the state store committer lambda, then adding the batching lambda, then updating the compaction task to send to the new queue. Any status store changes could be dealt with at the end if necessary.
State store committer
We can start by looking at the current transaction format and commit request format. The commit request has the job, and some metadata for the status store update. The transaction content for each job has just the fields from the job that are relevant. It has the partition ID twice, as it holds a FileReference object that has the partition ID, as well as a separate string for it.
We'll need to add the metadata for the status store update into the transaction, so that we can potentially derive the status store update from the transaction log in the future. We can also remove the duplicate partition ID.
We'll need to add some method on the state store to be able to add a transaction that's held in S3. This method could take the metadata that we expect to be in the SQS message and the transaction log, and also a pointer to the transaction body in S3. The state store can then read the body from S3 and apply it to the log.
We'll need to adjust how compaction commit transactions are applied so that any invalid commits are discarded, and the transaction as a whole is still applied, so that if a job is run more than once we ignore the second run. If this results in an empty transaction we can just not add it to the log.
We could simplify the flow of the state store committer lambda by saying that the SQS message only contains the type of the transaction, and either the body of the transaction or a pointer to where it is held. If any status store update needs to be done, all the data for that should be held in the transaction, so that in principle we could apply the status store update based on the transaction log. To start to refactor to this, we could begin by just adding this as a model for a type of commit request.
Compaction commit batching lambda
Once we enrich the transaction object to include the metadata for the status store update, we can think of the batching queue as providing a list of exactly that model (ReplaceFileReferencesRequest). That model is for the state store, and because the state store is for a specific Sleeper table, the model does not include the Sleeper table ID. We'll need to either wrap it and include the table ID, or make a separate model that we can convert.
The batcher can take a list of commit requests and split them up by Sleeper table. For each Sleeper table it will write the batch to S3 if needed and submit a request to the commit queue.
We then need to deploy a lambda with this logic, along with the SQS queue for the requests. We'll need instance properties for the lambda and the batching parameters.
We can then change the compaction task code to send asynchronous commits to the new SQS queue rather than direct to the state store committer.
The text was updated successfully, but these errors were encountered: