From 33143bdbd91d628034a637288ac9ae5e6d76a08c Mon Sep 17 00:00:00 2001 From: Christian Weilbach Date: Sat, 25 Mar 2023 04:03:57 -0700 Subject: [PATCH] Implement batching transactor. --- src/datahike/writer.cljc | 99 +++++++++++++++++++++++++++------------ src/datahike/writing.cljc | 42 +++++++++-------- 2 files changed, 92 insertions(+), 49 deletions(-) diff --git a/src/datahike/writer.cljc b/src/datahike/writer.cljc index 6aa124bf6..1fe91f5b5 100644 --- a/src/datahike/writer.cljc +++ b/src/datahike/writer.cljc @@ -3,8 +3,8 @@ [taoensso.timbre :as log] [datahike.core] [datahike.writing :as w] - [datahike.tools :as dt :refer [throwable-promise]] - [clojure.core.async :refer [chan close! promise-chan put! go go-loop ! poll!]]) (:import [clojure.lang ExceptionInfo])) (defprotocol PWriter @@ -12,7 +12,7 @@ (-shutdown [_] "Returns a channel that resolves when the writer has shut down.") (-streaming? [_] "Returns whether the transactor is streaming updates directly into the connection, so it does not need to fetch from store on read.")) -(defrecord LocalWriter [queue thread streaming?] +(defrecord LocalWriter [queue thread streaming? queue-size] PWriter (-dispatch! [_ arg-map] (let [p (promise-chan)] @@ -23,33 +23,70 @@ thread) (-streaming? [_] streaming?)) +(def ^:const default-queue-size 100000) + (defn create-thread "Creates new transaction thread" - [connection queue write-fn-map] + [connection queue write-fn-map queue-size] (thread-try S - (go-loop [] - (if-let [{:keys [op args callback] :as invocation} (! pending-txs-ch [res callback]))) + (recur)) + (do + (close! pending-txs-ch) + (log/debug "Writer thread gracefully closed")))) + ;; commit loop + (go-loop [tx (LocalWriter {:queue queue + :queue-size queue-size :thread thread :streaming? true}))) diff --git a/src/datahike/writing.cljc b/src/datahike/writing.cljc index 49a128b0c..e9929c32e 100644 --- a/src/datahike/writing.cljc +++ b/src/datahike/writing.cljc @@ -10,7 +10,8 @@ [konserve.core :as k] [taoensso.timbre :as log] [hasch.core :refer [uuid]] - [superv.async :refer [go-try- stored db true) + _ (when-let [pending-futures (:pending-writes (:storage store))] + (loop [[f & r] @pending-futures] + (when f (