Skip to content

Commit

Permalink
[WIP] Implement the delete-history Command
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderkiel committed Sep 15, 2024
1 parent 708f732 commit a3436d5
Show file tree
Hide file tree
Showing 61 changed files with 1,372 additions and 324 deletions.
1 change: 1 addition & 0 deletions cljfmt.edn
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
if-ok [[:block 1]]
try-one [[:block 2]]
when-ok [[:block 1]]
with-open-coll [[:block 1]]
do-sync [[:block 1]]
do-async [[:block 1]]
has-form [[:block 1]]
Expand Down
5 changes: 4 additions & 1 deletion modules/coll/.clj-kondo/config.edn
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
{:config-paths
["../../../.clj-kondo/root"]}
["../../../.clj-kondo/root"]

:lint-as
{blaze.coll.core/with-open-coll clojure.core/with-open}}
7 changes: 5 additions & 2 deletions modules/coll/src/blaze/coll/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
[coll]
(identical? ::empty (reduce #(reduced %2) ::empty coll)))

(defn- inc-rf [sum _] (inc ^long sum))
(defn inc-rf [sum _] (inc ^long sum))

(defn eduction
"Like `clojure.core/eduction` but implements Counted instead of Iterable."
Expand Down Expand Up @@ -67,4 +67,7 @@
IReduceInit
(reduce [_ rf# init#]
(with-open ~bindings
(reduce rf# init# ~coll)))))
(reduce rf# init# ~coll)))
Counted
(count [coll#]
(.reduce coll# inc-rf 0))))
14 changes: 12 additions & 2 deletions modules/coll/test/blaze/coll/core_test.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
(ns blaze.coll.core-test
(:require
[blaze.coll.core :as coll]
[blaze.coll.core :as coll :refer [with-open-coll]]
[blaze.test-util :as tu]
[clojure.spec.test.alpha :as st]
[clojure.test :as test :refer [are deftest is testing]]))
[clojure.test :as test :refer [are deftest is testing]])
(:import [java.lang AutoCloseable]))

(st/instrument)
(set! *warn-on-reflection* true)

(test/use-fixtures :each tu/fixture)

Expand Down Expand Up @@ -93,3 +95,11 @@
[::x ::y] 0 ::x
[::x ::y] 1 ::y
[::x ::y] 2 ::not-found)))

(deftest with-open-coll-test
(let [state (volatile! false)
coll (with-open-coll [_ (reify AutoCloseable (close [_] (vreset! state true)))]
(coll/eduction (map inc) (range 10)))]
(is (= 10 (count coll)))
(is (= (range 1 11) (vec coll)))
(is (true? @state))))
3 changes: 1 addition & 2 deletions modules/cql/.clj-kondo/config.edn
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
"../../module-test-util/resources/clj-kondo.exports/blaze/module-test-util"]

:lint-as
{blaze.db.impl.macros/with-open-coll clojure.core/with-open
blaze.elm.compiler.macros/defunop clojure.core/defn
{blaze.elm.compiler.macros/defunop clojure.core/defn
blaze.elm.compiler.macros/defbinop clojure.core/defn
blaze.elm.compiler.macros/defternop clojure.core/defn
blaze.elm.compiler.macros/defnaryop clojure.core/defn
Expand Down
7 changes: 6 additions & 1 deletion modules/db-tx-log/src/blaze/db/tx_log/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
queue?)

(s/def :blaze.db.tx-cmd/op
#{"create" "put" "keep" "delete" "conditional-delete"})
#{"create" "put" "keep" "delete" "conditional-delete" "delete-history"})

(s/def :blaze.db.tx-cmd/type
:fhir.resource/type)
Expand Down Expand Up @@ -87,6 +87,11 @@
:blaze.db.tx-cmd/check-refs
:blaze.db.tx-cmd/allow-multiple]))

(defmethod tx-cmd "delete-history" [_]
(s/keys :req-un [:blaze.db.tx-cmd/op
:blaze.db.tx-cmd/type
:blaze.resource/id]))

(s/def :blaze.db/tx-cmd
(s/multi-spec tx-cmd :op))

Expand Down
32 changes: 29 additions & 3 deletions modules/db-tx-log/test/blaze/db/tx_log/spec_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
[taoensso.timbre :as log]))

(st/instrument)
(log/set-level! :trace)
(log/set-min-level! :trace)

(test/use-fixtures :each tu/fixture)

Expand All @@ -36,23 +36,49 @@
:id "0"
:hash observation-hash-0
:refs [["Patient" "0"]]}
{:op "put"
:type "Patient"
:id "0"
:hash patient-hash-0}
{:op "put"
:type "Patient"
:id "0"
:hash patient-hash-0
:if-match 1}
{:op "keep"
:type "Patient"
:id "0"
:hash patient-hash-0}
{:op "delete"
:type "Patient"
:id "0"
:if-match 1}
{:op "delete"
:type "Patient"
:id "0"
:check-refs true}))
:check-refs true}
{:op "conditional-delete"
:type "Patient"}
{:op "delete-history"
:type "Patient"
:id "0"}))

