Skip to content

Commit

Permalink
Implement batching transactor.
Browse files Browse the repository at this point in the history
  • Loading branch information
whilo committed Mar 25, 2023
1 parent 9411d11 commit 33143bd
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 49 deletions.
99 changes: 70 additions & 29 deletions src/datahike/writer.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
[taoensso.timbre :as log]
[datahike.core]
[datahike.writing :as w]
[datahike.tools :as dt :refer [throwable-promise]]
[clojure.core.async :refer [chan close! promise-chan put! go go-loop <!]])
[datahike.tools :as dt :refer [throwable-promise get-time]]
[clojure.core.async :refer [chan close! promise-chan put! go go-loop <! >! poll!]])
(:import [clojure.lang ExceptionInfo]))

(defprotocol PWriter
(-dispatch! [_ arg-map] "Returns a channel that resolves when the transaction finalizes.")
(-shutdown [_] "Returns a channel that resolves when the writer has shut down.")
(-streaming? [_] "Returns whether the transactor is streaming updates directly into the connection, so it does not need to fetch from store on read."))

(defrecord LocalWriter [queue thread streaming?]
(defrecord LocalWriter [queue thread streaming? queue-size]
PWriter
(-dispatch! [_ arg-map]
(let [p (promise-chan)]
Expand All @@ -23,33 +23,70 @@
thread)
(-streaming? [_] streaming?))

(def ^:const default-queue-size 100000)

(defn create-thread
"Creates new transaction thread"
[connection queue write-fn-map]
[connection queue write-fn-map queue-size]
(thread-try
S
(go-loop []
(if-let [{:keys [op args callback] :as invocation} (<! queue)]
(do
(let [op-fn (write-fn-map op)
res (try
(<?- (apply op-fn connection args))
;; Only catch ExceptionInfo here (intentionally rejected transactions).
;; Any other exceptions should crash the writer and signal the supervisor.
(catch Exception e
(log/errorf "Error during invocation" invocation e)
;; take a guess that a NPE was triggered by an invalid connection
(if (= (type e) NullPointerException)
(ex-info "Null pointer encountered in invocation. Connection may have been invalidated, e.g. through db deletion, and needs to be released everywhere."
{:type :writer-error-during-invocation
:invocation invocation
:connection connection
:error e})
e)))]
(when (some? callback)
(put! callback res)))
(recur))
(log/debug "Writer thread gracefully closed")))))
(let [pending-txs-ch (chan queue-size)
store (:store @(:wrapped-atom connection))]
;; processing loop
(go-loop []
(if-let [{:keys [op args callback] :as invocation} (<! queue)]
(do
(let [op-fn (write-fn-map op)
res (try
(apply op-fn connection args)
;; Only catch ExceptionInfo here (intentionally rejected transactions).
;; Any other exceptions should crash the writer and signal the supervisor.
(catch Exception e
(log/errorf "Error during invocation" invocation e args)
;; take a guess that a NPE was triggered by an invalid connection
(when (some? callback)
;; short circuit on errors
(put! callback
(if (= (type e) NullPointerException)
(ex-info "Null pointer encountered in invocation. Connection may have been invalidated, e.g. through db deletion, and needs to be released everywhere."
{:type :writer-error-during-invocation
:invocation invocation
:connection connection
:error e})
e))
:error)))]
(when-not (= res :error)
(>! pending-txs-ch [res callback])))
(recur))
(do
(close! pending-txs-ch)
(log/debug "Writer thread gracefully closed"))))
;; commit loop
(go-loop [tx (<! pending-txs-ch)]
(when tx
(let [txs (atom [tx])]
;; empty channel of pending transactions
(loop [tx (poll! pending-txs-ch)]
(when tx
(swap! txs conj tx)
(recur (poll! pending-txs-ch))))
(log/trace "Batched transaction count: " (count @txs))
;; commit latest tx to disk
(let [db (:db-after (first (last @txs)))]
(try
(let [start-ts (.getTime (get-time))
{{:keys [datahike/commit-id]} :meta
:as _commit-db} (<?- (w/commit! store (:config db) db nil))]
(log/trace "Commit time (ms): " (- (.getTime (get-time)) start-ts))
;; notify all processes that transaction is complete
(doseq [[res callback] @txs]
(put! callback (assoc-in res [:tx-meta :db/commitId] commit-id))))
(catch Exception e
(doseq [[_ callback] @txs]
(put! callback e))
(log/error "Writer thread shutting down because of commit error " e)
(close! pending-txs-ch)))
(recur (<! pending-txs-ch)))))))))

