Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix read records #132

Merged
merged 7 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions scalardb/src/scalardb/transfer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -125,37 +125,39 @@
(assoc op :type :fail :error {:results results})))))

(defn- read-record
"Read a record with a transaction. If read fails, an exception is thrown."
[tx storage i]
;; Need Storage API to read the transaction metadata
(let [tx-result (.get tx (prepare-get i))
result (.get storage (prepare-get i))]
"Read and update the specified record with a transaction"
[test id]
(let [tx (scalar/start-transaction test)
tx-result (.get tx (prepare-get id))
;; Need Storage API to read the transaction metadata
result (.get @(:storage test) (prepare-get id))]
;; Put the same balance to check conflicts with in-flight transactions
(->> (calc-new-balance tx-result 0)
(prepare-put i)
(prepare-put id)
(.put tx))
(.commit tx)
result))

(defn read-all-with-retry
[test n]
(scalar/check-transaction-connection! test)
(scalar/check-storage-connection! test)
(defn- read-record-with-retry
[test id]
(scalar/with-retry
(fn [test]
(scalar/prepare-transaction-service! test)
(scalar/prepare-storage-service! test))
test
(try
(let [tx (scalar/start-transaction test)
results (doall (map #(read-record tx @(:storage test) %)
(range n)))]
(.commit tx)
results)
(read-record test id)
(catch Exception e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can catch CrudConflictException if it's for the lazy recovery. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brfrn169 Another exception could be thrown from the commit, couldn't it?

Copy link
Contributor

@brfrn169 brfrn169 Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yito88 Sorry, it seems I had the wrong idea. Please ignore the comment.

So currently, we retry all the record reads together, but after this PR, we will retry each record read individually. Is my understanding correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brfrn169 No problem. "read-record" tries to update, so it's a bit misleading.

we retry all the record reads together, but after this PR, we will retry each record read individually. Is my understanding correct?

Exactly!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

;; The transaction conflicted
;; Read failure or the transaction conflicted
(warn (.getMessage e))
nil))))

(defn read-all-with-retry
[test n]
(scalar/check-transaction-connection! test)
(scalar/check-storage-connection! test)
(doall (map #(read-record-with-retry test %) (range n))))

(defrecord TransferClient [initialized? n initial-balance max-txs]
client/Client
(open! [_ _ _]
Expand Down
25 changes: 14 additions & 11 deletions scalardb/src/scalardb/transfer_append.clj
Original file line number Diff line number Diff line change
Expand Up @@ -128,30 +128,33 @@
:start-fail))

(defn- scan-records
"Scan records with a transaction. If the scan fails, an exception is thrown."
[tx id]
(let [results (.scan tx (prepare-scan id))]
"Scan records and append a new record with a transaction"
[test id]
(let [tx (scalar/start-transaction test)
results (.scan tx (prepare-scan id))]
;; Put the same balance to check conflicts with in-flight transactions
(->> (prepare-put id
(-> results first calc-new-age)
(-> results first (calc-new-balance 0)))
(.put tx))
(.commit tx)
results))

(defn scan-all-records-with-retry
[test n]
(scalar/check-transaction-connection! test)
(defn- scan-records-with-retry
[test id]
(scalar/with-retry scalar/prepare-transaction-service! test
(try
(let [tx (scalar/start-transaction test)
results (doall (map #(scan-records tx %) (range n)))]
(.commit tx)
results)
(scan-records test id)
(catch Exception e
;; The transaction conflicted
;; Scan failure or the transaction conflicted
(warn (.getMessage e))
nil))))

(defn scan-all-records-with-retry
[test n]
(scalar/check-transaction-connection! test)
(doall (map #(scan-records-with-retry test %) (range n))))

(defrecord TransferClient [initialized? n initial-balance max-txs]
client/Client
(open! [_ _ _]
Expand Down
Loading