(testing "invalid"
(are [tx-cmd] (not (s/valid? :blaze.db/tx-cmd tx-cmd))
nil
1
{:op "create"
:type "Patient"
:id "0"}
{:op "put"
:type "Patient"
:id "0"}
{:op "delete"
:type "Patient"}
{:op "delete"
:type "Patient"
:id "0"
:check-refs "i should be a boolean"})))
:check-refs "i should be a boolean"}
{:op "conditional-delete"}
{:op "delete-history"
:type "Patient"})))
7 changes: 0 additions & 7 deletions modules/db/src/blaze/db/api_spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,6 @@
:start-t (s/? (s/nilable :blaze.db/t)))
:ret (cs/coll-of :blaze.db/resource-handle))

(s/fdef d/total-num-of-instance-changes
:args (s/cat :db :blaze.db/db
:type :fhir.resource/type
:id :blaze.resource/id
:since (s/? (s/nilable inst?)))
:ret nat-int?)

;; ---- Type-Level History Functions ------------------------------------------

(s/fdef d/type-history
Expand Down
6 changes: 3 additions & 3 deletions modules/db/src/blaze/db/impl/batch_db.clj
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@

(-instance-history [_ tid id start-t]
(let [start-t (if (some-> start-t (<= t)) start-t t)]
(rao/instance-history snapshot tid id start-t)))
(rao/instance-history snapshot tid id t start-t)))

(-total-num-of-instance-changes [_ tid id since]
(let [end-t (or (some->> since (t-by-instant/t-by-instant snapshot)) 0)]
Expand All @@ -156,7 +156,7 @@

(-type-history [_ tid start-t start-id]
(let [start-t (if (some-> start-t (<= t)) start-t t)]
(tao/type-history snapshot tid start-t start-id)))
(tao/type-history snapshot tid t start-t start-id)))

