Skip to content

Commit

Permalink
Workaround Cassandra table setup issue (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
yito88 authored Feb 13, 2024
1 parent 17b6976 commit be76b87
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 30 deletions.
6 changes: 6 additions & 0 deletions cassandra/src/cassandra/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@
(clause/with {:compaction
{:class compaction-strategy}}))))

(defn open-cassandra
[test]
(let [cluster (alia/cluster {:contact-points (:nodes test)})
session (alia/connect cluster)]
[cluster session]))

(defn close-cassandra
[cluster session]
(some-> session alia/shutdown (.get 10 TimeUnit/SECONDS))
Expand Down
96 changes: 77 additions & 19 deletions scalardb/src/scalardb/core.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
(ns scalardb.core
(:require [cheshire.core :as cheshire]
(:require [cassandra.core :as cassandra]
[cheshire.core :as cheshire]
[clojure.string :as string]
[clojure.tools.logging :refer [info warn]]
[jepsen.checker :as checker]
[jepsen.independent :as independent]
Expand All @@ -23,28 +25,84 @@
[r]
(Thread/sleep (reduce * 1000 (repeat r 2))))

(defn- get-cassandra-schema
"Only the current test schemata are covered
because this is just a workaround for the schema loader issue."
[schema]
(assert (= (count schema) 1) "The schema should have only 1 entry")
(let [keyspace-table (-> schema keys first name)
schema (-> schema vals first)
[keyspace table] (string/split keyspace-table #"\.")
partition-key (mapv keyword (:partition-key schema))
clustering-key (mapv keyword (:clustering-key schema))
columns (assoc (reduce
(fn [r [k t]]
(let [val-type (-> t string/lower-case keyword)
result (assoc r k val-type)]
(if (or (.contains partition-key k)
(.contains clustering-key k))
result
(assoc result
(->> k name (str "before_") keyword)
val-type))))
{}
(:columns schema))
:tx_id :text
:tx_version :int
:tx_state :int
:tx_prepared_at :bigint
:tx_committed_at :bigint
:before_tx_id :text
:before_tx_version :int
:before_tx_state :int
:before_tx_prepared_at :bigint
:before_tx_committed_at :bigint
:primary-key (into partition-key clustering-key))]
{:keyspace keyspace
:table table
:schema columns}))

(defn- setup-cassandra-tables
[test schemata]
(let [[cluster session] (cassandra/open-cassandra test)
schemata (map get-cassandra-schema schemata)]
(doseq [schema schemata]
(cassandra/create-my-keyspace session test schema)
(cassandra/create-my-table session schema))
(cassandra/create-my-keyspace session test {:keyspace "coordinator"})
(cassandra/create-my-table session {:keyspace "coordinator"
:table "state"
:schema {:tx_id :text
:tx_state :int
:tx_created_at :bigint
:primary-key [:tx_id]}})
(cassandra/close-cassandra cluster session)))

(defn setup-transaction-tables
[test schemata]
(let [properties (ext/create-properties (:db test) test)
options (ext/create-table-opts (:db test) test)]
(doseq [schema (map cheshire/generate-string schemata)]
(loop [retries RETRIES]
(when (zero? retries)
(throw (ex-info "Failed to set up tables" {:schema schema})))
(when (< retries RETRIES)
(exponential-backoff (- RETRIES retries))
(try
(SchemaLoader/unload properties schema true)
(catch Exception e (warn (.getMessage e))))
(exponential-backoff (- RETRIES retries)))
(let [result (try
(SchemaLoader/load properties schema options true)
:success
(catch Exception e
(warn (.getMessage e))
:fail))]
(when (= result :fail)
(recur (dec retries))))))))
(if (= (.getProperty properties "scalar.db.username") "cassandra")
;; Workaround the issue of the schema loader for Cassandra
(setup-cassandra-tables test schemata)
(doseq [schema (map cheshire/generate-string schemata)]
(loop [retries RETRIES]
(when (zero? retries)
(throw (ex-info "Failed to set up tables" {:schema schema})))
(when (< retries RETRIES)
(exponential-backoff (- RETRIES retries))
(try
(SchemaLoader/unload properties schema true)
(catch Exception e (warn (.getMessage e))))
(exponential-backoff (- RETRIES retries)))
(let [result (try
(SchemaLoader/load properties schema options true)
:success
(catch Exception e
(warn (.getMessage e))
:fail))]
(when (= result :fail)
(recur (dec retries)))))))))

(defn- close-storage!
[test]
Expand Down
8 changes: 3 additions & 5 deletions scalardl/test/scalardl/cas_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
(:import (com.scalar.dl.client.exception ClientException)
(com.scalar.dl.client.service ClientService)
(com.scalar.dl.ledger.model ContractExecutionResult)
(com.scalar.dl.ledger.service StatusCode)
(javax.json Json)))
(com.scalar.dl.ledger.service StatusCode)))

(def ^:dynamic contract-count (atom 0))
(def ^:dynamic execute-count (atom 0))
Expand All @@ -22,9 +21,8 @@
nil)
(executeContract [& _]
(swap! execute-count inc)
(ContractExecutionResult. (-> (Json/createObjectBuilder)
(.add "value" 3)
.build)
(ContractExecutionResult. "{\"value\": 3}"
nil
nil
nil))))

Expand Down
9 changes: 3 additions & 6 deletions scalardl/test/scalardl/transfer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
(:import (com.scalar.dl.client.exception ClientException)
(com.scalar.dl.client.service ClientService)
(com.scalar.dl.ledger.model ContractExecutionResult)
(com.scalar.dl.ledger.service StatusCode)
(javax.json Json)))
(com.scalar.dl.ledger.service StatusCode)))

(def ^:dynamic contract-count (atom 0))
(def ^:dynamic execute-count (atom 0))
Expand All @@ -23,10 +22,8 @@
nil)
(executeContract [& _]
(swap! execute-count inc)
(ContractExecutionResult. (-> (Json/createObjectBuilder)
(.add "balance" 1000)
(.add "age" 111)
.build)
(ContractExecutionResult. "{\"balance\": 1000, \"age\": 111}"
nil
nil
nil))))

Expand Down

0 comments on commit be76b87

Please sign in to comment.