diff --git a/.github/scripts/admin-api/run-prune-job.sh b/.github/scripts/admin-api/run-prune-job.sh new file mode 100755 index 000000000..e06a7a82e --- /dev/null +++ b/.github/scripts/admin-api/run-prune-job.sh @@ -0,0 +1,59 @@ +#!/bin/bash -e + +SCRIPT_DIR="$(dirname "$(readlink -f "$0")")" +. "$SCRIPT_DIR/../util.sh" + +BASE="http://localhost:8080/fhir" + +prune-job() { +cat <=6.5" + } + }, "node_modules/agent-base": { "version": "7.1.1", "resolved": "https://registry.npmjs.org/agent-base/-/agent-base-7.1.1.tgz", @@ -99,9 +111,9 @@ "license": "MIT" }, "node_modules/axios": { - "version": "1.7.5", - "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.5.tgz", - "integrity": "sha512-fZu86yCo+svH3uqJ/yTdQ0QHpQu5oL+/QE+QPSv6BZSkDAoky9vytxp7u5qk83OJFS3kEBcesWni9WTZAv3tSw==", + "version": "1.7.7", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.7.tgz", + "integrity": "sha512-S4kL7XrjgBmvdGut0sN3yJxqYzrDOnivkBiN0OFs6hLiUam3UPvswUo0kqGyhqUZGEOytHyumEdXsAkgCOUf3Q==", "license": "MIT", "dependencies": { "follow-redirects": "^1.15.6", @@ -115,6 +127,26 @@ "integrity": "sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==", "license": "MIT" }, + "node_modules/base64-js": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", + "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT" + }, "node_modules/brace-expansion": { "version": "1.1.11", "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-1.1.11.tgz", @@ -125,6 +157,30 @@ "concat-map": "0.0.1" } }, + "node_modules/buffer": { + "version": "6.0.3", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-6.0.3.tgz", + "integrity": "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT", + "dependencies": { + "base64-js": "^1.3.1", + "ieee754": "^1.2.1" + } + }, "node_modules/buffer-from": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.2.tgz", @@ -269,12 +325,12 @@ "license": "MIT" }, "node_modules/debug": { - "version": "4.3.6", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.6.tgz", - "integrity": "sha512-O/09Bd4Z1fBrU4VzkhFqVgpPzaGbw6Sm9FEkBT1A/YBXQFGuuSxa1dN2nxgxS34JmKXqYx8CZAwEVoJFImUXIg==", + "version": "4.3.7", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz", + "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==", "license": "MIT", "dependencies": { - "ms": "2.1.2" + "ms": "^2.1.3" }, "engines": { "node": ">=6.0" @@ -310,6 +366,24 @@ "integrity": "sha512-AKrN98kuwOzMIdAizXGI86UFBoo26CL21UM763y1h/GMSJ4/OHU9k2YlsmBpyScFo/wbLzWQJBMCW4+IO3/+OQ==", "license": "MIT" }, + "node_modules/event-target-shim": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-5.0.1.tgz", + "integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==", + "license": "MIT", + "engines": { + "node": ">=6" + } + }, + "node_modules/events": { + "version": "3.3.0", + "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", + "integrity": "sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==", + "license": "MIT", + "engines": { + "node": ">=0.8.x" + } + }, "node_modules/fast-deep-equal": { "version": "3.1.3", "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz", @@ -317,9 +391,9 @@ "license": "MIT" }, "node_modules/fast-uri": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/fast-uri/-/fast-uri-3.0.1.tgz", - "integrity": "sha512-MWipKbbYiYI0UC7cl8m/i/IWTqfC8YXsqjzybjddLsFjStroQzsHXkc73JutMvBiXmOvapk+axIl79ig5t55Bw==", + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/fast-uri/-/fast-uri-3.0.2.tgz", + "integrity": "sha512-GR6f0hD7XXyNJa25Tb9BuIdN0tdr+0BMi6/CJPH3wJO1JjNG3n/VsSw38AwRdKZABm8lGbPfakLRkYzx2V9row==", "license": "MIT" }, "node_modules/fecha": { @@ -444,9 +518,9 @@ "license": "MIT" }, "node_modules/follow-redirects": { - "version": "1.15.6", - "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.6.tgz", - "integrity": "sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==", + "version": "1.15.9", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.9.tgz", + "integrity": "sha512-gew4GsXizNgdoRyqmyfMHyAmXsZDk6mHkSxZFCzW9gwlbtOW44CDtYavM+y+72qD/Vq2l550kMF52DT8fOLJqQ==", "funding": [ { "type": "individual", @@ -522,9 +596,9 @@ "license": "ISC" }, "node_modules/fsh-sushi": { - "version": "3.11.1", - "resolved": "https://registry.npmjs.org/fsh-sushi/-/fsh-sushi-3.11.1.tgz", - "integrity": "sha512-rQtA/mDktM8uH2aekRKLCLOnpGNWylygFo5PqM5Q5iMu8/O7DvQ/QOwbbsVr7ZGE5FPwz9jnX1QzXRZoqZK5MA==", + "version": "3.12.0", + "resolved": "https://registry.npmjs.org/fsh-sushi/-/fsh-sushi-3.12.0.tgz", + "integrity": "sha512-VFIWj1w7YH35rKeRic0NwDffh7JEvv+jXu4isFdxMxZvE6wcmpwgf8Fu35H0LuyZXLYoG4IvG5FFTxHTPp5lZw==", "license": "Apache-2.0", "dependencies": { "ajv": "^8.17.1", @@ -642,6 +716,26 @@ "node": ">= 14" } }, + "node_modules/ieee754": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", + "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "BSD-3-Clause" + }, "node_modules/inflight": { "version": "1.0.6", "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz", @@ -864,9 +958,9 @@ } }, "node_modules/ms": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", "license": "MIT" }, "node_modules/no-case": { @@ -926,6 +1020,15 @@ "node": ">=0.10.0" } }, + "node_modules/process": { + "version": "0.11.10", + "resolved": "https://registry.npmjs.org/process/-/process-0.11.10.tgz", + "integrity": "sha512-cdGef/drWFoydD1JsMzuFf8100nZl+GT+yacc2bEced5f9Rjk4z+WtFUTBu9PhOi9j/jfmBPu0mMEY4wIdAF8A==", + "license": "MIT", + "engines": { + "node": ">= 0.6.0" + } + }, "node_modules/proxy-from-env": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", @@ -1273,19 +1376,35 @@ } }, "node_modules/winston-transport": { - "version": "4.7.1", - "resolved": "https://registry.npmjs.org/winston-transport/-/winston-transport-4.7.1.tgz", - "integrity": "sha512-wQCXXVgfv/wUPOfb2x0ruxzwkcZfxcktz6JIMUaPLmcNhO4bZTwA/WtDWK74xV3F2dKu8YadrFv0qhwYjVEwhA==", + "version": "4.8.0", + "resolved": "https://registry.npmjs.org/winston-transport/-/winston-transport-4.8.0.tgz", + "integrity": "sha512-qxSTKswC6llEMZKgCQdaWgDuMJQnhuvF5f2Nk3SNXc4byfQ+voo2mX1Px9dkNOuR8p0KAjfPG29PuYUSIb+vSA==", "license": "MIT", "dependencies": { "logform": "^2.6.1", - "readable-stream": "^3.6.2", + "readable-stream": "^4.5.2", "triple-beam": "^1.3.0" }, "engines": { "node": ">= 12.0.0" } }, + "node_modules/winston-transport/node_modules/readable-stream": { + "version": "4.5.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-4.5.2.tgz", + "integrity": "sha512-yjavECdqeZ3GLXNgRXgeQEdz9fvDDkNKyHnbHRFtOr7/LcfgBcmct7t/ET+HaCTqfh06OzoAxrkN/IfjJBVe+g==", + "license": "MIT", + "dependencies": { + "abort-controller": "^3.0.0", + "buffer": "^6.0.3", + "events": "^3.3.0", + "process": "^0.11.10", + "string_decoder": "^1.3.0" + }, + "engines": { + "node": "^12.22.0 || ^14.17.0 || >=16.0.0" + } + }, "node_modules/wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", diff --git a/job-ig/package.json b/job-ig/package.json index bad3c99e9..16d3c77ae 100644 --- a/job-ig/package.json +++ b/job-ig/package.json @@ -1,5 +1,5 @@ { "dependencies": { - "fsh-sushi": "^3.11.1" + "fsh-sushi": "^3.12.0" } } diff --git a/modules/admin-api/deps.edn b/modules/admin-api/deps.edn index 840c7d0d9..3f52ee9af 100644 --- a/modules/admin-api/deps.edn +++ b/modules/admin-api/deps.edn @@ -19,6 +19,9 @@ blaze/job-compact {:local/root "../job-compact"} + blaze/job-prune + {:local/root "../job-prune"} + blaze/job-re-index {:local/root "../job-re-index"} diff --git a/modules/admin-api/src/blaze/admin_api.clj b/modules/admin-api/src/blaze/admin_api.clj index 6a0389f7a..24b38b6f4 100644 --- a/modules/admin-api/src/blaze/admin_api.clj +++ b/modules/admin-api/src/blaze/admin_api.clj @@ -18,6 +18,7 @@ [blaze.interaction.util :as iu] [blaze.job-scheduler :as js] [blaze.job-scheduler.spec] + [blaze.job.prune] [blaze.job.re-index] [blaze.middleware.fhir.db :as db] [blaze.middleware.fhir.output :as fhir-output] @@ -184,7 +185,8 @@ :wrap link-headers/wrap-link-headers}) (def ^:private allowed-profiles - #{#fhir/canonical"https://samply.github.io/blaze/fhir/StructureDefinition/ReIndexJob" + #{#fhir/canonical"https://samply.github.io/blaze/fhir/StructureDefinition/PruneJob" + #fhir/canonical"https://samply.github.io/blaze/fhir/StructureDefinition/ReIndexJob" #fhir/canonical"https://samply.github.io/blaze/fhir/StructureDefinition/CompactJob" #fhir/canonical"https://samply.github.io/blaze/fhir/StructureDefinition/AsyncInteractionJob"}) @@ -468,12 +470,17 @@ "blaze/job/async_interaction/StructureDefinition-AsyncInteractionJob.json" "blaze/job/async_interaction/StructureDefinition-AsyncInteractionRequestBundle.json" "blaze/job/async_interaction/StructureDefinition-AsyncInteractionResponseBundle.json" - "blaze/job/async_interaction/CodeSystem-AsyncInteractionJobParameter.json" "blaze/job/async_interaction/CodeSystem-AsyncInteractionJobOutput.json" + "blaze/job/async_interaction/CodeSystem-AsyncInteractionJobParameter.json" "blaze/job/compact/StructureDefinition-CompactJob.json" + "blaze/job/prune/StructureDefinition-PruneJob.json" + "blaze/job/prune/CodeSystem-PruneIndices.json" + "blaze/job/prune/CodeSystem-PruneJobOutput.json" + "blaze/job/prune/CodeSystem-PruneJobParameter.json" + "blaze/job/prune/ValueSet-PruneIndices.json" "blaze/job/re_index/StructureDefinition-ReIndexJob.json" - "blaze/job/re_index/CodeSystem-ReIndexJobParameter.json" - "blaze/job/re_index/CodeSystem-ReIndexJobOutput.json"]) + "blaze/job/re_index/CodeSystem-ReIndexJobOutput.json" + "blaze/job/re_index/CodeSystem-ReIndexJobParameter.json"]) s)) (defn- create-validator* [] diff --git a/modules/db/src/blaze/db/api.clj b/modules/db/src/blaze/db/api.clj index b4440a57c..4e77d2847 100644 --- a/modules/db/src/blaze/db/api.clj +++ b/modules/db/src/blaze/db/api.clj @@ -526,6 +526,11 @@ ;; ---- (Re) Index ------------------------------------------------------------ (defn re-index-total + "Returns the total number of resources that have to be processed when + (re)indexing the search parameter with `search-param-url`. + + Returns an anomaly if the search parameter with `search-param-url` was not + found." [db search-param-url] (p/-re-index-total db search-param-url)) @@ -541,3 +546,25 @@ (p/-re-index db search-param-url)) ([db search-param-url start-type start-id] (p/-re-index db search-param-url start-type start-id))) + +;; ---- Prune ----------------------------------------------------------------- + +(defn prune-total + "Returns the estimated total number of index entries that have to be processed + during pruning." + [node] + (np/-prune-total node)) + +(defn prune + "Removes purged and outdated index entries from `node` which were purged at or + before `t`. + + Processes at most `n` index entries. The function `prune-total` can be used to + determine to total number of index entries needed to process. + + Returns a map of :index, :type, :id and :t which can supplied to the optional + `start` argument to continue the pruning." + ([node n t] + (np/-prune node n t nil)) + ([node n t start] + (np/-prune node n t start))) diff --git a/modules/db/src/blaze/db/api_spec.clj b/modules/db/src/blaze/db/api_spec.clj index 304f704c9..dea7d1947 100644 --- a/modules/db/src/blaze/db/api_spec.clj +++ b/modules/db/src/blaze/db/api_spec.clj @@ -263,3 +263,12 @@ :start (s/? (s/cat :start-type :fhir.resource/type :start-id :blaze.resource/id))) :ret ac/completable-future?) + +(s/fdef d/prune-total + :args (s/cat :node :blaze.db/node) + :ret nat-int?) + +(s/fdef d/prune + :args (s/cat :node :blaze.db/node :n pos-int? :t :blaze.db/t + :start (s/? :blaze.db.prune/start)) + :ret ac/completable-future?) diff --git a/modules/db/src/blaze/db/impl/index/resource_as_of.clj b/modules/db/src/blaze/db/impl/index/resource_as_of.clj index 6eeadee16..7df93ecf5 100644 --- a/modules/db/src/blaze/db/impl/index/resource_as_of.clj +++ b/modules/db/src/blaze/db/impl/index/resource_as_of.clj @@ -151,11 +151,16 @@ (search-entry! kb vb) result)))))))) -(defn- encode-key-buf [tid id t] - (-> (bb/allocate (unchecked-add-int except-id-key-size (bs/size id))) - (bb/put-int! tid) - (bb/put-byte-string! id) - (bb/put-long! (codec/descending-long t)))) +(defn- encode-key-buf + ([tid id] + (-> (bb/allocate (unchecked-add-int codec/tid-size (bs/size id))) + (bb/put-int! tid) + (bb/put-byte-string! id))) + ([tid id t] + (-> (bb/allocate (unchecked-add-int except-id-key-size (bs/size id))) + (bb/put-int! tid) + (bb/put-byte-string! id) + (bb/put-long! (codec/descending-long t))))) (defn encode-key "Encodes the key of the ResourceAsOf index from `tid`, `id` and `t`." @@ -179,6 +184,10 @@ (defn- start-key ([tid] (-> (Ints/toByteArray tid) bs/from-byte-array)) + ([tid start-id] + (-> (encode-key-buf tid start-id) + bb/flip! + bs/from-byte-buffer!)) ([tid start-id t] (-> (encode-key-buf tid start-id t) bb/flip! @@ -262,6 +271,7 @@ iteration. The state and t which are both longs are read from the off-heap key and value buffer. The hash and state which are read from the value buffer are only read once for each resource handle." + {:arglists '([batch-db tid] [batch-db tid start-id])} ([{:keys [snapshot t]} tid] (i/entries snapshot :resource-as-of-index (type-list-xf t tid) (start-key tid))) @@ -282,6 +292,7 @@ tid and resource id. The list starts at the optional `start-tid` and `start-id`." + {:arglists '([batch-db] [batch-db start-tid start-id])} ([{:keys [snapshot t]}] (i/entries snapshot :resource-as-of-index (system-list-xf t nil))) ([{:keys [snapshot t]} start-tid start-id] @@ -405,3 +416,52 @@ (when (matcher input handle) handle)))) (closer iter))))) + +(defn- delete-entry! [kb] + (bb/set-position! kb 0) + (let [key (byte-array (bb/limit kb))] + (bb/copy-into-byte-array! kb key) + [:resource-as-of-index key])) + +(defn- prune-rf [n] + (fn [ret {:keys [idx delete-entry] [tid id t] :key}] + (if (= idx n) + (reduced (assoc ret :next {:tid tid :id id :t t})) + (cond-> (update ret :num-entries-processed inc) + delete-entry + (update :delete-entries conj delete-entry))))) + +(defn- prune-key! [kb] + [(bb/get-int! kb) + (bs/from-byte-buffer! kb (- (bb/remaining kb) codec/t-size)) + (codec/descending-long (bb/get-long! kb))]) + +(defn- prune-xf [t] + (map-indexed + (fn [idx [kb vb]] + (bb/set-position! vb (+ hash/size codec/state-size)) + (cond-> + {:idx idx :key (prune-key! kb)} + (rh/purged!? vb t) + (assoc :delete-entry (delete-entry! kb)))))) + +(defn prune + "Scans the ResourceAsOf index for entries which were purged at or before `t`. + + Processes at most `n` entries and optionally starts at the entry with + `start-tid` and `start-id`. + + Returns a map with :delete-entries and :next where :delete-entries is a + vector of all index entries to delete and :next is a map of :tid and :id of + the index entry to start with in the next iteration if necessary." + ([snapshot n t] + (reduce + (prune-rf n) + {:delete-entries [] :num-entries-processed 0} + (i/entries snapshot :resource-as-of-index (prune-xf t)))) + ([snapshot n t start-tid start-id start-t] + (reduce + (prune-rf n) + {:delete-entries [] :num-entries-processed 0} + (i/entries snapshot :resource-as-of-index (prune-xf t) + (start-key start-tid start-id start-t))))) diff --git a/modules/db/src/blaze/db/impl/index/resource_handle.clj b/modules/db/src/blaze/db/impl/index/resource_handle.clj index b025b860a..5819f2715 100644 --- a/modules/db/src/blaze/db/impl/index/resource_handle.clj +++ b/modules/db/src/blaze/db/impl/index/resource_handle.clj @@ -61,6 +61,16 @@ :delete :put))) +(defn purged!? + "Returns true if the index entry with `vb` has an encoded purged-at t at or + before `t`. + + The position of `vb` has to be after reading the state and will be incremented + by 8 byte in case the end isn't reached." + [vb t] + (and (= (bb/remaining vb) codec/t-size) + (<= (bb/get-long! vb) (long t)))) + (defn resource-handle! "Creates a new resource handle when not purged at `base-t`. @@ -68,7 +78,7 @@ [tid id t base-t vb] (let [hash (hash/from-byte-buffer! vb) state (bb/get-long! vb)] - (when (or (< (bb/remaining vb) 8) (< (long base-t) (bb/get-long! vb))) + (when-not (purged!? vb base-t) (ResourceHandle. tid id diff --git a/modules/db/src/blaze/db/impl/index/system_as_of.clj b/modules/db/src/blaze/db/impl/index/system_as_of.clj index 05f5809b9..cbc293152 100644 --- a/modules/db/src/blaze/db/impl/index/system_as_of.clj +++ b/modules/db/src/blaze/db/impl/index/system_as_of.clj @@ -5,7 +5,8 @@ [blaze.byte-string :as bs] [blaze.db.impl.codec :as codec] [blaze.db.impl.index.resource-handle :as rh] - [blaze.db.impl.iterators :as i]) + [blaze.db.impl.iterators :as i] + [blaze.fhir.hash :as hash]) (:import [com.google.common.primitives Longs])) @@ -45,22 +46,21 @@ (bb/put-byte-string! id) bb/array)) -(defn- encode-t-tid [start-t start-tid] - (-> (bb/allocate t-tid-size) - (bb/put-long! (codec/descending-long ^long start-t)) - (bb/put-int! start-tid) - bb/array)) - (defn- start-key [start-t start-tid start-id] (cond start-id - (encode-key start-t start-tid start-id) + (bs/from-byte-array (encode-key start-t start-tid start-id)) start-tid - (encode-t-tid start-t start-tid) + (-> (bb/allocate t-tid-size) + (bb/put-long! (codec/descending-long ^long start-t)) + (bb/put-int! start-tid) + bb/flip! + bs/from-byte-buffer!) :else - (Longs/toByteArray (codec/descending-long ^long start-t)))) + (-> (Longs/toByteArray (codec/descending-long ^long start-t)) + bs/from-byte-array))) (defn system-history "Returns a reducible collection of all historic resource handles of the @@ -70,10 +70,59 @@ (i/entries snapshot :system-as-of-index (keep (decoder t)) - (bs/from-byte-array (start-key start-t start-tid start-id)))) + (start-key start-t start-tid start-id))) (defn changes "Returns a reducible collection of all resource handles changed at `t`." [snapshot t] (i/prefix-entries snapshot :system-as-of-index (keep (decoder t)) codec/t-size - (bs/from-byte-array (start-key t nil nil)))) + (start-key t nil nil))) + +(defn- delete-entry! [kb] + (bb/set-position! kb 0) + (let [key (byte-array (bb/limit kb))] + (bb/copy-into-byte-array! kb key) + [:system-as-of-index key])) + +(defn- prune-rf [n] + (fn [ret {:keys [idx delete-entry] [t tid id] :key}] + (if (= idx n) + (reduced (assoc ret :next {:t t :tid tid :id id})) + (cond-> (update ret :num-entries-processed inc) + delete-entry + (update :delete-entries conj delete-entry))))) + +(defn- prune-key! [kb] + [(codec/descending-long (bb/get-long! kb)) + (bb/get-int! kb) + (bs/from-byte-buffer! kb (bb/remaining kb))]) + +(defn- prune-xf [t] + (map-indexed + (fn [idx [kb vb]] + (bb/set-position! vb (+ hash/size codec/state-size)) + (cond-> + {:idx idx :key (prune-key! kb)} + (rh/purged!? vb t) + (assoc :delete-entry (delete-entry! kb)))))) + +(defn prune + "Scans the SystemAsOf index for entries which were purged at or before `t`. + + Processes at most `n` entries and optionally starts at the entry with + `start-t`, `start-tid` and `start-id`. + + Returns a map with :delete-entries and :next where :delete-entries is a + vector of all index entries to delete and :next is a map of :tid, :t and :id + of the index entry to start with in the next iteration if necessary." + ([snapshot n t] + (reduce + (prune-rf n) + {:delete-entries [] :num-entries-processed 0} + (i/entries snapshot :system-as-of-index (prune-xf t)))) + ([snapshot n t start-t start-tid start-id] + (reduce + (prune-rf n) + {:delete-entries [] :num-entries-processed 0} + (i/entries snapshot :system-as-of-index (prune-xf t) + (start-key start-t start-tid start-id))))) diff --git a/modules/db/src/blaze/db/impl/index/type_as_of.clj b/modules/db/src/blaze/db/impl/index/type_as_of.clj index 59ff36d41..d18ef3db1 100644 --- a/modules/db/src/blaze/db/impl/index/type_as_of.clj +++ b/modules/db/src/blaze/db/impl/index/type_as_of.clj @@ -5,7 +5,8 @@ [blaze.byte-string :as bs] [blaze.db.impl.codec :as codec] [blaze.db.impl.index.resource-handle :as rh] - [blaze.db.impl.iterators :as i])) + [blaze.db.impl.iterators :as i] + [blaze.fhir.hash :as hash])) (set! *warn-on-reflection* true) (set! *unchecked-math* :warn-on-boxed) @@ -62,3 +63,52 @@ snapshot :type-as-of-index (keep (decoder tid t)) codec/tid-size (start-key tid start-t start-id))) + +(defn- delete-entry! [kb] + (bb/set-position! kb 0) + (let [key (byte-array (bb/limit kb))] + (bb/copy-into-byte-array! kb key) + [:type-as-of-index key])) + +(defn- prune-rf [n] + (fn [ret {:keys [idx delete-entry] [tid t id] :key}] + (if (= idx n) + (reduced (assoc ret :next {:tid tid :t t :id id})) + (cond-> (update ret :num-entries-processed inc) + delete-entry + (update :delete-entries conj delete-entry))))) + +(defn- prune-key! [kb] + [(bb/get-int! kb) + (codec/descending-long (bb/get-long! kb)) + (bs/from-byte-buffer! kb (bb/remaining kb))]) + +(defn- prune-xf [t] + (map-indexed + (fn [idx [kb vb]] + (bb/set-position! vb (+ hash/size codec/state-size)) + (cond-> + {:idx idx :key (prune-key! kb)} + (rh/purged!? vb t) + (assoc :delete-entry (delete-entry! kb)))))) + +(defn prune + "Scans the TypeAsOf index for entries which were purged at or before `t`. + + Processes at most `n` entries and optionally starts at the entry with + `start-tid`, `start-t` and `start-id`. + + Returns a map with :delete-entries and :next where :delete-entries is a + vector of all index entries to delete and :next is a map of :tid, :t and :id + of the index entry to start with in the next iteration if necessary." + ([snapshot n t] + (reduce + (prune-rf n) + {:delete-entries [] :num-entries-processed 0} + (i/entries snapshot :type-as-of-index (prune-xf t)))) + ([snapshot n t start-tid start-t start-id] + (reduce + (prune-rf n) + {:delete-entries [] :num-entries-processed 0} + (i/entries snapshot :type-as-of-index (prune-xf t) + (start-key start-tid start-t start-id))))) diff --git a/modules/db/src/blaze/db/node.clj b/modules/db/src/blaze/db/node.clj index 7cf43cdb4..cdc97645c 100644 --- a/modules/db/src/blaze/db/node.clj +++ b/modules/db/src/blaze/db/node.clj @@ -10,10 +10,13 @@ [blaze.db.impl.db :as db] [blaze.db.impl.index :as index] [blaze.db.impl.index.patient-last-change :as plc] + [blaze.db.impl.index.resource-as-of :as rao] [blaze.db.impl.index.resource-handle :as rh] + [blaze.db.impl.index.system-as-of :as sao] [blaze.db.impl.index.t-by-instant :as t-by-instant] [blaze.db.impl.index.tx-error :as tx-error] [blaze.db.impl.index.tx-success :as tx-success] + [blaze.db.impl.index.type-as-of :as tao] [blaze.db.impl.protocols :as p] [blaze.db.kv :as kv] [blaze.db.node.patient-last-change-index :as node-plc] @@ -249,6 +252,38 @@ (with-open [db (batch-db/new-batch-db node t t)] (into [] (take-while #(= t (rh/t %))) (d/type-history db type)))) +(defn- prune-next [{:keys [tid id t] :as next} index next-index] + (cond + next + {:index index + :type (codec/tid->type tid) + :id (codec/id-string id) + :t t} + next-index + {:index next-index})) + +(defmulti prune (fn [_ _ _ {:keys [index]}] index)) + +(defmethod prune nil [snapshot n t _] + (-> (rao/prune snapshot n t) + (update :next prune-next :resource-as-of-index :type-as-of-index))) + +(defmethod prune :resource-as-of-index [snapshot n t {:keys [type id] :as start}] + (-> (rao/prune snapshot n t (codec/tid type) (codec/id-byte-string id) (:t start)) + (update :next prune-next :resource-as-of-index :type-as-of-index))) + +(defmethod prune :type-as-of-index [snapshot n t {:keys [type id] :as start}] + (-> (if type + (tao/prune snapshot n t (codec/tid type) (:t start) (codec/id-byte-string id)) + (tao/prune snapshot n t)) + (update :next prune-next :type-as-of-index :system-as-of-index))) + +(defmethod prune :system-as-of-index [snapshot n t {:keys [type id] :as start}] + (-> (if type + (sao/prune snapshot n t (:t start) (codec/tid type) (codec/id-byte-string id)) + (sao/prune snapshot n t)) + (update :next prune-next :system-as-of-index nil))) + (defrecord Node [context tx-log tx-cache kv-store resource-store sync-fn search-param-registry resource-indexer state run? poll-timeout finished] @@ -304,6 +339,21 @@ (flow/submit! publisher changed-handles))))) publisher)) + (-prune-total [_] + (+ (kv/estimate-num-keys kv-store :resource-as-of-index) + (kv/estimate-num-keys kv-store :type-as-of-index) + (kv/estimate-num-keys kv-store :system-as-of-index))) + + (-prune [_ n t start] + (log/trace "prune at most" n "index entries starting at t =" t) + (let [{:keys [delete-entries num-entries-processed next]} + (with-open [snapshot (kv/new-snapshot kv-store)] + (prune snapshot n t start))] + (kv/delete! kv-store delete-entries) + {:num-index-entries-processed num-entries-processed + :num-index-entries-deleted (count delete-entries) + :next next})) + p/Tx (-tx [_ t] (tx-success/tx tx-cache t)) diff --git a/modules/db/src/blaze/db/node/protocols.clj b/modules/db/src/blaze/db/node/protocols.clj index ef3ce0dca..a78136c99 100644 --- a/modules/db/src/blaze/db/node/protocols.clj +++ b/modules/db/src/blaze/db/node/protocols.clj @@ -5,4 +5,6 @@ (-sync [node] [node t]) (-submit-tx [node tx-ops]) (-tx-result [node t]) - (-changed-resources-publisher [node type])) + (-changed-resources-publisher [node type]) + (-prune-total [node]) + (-prune [node n t start])) diff --git a/modules/db/src/blaze/db/spec.clj b/modules/db/src/blaze/db/spec.clj index 1f76f64cd..d9b290ec4 100644 --- a/modules/db/src/blaze/db/spec.clj +++ b/modules/db/src/blaze/db/spec.clj @@ -116,3 +116,13 @@ (s/def :blaze.db/allow-multiple-delete boolean?) + +(s/def :blaze.db.prune/index + #{:resource-as-of-index :type-as-of-index :system-as-of-index}) + +(s/def :blaze.db.prune/start + (s/keys :req-un [(or :blaze.db.prune/index + (and :blaze.db.prune/index + :fhir.resource/type + :blaze.resource/id + :blaze.db/t))])) diff --git a/modules/db/test/blaze/db/api_test.clj b/modules/db/test/blaze/db/api_test.clj index e74a748f9..3cdff5ca9 100644 --- a/modules/db/test/blaze/db/api_test.clj +++ b/modules/db/test/blaze/db/api_test.clj @@ -11,6 +11,8 @@ [blaze.db.api-spec] [blaze.db.impl.db-spec] [blaze.db.impl.index.resource-search-param-value-test-util :as r-sp-v-tu] + [blaze.db.impl.iterators :as i] + [blaze.db.kv :as kv] [blaze.db.kv.mem-spec] [blaze.db.node :as node] [blaze.db.node-spec] @@ -7574,3 +7576,98 @@ "Observation" "09999") :num-resources := 1 :next := nil)))))))) + +(defn- prune-steps [node n t num-steps] + (take num-steps (iterate (fn [{start :next}] (d/prune node n t start)) (d/prune node n t)))) + +(deftest prune-total-test + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]]] + + (is (= 3 (d/prune-total node)))) + + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}] + [:create {:fhir/type :fhir/Patient :id "1"}]]] + + (is (= 6 (d/prune-total node))))) + +(deftest prune-test + (testing "one purged patient in three steps" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]] + [[:patient-purge "0"]]] + + (testing "the next handles go from one index to the next" + (given (prune-steps node 1 2 3) + count := 3 + [0 :num-index-entries-processed] := 1 + [0 :num-index-entries-deleted] := 1 + [0 :next] := {:index :type-as-of-index} + [1 :num-index-entries-processed] := 1 + [1 :num-index-entries-deleted] := 1 + [1 :next] := {:index :system-as-of-index} + [2 :num-index-entries-processed] := 1 + [2 :num-index-entries-deleted] := 1 + [2 :next] := nil)) + + (testing "all index entries are gone" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (is (empty? (vec (i/entries snapshot :resource-as-of-index identity)))) + (is (empty? (vec (i/entries snapshot :type-as-of-index identity)))) + (is (empty? (vec (i/entries snapshot :system-as-of-index identity)))))))) + + (testing "two purged patients in three steps" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}] + [:create {:fhir/type :fhir/Patient :id "1"}]] + [[:patient-purge "0"]] + [[:patient-purge "1"]]] + + (testing "the next handles go from one index to the next" + (given (prune-steps node 2 3 3) + count := 3 + [0 :num-index-entries-processed] := 2 + [0 :num-index-entries-deleted] := 2 + [0 :next] := {:index :type-as-of-index} + [1 :num-index-entries-processed] := 2 + [1 :num-index-entries-deleted] := 2 + [1 :next] := {:index :system-as-of-index} + [2 :num-index-entries-processed] := 2 + [2 :num-index-entries-deleted] := 2 + [2 :next] := nil)) + + (testing "all index entries are gone" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (is (empty? (vec (i/entries snapshot :resource-as-of-index identity)))) + (is (empty? (vec (i/entries snapshot :type-as-of-index identity)))) + (is (empty? (vec (i/entries snapshot :system-as-of-index identity)))))))) + + (testing "two purged patients in six steps" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}] + [:create {:fhir/type :fhir/Patient :id "1"}]] + [[:patient-purge "0"]] + [[:patient-purge "1"]]] + + (testing "the next handles go from one index to the next with sub steps" + (given (prune-steps node 1 3 6) + count := 6 + [0 :num-index-entries-processed] := 1 + [0 :next] := {:index :resource-as-of-index :type "Patient" :id "1" :t 1} + [1 :num-index-entries-processed] := 1 + [1 :next] := {:index :type-as-of-index} + [2 :num-index-entries-processed] := 1 + [2 :next] := {:index :type-as-of-index :type "Patient" :id "1" :t 1} + [3 :num-index-entries-processed] := 1 + [3 :next] := {:index :system-as-of-index} + [4 :num-index-entries-processed] := 1 + [4 :next] := {:index :system-as-of-index :type "Patient" :id "1" :t 1} + [5 :num-index-entries-processed] := 1 + [5 :next] := nil)) + + (testing "all index entries are gone" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (is (empty? (vec (i/entries snapshot :resource-as-of-index identity)))) + (is (empty? (vec (i/entries snapshot :type-as-of-index identity)))) + (is (empty? (vec (i/entries snapshot :system-as-of-index identity))))))))) diff --git a/modules/db/test/blaze/db/impl/index/resource_as_of_spec.clj b/modules/db/test/blaze/db/impl/index/resource_as_of_spec.clj index 82089ad85..1c91ed68f 100644 --- a/modules/db/test/blaze/db/impl/index/resource_as_of_spec.clj +++ b/modules/db/test/blaze/db/impl/index/resource_as_of_spec.clj @@ -19,13 +19,13 @@ :ret bytes?) (s/fdef rao/type-list - :args (s/cat :context (s/keys :req-un [::kv/snapshot :blaze.db/t]) + :args (s/cat :batch-db :blaze.db.impl/batch-db :tid :blaze.db/tid :start-id (s/? :blaze.db/id-byte-string)) :ret (cs/coll-of :blaze.db/resource-handle)) (s/fdef rao/system-list - :args (s/cat :context (s/keys :req-un [::kv/snapshot :blaze.db/t]) + :args (s/cat :batch-db :blaze.db.impl/batch-db :start (s/? (s/cat :start-tid :blaze.db/tid :start-id :blaze.db/id-byte-string))) :ret (cs/coll-of :blaze.db/resource-handle)) @@ -57,3 +57,12 @@ :id-extractor (s/? fn?) :matcher (s/? fn?)) :ret fn?) + +(s/fdef rao/prune + :args (s/cat :snapshot ::kv/snapshot + :n pos-int? + :t :blaze.db/t + :start (s/? (s/cat :start-tid :blaze.db/tid + :start-id :blaze.db/id-byte-string + :start-t :blaze.db/t))) + :ret (s/coll-of ::kv/delete-entry :kind vector?)) diff --git a/modules/db/test/blaze/db/impl/index/resource_as_of_test.clj b/modules/db/test/blaze/db/impl/index/resource_as_of_test.clj new file mode 100644 index 000000000..950aa6563 --- /dev/null +++ b/modules/db/test/blaze/db/impl/index/resource_as_of_test.clj @@ -0,0 +1,104 @@ +(ns blaze.db.impl.index.resource-as-of-test + (:require + [blaze.db.impl.codec :as codec] + [blaze.db.impl.index.resource-as-of :as rao] + [blaze.db.impl.index.resource-as-of-spec] + [blaze.db.impl.index.resource-as-of-test-util :as rao-tu] + [blaze.db.kv :as kv] + [blaze.db.test-util :refer [config with-system-data]] + [blaze.module.test-util :refer [with-system]] + [blaze.test-util :as tu] + [clojure.spec.test.alpha :as st] + [clojure.test :as test :refer [deftest testing]] + [juxt.iota :refer [given]])) + +(st/instrument) + +(test/use-fixtures :each tu/fixture) + +(deftest prune-test + (testing "empty database" + (with-system [{:blaze.db/keys [node]} config] + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 10 0) + :delete-entries := [] + :num-entries-processed := 0 + :next := nil)))) + + (testing "one non-purged patient" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]]] + + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 10 1) + :delete-entries := [] + :num-entries-processed := 1 + :next := nil)))) + + (testing "one purged patient" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]] + [[:patient-purge "0"]]] + + (testing "returns no delete entry at t=1" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 10 1) + [:delete-entries count] := 0 + :num-entries-processed := 1 + :next := nil))) + + (testing "returns one delete entry at t=2" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 10 2) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :resource-as-of-index + [:delete-entries 0 1 rao-tu/decode-key] := {:type "Patient" :id "0" :t 1} + :num-entries-processed := 1 + :next := nil))))) + + (testing "two purged patients" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}] + [:create {:fhir/type :fhir/Patient :id "1"}]] + [[:patient-purge "0"]] + [[:patient-purge "1"]]] + + (testing "returns one delete entry at t=2" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 10 2) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :resource-as-of-index + [:delete-entries 0 1 rao-tu/decode-key] := {:type "Patient" :id "0" :t 1} + :num-entries-processed := 2 + :next := nil))) + + (testing "returns one delete entry at t=3 and n=1" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 1 3) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :resource-as-of-index + [:delete-entries 0 1 rao-tu/decode-key] := {:type "Patient" :id "0" :t 1} + :num-entries-processed := 1 + [:next :tid] := (codec/tid "Patient") + [:next :id] := (codec/id-byte-string "1") + [:next :t] := 1)) + + (testing "it's possible to continue with the next entry" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 2 3 (codec/tid "Patient") (codec/id-byte-string "1") 1) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :resource-as-of-index + [:delete-entries 0 1 rao-tu/decode-key] := {:type "Patient" :id "1" :t 1} + :num-entries-processed := 1 + :next := nil)))) + + (testing "returns two delete entries at t=3" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (rao/prune snapshot 10 3) + [:delete-entries count] := 2 + [:delete-entries 0 0] := :resource-as-of-index + [:delete-entries 0 1 rao-tu/decode-key] := {:type "Patient" :id "0" :t 1} + [:delete-entries 1 0] := :resource-as-of-index + [:delete-entries 1 1 rao-tu/decode-key] := {:type "Patient" :id "1" :t 1} + :num-entries-processed := 2 + :next := nil)))))) diff --git a/modules/db/test/blaze/db/impl/index/system_as_of_spec.clj b/modules/db/test/blaze/db/impl/index/system_as_of_spec.clj index 7e63b795c..08778fc0c 100644 --- a/modules/db/test/blaze/db/impl/index/system_as_of_spec.clj +++ b/modules/db/test/blaze/db/impl/index/system_as_of_spec.clj @@ -7,6 +7,7 @@ [blaze.db.impl.index.resource-handle-spec] [blaze.db.impl.index.system-as-of :as sao] [blaze.db.impl.iterators-spec] + [blaze.db.kv :as-alias kv] [blaze.db.kv-spec] [blaze.db.kv.spec] [blaze.db.spec] @@ -20,3 +21,12 @@ :start-tid (s/nilable :blaze.db/tid) :start-id (s/nilable :blaze.db/id-byte-string)) :ret (cs/coll-of :blaze.db/resource-handle)) + +(s/fdef sao/prune + :args (s/cat :snapshot ::kv/snapshot + :n pos-int? + :t :blaze.db/t + :start (s/? (s/cat :start-t :blaze.db/t + :start-tid :blaze.db/tid + :start-id :blaze.db/id-byte-string))) + :ret (s/coll-of ::kv/delete-entry :kind vector?)) diff --git a/modules/db/test/blaze/db/impl/index/system_as_of_test.clj b/modules/db/test/blaze/db/impl/index/system_as_of_test.clj new file mode 100644 index 000000000..3bfc5f970 --- /dev/null +++ b/modules/db/test/blaze/db/impl/index/system_as_of_test.clj @@ -0,0 +1,104 @@ +(ns blaze.db.impl.index.system-as-of-test + (:require + [blaze.db.impl.codec :as codec] + [blaze.db.impl.index.system-as-of :as sao] + [blaze.db.impl.index.system-as-of-spec] + [blaze.db.impl.index.system-as-of-test-util :as sao-tu] + [blaze.db.kv :as kv] + [blaze.db.test-util :refer [config with-system-data]] + [blaze.module.test-util :refer [with-system]] + [blaze.test-util :as tu] + [clojure.spec.test.alpha :as st] + [clojure.test :as test :refer [deftest testing]] + [juxt.iota :refer [given]])) + +(st/instrument) + +(test/use-fixtures :each tu/fixture) + +(deftest prune-test + (testing "empty database" + (with-system [{:blaze.db/keys [node]} config] + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 10 0) + :delete-entries := [] + :num-entries-processed := 0 + :next := nil)))) + + (testing "one non-purged patient" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]]] + + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 10 1) + :delete-entries := [] + :num-entries-processed := 1 + :next := nil)))) + + (testing "one purged patient" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]] + [[:patient-purge "0"]]] + + (testing "returns no delete entry at t=1" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 10 1) + [:delete-entries count] := 0 + :num-entries-processed := 1 + :next := nil))) + + (testing "returns one delete entry at t=2" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 10 2) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :system-as-of-index + [:delete-entries 0 1 sao-tu/decode-key] := {:t 1 :type "Patient" :id "0"} + :num-entries-processed := 1 + :next := nil))))) + + (testing "two purged patients" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}] + [:create {:fhir/type :fhir/Patient :id "1"}]] + [[:patient-purge "0"]] + [[:patient-purge "1"]]] + + (testing "returns one delete entry at t=2" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 10 2) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :system-as-of-index + [:delete-entries 0 1 sao-tu/decode-key] := {:t 1 :type "Patient" :id "0"} + :num-entries-processed := 2 + :next := nil))) + + (testing "returns one delete entry at t=3 and n=1" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 1 3) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :system-as-of-index + [:delete-entries 0 1 sao-tu/decode-key] := {:t 1 :type "Patient" :id "0"} + :num-entries-processed := 1 + [:next :tid] := (codec/tid "Patient") + [:next :t] := 1 + [:next :id] := (codec/id-byte-string "1"))) + + (testing "it's possible to continue with the next entry" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 1 3 1 (codec/tid "Patient") (codec/id-byte-string "1")) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :system-as-of-index + [:delete-entries 0 1 sao-tu/decode-key] := {:t 1 :type "Patient" :id "1"} + :num-entries-processed := 1 + :next := nil)))) + + (testing "returns two delete entries at t=3" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (sao/prune snapshot 10 3) + [:delete-entries count] := 2 + [:delete-entries 0 0] := :system-as-of-index + [:delete-entries 0 1 sao-tu/decode-key] := {:t 1 :type "Patient" :id "0"} + [:delete-entries 1 0] := :system-as-of-index + [:delete-entries 1 1 sao-tu/decode-key] := {:t 1 :type "Patient" :id "1"} + :num-entries-processed := 2 + :next := nil)))))) diff --git a/modules/db/test/blaze/db/impl/index/type_as_of_spec.clj b/modules/db/test/blaze/db/impl/index/type_as_of_spec.clj index 6ac4dc358..ec0c877bd 100644 --- a/modules/db/test/blaze/db/impl/index/type_as_of_spec.clj +++ b/modules/db/test/blaze/db/impl/index/type_as_of_spec.clj @@ -7,6 +7,7 @@ [blaze.db.impl.index.resource-handle-spec] [blaze.db.impl.index.type-as-of :as tao] [blaze.db.impl.iterators-spec] + [blaze.db.kv :as-alias kv] [blaze.db.kv-spec] [blaze.db.kv.spec] [blaze.fhir.spec] @@ -19,3 +20,12 @@ :start-t :blaze.db/t :start-id (s/nilable :blaze.db/id-byte-string)) :ret (cs/coll-of :blaze.db/resource-handle)) + +(s/fdef tao/prune + :args (s/cat :snapshot ::kv/snapshot + :n pos-int? + :t :blaze.db/t + :start (s/? (s/cat :start-tid :blaze.db/tid + :start-t :blaze.db/t + :start-id :blaze.db/id-byte-string))) + :ret (s/coll-of ::kv/delete-entry :kind vector?)) diff --git a/modules/db/test/blaze/db/impl/index/type_as_of_test.clj b/modules/db/test/blaze/db/impl/index/type_as_of_test.clj new file mode 100644 index 000000000..fa8ac5635 --- /dev/null +++ b/modules/db/test/blaze/db/impl/index/type_as_of_test.clj @@ -0,0 +1,104 @@ +(ns blaze.db.impl.index.type-as-of-test + (:require + [blaze.db.impl.codec :as codec] + [blaze.db.impl.index.type-as-of :as tao] + [blaze.db.impl.index.type-as-of-spec] + [blaze.db.impl.index.type-as-of-test-util :as tao-tu] + [blaze.db.kv :as kv] + [blaze.db.test-util :refer [config with-system-data]] + [blaze.module.test-util :refer [with-system]] + [blaze.test-util :as tu] + [clojure.spec.test.alpha :as st] + [clojure.test :as test :refer [deftest testing]] + [juxt.iota :refer [given]])) + +(st/instrument) + +(test/use-fixtures :each tu/fixture) + +(deftest prune-test + (testing "empty database" + (with-system [{:blaze.db/keys [node]} config] + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 10 0) + :delete-entries := [] + :num-entries-processed := 0 + :next := nil)))) + + (testing "one non-purged patient" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]]] + + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 10 1) + :delete-entries := [] + :num-entries-processed := 1 + :next := nil)))) + + (testing "one purged patient" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}]] + [[:patient-purge "0"]]] + + (testing "returns no delete entry at t=1" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 10 1) + [:delete-entries count] := 0 + :num-entries-processed := 1 + :next := nil))) + + (testing "returns one delete entry at t=2" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 10 2) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :type-as-of-index + [:delete-entries 0 1 tao-tu/decode-key] := {:type "Patient" :t 1 :id "0"} + :num-entries-processed := 1 + :next := nil))))) + + (testing "two purged patients" + (with-system-data [{:blaze.db/keys [node]} config] + [[[:create {:fhir/type :fhir/Patient :id "0"}] + [:create {:fhir/type :fhir/Patient :id "1"}]] + [[:patient-purge "0"]] + [[:patient-purge "1"]]] + + (testing "returns one delete entry at t=2" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 10 2) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :type-as-of-index + [:delete-entries 0 1 tao-tu/decode-key] := {:type "Patient" :t 1 :id "0"} + :num-entries-processed := 2 + :next := nil))) + + (testing "returns one delete entry at t=3 and n=1" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 1 3) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :type-as-of-index + [:delete-entries 0 1 tao-tu/decode-key] := {:type "Patient" :t 1 :id "0"} + :num-entries-processed := 1 + [:next :tid] := (codec/tid "Patient") + [:next :t] := 1 + [:next :id] := (codec/id-byte-string "1"))) + + (testing "it's possible to continue with the next entry" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 1 3 (codec/tid "Patient") 1 (codec/id-byte-string "1")) + [:delete-entries count] := 1 + [:delete-entries 0 0] := :type-as-of-index + [:delete-entries 0 1 tao-tu/decode-key] := {:type "Patient" :t 1 :id "1"} + :num-entries-processed := 1 + :next := nil)))) + + (testing "returns two delete entries at t=3" + (with-open [snapshot (kv/new-snapshot (:kv-store node))] + (given (tao/prune snapshot 10 3) + [:delete-entries count] := 2 + [:delete-entries 0 0] := :type-as-of-index + [:delete-entries 0 1 tao-tu/decode-key] := {:type "Patient" :t 1 :id "0"} + [:delete-entries 1 0] := :type-as-of-index + [:delete-entries 1 1 tao-tu/decode-key] := {:type "Patient" :t 1 :id "1"} + :num-entries-processed := 2 + :next := nil)))))) diff --git a/modules/job-prune/.clj-kondo/config.edn b/modules/job-prune/.clj-kondo/config.edn new file mode 100644 index 000000000..10c705f73 --- /dev/null +++ b/modules/job-prune/.clj-kondo/config.edn @@ -0,0 +1,7 @@ +{:config-paths + ["../../../.clj-kondo/root" + "../../anomaly/resources/clj-kondo.exports/blaze/anomaly" + "../../module-test-util/resources/clj-kondo.exports/blaze/module-test-util"] + + :lint-as + {blaze.job.prune-test/with-system-data clojure.core/with-open}} diff --git a/modules/job-prune/.gitignore b/modules/job-prune/.gitignore new file mode 100644 index 000000000..8b4bda0a4 --- /dev/null +++ b/modules/job-prune/.gitignore @@ -0,0 +1 @@ +resources diff --git a/modules/job-prune/Makefile b/modules/job-prune/Makefile new file mode 100644 index 000000000..2f5a9d1e2 --- /dev/null +++ b/modules/job-prune/Makefile @@ -0,0 +1,28 @@ +fmt: + cljfmt check src test build.clj deps.edn tests.edn + +lint: + clj-kondo --lint src test build.clj deps.edn + +build: + clojure -T:build copy-profiles + +prep: build + clojure -X:deps prep + +test: prep + clojure -M:test:kaocha --profile :ci + +test-coverage: prep + clojure -M:test:coverage + +cloc-prod: + cloc src + +cloc-test: + cloc test + +clean: + rm -rf .clj-kondo/.cache .cpcache target resources + +.PHONY: fmt lint build prep test test-coverage cloc-prod cloc-test clean diff --git a/modules/job-prune/build.clj b/modules/job-prune/build.clj new file mode 100644 index 000000000..beeaa0ebb --- /dev/null +++ b/modules/job-prune/build.clj @@ -0,0 +1,12 @@ +(ns build + (:require [clojure.tools.build.api :as b])) + +(defn copy-profiles [_] + (doseq [file ["StructureDefinition-PruneJob" + "CodeSystem-PruneJobParameter" + "CodeSystem-PruneJobOutput" + "CodeSystem-PruneIndices" + "ValueSet-PruneIndices"]] + (b/copy-file + {:src (str "../../job-ig/fsh-generated/resources/" file ".json") + :target (str "resources/blaze/job/prune/" file ".json")}))) diff --git a/modules/job-prune/deps.edn b/modules/job-prune/deps.edn new file mode 100644 index 000000000..54e1b5093 --- /dev/null +++ b/modules/job-prune/deps.edn @@ -0,0 +1,44 @@ +{:paths ["src" "resources"] + + :deps + {blaze/job-scheduler + {:local/root "../job-scheduler"} + + blaze/module-base + {:local/root "../module-base"}} + + :deps/prep-lib + {:alias :build + :fn copy-profiles + :ensure "resources/blaze/job/prune"} + + :aliases + {:build + {:deps + {io.github.clojure/tools.build + {:git/tag "v0.10.5" :git/sha "2a21b7a"}} + :ns-default build} + + :test + {:extra-paths ["test"] + + :extra-deps + {blaze/db-stub + {:local/root "../db-stub"} + + blaze/job-test-util + {:local/root "../job-test-util"}}} + + :kaocha + {:extra-deps + {lambdaisland/kaocha + {:mvn/version "1.91.1392"}} + + :main-opts ["-m" "kaocha.runner"]} + + :coverage + {:extra-deps + {lambdaisland/kaocha-cloverage + {:mvn/version "1.1.89"}} + + :main-opts ["-m" "kaocha.runner" "--profile" "coverage"]}}} diff --git a/modules/job-prune/src/blaze/job/prune.clj b/modules/job-prune/src/blaze/job/prune.clj new file mode 100644 index 000000000..03d508599 --- /dev/null +++ b/modules/job-prune/src/blaze/job/prune.clj @@ -0,0 +1,212 @@ +(ns blaze.job.prune + "The prune job calls d/prune in several steps ensuring that the progress can + be tracked accordingly. Prune jobs can be paused and resumed." + (:require + [blaze.anomaly :as ba] + [blaze.async.comp :as ac] + [blaze.db.api :as d] + [blaze.fhir.spec.type :as type] + [blaze.job-scheduler.protocols :as p] + [blaze.job.prune.spec] + [blaze.job.util :as job-util] + [blaze.module :as m] + [clojure.spec.alpha :as s] + [integrant.core :as ig])) + +(set! *warn-on-reflection* true) + +(def ^:private ^:const ^long default-index-entries-per-step 100000) + +(def ^:private parameter-system + "https://samply.github.io/blaze/fhir/CodeSystem/PruneJobParameter") + +(def ^:private output-system + "https://samply.github.io/blaze/fhir/CodeSystem/PruneJobOutput") + +(def ^:private task-output + (partial job-util/task-output output-system)) + +(def ^:private initial-duration + #fhir/Quantity + {:value #fhir/decimal 0 + :unit #fhir/string"s" + :system #fhir/uri"http://unitsofmeasure.org" + :code #fhir/code"s"}) + +(defn- start-job [job total-index-entries] + (assoc + job + :status #fhir/code"in-progress" + :statusReason job-util/started-status-reason + :output + [(task-output "total-index-entries" (type/unsignedInt total-index-entries)) + (task-output "index-entries-processed" #fhir/unsignedInt 0) + (task-output "index-entries-deleted" #fhir/unsignedInt 0) + (task-output "processing-duration" initial-duration)])) + +(defn- add-output [job code value] + (job-util/add-output job output-system code value)) + +(defn- remove-output [job code] + (job-util/remove-output job output-system code)) + +(defn- set-output [job code value] + (if value + (add-output job code value) + (remove-output job code))) + +(defn- increment-unsigned-int [value x] + (type/unsignedInt (+ (type/value value) x))) + +(defn- increment-index-entries-processed [job num-index-entries] + (job-util/update-output-value job output-system "index-entries-processed" + increment-unsigned-int num-index-entries)) + +(defn- increment-index-entries-deleted [job num-index-entries] + (job-util/update-output-value job output-system "index-entries-deleted" + increment-unsigned-int num-index-entries)) + +(defn- increment-quantity-value [quantity x] + (update quantity :value #(type/decimal (+ (type/value %) x)))) + +(defn- increment-duration [job duration] + (job-util/update-output-value job output-system "processing-duration" + increment-quantity-value duration)) + +(defn- set-next [job {:keys [index type id t]}] + (-> (set-output job "next-index" (some-> index name type/code)) + (set-output "next-type" (some-> type type/code)) + (set-output "next-id" (some-> id type/id)) + (set-output "next-t" (some-> t type/positiveInt)))) + +(defn- increment-on-hold-job + [job + {:keys [num-index-entries-processed num-index-entries-deleted duration next]}] + (-> (increment-index-entries-processed job num-index-entries-processed) + (increment-index-entries-deleted num-index-entries-deleted) + (increment-duration duration) + (set-next next))) + +(defn- increment-job [job result] + (-> (assoc job :statusReason job-util/incremented-status-reason) + (increment-on-hold-job result))) + +(defn- complete-job [job result] + (-> (increment-job job result) + (assoc :status #fhir/code"completed") + (dissoc :statusReason))) + +(defn- t [job] + (some-> (job-util/input-value job parameter-system "t") type/value)) + +(defn- next-index [job] + (some-> (job-util/output-value job output-system "next-index") type/value keyword)) + +(defn- next-type [job] + (some-> (job-util/output-value job output-system "next-type") type/value)) + +(defn- next-id [job] + (some-> (job-util/output-value job output-system "next-id") type/value)) + +(defn- next-t [job] + (some-> (job-util/output-value job output-system "next-t") type/value)) + +(defn- next-handle [job] + (when-let [index (next-index job)] + (let [type (next-type job)] + (cond-> {:index index} + type + (assoc :type type :id (next-id job) :t (next-t job)))))) + +(defn- on-hold? [job] + (= #fhir/code"on-hold" (:status job))) + +(defn- update-job [{:keys [admin-node]} job {:keys [next] :as result}] + (if next + (-> (job-util/update-job admin-node job increment-job result) + (ac/exceptionally-compose + (fn [e] + (if (job-util/job-update-failed? e) + (-> (job-util/pull-job admin-node (:id job)) + (ac/then-compose + (fn [job] + (if (on-hold? job) + (job-util/update-job admin-node job increment-on-hold-job result) + (ac/completed-future job))))) + (ac/completed-future e))))) + (job-util/update-job admin-node job complete-job result))) + +(defn- assoc-duration [start result] + (assoc result :duration (BigDecimal/valueOf (- (System/nanoTime) start) 9))) + +(defn- time-future [future] + (ac/then-apply future (partial assoc-duration (System/nanoTime)))) + +(defn- prune-fn [main-node n t] + (fn + ([] + (-> (ac/supply-async #(d/prune main-node n t)) + (time-future))) + ([next] + (-> (ac/supply-async #(d/prune main-node n t next)) + (time-future))))) + +(defn- continuation + "Returns a function that takes a prune result (or nil) and an anomaly + (or nil), updates the job and continues processing if there is more work to + do." + [{:keys [admin-node] :as context} prune job] + (fn [{:keys [next] :as result} anomaly] + (if anomaly + (job-util/update-job admin-node job job-util/fail-job anomaly) + (cond-> (update-job context job result) + next + (ac/then-compose + (fn [job] + (if (on-hold? job) + (ac/completed-future job) + (-> (prune next) + (ac/handle (continuation context prune job)) + (ac/then-compose identity))))))))) + +(def ^:private missing-t-anom + (ba/incorrect "Missing T.")) + +(defn- on-start + [{:keys [main-node admin-node index-entries-per-step] + :or {index-entries-per-step default-index-entries-per-step} + :as context} job] + (if-let [t (t job)] + (let [total (d/prune-total main-node) + prune (prune-fn main-node index-entries-per-step t)] + (-> (job-util/update-job admin-node job start-job total) + (ac/then-compose + (fn [job] + (-> (prune) + (ac/handle (continuation context prune job)) + (ac/then-compose identity)))))) + (job-util/update-job admin-node job job-util/fail-job missing-t-anom))) + +(defn- on-resume + [{:keys [main-node index-entries-per-step] + :or {index-entries-per-step default-index-entries-per-step} + :as context} job] + (let [prune (prune-fn main-node index-entries-per-step (t job)) + next (next-handle job)] + (-> (if next (prune next) (prune)) + (ac/handle (continuation context prune job)) + (ac/then-compose identity)))) + +(defmethod m/pre-init-spec :blaze.job/prune [_] + (s/keys :req-un [::main-node ::admin-node :blaze/clock] + :opt-un [::index-entries-per-step])) + +(defmethod ig/init-key :blaze.job/prune + [_ config] + (reify p/JobHandler + (-on-start [_ job] + (on-start config job)) + (-on-resume [_ job] + (on-resume config job)))) + +(derive :blaze.job/prune :blaze.job/handler) diff --git a/modules/job-prune/src/blaze/job/prune/spec.clj b/modules/job-prune/src/blaze/job/prune/spec.clj new file mode 100644 index 000000000..8831171cf --- /dev/null +++ b/modules/job-prune/src/blaze/job/prune/spec.clj @@ -0,0 +1,14 @@ +(ns blaze.job.prune.spec + (:require + [blaze.db.spec] + [blaze.job.prune :as-alias prune] + [clojure.spec.alpha :as s])) + +(s/def ::prune/main-node + :blaze.db/node) + +(s/def ::prune/admin-node + :blaze.db/node) + +(s/def ::prune/index-entries-per-step + pos-int?) diff --git a/modules/job-prune/test/blaze/job/prune_test.clj b/modules/job-prune/test/blaze/job/prune_test.clj new file mode 100644 index 000000000..83c2e4920 --- /dev/null +++ b/modules/job-prune/test/blaze/job/prune_test.clj @@ -0,0 +1,755 @@ +(ns blaze.job.prune-test + (:require + [blaze.db.api :as d] + [blaze.db.api-spec] + [blaze.db.kv :as kv] + [blaze.db.kv.mem] + [blaze.db.node :as node :refer [node?]] + [blaze.db.node.protocols :as np] + [blaze.db.resource-store :as rs] + [blaze.db.resource-store.kv :as rs-kv] + [blaze.db.search-param-registry] + [blaze.db.tx-cache] + [blaze.db.tx-log :as tx-log] + [blaze.db.tx-log.local] + [blaze.fhir.test-util :refer [structure-definition-repo]] + [blaze.job-scheduler :as js] + [blaze.job.prune] + [blaze.job.test-util :as jtu] + [blaze.job.util :as job-util] + [blaze.log] + [blaze.luid :as luid] + [blaze.module.test-util :refer [with-system]] + [blaze.test-util :as tu :refer [given-thrown]] + [clojure.spec.alpha :as s] + [clojure.spec.test.alpha :as st] + [clojure.test :as test :refer [deftest testing]] + [integrant.core :as ig] + [java-time.api :as time] + [juxt.iota :refer [given]])) + +(set! *warn-on-reflection* true) +(st/instrument) + +(test/use-fixtures :each tu/fixture) + +(deftest init-test + (testing "nil config" + (given-thrown (ig/init {:blaze.job/prune nil}) + :key := :blaze.job/prune + :reason := ::ig/build-failed-spec + [:cause-data ::s/problems 0 :pred] := `map?)) + + (testing "missing config" + (given-thrown (ig/init {:blaze.job/prune {}}) + :key := :blaze.job/prune + :reason := ::ig/build-failed-spec + [:cause-data ::s/problems 0 :pred] := `(fn ~'[%] (contains? ~'% :main-node)) + [:cause-data ::s/problems 1 :pred] := `(fn ~'[%] (contains? ~'% :admin-node)) + [:cause-data ::s/problems 2 :pred] := `(fn ~'[%] (contains? ~'% :clock)))) + + (testing "invalid main-node" + (given-thrown (ig/init {:blaze.job/prune {:main-node ::invalid}}) + :key := :blaze.job/prune + :reason := ::ig/build-failed-spec + [:cause-data ::s/problems 0 :pred] := `(fn ~'[%] (contains? ~'% :admin-node)) + [:cause-data ::s/problems 1 :pred] := `(fn ~'[%] (contains? ~'% :clock)) + [:cause-data ::s/problems 2 :pred] := `node? + [:cause-data ::s/problems 2 :val] := ::invalid))) + +(derive :blaze.db.main/node :blaze.db/node) +(derive :blaze.db.admin/node :blaze.db/node) + +(def config + {:blaze/job-scheduler + {:node (ig/ref :blaze.db.admin/node) + :handlers {:blaze.job/prune (ig/ref :blaze.job/prune)} + :clock (ig/ref :blaze.test/offset-clock) + :rng-fn (ig/ref :blaze.test/fixed-rng-fn)} + + :blaze.job/prune + {:main-node (ig/ref :blaze.db.main/node) + :admin-node (ig/ref :blaze.db.admin/node) + :clock (ig/ref :blaze.test/offset-clock) + :index-entries-per-step 10} + + :blaze.db.main/node + {:tx-log (ig/ref :blaze.db.main/tx-log) + :tx-cache (ig/ref :blaze.db.main/tx-cache) + :indexer-executor (ig/ref :blaze.db.node.main/indexer-executor) + :resource-store (ig/ref :blaze.db/resource-store) + :kv-store (ig/ref :blaze.db.main/index-kv-store) + :resource-indexer (ig/ref :blaze.db.node.main/resource-indexer) + :search-param-registry (ig/ref :blaze.db/search-param-registry) + :scheduler (ig/ref :blaze/scheduler) + :poll-timeout (time/millis 10)} + + :blaze.db.admin/node + {:tx-log (ig/ref :blaze.db.admin/tx-log) + :tx-cache (ig/ref :blaze.db.admin/tx-cache) + :indexer-executor (ig/ref :blaze.db.node.admin/indexer-executor) + :resource-store (ig/ref :blaze.db/resource-store) + :kv-store (ig/ref :blaze.db.admin/index-kv-store) + :resource-indexer (ig/ref :blaze.db.node.admin/resource-indexer) + :search-param-registry (ig/ref :blaze.db/search-param-registry) + :scheduler (ig/ref :blaze/scheduler) + :poll-timeout (time/millis 10)} + + [::tx-log/local :blaze.db.main/tx-log] + {:kv-store (ig/ref :blaze.db.main/transaction-kv-store) + :clock (ig/ref :blaze.test/fixed-clock)} + + [::tx-log/local :blaze.db.admin/tx-log] + {:kv-store (ig/ref :blaze.db.admin/transaction-kv-store) + :clock (ig/ref :blaze.test/fixed-clock)} + + [::kv/mem :blaze.db.main/transaction-kv-store] + {:column-families {}} + + [::kv/mem :blaze.db.admin/transaction-kv-store] + {:column-families {}} + + [:blaze.db/tx-cache :blaze.db.main/tx-cache] + {:kv-store (ig/ref :blaze.db.main/index-kv-store)} + + [:blaze.db/tx-cache :blaze.db.admin/tx-cache] + {:kv-store (ig/ref :blaze.db.admin/index-kv-store)} + + [::node/indexer-executor :blaze.db.node.main/indexer-executor] + {} + + [::node/indexer-executor :blaze.db.node.admin/indexer-executor] + {} + + [::kv/mem :blaze.db.main/index-kv-store] + {:column-families + {:search-param-value-index nil + :resource-value-index nil + :compartment-search-param-value-index nil + :compartment-resource-type-index nil + :active-search-params nil + :tx-success-index {:reverse-comparator? true} + :tx-error-index nil + :t-by-instant-index {:reverse-comparator? true} + :resource-as-of-index nil + :type-as-of-index nil + :system-as-of-index nil + :type-stats-index nil + :system-stats-index nil}} + + [::kv/mem :blaze.db.admin/index-kv-store] + {:column-families + {:search-param-value-index nil + :resource-value-index nil + :compartment-search-param-value-index nil + :compartment-resource-type-index nil + :active-search-params nil + :tx-success-index {:reverse-comparator? true} + :tx-error-index nil + :t-by-instant-index {:reverse-comparator? true} + :resource-as-of-index nil + :type-as-of-index nil + :system-as-of-index nil + :type-stats-index nil + :system-stats-index nil}} + + [::node/resource-indexer :blaze.db.node.main/resource-indexer] + {:kv-store (ig/ref :blaze.db.main/index-kv-store) + :resource-store (ig/ref :blaze.db/resource-store) + :search-param-registry (ig/ref :blaze.db/search-param-registry) + :executor (ig/ref :blaze.db.node.resource-indexer.main/executor)} + + [::node/resource-indexer :blaze.db.node.admin/resource-indexer] + {:kv-store (ig/ref :blaze.db.admin/index-kv-store) + :resource-store (ig/ref :blaze.db/resource-store) + :search-param-registry (ig/ref :blaze.db/search-param-registry) + :executor (ig/ref :blaze.db.node.resource-indexer.admin/executor)} + + [:blaze.db.node.resource-indexer/executor :blaze.db.node.resource-indexer.main/executor] + {} + + [:blaze.db.node.resource-indexer/executor :blaze.db.node.resource-indexer.admin/executor] + {} + + ::rs/kv + {:kv-store (ig/ref :blaze.db/resource-kv-store) + :executor (ig/ref ::rs-kv/executor)} + + [::kv/mem :blaze.db/resource-kv-store] + {:column-families {}} + + ::rs-kv/executor {} + + :blaze.db/search-param-registry + {:structure-definition-repo structure-definition-repo} + + :blaze/scheduler {} + + :blaze.test/fixed-clock {} + + :blaze.test/offset-clock + {:clock (ig/ref :blaze.test/fixed-clock) + :offset-seconds 11} + + :blaze.test/fixed-rng-fn {}}) + +(defmacro with-system-data + [[binding-form config] txs & body] + `(with-system [system# ~config] + (run! #(deref (d/transact (:blaze.db.main/node system#) %)) ~txs) + (let [~binding-form system#] ~@body))) + +(def job + {:fhir/type :fhir/Task + :meta #fhir/Meta{:profile [#fhir/canonical"https://samply.github.io/blaze/fhir/StructureDefinition/PruneJob"]} + :status #fhir/code"ready" + :intent #fhir/code"order" + :code #fhir/CodeableConcept + {:coding + [#fhir/Coding + {:system #fhir/uri"https://samply.github.io/blaze/fhir/CodeSystem/JobType" + :code #fhir/code"prune" + :display "Prune the Database"}]}}) + +(def job-missing-t + job) + +(def job-42 + (assoc + job + :input + [{:fhir/type :fhir.Task/input + :type #fhir/CodeableConcept + {:coding + [#fhir/Coding + {:system #fhir/uri"https://samply.github.io/blaze/fhir/CodeSystem/PruneJobParameter" + :code #fhir/code"t"}]} + :value #fhir/positiveInt 42}])) + +(defn- output-value [job code] + (job-util/output-value job "https://samply.github.io/blaze/fhir/CodeSystem/PruneJobOutput" code)) + +(defn- total-index-entries [job] + (output-value job "total-index-entries")) + +(defn- index-entries-processed [job] + (output-value job "index-entries-processed")) + +(defn- index-entries-deleted [job] + (output-value job "index-entries-deleted")) + +(defn- processing-duration [job] + (output-value job "processing-duration")) + +(defn- next-index [job] + (output-value job "next-index")) + +(defn- next-type [job] + (output-value job "next-type")) + +(defn- next-id [job] + (output-value job "next-id")) + +(defn- next-t [job] + (output-value job "next-t")) + +(defn- job-id [{{:keys [clock rng-fn]} :context}] + (luid/luid clock (rng-fn))) + +(defn gen-create-patient-tx-data [n] + (mapv + (fn [id] + [:create {:fhir/type :fhir/Patient :id (format "%05d" id)}]) + (range n))) + +(defn gen-patient-purge-tx-data [n] + (mapv + (fn [id] + [:patient-purge (format "%05d" id)]) + (range n))) + +(deftest simple-job-execution-test + (testing "success" + (testing "increment three times, once for each index" + (with-system-data [{:blaze/keys [job-scheduler] :as system} config] + [(gen-create-patient-tx-data 10) + (gen-patient-purge-tx-data 5)] + + @(js/create-job job-scheduler job-42) + + (testing "the job is completed" + (given @(jtu/pull-job system :completed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :completed + total-index-entries := #fhir/unsignedInt 30 + index-entries-processed := #fhir/unsignedInt 30 + index-entries-deleted := #fhir/unsignedInt 15 + [processing-duration :value] :? pos? + [processing-duration :unit] := #fhir/string"s" + [processing-duration :system] := #fhir/uri"http://unitsofmeasure.org" + [processing-duration :code] := #fhir/code"s" + next-index := nil)) + + (testing "job history" + (given @(jtu/pull-job-history system) + count := 5 + + [0 jtu/combined-status] := :ready + [1 jtu/combined-status] := :in-progress/started + [2 jtu/combined-status] := :in-progress/incremented + [3 jtu/combined-status] := :in-progress/incremented + [4 jtu/combined-status] := :completed + + [0 total-index-entries] := nil + [1 total-index-entries] := #fhir/unsignedInt 30 + [2 total-index-entries] := #fhir/unsignedInt 30 + [3 total-index-entries] := #fhir/unsignedInt 30 + [4 total-index-entries] := #fhir/unsignedInt 30 + + [0 index-entries-processed] := nil + [1 index-entries-processed] := #fhir/unsignedInt 0 + [2 index-entries-processed] := #fhir/unsignedInt 10 + [3 index-entries-processed] := #fhir/unsignedInt 20 + [4 index-entries-processed] := #fhir/unsignedInt 30 + + [0 index-entries-deleted] := nil + [1 index-entries-deleted] := #fhir/unsignedInt 0 + [2 index-entries-deleted] := #fhir/unsignedInt 5 + [3 index-entries-deleted] := #fhir/unsignedInt 10 + [4 index-entries-deleted] := #fhir/unsignedInt 15 + + [0 next-index] := nil + [1 next-index] := nil + [2 next-index] := #fhir/code"type-as-of-index" + [3 next-index] := #fhir/code"system-as-of-index" + [4 next-index] := nil + + [0 next-type] := nil + [1 next-type] := nil + [2 next-type] := nil + [3 next-type] := nil + [4 next-type] := nil)))) + + (testing "increment six times, twice for each index" + (with-system-data [{:blaze/keys [job-scheduler] :as system} config] + [(gen-create-patient-tx-data 20) + (gen-patient-purge-tx-data 10)] + + @(js/create-job job-scheduler job-42) + + (testing "the job is completed" + (given @(jtu/pull-job system :completed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :completed + total-index-entries := #fhir/unsignedInt 60 + index-entries-processed := #fhir/unsignedInt 60 + index-entries-deleted := #fhir/unsignedInt 30 + [processing-duration :value] :? pos? + [processing-duration :unit] := #fhir/string"s" + [processing-duration :system] := #fhir/uri"http://unitsofmeasure.org" + [processing-duration :code] := #fhir/code"s" + next-index := nil)) + + (testing "job history" + (given @(jtu/pull-job-history system) + count := 8 + + [0 jtu/combined-status] := :ready + [1 jtu/combined-status] := :in-progress/started + [2 jtu/combined-status] := :in-progress/incremented + [3 jtu/combined-status] := :in-progress/incremented + [4 jtu/combined-status] := :in-progress/incremented + [5 jtu/combined-status] := :in-progress/incremented + [6 jtu/combined-status] := :in-progress/incremented + [7 jtu/combined-status] := :completed + + [0 total-index-entries] := nil + [1 total-index-entries] := #fhir/unsignedInt 60 + [2 total-index-entries] := #fhir/unsignedInt 60 + [3 total-index-entries] := #fhir/unsignedInt 60 + [4 total-index-entries] := #fhir/unsignedInt 60 + [5 total-index-entries] := #fhir/unsignedInt 60 + [6 total-index-entries] := #fhir/unsignedInt 60 + [7 total-index-entries] := #fhir/unsignedInt 60 + + [0 index-entries-processed] := nil + [1 index-entries-processed] := #fhir/unsignedInt 0 + [2 index-entries-processed] := #fhir/unsignedInt 10 + [3 index-entries-processed] := #fhir/unsignedInt 20 + [4 index-entries-processed] := #fhir/unsignedInt 30 + [5 index-entries-processed] := #fhir/unsignedInt 40 + [6 index-entries-processed] := #fhir/unsignedInt 50 + [7 index-entries-processed] := #fhir/unsignedInt 60 + + [0 index-entries-deleted] := nil + [1 index-entries-deleted] := #fhir/unsignedInt 0 + [2 index-entries-deleted] := #fhir/unsignedInt 10 + [3 index-entries-deleted] := #fhir/unsignedInt 10 + [4 index-entries-deleted] := #fhir/unsignedInt 20 + [5 index-entries-deleted] := #fhir/unsignedInt 20 + [6 index-entries-deleted] := #fhir/unsignedInt 30 + [7 index-entries-deleted] := #fhir/unsignedInt 30 + + [0 next-index] := nil + [1 next-index] := nil + [2 next-index] := #fhir/code"resource-as-of-index" + [3 next-index] := #fhir/code"type-as-of-index" + [4 next-index] := #fhir/code"type-as-of-index" + [5 next-index] := #fhir/code"system-as-of-index" + [6 next-index] := #fhir/code"system-as-of-index" + [7 next-index] := nil + + [0 next-type] := nil + [1 next-type] := nil + [2 next-type] := #fhir/code"Patient" + [3 next-type] := nil + [4 next-type] := #fhir/code"Patient" + [5 next-type] := nil + [6 next-type] := #fhir/code"Patient" + [7 next-type] := nil + + [0 next-id] := nil + [1 next-id] := nil + [2 next-id] := #fhir/id"00010" + [3 next-id] := nil + [4 next-id] := #fhir/id"00010" + [5 next-id] := nil + [6 next-id] := #fhir/id"00010" + [7 next-id] := nil + + [0 next-t] := nil + [1 next-t] := nil + [2 next-t] := #fhir/positiveInt 1 + [3 next-t] := nil + [4 next-t] := #fhir/positiveInt 1 + [5 next-t] := nil + [6 next-t] := #fhir/positiveInt 1 + [7 next-t] := nil))))) + + (testing "missing t" + (with-system [{:blaze/keys [job-scheduler] :as system} config] + + @(js/create-job job-scheduler job-missing-t) + + (testing "the job has failed" + (given @(jtu/pull-job system :failed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :failed + job-util/error-msg := "Missing T."))))) + +(defn- delayed-prune + ([node n t] + (Thread/sleep 100) + (np/-prune node n t nil)) + ([node n t start] + (Thread/sleep 100) + (np/-prune node n t start))) + +(deftest job-execution-with-pause-test + (with-redefs [d/prune delayed-prune] + (testing "resume from started state" + (with-system-data [{:blaze/keys [job-scheduler] :as system} config] + [(gen-create-patient-tx-data 10) + (gen-patient-purge-tx-data 5)] + + @(js/create-job job-scheduler job-42) + + (given @(jtu/pull-job system :in-progress/started) + :fhir/type := :fhir/Task + job-util/job-number := "1" + total-index-entries := #fhir/unsignedInt 30 + index-entries-processed := #fhir/unsignedInt 0 + index-entries-deleted := #fhir/unsignedInt 0) + + (given @(js/pause-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :on-hold/paused) + + (Thread/sleep 200) + + (given @(js/resume-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :in-progress/resumed) + + (testing "the job is completed" + (given @(jtu/pull-job system :completed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :completed + total-index-entries := #fhir/unsignedInt 30 + index-entries-processed := #fhir/unsignedInt 30 + index-entries-deleted := #fhir/unsignedInt 15 + [processing-duration :value] :? pos? + [processing-duration :unit] := #fhir/string"s" + [processing-duration :system] := #fhir/uri"http://unitsofmeasure.org" + [processing-duration :code] := #fhir/code"s")) + + (testing "job history" + (given @(jtu/pull-job-history system) + count := 7 + + [0 jtu/combined-status] := :ready + [1 jtu/combined-status] := :in-progress/started + [2 jtu/combined-status] := :on-hold/paused + [3 jtu/combined-status] := :on-hold/paused + [4 jtu/combined-status] := :in-progress/resumed + [5 jtu/combined-status] := :in-progress/incremented + [6 jtu/combined-status] := :completed + + [0 total-index-entries] := nil + [1 total-index-entries] := #fhir/unsignedInt 30 + [2 total-index-entries] := #fhir/unsignedInt 30 + [3 total-index-entries] := #fhir/unsignedInt 30 + [4 total-index-entries] := #fhir/unsignedInt 30 + [5 total-index-entries] := #fhir/unsignedInt 30 + [6 total-index-entries] := #fhir/unsignedInt 30 + + [0 index-entries-processed] := nil + [1 index-entries-processed] := #fhir/unsignedInt 0 + [2 index-entries-processed] := #fhir/unsignedInt 0 + [3 index-entries-processed] := #fhir/unsignedInt 10 + [4 index-entries-processed] := #fhir/unsignedInt 10 + [5 index-entries-processed] := #fhir/unsignedInt 20 + [6 index-entries-processed] := #fhir/unsignedInt 30 + + [0 index-entries-deleted] := nil + [1 index-entries-deleted] := #fhir/unsignedInt 0 + [2 index-entries-deleted] := #fhir/unsignedInt 0 + [3 index-entries-deleted] := #fhir/unsignedInt 5 + [4 index-entries-deleted] := #fhir/unsignedInt 5 + [5 index-entries-deleted] := #fhir/unsignedInt 10 + [6 index-entries-deleted] := #fhir/unsignedInt 15))) + + (testing "increment six times, twice for each index" + (with-system-data [{:blaze/keys [job-scheduler] :as system} config] + [(gen-create-patient-tx-data 20) + (gen-patient-purge-tx-data 10)] + + @(js/create-job job-scheduler job-42) + + (given @(jtu/pull-job system :in-progress/started) + :fhir/type := :fhir/Task + job-util/job-number := "1" + total-index-entries := #fhir/unsignedInt 60 + index-entries-processed := #fhir/unsignedInt 0 + index-entries-deleted := #fhir/unsignedInt 0) + + (given @(js/pause-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :on-hold/paused + index-entries-processed := #fhir/unsignedInt 0 + index-entries-deleted := #fhir/unsignedInt 0) + + (Thread/sleep 200) + + (given @(js/resume-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :in-progress/resumed + index-entries-processed := #fhir/unsignedInt 10 + index-entries-deleted := #fhir/unsignedInt 10) + + (testing "the job is completed" + (given @(jtu/pull-job system :completed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :completed + total-index-entries := #fhir/unsignedInt 60 + index-entries-processed := #fhir/unsignedInt 60 + index-entries-deleted := #fhir/unsignedInt 30 + [processing-duration :value] :? pos? + [processing-duration :unit] := #fhir/string"s" + [processing-duration :system] := #fhir/uri"http://unitsofmeasure.org" + [processing-duration :code] := #fhir/code"s")) + + (testing "job history" + (given @(jtu/pull-job-history system) + count := 10 + + [0 jtu/combined-status] := :ready + [1 jtu/combined-status] := :in-progress/started + [2 jtu/combined-status] := :on-hold/paused + [3 jtu/combined-status] := :on-hold/paused + [4 jtu/combined-status] := :in-progress/resumed + [5 jtu/combined-status] := :in-progress/incremented + [6 jtu/combined-status] := :in-progress/incremented + [7 jtu/combined-status] := :in-progress/incremented + [8 jtu/combined-status] := :in-progress/incremented + [9 jtu/combined-status] := :completed + + [0 total-index-entries] := nil + [1 total-index-entries] := #fhir/unsignedInt 60 + [2 total-index-entries] := #fhir/unsignedInt 60 + [3 total-index-entries] := #fhir/unsignedInt 60 + [4 total-index-entries] := #fhir/unsignedInt 60 + [5 total-index-entries] := #fhir/unsignedInt 60 + [6 total-index-entries] := #fhir/unsignedInt 60 + [7 total-index-entries] := #fhir/unsignedInt 60 + [8 total-index-entries] := #fhir/unsignedInt 60 + [9 total-index-entries] := #fhir/unsignedInt 60 + + [0 index-entries-processed] := nil + [1 index-entries-processed] := #fhir/unsignedInt 0 + [2 index-entries-processed] := #fhir/unsignedInt 0 + [3 index-entries-processed] := #fhir/unsignedInt 10 + [4 index-entries-processed] := #fhir/unsignedInt 10 + [5 index-entries-processed] := #fhir/unsignedInt 20 + [6 index-entries-processed] := #fhir/unsignedInt 30 + [7 index-entries-processed] := #fhir/unsignedInt 40 + [8 index-entries-processed] := #fhir/unsignedInt 50 + [9 index-entries-processed] := #fhir/unsignedInt 60 + + [0 index-entries-deleted] := nil + [1 index-entries-deleted] := #fhir/unsignedInt 0 + [2 index-entries-deleted] := #fhir/unsignedInt 0 + [3 index-entries-deleted] := #fhir/unsignedInt 10 + [4 index-entries-deleted] := #fhir/unsignedInt 10 + [5 index-entries-deleted] := #fhir/unsignedInt 10 + [6 index-entries-deleted] := #fhir/unsignedInt 20 + [7 index-entries-deleted] := #fhir/unsignedInt 20 + [8 index-entries-deleted] := #fhir/unsignedInt 30 + [9 index-entries-deleted] := #fhir/unsignedInt 30))))) + + (testing "resume from incremented state" + (with-system-data [{:blaze/keys [job-scheduler] :as system} config] + [(gen-create-patient-tx-data 10) + (gen-patient-purge-tx-data 5)] + + @(js/create-job job-scheduler job-42) + + (given @(jtu/pull-job system :in-progress/incremented) + :fhir/type := :fhir/Task + job-util/job-number := "1" + total-index-entries := #fhir/unsignedInt 30 + index-entries-processed := #fhir/unsignedInt 10 + index-entries-deleted := #fhir/unsignedInt 5) + + (given @(js/pause-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :on-hold/paused) + + (Thread/sleep 200) + + (given @(js/resume-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :in-progress/resumed) + + (testing "the job is completed" + (given @(jtu/pull-job system :completed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :completed + total-index-entries := #fhir/unsignedInt 30 + index-entries-processed := #fhir/unsignedInt 30 + index-entries-deleted := #fhir/unsignedInt 15 + [processing-duration :value] :? pos? + [processing-duration :unit] := #fhir/string"s" + [processing-duration :system] := #fhir/uri"http://unitsofmeasure.org" + [processing-duration :code] := #fhir/code"s")) + + (testing "job history" + (given @(jtu/pull-job-history system) + count := 7 + + [0 jtu/combined-status] := :ready + [1 jtu/combined-status] := :in-progress/started + [2 jtu/combined-status] := :in-progress/incremented + [3 jtu/combined-status] := :on-hold/paused + [4 jtu/combined-status] := :on-hold/paused + [5 jtu/combined-status] := :in-progress/resumed + [6 jtu/combined-status] := :completed))) + + (testing "increment six times, twice for each index" + (with-system-data [{:blaze/keys [job-scheduler] :as system} config] + [(gen-create-patient-tx-data 20) + (gen-patient-purge-tx-data 10)] + + @(js/create-job job-scheduler job-42) + + (given @(jtu/pull-job system :in-progress/incremented) + :fhir/type := :fhir/Task + job-util/job-number := "1" + total-index-entries := #fhir/unsignedInt 60 + index-entries-processed := #fhir/unsignedInt 10 + index-entries-deleted := #fhir/unsignedInt 10) + + (given @(js/pause-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :on-hold/paused) + + (Thread/sleep 200) + + (given @(js/resume-job job-scheduler (job-id job-scheduler)) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :in-progress/resumed) + + (testing "the job is completed" + (given @(jtu/pull-job system :completed) + :fhir/type := :fhir/Task + job-util/job-number := "1" + jtu/combined-status := :completed + total-index-entries := #fhir/unsignedInt 60 + index-entries-processed := #fhir/unsignedInt 60 + index-entries-deleted := #fhir/unsignedInt 30 + [processing-duration :value] :? pos? + [processing-duration :unit] := #fhir/string"s" + [processing-duration :system] := #fhir/uri"http://unitsofmeasure.org" + [processing-duration :code] := #fhir/code"s")) + + (testing "job history" + (given @(jtu/pull-job-history system) + count := 10 + + [0 jtu/combined-status] := :ready + [1 jtu/combined-status] := :in-progress/started + [2 jtu/combined-status] := :in-progress/incremented + [3 jtu/combined-status] := :on-hold/paused + [4 jtu/combined-status] := :on-hold/paused + [5 jtu/combined-status] := :in-progress/resumed + [6 jtu/combined-status] := :in-progress/incremented + [7 jtu/combined-status] := :in-progress/incremented + [8 jtu/combined-status] := :in-progress/incremented + [9 jtu/combined-status] := :completed + + [0 index-entries-processed] := nil + [1 index-entries-processed] := #fhir/unsignedInt 0 + [2 index-entries-processed] := #fhir/unsignedInt 10 + [3 index-entries-processed] := #fhir/unsignedInt 10 + [4 index-entries-processed] := #fhir/unsignedInt 20 + [5 index-entries-processed] := #fhir/unsignedInt 20 + [6 index-entries-processed] := #fhir/unsignedInt 30 + [7 index-entries-processed] := #fhir/unsignedInt 40 + [8 index-entries-processed] := #fhir/unsignedInt 50 + [9 index-entries-processed] := #fhir/unsignedInt 60 + + [0 next-index] := nil + [1 next-index] := nil + [2 next-index] := #fhir/code"resource-as-of-index" + [3 next-index] := #fhir/code"resource-as-of-index" + [4 next-index] := #fhir/code"type-as-of-index" + [5 next-index] := #fhir/code"type-as-of-index" + [6 next-index] := #fhir/code"type-as-of-index" + [7 next-index] := #fhir/code"system-as-of-index" + [8 next-index] := #fhir/code"system-as-of-index" + [9 next-index] := nil + + [0 next-type] := nil + [1 next-type] := nil + [2 next-type] := #fhir/code"Patient" + [3 next-type] := #fhir/code"Patient" + [4 next-type] := nil + [5 next-type] := nil + [6 next-type] := #fhir/code"Patient" + [7 next-type] := nil + [8 next-type] := #fhir/code"Patient" + [9 next-type] := nil))))))) diff --git a/modules/job-prune/tests.edn b/modules/job-prune/tests.edn new file mode 100644 index 000000000..12727a10d --- /dev/null +++ b/modules/job-prune/tests.edn @@ -0,0 +1,10 @@ +#kaocha/v1 + #merge + [{} + #profile {:ci {:reporter kaocha.report/documentation + :color? false} + :coverage {:plugins [:kaocha.plugin/cloverage] + :cloverage/opts + {:codecov? true} + :reporter kaocha.report/documentation + :color? false}}] diff --git a/modules/job-re-index/src/blaze/job/re_index.clj b/modules/job-re-index/src/blaze/job/re_index.clj index 367babded..e7d5234e3 100644 --- a/modules/job-re-index/src/blaze/job/re_index.clj +++ b/modules/job-re-index/src/blaze/job/re_index.clj @@ -82,7 +82,7 @@ (some-> (job-util/output-value job output-system "next-resource") type/value)) -(defn elapsed [clock job] +(defn- elapsed [clock job] (-> (time/duration (-> job :meta :lastUpdated) (time/instant clock)) (time/as :seconds))) @@ -146,8 +146,7 @@ (defn- on-resume [{:keys [main-node] :as context} job] (let [main-db (d/db main-node) - search-param-url (search-param-url job) - re-index (re-index-fn main-db search-param-url) + re-index (re-index-fn main-db (search-param-url job)) [type id] (some-> (next-resource job) (str/split #"/" 2))] (-> (if type (re-index type id) (re-index)) (ac/handle (continuation context re-index job)) diff --git a/modules/job-scheduler/src/blaze/job_scheduler.clj b/modules/job-scheduler/src/blaze/job_scheduler.clj index c2eb50931..5b7784b01 100644 --- a/modules/job-scheduler/src/blaze/job_scheduler.clj +++ b/modules/job-scheduler/src/blaze/job_scheduler.clj @@ -50,11 +50,10 @@ (swap! running-jobs dissoc id) (if e (if (job-util/job-update-failed? e) - (log/debug "Paused job with id =" id) + (log/debug "The job with id =" id "was unable to update itself. It may have been paused.") (fail-job-on-error node id e)) - (if (= #fhir/code"cancelled" status) - (log/debug "Cancelled job with id =" id) - (log/debug "Completed job with id =" id))))) + (log/debug "The execution of the job with id =" id "ended with status =" + (type/value status))))) (defn- wrap-error [f context job] (try diff --git a/modules/job-test-util/src/blaze/job/test_util.clj b/modules/job-test-util/src/blaze/job/test_util.clj index 7804821a2..089f1b270 100644 --- a/modules/job-test-util/src/blaze/job/test_util.clj +++ b/modules/job-test-util/src/blaze/job/test_util.clj @@ -34,6 +34,7 @@ "Tries to pull the job with `status` from `system`. Waits until `status` is reached or 10 seconds are eclipsed." + {:arglists '([system status] [system job-id status])} ([{:blaze/keys [job-scheduler] :as system} status] (pull-job system (job-id job-scheduler) status)) ([system job-id status] diff --git a/resources/blaze.edn b/resources/blaze.edn index 7ef9772b9..f6cc456ee 100644 --- a/resources/blaze.edn +++ b/resources/blaze.edn @@ -431,6 +431,11 @@ :clock #blaze/ref :blaze/clock :rng-fn #blaze/ref :blaze/rng-fn} + :blaze.job/prune + {:main-node #blaze/ref :blaze.db.main/node + :admin-node #blaze/ref :blaze.db.admin/node + :clock #blaze/ref :blaze/clock} + :blaze.job/re-index {:main-node #blaze/ref :blaze.db.main/node :admin-node #blaze/ref :blaze.db.admin/node