(-total-num-of-type-changes [_ type since]
(let [tid (codec/tid type)
Expand All @@ -168,7 +168,7 @@

(-system-history [_ start-t start-tid start-id]
(let [start-t (if (some-> start-t (<= t)) start-t t)]
(sao/system-history snapshot start-t start-tid start-id)))
(sao/system-history snapshot t start-t start-tid start-id)))

(-total-num-of-system-changes [_ since]
(let [end-t (some->> since (t-by-instant/t-by-instant snapshot))]
Expand Down
28 changes: 16 additions & 12 deletions modules/db/src/blaze/db/impl/index/resource_as_of.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
(def ^:private ^:const ^long max-key-size
(+ except-id-key-size codec/max-id-size))

(def ^:const ^long value-size
(def ^:const ^long min-value-size
(+ hash/size codec/state-size))

(def ^:const ^long max-value-size
(+ hash/size codec/state-size codec/t-size))

(defn- focus-id!
"Reduces the limit of `kb` in order to hide the t and focus on id solely."
[kb]
Expand Down Expand Up @@ -295,15 +298,16 @@
(rh/resource-handle! tid id (codec/descending-long (bb/get-long! kb tid-id-size)) vb)))

(defn instance-history
"Returns a reducible collection of all versions of the resource with `tid` and
`id` starting at `start-t`.
Versions are resource handles."
[snapshot tid id start-t]
"Returns a reducible collection of all historic resource handles of the
resource with `tid` and `id` of the database with the point in time `t`
starting at `start-t`."
[snapshot tid id t start-t]
(let [tid-id-size (+ codec/tid-size (bs/size id))]
(i/prefix-entries snapshot :resource-as-of-index
(map (decoder tid (codec/id-string id) tid-id-size))
tid-id-size (start-key tid id start-t))))
(i/prefix-entries
snapshot :resource-as-of-index
(comp (map (decoder tid (codec/id-string id) tid-id-size))
(take-while #(< (long t) (rh/purged-at %))))
tid-id-size (start-key tid id start-t))))

(defn- resource-handle* [iter target-buf key-buf value-buf tid id t]
(let [tid-id-size (+ codec/tid-size (bs/size id))
Expand Down Expand Up @@ -347,7 +351,7 @@
[snapshot tid id t]
(let [target-buf (bb/allocate max-key-size)
key-buf (bb/allocate max-key-size)
value-buf (bb/allocate value-size)]
value-buf (bb/allocate max-value-size)]
(with-open [iter (kv/new-iterator snapshot :resource-as-of-index)]
(resource-handle* iter target-buf key-buf value-buf tid id t))))

Expand All @@ -369,7 +373,7 @@
[snapshot t]
(let [target-buf (bb/allocate max-key-size)
key-buf (bb/allocate max-key-size)
value-buf (bb/allocate value-size)
value-buf (bb/allocate max-value-size)
iter (kv/new-iterator snapshot :resource-as-of-index)]
(comp
(keep
Expand All @@ -389,7 +393,7 @@
([snapshot t tid id-extractor matcher]
(let [target-buf (bb/allocate max-key-size)
key-buf (bb/allocate max-key-size)
value-buf (bb/allocate value-size)
value-buf (bb/allocate max-value-size)
iter (kv/new-iterator snapshot :resource-as-of-index)]
(comp
(keep
Expand Down
16 changes: 14 additions & 2 deletions modules/db/src/blaze/db/impl/index/resource_handle.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
(set! *warn-on-reflection* true)
(set! *unchecked-math* :warn-on-boxed)

(deftype ResourceHandle [^int tid id ^long t hash ^long num-changes op]
(deftype ResourceHandle [^int tid id ^long t hash ^long num-changes op ^long purged-at]
p/FhirType
(-type [_]
;; TODO: maybe cache this
Expand All @@ -29,6 +29,7 @@
:hash hash
:num-changes num-changes
:op op
:purged-at purged-at
not-found))

Object
Expand Down Expand Up @@ -61,6 +62,11 @@
:delete
:put)))

(defn- get-purged-at! [vb]
(if (<= 8 (bb/remaining vb))
(bb/get-long! vb)
Long/MAX_VALUE))

(defn resource-handle!
"Creates a new resource handle.
Expand All @@ -74,7 +80,8 @@
t
hash
(state->num-changes state)
(state->op state))))
(state->op state)
(get-purged-at! vb))))

(defn resource-handle? [x]
(instance? ResourceHandle x))
Expand Down Expand Up @@ -116,6 +123,11 @@
[rh]
(.-op ^ResourceHandle rh))

(defn purged-at
{:inline (fn [rh] `(.-purged-at ~(with-meta rh {:tag `ResourceHandle})))}
[rh]
(.-purged-at ^ResourceHandle rh))

