-
Notifications
You must be signed in to change notification settings - Fork 4
WAL Proposal
Status: implementation moved to package stuffedio.
The in-memory implementation of EntroQ is extremely fast, even though it relies on whole-system locks to manage tasks. Tasks are easily found in memory, and queues are each given their own heap structure, making true multi-user parallel access unnecessary for any reasonable application. Indeed, Google's original Task Master (Workflow System) on which this idea is based (from the SRE Book) is a single-threaded singleton and still handles a great deal of load.
What is lacking in the in-memory implementation is persistence: it is truly, completely ephemeral. To get persistence behavior in early 2021, you currently have to use the PostgreSQL backend implementation. That implementation is fairly performant, but not everyone wants to manage a PostgreSQL instance.
To add persistance, a write-ahead log (WAL) would be helpful. This proposal is about the changes needed to implement a WAL in the existing in-memory implementation, including a discussion of some of the potential tradeoffs inherent in tying file sync operations to EntroQ task management in what would otherwise be a completely in-memory (and thus very fast) solution.
Contrary to popular belief, WALs are not exactly rocket science. There are some subtleties hiding in them, such as how to handle end-of-file corruption in early termination contexts, but by and large they are quite simple: you write records, one at a time, to the end of a file. When the file gets to a certain capacity, you switch to a new file. Using standard naming conventions, you can always tell which files are which, ensuring that their names are lexicographically ordered. So long as only one process is accessing the WAL, correctness is relatively easy to achieve.
This description of its simplicity, of course, exposes a few things that are not simple:
- When more than one process writes to the log, there can be weeping and wailing involved with the level of intense corruption that results.
- File syncs are quite expensive, but without a sync for every entry written, there is no guarantee that the data arrived to disk.
- End-of-file corruption must be handled gracefully so that restarts result in successful reads. Depending on how corrupt end-of-file states are handled (e.g., whether the process ignores bad records and just moves on with life), middle-of-file corruption may also need to be detected and handled appropriately.
- Log rotation and compaction is a little bit tricky, as well. Several approaches exist, all of which have associated trade-offs.
We discuss an implementation below that we feel strikes a balance between implementation simplicity, efficiency, and safety.
An important technique for ensuring that corruption in a log can be detected and ignored is called Consistent Overhead Byte Stuffing, or COBS. The variant described in the blog post is COWS (word instead of byte), which is delightful. The idea is to use a self-synchronizing data representation. An excellent and common example of this idea is the UTF-8 standard, where continuation bytes and and initial bytes differ in their high bits: you can thus tell them apart.
Essentially, a COBS strategy picks a byte as a separator, then encodes the data to avoid that byte. This isn't quite the same as simply "escaping" a certain byte, though. See the article for details. It is simple to implement, however, and it has some lovely properties, so we make use of it in this proposal.
Note that an implementation of a word-stuffed WAL (just the essentials, including checksums and ordinals) can be found in the entrogo.com/stuffedio package. Its source is at github.com/shiblon/stuffedio.
Assuming a simple COBS log writer/reader implementation is created for this purpose, we have a self-synchronizing log at our disposal. With this at the ready, every mutating operation in EntroQ can then be written to this log as it is also performed in memory. Mutating operations include claims, deletions, insertions, and changes. As a reminder, claims are mutating because they set the ETA, update the claim count, and increment a task's version. This last part, the version increment, is the most critical, as it ensures that other lagging claimants cannot commit changes to a task that has been reassigned. Essentially, anything that increments a task's version number is considered a mutating operation. Plain reads are not mutating, though mutating operations that include dependencies (which do not mutate, but do depend on task ID/version pairs existing) will include that dependency information in the log: it is an essential part of whether a transaction succeeds.
There is already a protocol buffer definition that describes a Claim
or Modify
operation, and this can be reused as part of the format for the WAL entries. That is convenient and makes it easy to handle different structures over time while retaining backward compatibility with existing logs. Protocol buffer deserialization is not incredibly fast, but it is likely to be good enough for our purposes. Using the existing ModifyRequest
message should be sufficient as a storage format for each change in the log.
That said, there are two additional bits of information that are needed to detect corruption in a log entry: a fingerprint, and a counter. The fingerprint will occupy a fixed number of bytes at the beginning of the record, and the counter will be part of the surrounding "log entry" proto message. If the counter skips a value, that's an error. If it is duplicated, then we can ignore duplicates, or error out while reading if two similar count entries contain different fingerprints (indicating that there was a write conflict).
Since all that must be stored in the data itself is the change to a task, even a Claim
is merely a more complete modification, as the log doesn't care about subtle issues like atomicity in claim behavior, merely in what was successfully changed. In this case, the arrival time, version, modify time, claim count, and claimant are altered. Every change stored in the log implies a version increment (the existing version is the pre-change value, and that is stored in the log).
Assuming good serialization of records and an existing WAL implementation, it remains to add WAL writes to the in-memory implementation of EntroQ. With filesystem caching and sync disabled (except at OS-specified intervals, the default being about 30 seconds, typically), high throughput can be achieved if writes are performed synchronously with in-memory operations. The operation can simply not return until a write returns, as well.
For particularly sensitive situations, a user option to sync every time can also be added, drastically reducing performance, but increasing reliability of the system in the face of events like power loss. This is expected to be used only rarely, however, as fault-tolerant systems such as LevelDB find that they do not really require this level of fault tolerance in actual practice.
If desired, an option to sync every N commits can also be given, striking a balance between safety and performance. To ensure that these commits are serialized without incurring an unnecessary sync penalty on random transactions, they can be pushed into a write operation channel consumed by a single goroutine, and that goroutine can be responsible for counting the remaining steps to a forced filesystem sync operation. With an appropriately sized channel buffer, this can be relatively non-blocking. It will still result in slowdowns in some cases, as syncs are not free and systems under heavy load will likely fill the buffer quickly, but it can smooth out usage spikes.
The purpose of a WAL is to provide persistence to a system that is otherwise ephemeral. The other half of writing operations down is reading them back when the system is in cold-start after a failure. This read operation is known as "replay", and it literally runs all of the operations in the log against an initially blank EntroQ in-memory system before it can accept new operations.
Thus, since every record in the WAL fully describes the mutating operation performed on the system, once the log is exhausted, the system will be in the same state (or close, modulo end-of-log corruption or permissive sync settings) as it was in when the system was originally stopped or failed.
Reading and performing all operations on the system can take a substantial amount of time. Because they are all serialized, there will be no lock contention, but acquiring and releasing locks is itself a somewhat slow process. Because the system is not yet accepting any external operation requests, it is safe to completely disable any mutex acquisition and release during log replay. This can substantially improve performance, at the expense of more complex code paths (a state to disable lock safety everywhere a lock is used).
Even with faster application of replay from the WAL, a long-running system can take a long time to replay all of the operations that have ever happened in its history. Thus, a WAL compaction step might be added to create an "initial snapshot" that contains the full system state before remaining WAL entries are applied.
There are several possible approaches to this, all with their own relative merits. This part of the system is actually one of the most complex. With a working WAL implementation, writing and replay are simple. Making it efficient after the system has run for a long time is less simple. A couple of possibilities present themselves, listed in their own subsections below.
One option is to snapshot the system periodically and truncate logs when it is successful. This can work well, but it causes the system to take a (potentially long) break to serialized its entire state to disk. During this time, nothing can progress, since the state of the system needs to stay consistent during the snapshotting process.
It is possible to mitigate this by freezing the in-memory implementation and keeping a separate "in-memory delta" system, and querying both of them for every operation, only making mutating changes to the delta. This works, but is quite complex, easy to get wrong, and its performance is hard to predict and highly dependent on current load and in-memory data size.
Thus, pausing during write is generally preferable from the point of view of ongoing code maintenance, though good testing can mitigate this issue.
Another potential point of compaction is during replay. At this point in the evolution of the system, it is always safe to pause and serialize the current state of the system, as we have control over when it becomes available for external operations. As it is part of initialization, it can be snapshotted before any operations are allowed at all.
This has the benefit of simplicity: the system does not start up until a successful compaction has occurred. The downside is that compaction only occurs at startup, which obviates at least some of the purported benefits of compaction: the intent is for compaction to make startup faster, but it is actually making it slower (at least at first), unless the system stops and restarts relatively frequently.
Finally, an interesting and potentially useful option is to have a process wake up periodically, load a snapshot into memory, replay a WAL fragment, and reserialize it. This has the downside of requiring a great deal more RAM during compaction events: essentially two copies of the system will exist at once, one in the current state, and one approaching a past state.
Note that no matter when compaction happens, the implementation will be similar: actual task data will need to be serialized to disk. This dump is quite simple, it is another WAL-formatted structure that simply contains all tasks (which contain information about their own queues, IDs, versions, etc.) serialized using the Task
proto message.
A special kind of replay will be implemented, where unlocked task insertion is permitted without automatic IDs, versions, or any other resets, from the snapshot. Compacting on replay is expected to take very little additional time when compared to journal reply, particularly in a system that is basically functioning as intended (nominally zero tasks), but even for systems with a substantial backlog. Writing a long list of tasks to disk is efficient and can take advantage of operating system buffers.
Parallel compaction is also possible and perhaps desirable, e.g., on a daily basis. This can be done by reading in only finalized snapshots and journals, leaving any incomplete journals out (reading up to the penultimate journal file, for example). This can be a completely separate job, as snapshot creation is a non-destructive operation: it atomically creates a new file in the WAL directory, and the next system restart will find it and skip any relevant journals. The disadvantage is that it may take more memory while creating the snapshot at the same time as the running process. Since this compaction process only needs to have the same mount points as the in-use process, and does not have to run in the same place in any other way, memory can be managed separately for it on a batch basis, which mitigates this concern.
As for when compaction happens, after considering the above options, compaction on write is not at all attractive, compaction on replay is highly attractive, and compaction in parallel has potential, but may not be a sensible option if memory is constrained.
Compaction on replay will thus be enabled by default, and compaction in parallel will be an option provided to users with documented caveats on RAM requirements being spiky for such a system.