diff --git a/scalardb/src/scalardb/transfer.clj b/scalardb/src/scalardb/transfer.clj index e7ea196..ac20329 100644 --- a/scalardb/src/scalardb/transfer.clj +++ b/scalardb/src/scalardb/transfer.clj @@ -125,37 +125,39 @@ (assoc op :type :fail :error {:results results}))))) (defn- read-record - "Read a record with a transaction. If read fails, an exception is thrown." - [tx storage i] - ;; Need Storage API to read the transaction metadata - (let [tx-result (.get tx (prepare-get i)) - result (.get storage (prepare-get i))] + "Read and update the specified record with a transaction" + [test id] + (let [tx (scalar/start-transaction test) + tx-result (.get tx (prepare-get id)) + ;; Need Storage API to read the transaction metadata + result (.get @(:storage test) (prepare-get id))] ;; Put the same balance to check conflicts with in-flight transactions (->> (calc-new-balance tx-result 0) - (prepare-put i) + (prepare-put id) (.put tx)) + (.commit tx) result)) -(defn read-all-with-retry - [test n] - (scalar/check-transaction-connection! test) - (scalar/check-storage-connection! test) +(defn- read-record-with-retry + [test id] (scalar/with-retry (fn [test] (scalar/prepare-transaction-service! test) (scalar/prepare-storage-service! test)) test (try - (let [tx (scalar/start-transaction test) - results (doall (map #(read-record tx @(:storage test) %) - (range n)))] - (.commit tx) - results) + (read-record test id) (catch Exception e - ;; The transaction conflicted + ;; Read failure or the transaction conflicted (warn (.getMessage e)) nil)))) +(defn read-all-with-retry + [test n] + (scalar/check-transaction-connection! test) + (scalar/check-storage-connection! test) + (doall (map #(read-record-with-retry test %) (range n)))) + (defrecord TransferClient [initialized? n initial-balance max-txs] client/Client (open! [_ _ _] diff --git a/scalardb/src/scalardb/transfer_append.clj b/scalardb/src/scalardb/transfer_append.clj index 9197376..198b969 100644 --- a/scalardb/src/scalardb/transfer_append.clj +++ b/scalardb/src/scalardb/transfer_append.clj @@ -128,30 +128,33 @@ :start-fail)) (defn- scan-records - "Scan records with a transaction. If the scan fails, an exception is thrown." - [tx id] - (let [results (.scan tx (prepare-scan id))] + "Scan records and append a new record with a transaction" + [test id] + (let [tx (scalar/start-transaction test) + results (.scan tx (prepare-scan id))] ;; Put the same balance to check conflicts with in-flight transactions (->> (prepare-put id (-> results first calc-new-age) (-> results first (calc-new-balance 0))) (.put tx)) + (.commit tx) results)) -(defn scan-all-records-with-retry - [test n] - (scalar/check-transaction-connection! test) +(defn- scan-records-with-retry + [test id] (scalar/with-retry scalar/prepare-transaction-service! test (try - (let [tx (scalar/start-transaction test) - results (doall (map #(scan-records tx %) (range n)))] - (.commit tx) - results) + (scan-records test id) (catch Exception e - ;; The transaction conflicted + ;; Scan failure or the transaction conflicted (warn (.getMessage e)) nil)))) +(defn scan-all-records-with-retry + [test n] + (scalar/check-transaction-connection! test) + (doall (map #(scan-records-with-retry test %) (range n)))) + (defrecord TransferClient [initialized? n initial-balance max-txs] client/Client (open! [_ _ _]