(defn reference [rh]
(str (codec/tid->type (tid rh)) "/" (id rh)))

Expand Down
33 changes: 23 additions & 10 deletions modules/db/src/blaze/db/impl/index/rts_as_of.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,27 @@
(identical? :create op) (Numbers/setBit 1)
(identical? :delete op) (Numbers/setBit 0)))

(defn encode-value [hash num-changes op]
(-> (bb/allocate rao/value-size)
(hash/into-byte-buffer! hash)
(bb/put-long! (state num-changes op))
bb/array))
(defn- encode-value
([hash num-changes op]
(-> (bb/allocate rao/min-value-size)
(hash/into-byte-buffer! hash)
(bb/put-long! (state num-changes op))
bb/array))
([hash num-changes op purged-at]
(-> (bb/allocate rao/max-value-size)
(hash/into-byte-buffer! hash)
(bb/put-long! (state num-changes op))
(bb/put-long! purged-at)
bb/array)))

(defn index-entries [tid id t hash num-changes op]
(let [value (encode-value hash num-changes op)]
[[:resource-as-of-index (rao/encode-key tid id t) value]
[:type-as-of-index (tao/encode-key tid t id) value]
[:system-as-of-index (sao/encode-key t tid id) value]]))
(defn index-entries
([tid id t hash num-changes op]
(let [value (encode-value hash num-changes op)]
[[:resource-as-of-index (rao/encode-key tid id t) value]
[:type-as-of-index (tao/encode-key tid t id) value]
[:system-as-of-index (sao/encode-key t tid id) value]]))
([tid id t hash num-changes op purged-at]
(let [value (encode-value hash num-changes op purged-at)]
[[:resource-as-of-index (rao/encode-key tid id t) value]
[:type-as-of-index (tao/encode-key tid t id) value]
[:system-as-of-index (sao/encode-key t tid id) value]])))
17 changes: 9 additions & 8 deletions modules/db/src/blaze/db/impl/index/system_as_of.clj
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@
(Longs/toByteArray (codec/descending-long ^long start-t))))

(defn system-history
"Returns a reducible collection of all versions between `start-t` (inclusive),
`start-tid` (optional, inclusive) and `start-id` (optional, inclusive) of all
resources.
Versions are resource handles."
[snapshot start-t start-tid start-id]
(i/entries snapshot :system-as-of-index (map (decoder))
(bs/from-byte-array (start-key start-t start-tid start-id))))
"Returns a reducible collection of all historic resource handles of the
database with the point in time `t` between `start-t` (inclusive), `start-tid`
(optional, inclusive) and `start-id` (optional, inclusive)."
[snapshot t start-t start-tid start-id]
(i/entries
snapshot :system-as-of-index
(comp (map (decoder))
(filter #(< (long t) (rh/purged-at %))))
(bs/from-byte-array (start-key start-t start-tid start-id))))

(defn changes
"Returns a reducible collection of all resource handles changed at `t`."
Expand Down
15 changes: 9 additions & 6 deletions modules/db/src/blaze/db/impl/index/type_as_of.clj
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@
bs/from-byte-buffer!)))

(defn type-history
"Returns a reducible collection of all historic resource handles between
`start-t` (inclusive) and `start-id` (optional, inclusive) of resources with
`tid`."
[snapshot tid start-t start-id]
(i/prefix-entries snapshot :type-as-of-index (map (decoder tid))
codec/tid-size (start-key tid start-t start-id)))
"Returns a reducible collection of all historic resource handles with type
`tid` of the database with the point in time `t` between `start-t` (inclusive)
and `start-id` (optional, inclusive)."
[snapshot tid t start-t start-id]
(i/prefix-entries
snapshot :type-as-of-index
(comp (map (decoder tid))
(filter #(< (long t) (rh/purged-at %))))
codec/tid-size (start-key tid start-t start-id)))
Loading

0 comments on commit a3436d5

Please sign in to comment.