;; public API

Expand All @@ -62,14 +99,18 @@
(fn [writer-config _]
(:backend writer-config)))


(defmethod create-writer :self
[{:keys [buffer-size write-fn-map]} connection]
(let [queue (chan buffer-size)
[{:keys [queue-size write-fn-map]} connection]
(let [queue-size (or queue-size default-queue-size)
queue (chan queue-size)
thread (create-thread connection queue
(merge default-write-fn-map
write-fn-map))]
write-fn-map)
queue-size)]
(map->LocalWriter
{:queue queue
:queue-size queue-size
:thread thread
:streaming? true})))

Expand Down
42 changes: 22 additions & 20 deletions src/datahike/writing.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
[konserve.core :as k]
[taoensso.timbre :as log]
[hasch.core :refer [uuid]]
[superv.async :refer [go-try- <?-]]))
[superv.async :refer [go-try- <?-]]
[clojure.core.async :refer [go <!]]))

;; mapping to storage

Expand Down Expand Up @@ -111,6 +112,10 @@
(assoc-in [:meta :datahike/commit-id] cid)
(assoc-in [:meta :datahike/parents] parents))
db-to-store (db->stored db true)
_ (when-let [pending-futures (:pending-writes (:storage store))]
(loop [[f & r] @pending-futures]
(when f (<?- f) (recur r)))
(reset! (:pending-writes (:storage store)) []))
commit-log-op (k/assoc store cid db-to-store {:sync? false})
branch-op (k/assoc store (:branch config) db-to-store {:sync? false})]
(<?- commit-log-op)
Expand All @@ -121,25 +126,22 @@
([connection tx-data tx-meta update-fn]
(update-and-flush-db connection tx-data tx-meta update-fn nil))
([connection tx-data tx-meta update-fn parents]
(go-try-
(let [{:keys [db/noCommit]} tx-meta
{:keys [db-after]
{:keys [db/txInstant]}
:tx-meta
:as tx-report} @(update-fn connection tx-data tx-meta)
{:keys [config meta]} db-after
meta (assoc meta :datahike/updated-at txInstant)
db (assoc db-after :meta meta)
store (:store @(:wrapped-atom connection))
_ (loop [[f & r] @(:pending-writes (:storage store))]
(when f (<?- f) (recur r)))
_ (reset! (:pending-writes (:storage store)) [])
db (if noCommit db (<?- (commit! store config db parents)))]
(reset! connection db)
(if noCommit
tx-report
(assoc-in tx-report [:tx-meta :db/commitId]
(get-in db [:meta :datahike/commit-id])))))))
(let [{:keys [db/noCommit]} tx-meta
{:keys [db-after]
{:keys [db/txInstant]}
:tx-meta
:as tx-report} @(update-fn connection tx-data tx-meta)
{:keys [config meta]} db-after
meta (assoc meta :datahike/updated-at txInstant)
db (assoc db-after :meta meta)
#_#_ store (:store @(:wrapped-atom connection))
#_db #_ (if noCommit db (<?- (commit! store config db parents)))]
(reset! connection db)
tx-report
#_(if noCommit
tx-report
(assoc-in tx-report [:tx-meta :db/commitId]
(get-in db [:meta :datahike/commit-id]))))))

(defprotocol PDatabaseManager
(-create-database [config opts])
Expand Down

0 comments on commit 33143bd

Please sign in to comment.