Skip to content

Commit

Permalink
Scalardb/cleanup cassandra deps (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
yito88 authored Nov 21, 2023
1 parent 9613dc0 commit 83a32c8
Show file tree
Hide file tree
Showing 16 changed files with 312 additions and 251 deletions.
4 changes: 2 additions & 2 deletions scalardb/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
[net.java.dev.jna/jna-platform "5.11.0"]
[org.slf4j/slf4j-jdk14 "2.0.6"]
[cassandra "0.1.0-SNAPSHOT"]
[cc.qbits/alia "4.3.6"]
[cc.qbits/hayt "4.1.0"]]
[cheshire "5.12.0"]
[com.scalar-labs/scalardb-schema-loader "4.0.0-SNAPSHOT"]]
:repositories {"sonartype" "https://oss.sonatype.org/content/repositories/snapshots/"}
:profiles {:dev {:dependencies [[tortue/spy "2.0.0"]]
:plugins [[lein-cloverage "1.1.2"]]}
Expand Down
69 changes: 27 additions & 42 deletions scalardb/src/scalardb/core.clj
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
(ns scalardb.core
(:require [cassandra.core :as c]
[clojure.string :as string]
(:require [cheshire.core :as cheshire]
[clojure.tools.logging :refer [info warn]]
[jepsen.checker :as checker]
[jepsen.independent :as independent]
[qbits.alia :as alia])
[scalardb.db-extend :as ext])
(:import (com.scalar.db.api TransactionState)
(com.scalar.db.schemaloader SchemaLoader)
(com.scalar.db.service TransactionFactory
StorageFactory)
(com.scalar.db.transaction.consensuscommit Coordinator)
(java.util Properties)))
(com.scalar.db.transaction.consensuscommit Coordinator)))

(def ^:const RETRIES 8)
(def ^:const RETRIES_FOR_RECONNECTION 3)
Expand All @@ -18,43 +17,31 @@
(def ^:const DEFAULT_TABLE_COUNT 3)

(def ^:const KEYSPACE "jepsen")
(def ^:private ^:const COORDINATOR "coordinator")
(def ^:private ^:const STATE_TABLE "state")
(def ^:const VERSION "tx_version")

(def ^:private ISOLATION_LEVELS {:snapshot "SNAPSHOT"
:serializable "SERIALIZABLE"})

(def ^:private SERIALIZABLE_STRATEGIES {:extra-read "EXTRA_READ"
:extra-write "EXTRA_WRITE"})
(defn exponential-backoff
[r]
(Thread/sleep (reduce * 1000 (repeat r 2))))

(defn setup-transaction-tables
[test schemata]
(let [cluster (alia/cluster {:contact-points (:nodes test)})
session (alia/connect cluster)]
(let [properties (ext/create-properties (:db test) test)
options (ext/create-table-opts (:db test) test)]
(doseq [schema schemata]
(c/create-my-keyspace session test schema)
(c/create-my-table session schema))

(c/create-my-keyspace session test {:keyspace COORDINATOR})
(c/create-my-table session {:keyspace COORDINATOR
:table STATE_TABLE
:schema {:tx_id :text
:tx_state :int
:tx_created_at :bigint
:primary-key [:tx_id]}})
(c/close-cassandra cluster session)))

(defn- create-properties
[test nodes]
(doto (Properties.)
(.setProperty "scalar.db.contact_points" (string/join "," nodes))
(.setProperty "scalar.db.username" "cassandra")
(.setProperty "scalar.db.password" "cassandra")
(.setProperty "scalar.db.isolation_level"
((:isolation-level test) ISOLATION_LEVELS))
(.setProperty "scalar.db.consensus_commit.serializable_strategy"
((:serializable-strategy test) SERIALIZABLE_STRATEGIES))))
(loop [retries RETRIES]
(when (zero? retries)
(throw (ex-info "Failed to set up tables" {:schema schema})))
(let [result (try
(SchemaLoader/load properties
(cheshire/generate-string schema)
options
true)
:success
(catch Exception e
(warn (.getMessage e))
:fail))]
(when (= result :fail)
(recur (dec retries))))))))

