From 8f46d5d8a041e6c66037661c1c336d2c761ebf9e Mon Sep 17 00:00:00 2001 From: yito88 Date: Mon, 28 Oct 2024 14:51:56 +0100 Subject: [PATCH 1/7] fix read-records in transfer --- scalardb/src/scalardb/transfer.clj | 41 +++++++++++++++--------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/scalardb/src/scalardb/transfer.clj b/scalardb/src/scalardb/transfer.clj index e7ea196..88324b0 100644 --- a/scalardb/src/scalardb/transfer.clj +++ b/scalardb/src/scalardb/transfer.clj @@ -125,16 +125,24 @@ (assoc op :type :fail :error {:results results}))))) (defn- read-record - "Read a record with a transaction. If read fails, an exception is thrown." - [tx storage i] - ;; Need Storage API to read the transaction metadata - (let [tx-result (.get tx (prepare-get i)) - result (.get storage (prepare-get i))] - ;; Put the same balance to check conflicts with in-flight transactions - (->> (calc-new-balance tx-result 0) - (prepare-put i) - (.put tx)) - result)) + "Read and update the specified record with a transaction. + Return nil if the read fails or a conflict happens." + [test storage i] + (try + (let [tx (scalar/start-transaction test) + tx-result (.get tx (prepare-get i)) + ;; Need Storage API to read the transaction metadata + result (.get storage (prepare-get i))] + ;; Put the same balance to check conflicts with in-flight transactions + (->> (calc-new-balance tx-result 0) + (prepare-put i) + (.put tx)) + (.commit tx) + result) + (catch Exception e + ;; Read failure or the transaction conflicted + (warn (.getMessage e)) + nil))) (defn read-all-with-retry [test n] @@ -145,16 +153,9 @@ (scalar/prepare-transaction-service! test) (scalar/prepare-storage-service! test)) test - (try - (let [tx (scalar/start-transaction test) - results (doall (map #(read-record tx @(:storage test) %) - (range n)))] - (.commit tx) - results) - (catch Exception e - ;; The transaction conflicted - (warn (.getMessage e)) - nil)))) + (let [results (pmap #(read-record test @(:storage test) %) (range n))] + (when (every? #(some? %) results) + results)))) (defrecord TransferClient [initialized? n initial-balance max-txs] client/Client From 96b6a84c5bce64e7753b51649a9e3fc47bd8869e Mon Sep 17 00:00:00 2001 From: yito88 Date: Mon, 28 Oct 2024 15:03:56 +0100 Subject: [PATCH 2/7] fix scan-records for transfer-append --- scalardb/src/scalardb/transfer_append.clj | 38 ++++++++++++----------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/scalardb/src/scalardb/transfer_append.clj b/scalardb/src/scalardb/transfer_append.clj index 9197376..c325600 100644 --- a/scalardb/src/scalardb/transfer_append.clj +++ b/scalardb/src/scalardb/transfer_append.clj @@ -128,29 +128,31 @@ :start-fail)) (defn- scan-records - "Scan records with a transaction. If the scan fails, an exception is thrown." - [tx id] - (let [results (.scan tx (prepare-scan id))] - ;; Put the same balance to check conflicts with in-flight transactions - (->> (prepare-put id - (-> results first calc-new-age) - (-> results first (calc-new-balance 0))) - (.put tx)) - results)) + "Scan records and append a new record with a transaction. + Return nil if the read fails or a conflict happens." + [test id] + (try + (let [tx (scalar/start-transaction test) + results (.scan tx (prepare-scan id))] + ;; Put the same balance to check conflicts with in-flight transactions + (->> (prepare-put id + (-> results first calc-new-age) + (-> results first (calc-new-balance 0))) + (.put tx)) + (.commit tx) + results) + (catch Exception e + ;; Read failure or the transaction conflicted + (warn (.getMessage e)) + nil))) (defn scan-all-records-with-retry [test n] (scalar/check-transaction-connection! test) (scalar/with-retry scalar/prepare-transaction-service! test - (try - (let [tx (scalar/start-transaction test) - results (doall (map #(scan-records tx %) (range n)))] - (.commit tx) - results) - (catch Exception e - ;; The transaction conflicted - (warn (.getMessage e)) - nil)))) + (let [results (pmap #(scan-records test %) (range n))] + (when (every? #(some? %) results) + results)))) (defrecord TransferClient [initialized? n initial-balance max-txs] client/Client From 40f5416586f67fc1d7d3164d944e40de87a9a1e7 Mon Sep 17 00:00:00 2001 From: yito88 Date: Tue, 29 Oct 2024 12:02:32 +0100 Subject: [PATCH 3/7] fix to handle an exception for each record --- scalardb/src/scalardb/transfer.clj | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/scalardb/src/scalardb/transfer.clj b/scalardb/src/scalardb/transfer.clj index 88324b0..c24005a 100644 --- a/scalardb/src/scalardb/transfer.clj +++ b/scalardb/src/scalardb/transfer.clj @@ -127,12 +127,12 @@ (defn- read-record "Read and update the specified record with a transaction. Return nil if the read fails or a conflict happens." - [test storage i] + [test i] (try (let [tx (scalar/start-transaction test) tx-result (.get tx (prepare-get i)) ;; Need Storage API to read the transaction metadata - result (.get storage (prepare-get i))] + result (.get @(:storage test) (prepare-get i))] ;; Put the same balance to check conflicts with in-flight transactions (->> (calc-new-balance tx-result 0) (prepare-put i) @@ -144,18 +144,22 @@ (warn (.getMessage e)) nil))) -(defn read-all-with-retry - [test n] - (scalar/check-transaction-connection! test) - (scalar/check-storage-connection! test) +(defn- read-record-with-retry + [test i] (scalar/with-retry (fn [test] (scalar/prepare-transaction-service! test) (scalar/prepare-storage-service! test)) test - (let [results (pmap #(read-record test @(:storage test) %) (range n))] - (when (every? #(some? %) results) - results)))) + (read-record test i))) + +(defn read-all-with-retry + [test n] + (scalar/check-transaction-connection! test) + (scalar/check-storage-connection! test) + (try + (pmap #(read-record-with-retry test %) (range n)) + (catch Exception _ nil))) (defrecord TransferClient [initialized? n initial-balance max-txs] client/Client From a6766a068fe951a9089dd6a957ea592d599beb97 Mon Sep 17 00:00:00 2001 From: yito88 Date: Thu, 31 Oct 2024 22:03:48 +0100 Subject: [PATCH 4/7] fix tests --- scalardb/src/scalardb/transfer.clj | 40 ++++++++++---------- scalardb/test/scalardb/transfer_2pc_test.clj | 18 ++++++--- scalardb/test/scalardb/transfer_test.clj | 16 +++++--- 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/scalardb/src/scalardb/transfer.clj b/scalardb/src/scalardb/transfer.clj index c24005a..b526113 100644 --- a/scalardb/src/scalardb/transfer.clj +++ b/scalardb/src/scalardb/transfer.clj @@ -125,24 +125,18 @@ (assoc op :type :fail :error {:results results}))))) (defn- read-record - "Read and update the specified record with a transaction. - Return nil if the read fails or a conflict happens." + "Read and update the specified record with a transaction" [test i] - (try - (let [tx (scalar/start-transaction test) - tx-result (.get tx (prepare-get i)) - ;; Need Storage API to read the transaction metadata - result (.get @(:storage test) (prepare-get i))] - ;; Put the same balance to check conflicts with in-flight transactions - (->> (calc-new-balance tx-result 0) - (prepare-put i) - (.put tx)) - (.commit tx) - result) - (catch Exception e - ;; Read failure or the transaction conflicted - (warn (.getMessage e)) - nil))) + (let [tx (scalar/start-transaction test) + tx-result (.get tx (prepare-get i)) + ;; Need Storage API to read the transaction metadata + result (.get @(:storage test) (prepare-get i))] + ;; Put the same balance to check conflicts with in-flight transactions + (->> (calc-new-balance tx-result 0) + (prepare-put i) + (.put tx)) + (.commit tx) + result)) (defn- read-record-with-retry [test i] @@ -151,15 +145,21 @@ (scalar/prepare-transaction-service! test) (scalar/prepare-storage-service! test)) test - (read-record test i))) + (try + (read-record test i) + (catch Exception e + ;; Read failure or the transaction conflicted + (warn (.getMessage e)) + nil)))) (defn read-all-with-retry [test n] (scalar/check-transaction-connection! test) (scalar/check-storage-connection! test) (try - (pmap #(read-record-with-retry test %) (range n)) - (catch Exception _ nil))) + (doall (pmap #(read-record-with-retry test %) (range n))) + ;; unwrap the exception + (catch java.util.concurrent.ExecutionException e (throw (.getCause e))))) (defrecord TransferClient [initialized? n initial-balance max-txs] client/Client diff --git a/scalardb/test/scalardb/transfer_2pc_test.clj b/scalardb/test/scalardb/transfer_2pc_test.clj index 6ad953b..485d1e5 100644 --- a/scalardb/test/scalardb/transfer_2pc_test.clj +++ b/scalardb/test/scalardb/transfer_2pc_test.clj @@ -227,18 +227,26 @@ scalar/prepare-transaction-service! (spy/spy) scalar/prepare-storage-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] - (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100 1) - nil nil)] + (let [num-accounts 5 + client (client/open! (transfer-2pc/->TransferClient (atom false) + num-accounts + 100 1) + nil nil) + retries-reconnection (* num-accounts + (+ (quot scalar/RETRIES + scalar/RETRIES_FOR_RECONNECTION) + 1))] (is (thrown? clojure.lang.ExceptionInfo (client/invoke! client {:db mock-db :storage (ref mock-storage)} (#'transfer/get-all {:client client} nil)))) - (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) + (is (spy/called-n-times? scalar/exponential-backoff + (* scalar/RETRIES num-accounts))) (is (spy/called-n-times? scalar/prepare-transaction-service! - (+ (quot scalar/RETRIES scalar/RETRIES_FOR_RECONNECTION) 1))) + retries-reconnection)) (is (spy/called-n-times? scalar/prepare-storage-service! - (+ (quot scalar/RETRIES scalar/RETRIES_FOR_RECONNECTION) 1)))))) + retries-reconnection))))) (deftest transfer-client-check-tx-test (with-redefs [scalar/check-transaction-states (spy/stub 1)] diff --git a/scalardb/test/scalardb/transfer_test.clj b/scalardb/test/scalardb/transfer_test.clj index b750c60..70634b8 100644 --- a/scalardb/test/scalardb/transfer_test.clj +++ b/scalardb/test/scalardb/transfer_test.clj @@ -197,18 +197,24 @@ scalar/prepare-transaction-service! (spy/spy) scalar/prepare-storage-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) - nil nil)] + (let [num-accounts 5 + client (client/open! (transfer/->TransferClient (atom false) + num-accounts 100 1) + nil nil) + retries-reconnection (* num-accounts + (+ (quot scalar/RETRIES + scalar/RETRIES_FOR_RECONNECTION) + 1))] (is (thrown? clojure.lang.ExceptionInfo (client/invoke! client {:db mock-db :storage (ref mock-storage)} (#'transfer/get-all {:client client} nil)))) - (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) + (is (spy/called-n-times? scalar/exponential-backoff (* scalar/RETRIES 5))) (is (spy/called-n-times? scalar/prepare-transaction-service! - (+ (quot scalar/RETRIES scalar/RETRIES_FOR_RECONNECTION) 1))) + retries-reconnection)) (is (spy/called-n-times? scalar/prepare-storage-service! - (+ (quot scalar/RETRIES scalar/RETRIES_FOR_RECONNECTION) 1)))))) + retries-reconnection))))) (deftest transfer-client-check-tx-test (with-redefs [scalar/check-transaction-states (spy/stub 1)] From 000a11d8fb4daaf9f76865a0b97d0218ddcc2a49 Mon Sep 17 00:00:00 2001 From: yito88 Date: Thu, 31 Oct 2024 22:08:21 +0100 Subject: [PATCH 5/7] fix retry num --- scalardb/test/scalardb/transfer_test.clj | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scalardb/test/scalardb/transfer_test.clj b/scalardb/test/scalardb/transfer_test.clj index 70634b8..f9e2e2c 100644 --- a/scalardb/test/scalardb/transfer_test.clj +++ b/scalardb/test/scalardb/transfer_test.clj @@ -210,7 +210,8 @@ :storage (ref mock-storage)} (#'transfer/get-all {:client client} nil)))) - (is (spy/called-n-times? scalar/exponential-backoff (* scalar/RETRIES 5))) + (is (spy/called-n-times? scalar/exponential-backoff + (* scalar/RETRIES num-accounts))) (is (spy/called-n-times? scalar/prepare-transaction-service! retries-reconnection)) (is (spy/called-n-times? scalar/prepare-storage-service! From 188794eefc3f8a04164e8e25d18d876dc1fb76e1 Mon Sep 17 00:00:00 2001 From: yito88 Date: Thu, 31 Oct 2024 22:15:39 +0100 Subject: [PATCH 6/7] seq exec read txs --- scalardb/src/scalardb/transfer.clj | 5 +---- scalardb/test/scalardb/transfer_2pc_test.clj | 18 +++++------------- scalardb/test/scalardb/transfer_test.clj | 17 +++++------------ 3 files changed, 11 insertions(+), 29 deletions(-) diff --git a/scalardb/src/scalardb/transfer.clj b/scalardb/src/scalardb/transfer.clj index b526113..bc00d67 100644 --- a/scalardb/src/scalardb/transfer.clj +++ b/scalardb/src/scalardb/transfer.clj @@ -156,10 +156,7 @@ [test n] (scalar/check-transaction-connection! test) (scalar/check-storage-connection! test) - (try - (doall (pmap #(read-record-with-retry test %) (range n))) - ;; unwrap the exception - (catch java.util.concurrent.ExecutionException e (throw (.getCause e))))) + (doall (map #(read-record-with-retry test %) (range n)))) (defrecord TransferClient [initialized? n initial-balance max-txs] client/Client diff --git a/scalardb/test/scalardb/transfer_2pc_test.clj b/scalardb/test/scalardb/transfer_2pc_test.clj index 485d1e5..6ad953b 100644 --- a/scalardb/test/scalardb/transfer_2pc_test.clj +++ b/scalardb/test/scalardb/transfer_2pc_test.clj @@ -227,26 +227,18 @@ scalar/prepare-transaction-service! (spy/spy) scalar/prepare-storage-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] - (let [num-accounts 5 - client (client/open! (transfer-2pc/->TransferClient (atom false) - num-accounts - 100 1) - nil nil) - retries-reconnection (* num-accounts - (+ (quot scalar/RETRIES - scalar/RETRIES_FOR_RECONNECTION) - 1))] + (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100 1) + nil nil)] (is (thrown? clojure.lang.ExceptionInfo (client/invoke! client {:db mock-db :storage (ref mock-storage)} (#'transfer/get-all {:client client} nil)))) - (is (spy/called-n-times? scalar/exponential-backoff - (* scalar/RETRIES num-accounts))) + (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! - retries-reconnection)) + (+ (quot scalar/RETRIES scalar/RETRIES_FOR_RECONNECTION) 1))) (is (spy/called-n-times? scalar/prepare-storage-service! - retries-reconnection))))) + (+ (quot scalar/RETRIES scalar/RETRIES_FOR_RECONNECTION) 1)))))) (deftest transfer-client-check-tx-test (with-redefs [scalar/check-transaction-states (spy/stub 1)] diff --git a/scalardb/test/scalardb/transfer_test.clj b/scalardb/test/scalardb/transfer_test.clj index f9e2e2c..b750c60 100644 --- a/scalardb/test/scalardb/transfer_test.clj +++ b/scalardb/test/scalardb/transfer_test.clj @@ -197,25 +197,18 @@ scalar/prepare-transaction-service! (spy/spy) scalar/prepare-storage-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] - (let [num-accounts 5 - client (client/open! (transfer/->TransferClient (atom false) - num-accounts 100 1) - nil nil) - retries-reconnection (* num-accounts - (+ (quot scalar/RETRIES - scalar/RETRIES_FOR_RECONNECTION) - 1))] + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) + nil nil)] (is (thrown? clojure.lang.ExceptionInfo (client/invoke! client {:db mock-db :storage (ref mock-storage)} (#'transfer/get-all {:client client} nil)))) - (is (spy/called-n-times? scalar/exponential-backoff - (* scalar/RETRIES num-accounts))) + (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! - retries-reconnection)) + (+ (quot scalar/RETRIES scalar/RETRIES_FOR_RECONNECTION) 1))) (is (spy/called-n-times? scalar/prepare-storage-service! - retries-reconnection))))) + (+ (quot scalar/RETRIES scalar/RETRIES_FOR_RECONNECTION) 1)))))) (deftest transfer-client-check-tx-test (with-redefs [scalar/check-transaction-states (spy/stub 1)] From 0a0b7497fbbb5a5246b797f78a973fed7a7a38fb Mon Sep 17 00:00:00 2001 From: yito88 Date: Sat, 2 Nov 2024 21:45:17 +0100 Subject: [PATCH 7/7] fix for append --- scalardb/src/scalardb/transfer.clj | 12 +++---- scalardb/src/scalardb/transfer_append.clj | 41 ++++++++++++----------- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/scalardb/src/scalardb/transfer.clj b/scalardb/src/scalardb/transfer.clj index bc00d67..ac20329 100644 --- a/scalardb/src/scalardb/transfer.clj +++ b/scalardb/src/scalardb/transfer.clj @@ -126,27 +126,27 @@ (defn- read-record "Read and update the specified record with a transaction" - [test i] + [test id] (let [tx (scalar/start-transaction test) - tx-result (.get tx (prepare-get i)) + tx-result (.get tx (prepare-get id)) ;; Need Storage API to read the transaction metadata - result (.get @(:storage test) (prepare-get i))] + result (.get @(:storage test) (prepare-get id))] ;; Put the same balance to check conflicts with in-flight transactions (->> (calc-new-balance tx-result 0) - (prepare-put i) + (prepare-put id) (.put tx)) (.commit tx) result)) (defn- read-record-with-retry - [test i] + [test id] (scalar/with-retry (fn [test] (scalar/prepare-transaction-service! test) (scalar/prepare-storage-service! test)) test (try - (read-record test i) + (read-record test id) (catch Exception e ;; Read failure or the transaction conflicted (warn (.getMessage e)) diff --git a/scalardb/src/scalardb/transfer_append.clj b/scalardb/src/scalardb/transfer_append.clj index c325600..198b969 100644 --- a/scalardb/src/scalardb/transfer_append.clj +++ b/scalardb/src/scalardb/transfer_append.clj @@ -128,31 +128,32 @@ :start-fail)) (defn- scan-records - "Scan records and append a new record with a transaction. - Return nil if the read fails or a conflict happens." + "Scan records and append a new record with a transaction" [test id] - (try - (let [tx (scalar/start-transaction test) - results (.scan tx (prepare-scan id))] - ;; Put the same balance to check conflicts with in-flight transactions - (->> (prepare-put id - (-> results first calc-new-age) - (-> results first (calc-new-balance 0))) - (.put tx)) - (.commit tx) - results) - (catch Exception e - ;; Read failure or the transaction conflicted - (warn (.getMessage e)) - nil))) + (let [tx (scalar/start-transaction test) + results (.scan tx (prepare-scan id))] + ;; Put the same balance to check conflicts with in-flight transactions + (->> (prepare-put id + (-> results first calc-new-age) + (-> results first (calc-new-balance 0))) + (.put tx)) + (.commit tx) + results)) + +(defn- scan-records-with-retry + [test id] + (scalar/with-retry scalar/prepare-transaction-service! test + (try + (scan-records test id) + (catch Exception e + ;; Scan failure or the transaction conflicted + (warn (.getMessage e)) + nil)))) (defn scan-all-records-with-retry [test n] (scalar/check-transaction-connection! test) - (scalar/with-retry scalar/prepare-transaction-service! test - (let [results (pmap #(scan-records test %) (range n))] - (when (every? #(some? %) results) - results)))) + (doall (map #(scan-records-with-retry test %) (range n)))) (defrecord TransferClient [initialized? n initial-balance max-txs] client/Client