Skip to content

Commit

Permalink
refactoring 2pc txs
Browse files Browse the repository at this point in the history
  • Loading branch information
yito88 committed Sep 3, 2023
1 parent bb087f3 commit 0bd69c9
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 32 deletions.
13 changes: 13 additions & 0 deletions scalardb/src/scalardb/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,19 @@
; use the second transaction manager to join a transaction
(some-> test :2pc deref second (.join tx-id)))

(defn prepare-validate-commit-txs
"Given transactions as a vector are prepared, validated,
then committed for 2pc."
[txs]
(doseq [f [#(.prepare %) #(.validate %) #(.commit %)]
tx txs]
(f tx)))

(defn rollback-txs
"Given transactions as a vector are rollbacked."
[txs]
(doseq [tx txs] (.rollback tx)))

(defmacro with-retry
"If the result of the body is nil, it retries it"
[connect-fn test & body]
Expand Down
10 changes: 2 additions & 8 deletions scalardb/src/scalardb/elle_append_2pc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,13 @@
;; add tables for the next sequence
(append/add-tables test (inc seq-id)))
(let [txn' (mapv (partial tx-execute seq-id tx1 tx2) txn)]
(.prepare tx1)
(.prepare tx2)
(.validate tx1)
(.validate tx2)
(.commit tx1)
(.commit tx2)
(scalar/prepare-validate-commit-txs [tx1 tx2])
(assoc op :type :ok :value (independent/tuple seq-id txn')))
(catch UnknownTransactionStatusException _
(swap! (:unknown-tx test) conj (.getId tx1))
(assoc op :type :info :error {:unknown-tx-status (.getId tx1)}))
(catch Exception e
(.rollback tx1)
(.rollback tx2)
(scalar/rollback-txs [tx1 tx2])
(scalar/try-reconnection-for-2pc! test)
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))

Expand Down
10 changes: 2 additions & 8 deletions scalardb/src/scalardb/elle_write_read_2pc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,13 @@
;; add tables for the next sequence
(wr/add-tables test (inc seq-id)))
(let [txn' (mapv (partial tx-execute seq-id tx1 tx2) txn)]
(.prepare tx1)
(.prepare tx2)
(.validate tx1)
(.validate tx2)
(.commit tx1)
(.commit tx2)
(scalar/prepare-validate-commit-txs [tx1 tx2])
(assoc op :type :ok :value (independent/tuple seq-id txn')))
(catch UnknownTransactionStatusException _
(swap! (:unknown-tx test) conj (.getId tx1))
(assoc op :type :info :error {:unknown-tx-status (.getId tx1)}))
(catch Exception e
(.rollback tx1)
(.rollback tx2)
(scalar/rollback-txs [tx1 tx2])
(scalar/try-reconnection-for-2pc! test)
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))

Expand Down
10 changes: 2 additions & 8 deletions scalardb/src/scalardb/transfer_2pc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,11 @@
(->> (transfer/calc-new-balance toResult amount)
(transfer/prepare-put to)
(.put tx2)))
(.prepare tx1)
(.prepare tx2)
(.validate tx1)
(.validate tx2)
(.commit tx1)
(.commit tx2)
(scalar/prepare-validate-commit-txs [tx1 tx2])
(catch UnknownTransactionStatusException e
(throw e))
(catch Exception e
(.rollback tx1)
(.rollback tx2)
(scalar/rollback-txs [tx1 tx2])
(throw e))))

(defrecord TransferClient [initialized? n initial-balance]
Expand Down
10 changes: 2 additions & 8 deletions scalardb/src/scalardb/transfer_append_2pc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,11 @@
(t-append/calc-new-age to-result)
(t-append/calc-new-balance to-result amount))
(.put tx2)))
(.prepare tx1)
(.prepare tx2)
(.validate tx1)
(.validate tx2)
(.commit tx1)
(.commit tx2)
(scalar/prepare-validate-commit-txs [tx1 tx2])
(catch UnknownTransactionStatusException e
(throw e))
(catch Exception e
(.rollback tx1)
(.rollback tx2)
(scalar/rollback-txs [tx1 tx2])
(throw e))))

(defrecord TransferClient [initialized? n initial-balance]
Expand Down

0 comments on commit 0bd69c9

Please sign in to comment.