(defn- close-storage!
[test]
Expand Down Expand Up @@ -90,9 +77,7 @@

(defn- create-service-instance
[test mode]
(when-let [properties (some->> (c/live-nodes test)
not-empty
(create-properties test))]
(when-let [properties (ext/create-properties (:db test) test)]
(try
(condp = mode
:storage (.getStorage (StorageFactory/create properties))
Expand All @@ -110,7 +95,7 @@
(info "reconnecting to the cluster")
(loop [tries RETRIES]
(when (< tries RETRIES)
(c/exponential-backoff (- RETRIES tries)))
(exponential-backoff (- RETRIES tries)))
(if-not (pos? tries)
(warn "Failed to connect to the cluster")
(if-let [instance (create-service-instance test mode)]
Expand Down Expand Up @@ -190,7 +175,7 @@
[connect-fn test & body]
`(loop [tries# RETRIES]
(when (< tries# RETRIES)
(c/exponential-backoff (- RETRIES tries#)))
(exponential-backoff (- RETRIES tries#)))
(when (zero? (mod tries# RETRIES_FOR_RECONNECTION))
(~connect-fn ~test))
(if-let [results# ~@body]
Expand All @@ -212,7 +197,7 @@
(do
(warn e)
(when fallback (fallback))
(c/exponential-backoff (- RETRIES tries))
(exponential-backoff (- RETRIES tries))
(recur (dec tries) f args fallback))
(:value res)))))

Expand Down
66 changes: 66 additions & 0 deletions scalardb/src/scalardb/db_extend.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
(ns scalardb.db-extend
(:require [cassandra.core :as cassandra]
[clojure.string :as string]
[jepsen.db :as db])
(:import (com.scalar.db.storage.cassandra CassandraAdmin
CassandraAdmin$ReplicationStrategy
CassandraAdmin$CompactionStrategy)
(java.util Properties)))

(def ^:private ISOLATION_LEVELS {:snapshot "SNAPSHOT"
:serializable "SERIALIZABLE"})

(def ^:private SERIALIZABLE_STRATEGIES {:extra-read "EXTRA_READ"
:extra-write "EXTRA_WRITE"})

(defprotocol DbExtension
(live-nodes [this test])
(wait-for-recovery [this test])
(create-table-opts [this test])
(create-properties [this test]))

(defrecord ExtCassandra []
DbExtension
(live-nodes [_ test] (cassandra/live-nodes test))
(wait-for-recovery [_ test] (cassandra/wait-rf-nodes test))
(create-table-opts
[_ test]
{(keyword CassandraAdmin/REPLICATION_STRATEGY)
(str CassandraAdmin$ReplicationStrategy/SIMPLE_STRATEGY)
(keyword CassandraAdmin/COMPACTION_STRATEGY)
(str CassandraAdmin$CompactionStrategy/LCS)
(keyword CassandraAdmin/REPLICATION_FACTOR) (:rf test)})
(create-properties
[_ test]
(let [nodes (cassandra/live-nodes test)]
(when (nil? nodes)
(throw (ex-info "No living node" {:test test})))
(doto (Properties.)
(.setProperty "scalar.db.contact_points" (string/join "," nodes))
(.setProperty "scalar.db.username" "cassandra")
(.setProperty "scalar.db.password" "cassandra")
(.setProperty "scalar.db.consensus_commit.isolation_level"
((:isolation-level test) ISOLATION_LEVELS))
(.setProperty "scalar.db.consensus_commit.serializable_strategy"
((:serializable-strategy test) SERIALIZABLE_STRATEGIES))))))

(def ^:private ext-dbs
{:cassandra (->ExtCassandra)})

(defn extend-db
[db db-type]
(let [ext-db (db-type ext-dbs)]
(reify
db/DB
(setup! [_ test node] (db/setup! db test node))
(teardown! [_ test node] (db/teardown! db test node))
db/Primary
(primaries [_ test] (db/primaries db test))
(setup-primary! [_ test node] (db/setup-primary! db test node))
db/LogFiles
(log-files [_ test node] (db/log-files db test node))
DbExtension
(live-nodes [_ test] (live-nodes ext-db test))
(wait-for-recovery [_ test] (wait-for-recovery ext-db test))
(create-table-opts [_ test] (create-table-opts ext-db test))
(create-properties [_ test] (create-properties ext-db test)))))
32 changes: 9 additions & 23 deletions scalardb/src/scalardb/elle_append.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,12 @@
UnknownTransactionStatusException)))

(def ^:const TABLE "txn")
(def ^:const SCHEMA {:id :int
:val :text
:tx_id :text
:tx_version :int
:tx_state :int
:tx_prepared_at :bigint
:tx_committed_at :bigint
:before_val :text
:before_tx_id :text
:before_tx_version :int
:before_tx_state :int
:before_tx_prepared_at :bigint
:before_tx_committed_at :bigint
:primary-key [:id]})
(def ^:private ^:const ID "id")
(def ^:private ^:const VALUE "val")
(def ^:const SCHEMA {:transaction true
:partition-key [ID]
:clustering-key []
:columns {(keyword ID) "INT" (keyword VALUE) "INT"}})

(defn prepare-get
[table id]
Expand Down Expand Up @@ -80,9 +70,8 @@
[test]
(doseq [id (range (inc INITIAL_TABLE_ID))
i (range DEFAULT_TABLE_COUNT)]
(scalar/setup-transaction-tables test [{:keyspace KEYSPACE
:table (str TABLE id \_ i)
:schema SCHEMA}])))
(scalar/setup-transaction-tables
test [{(keyword (str KEYSPACE \. TABLE id \_ i)) SCHEMA}])))

(defn add-tables
[test next-id]
Expand All @@ -92,12 +81,9 @@
(when (compare-and-set! (:table-id test) current-id next-id)
(info (str "Creating new tables for " next-id))
(doseq [i (range DEFAULT_TABLE_COUNT)]
(scalar/setup-transaction-tables test [{:keyspace KEYSPACE
:table (str TABLE
next-id
\_
i)
:schema SCHEMA}])))))))
(scalar/setup-transaction-tables
test
[{(keyword (str KEYSPACE \. TABLE next-id \_ i)) SCHEMA}])))))))

(defrecord AppendClient [initialized?]
client/Client
Expand Down
32 changes: 9 additions & 23 deletions scalardb/src/scalardb/elle_write_read.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,12 @@
UnknownTransactionStatusException)))

(def ^:const TABLE "txn")
(def ^:private ^:const SCHEMA {:id :int
:val :int
:tx_id :text
:tx_version :int
:tx_state :int
:tx_prepared_at :bigint
:tx_committed_at :bigint
:before_val :int
:before_tx_id :text
:before_tx_version :int
:before_tx_state :int
:before_tx_prepared_at :bigint
:before_tx_committed_at :bigint
:primary-key [:id]})
(def ^:private ^:const ID "id")
(def ^:private ^:const VALUE "val")
(def ^:const SCHEMA {:transaction true
:partition-key [ID]
:clustering-key []
:columns {(keyword ID) "INT" (keyword VALUE) "INT"}})

(defn prepare-get
[table id]
Expand Down Expand Up @@ -75,9 +65,8 @@
[test]
(doseq [id (range (inc INITIAL_TABLE_ID))
i (range DEFAULT_TABLE_COUNT)]
(scalar/setup-transaction-tables test [{:keyspace KEYSPACE
:table (str TABLE id \_ i)
:schema SCHEMA}])))
(scalar/setup-transaction-tables
test [{(keyword (str KEYSPACE \. TABLE id \_ i)) SCHEMA}])))

(defn add-tables
[test next-id]
Expand All @@ -87,12 +76,9 @@
(when (compare-and-set! (:table-id test) current-id next-id)
(info (str "Creating new tables for " next-id))
(doseq [i (range DEFAULT_TABLE_COUNT)]
(scalar/setup-transaction-tables test [{:keyspace KEYSPACE
:table (str TABLE
next-id
\_
i)
:schema SCHEMA}])))))))
(scalar/setup-transaction-tables
test
[{(keyword (str KEYSPACE \. TABLE next-id \_ i)) SCHEMA}])))))))

(defrecord WriteReadClient [initialized?]
client/Client
Expand Down
Loading

0 comments on commit 83a32c8

Please sign in to comment.