-
-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: improve writer latency #618
Conversation
0fc6e49
to
33143bd
Compare
Reduce blocking => speedup. Rename get-time to get-date.
0294ff3
to
49caee4
Compare
4e895c6
to
80d574b
Compare
persistent_set files
(put! callback res))) | ||
(recur)) | ||
(log/debug "Writer thread gracefully closed"))))) | ||
[connection write-fn-map transaction-queue-size commit-queue-size commit-wait-time] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main added logic. It first transacts in a loop and then commits in the second loop. Transactions are grouped in commits and they all see the same db-after containing their transaction and their individual datoms added and retracted.
IStorage | ||
(store [_ node] | ||
(swap! stats update :writes inc) | ||
(let [address (gen-address node (:crypto-hash? config)) | ||
_ (trace "writing storage: " address " crypto: " (:crypto-hash? config))] | ||
(k/assoc store address node {:sync? true}) | ||
(swap! pending-writes conj (k/assoc store address node {:sync? false})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write operations are tracked here and collected by commit! later.
(psset/set-branching-factor! BRANCHING_FACTOR) | ||
(let [^PersistentSortedSet pset (psset/sorted-set-by (index-type->cmp-quick index-type false))] | ||
(set! (.-_storage pset) (:storage store)) | ||
(let [^PersistentSortedSet pset (psset/sorted-set* {:cmp (index-type->cmp-quick index-type false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the new persistent sorted set version.
(let [{:keys [hash max-tx max-eid meta]} db] | ||
(uuid [hash max-tx max-eid meta]))) | ||
|
||
(defn commit! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All durable changes happen here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This takes the time of two roundtrips to the store, one for all writes in flush-pending-writes
(where the slowest write operation dominates) and one for both commit-log-op
and branch-op
to update the root of the indices.
(assoc-in [:meta :datahike/parents] parents) | ||
(assoc-in [:meta :datahike/commit-id] commit-id))))) | ||
|
||
(defn update-connection! [connection tx-data tx-meta update-fn] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Transactions (transact and load-entities) happen here.
@TimoKramer This PR is finally ready now. The necessary flush statement of the last commit was missing and caused errors on machines with slow filesystems which then made the async IO hang. All tests pass now always on all machines I have access to. The problem was not clearly visible because assertion errors are not propagated and koache swallows all log output by default. The following konserve PR renders such read errors visible and does not use assertions anymore replikativ/konserve#115. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so far looks good to me. one thing I do not understand is, why you're using two loops in the writer...
@@ -166,6 +166,7 @@ | |||
(if-let [cached (wrapped/lookup cache address)] | |||
cached | |||
(let [node (k/get store address nil {:sync? true})] | |||
(assert (not (nil? node)) "Node not found in storage.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You really want to throw an Error here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should never happen. All the nodes we refer to in our addresses/pointers must always exist, otherwise the store got corrupted or a write operation was inconsistent (e.g. the underlying store did not ensure its durability or mixed up the causal order of events [I think Redis, does, but it does not immediately store durably, so something like this could go wrong in a faulty store backend]). If your comment is about the assertion, yes I will change that to an Exception now.
But we also could allow it and retry. In this case you would be able to sync the store from another system out of order (e.g. through datsync, rsync or file copying or whatever) and your operations would just stall instead of break. But I think it is better to just copy everything except for the roots first and overwrite the roots last in an atomic operation (in the same order commit writes), that way you can sync without interrupting operations at all.
src/datahike/writer.cljc
Outdated
(recur (poll! commit-queue)))) | ||
(log/trace "Batched transaction count: " (count @txs)) | ||
;; commit latest tx to disk | ||
(let [db (:db-after (first (last @txs)))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
peek?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's good.
src/datahike/writing.cljc
Outdated
:parent p})) | ||
commit-id)))))) | ||
(do | ||
(assert (not (nil? p)) "Parent cannot be nil.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one more assert here. FYI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, fixed.
(<?- commit-log-op) | ||
(<?- branch-op) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this for catching exceptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is waiting for the write operations in the asynchronous case and a no-op in the synchronous case (because the values are already written then). It also will re-throw exceptions that would be in either of these channels.
Fixes #617. This pull request changes the operations of the write process to flush and sync the dirty indices in a two stage process instead of waiting on all operations during the execution:
This approach reduces the transaction latency in the best case to two round trips to the underlying store, which is optimal if distributed snapshot consistency needs to be preserved. Otherwise other processes could read DB records that point to tree fragments that are not yet written.