diff --git a/couchbase/build.gradle.kts b/couchbase/build.gradle.kts index f1c07f6..b1f680a 100644 --- a/couchbase/build.gradle.kts +++ b/couchbase/build.gradle.kts @@ -1,5 +1,6 @@ plugins { id("com.coditory.integration-test") version "1.4.4" + `idea` } dependencies { @@ -9,10 +10,10 @@ dependencies { implementation("io.micrometer:micrometer-core:1.9.5") api(group = "com.couchbase.client", name = "java-client", version = "3.2.5") api(group = "com.couchbase.client", name = "metrics-micrometer", version = "0.1.0") - api(group = "com.fasterxml.jackson.module", name = "jackson-module-kotlin", version = "2.13.1") + api(group = "com.fasterxml.jackson.module", name = "jackson-module-kotlin", version = "2.13.4") testImplementation(group = "io.kotest", name = "kotest-runner-junit5", version = "5.1.0") - testImplementation(group = "io.kotest.extensions", name = "kotest-extensions-testcontainers", version = "1.2.1") + testImplementation(group = "io.kotest.extensions", name = "kotest-extensions-testcontainers", version = "1.3.4") testImplementation(group = "org.testcontainers", name = "couchbase", version = "1.17.5") testImplementation(group = "org.mockito.kotlin", name = "mockito-kotlin", version = "4.0.0") } diff --git a/couchbase/src/integration/kotlin/pl/allegro/tech/couchbasecommons/CouchbaseRepositorySpec.kt b/couchbase/src/integration/kotlin/pl/allegro/tech/couchbasecommons/CouchbaseRepositorySpec.kt index b6502c1..1a55c4d 100644 --- a/couchbase/src/integration/kotlin/pl/allegro/tech/couchbasecommons/CouchbaseRepositorySpec.kt +++ b/couchbase/src/integration/kotlin/pl/allegro/tech/couchbasecommons/CouchbaseRepositorySpec.kt @@ -18,22 +18,25 @@ import io.kotest.matchers.collections.shouldContainExactly import io.kotest.matchers.collections.shouldHaveLowerBound import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.collections.shouldHaveUpperBound +import io.kotest.matchers.date.shouldBeAfter +import io.kotest.matchers.date.shouldBeBefore import io.kotest.matchers.shouldBe import io.micrometer.core.instrument.simple.SimpleMeterRegistry import org.mockito.kotlin.any import org.mockito.kotlin.doReturn import org.mockito.kotlin.mock +import org.testcontainers.couchbase.BucketDefinition +import org.testcontainers.couchbase.CouchbaseContainer import reactor.core.publisher.Mono import java.time.Duration import java.time.Instant -import org.testcontainers.couchbase.BucketDefinition -import org.testcontainers.couchbase.CouchbaseContainer class CouchbaseRepositorySpec : StringSpec() { private lateinit var reactiveCollection: ReactiveCollection private lateinit var collection: CouchbaseCollection private val prefix = "prefix" + private val dictionaryTtl = Duration.ofSeconds(100) override fun beforeSpec(spec: Spec) { val bucketName = "mybucket" @@ -45,7 +48,8 @@ class CouchbaseRepositorySpec : StringSpec() { val cluster = ReactiveCluster.connect( container.connectionString, - ClusterOptions.clusterOptions(container.username, container.password) + ClusterOptions + .clusterOptions(container.username, container.password) .environment( ClusterEnvironment.builder() .jsonSerializer(JacksonJsonSerializer.create(jacksonObjectMapper())) @@ -61,7 +65,7 @@ class CouchbaseRepositorySpec : StringSpec() { "should save object in cache" { //GIVEN val repository = TypedCouchbaseRepository(collection, DTO::class.java, SimpleMeterRegistry()) - val dto = DTO("key", "body") + val dto = DTO("partialId", "partialBody") //WHEN repository.put(dto.id, dto).block() @@ -77,7 +81,7 @@ class CouchbaseRepositorySpec : StringSpec() { "should save object in cache using hashed key when provided ts longer than 250 characters" { //GIVEN val repository = TypedCouchbaseRepository(collection, DTO::class.java, SimpleMeterRegistry()) - val dto = DTO("id".repeat(250), "body") + val dto = DTO("id".repeat(250), "partialBody") //WHEN repository.put(dto.id, dto).block() @@ -102,7 +106,7 @@ class CouchbaseRepositorySpec : StringSpec() { val writeActualCount = writeTimer.count() val readActualCount = readTimer.count() - val dto = DTO("key", "body") + val dto = DTO("somePartial", "partialBody") //WHEN repository.put(dto.id, dto).block() @@ -129,7 +133,7 @@ class CouchbaseRepositorySpec : StringSpec() { val readActualCount = readErrors.count() //WHEN - repository.put("id", DTO("key", "body")).onErrorResume { _ -> Mono.empty() }.block() + repository.put("id", DTO("somePartial", "partialBody")).onErrorResume { _ -> Mono.empty() }.block() repository.get("id").onErrorResume { _ -> Mono.empty() }.block() //THEN @@ -144,7 +148,7 @@ class CouchbaseRepositorySpec : StringSpec() { val writeDictionaryTimer = meterRegistry.timer("cache.couchbase.set.write.dictionary") val mutateDictionaryTimer = meterRegistry.timer("cache.couchbase.set.mutate.dictionary") val writeTimer = meterRegistry.timer("cache.couchbase.set.write") - val repository = CouchbaseSetRepository(collection, DTO::class.java, meterRegistry) + val repository = CouchbaseSetRepository(collection, DTO::class.java, dictionaryTtl, meterRegistry) val longKey = "dtos1".repeat(250) //WHEN @@ -152,6 +156,7 @@ class CouchbaseRepositorySpec : StringSpec() { val dtos = (1..30).map { DTO("id_$it", "value_$it") }.toSet() repository.add(longKey, dtos, ttl = ttl).block() val expiryTime = Instant.now().plus(ttl) + val dictionaryExpiry = Instant.now().plus(dictionaryTtl) //EXPECT writeItemTimer.count() shouldBe 30 @@ -174,9 +179,11 @@ class CouchbaseRepositorySpec : StringSpec() { mutateDictionaryTimer.count() shouldBe 4 writeTimer.count() shouldBe 2 - val raw = reactiveCollection.get("prefix_$longKey".sha256()).block()!! - val rawObject = raw.contentAs(Map::class.java) val getOptions = GetOptions.getOptions().withExpiry(true) + val raw = reactiveCollection.get("prefix_$longKey".sha256(), getOptions).block()!! + raw.expiryTime().get() shouldBeAfter dictionaryExpiry.minusSeconds(2) + raw.expiryTime().get() shouldBeBefore dictionaryExpiry.plusSeconds(2) + val rawObject = raw.contentAs(Map::class.java) val setItems = rawObject.keys .map { it as String } .map { reactiveCollection.get("prefix_${longKey}_$it".sha256(), getOptions).block() } @@ -187,7 +194,7 @@ class CouchbaseRepositorySpec : StringSpec() { "should remove evicted values from dictionary" { //GIVEN - val repository = CouchbaseSetRepository(collection, DTO::class.java, SimpleMeterRegistry()) + val repository = CouchbaseSetRepository(collection, DTO::class.java, dictionaryTtl, SimpleMeterRegistry()) repository.add("dtos2", DTO("id_1", "value_1"), ttl = Duration.ofSeconds(20)).block() repository.add("dtos2", DTO("id_2", "value_2"), ttl = Duration.ofSeconds(10)).block() @@ -215,7 +222,7 @@ class CouchbaseRepositorySpec : StringSpec() { "should remove more than 16 items from set" { //GIVEN - val repository = CouchbaseSetRepository(collection, DTO::class.java, SimpleMeterRegistry()) + val repository = CouchbaseSetRepository(collection, DTO::class.java, dictionaryTtl, SimpleMeterRegistry()) val ttl = Duration.ofSeconds(60) val dtos = (1..30).map { DTO("id_$it", "value_$it") }.toSet() repository.add("dtosA", dtos, ttl = ttl).block() @@ -235,7 +242,7 @@ class CouchbaseRepositorySpec : StringSpec() { "should remove same value from single set only" { //GIVEN - val repository = CouchbaseSetRepository(collection, DTO::class.java, SimpleMeterRegistry()) + val repository = CouchbaseSetRepository(collection, DTO::class.java, dictionaryTtl, SimpleMeterRegistry()) val ttl = Duration.ofSeconds(60) val dtos = (1..10).map { DTO("id_$it", "value_$it") }.toSet() diff --git a/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/CouchbaseCollection.kt b/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/CouchbaseCollection.kt index bba1901..23ca78f 100644 --- a/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/CouchbaseCollection.kt +++ b/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/CouchbaseCollection.kt @@ -1,6 +1,8 @@ package pl.allegro.tech.couchbasecommons import com.couchbase.client.java.kv.GetResult +import com.couchbase.client.java.kv.InsertOptions +import com.couchbase.client.java.kv.MutateInOptions import com.couchbase.client.java.kv.MutateInResult import com.couchbase.client.java.kv.MutateInSpec import com.couchbase.client.java.kv.MutationResult @@ -10,8 +12,8 @@ import reactor.core.publisher.Mono interface CouchbaseCollection { fun get(key: String): Mono fun upsert(key: String, value: Any): Mono - fun upsert(key: String, value: Any, options: UpsertOptions): Mono - fun mutateIn(key: String, specs: List): Mono - fun insert(key: String, value: Any): Mono + fun upsert(key: String, value: Any, options: UpsertOptions = UpsertOptions.upsertOptions()): Mono + fun mutateIn(key: String, specs: List, options: MutateInOptions = MutateInOptions.mutateInOptions()): Mono + fun insert(key: String, value: Any, options: InsertOptions = InsertOptions.insertOptions()): Mono fun remove(key: String): Mono } diff --git a/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/CouchbaseRepository.kt b/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/CouchbaseRepository.kt index 381da1f..aca13ba 100644 --- a/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/CouchbaseRepository.kt +++ b/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/CouchbaseRepository.kt @@ -4,6 +4,8 @@ import com.couchbase.client.core.error.DocumentNotFoundException import com.couchbase.client.java.codec.TypeRef import com.couchbase.client.java.json.JsonObject import com.couchbase.client.java.kv.GetResult +import com.couchbase.client.java.kv.InsertOptions +import com.couchbase.client.java.kv.MutateInOptions import com.couchbase.client.java.kv.MutateInSpec import com.couchbase.client.java.kv.MutationResult import com.couchbase.client.java.kv.UpsertOptions @@ -56,7 +58,8 @@ private const val MAX_MUTATE_IN_OPERATIONS = 16 class CouchbaseSetRepository( private val collection: CouchbaseCollection, private val type: Class, - private val meterRegistry: MeterRegistry + private val setDictionaryTtl: Duration, + private val meterRegistry: MeterRegistry, ) { fun add(key: String, value: T, ttl: Duration): Mono = add(key, setOf(value), ttl) @@ -83,12 +86,14 @@ class CouchbaseSetRepository( private fun insertKeysToIndex(collectionKey: String, keys: List): Mono { return collection.mutateIn( collectionKey, - keys.map { singleKey -> MutateInSpec.upsert(singleKey, 1).createPath() }) + keys.map { singleKey -> MutateInSpec.upsert(singleKey, 1).createPath() }, + dictionaryMutateInOptions() + ) .measure(meterRegistry, "set.mutate.dictionary") .cast(MutationResult::class.java) .onErrorResume { throwable -> if (throwable is DocumentNotFoundException) - collection.insert(collectionKey, keys.map { it to 1 }.toMap()) + collection.insert(collectionKey, keys.map { it to 1 }.toMap(), dictionaryInsertOptions()) .measure(meterRegistry, "set.write.dictionary") else Mono.error(throwable) @@ -139,6 +144,11 @@ class CouchbaseSetRepository( collection.mutateIn(key, it) }.collectList() + private fun dictionaryMutateInOptions() + = MutateInOptions.mutateInOptions().expiry(setDictionaryTtl) + + private fun dictionaryInsertOptions() + = InsertOptions.insertOptions().expiry(setDictionaryTtl) } private class StringKeyedMapTypeRef(valueType: Class) : TypeRef>() { diff --git a/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/KeyPrefixingCollection.kt b/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/KeyPrefixingCollection.kt index 7c19381..6da2e5e 100644 --- a/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/KeyPrefixingCollection.kt +++ b/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/KeyPrefixingCollection.kt @@ -2,6 +2,8 @@ package pl.allegro.tech.couchbasecommons import com.couchbase.client.core.error.DocumentNotFoundException import com.couchbase.client.java.kv.GetResult +import com.couchbase.client.java.kv.InsertOptions +import com.couchbase.client.java.kv.MutateInOptions import com.couchbase.client.java.kv.MutateInResult import com.couchbase.client.java.kv.MutateInSpec import com.couchbase.client.java.kv.MutationResult @@ -28,11 +30,11 @@ class KeyPrefixingCollection( override fun upsert(key: String, value: Any, options: UpsertOptions): Mono = collection.upsert(prefix(key), value, options) - override fun mutateIn(key: String, specs: List): Mono - = collection.mutateIn(prefix(key), specs) + override fun mutateIn(key: String, specs: List, options: MutateInOptions): Mono + = collection.mutateIn(prefix(key), specs, options) - override fun insert(key: String, value: Any): Mono - = collection.insert(prefix(key), value) + override fun insert(key: String, value: Any, options: InsertOptions): Mono + = collection.insert(prefix(key), value, options) override fun remove(key: String): Mono = collection.remove(prefix(key)) diff --git a/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/PublishOnSchedulerCouchbaseCollection.kt b/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/PublishOnSchedulerCouchbaseCollection.kt index 9684310..2836e2c 100644 --- a/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/PublishOnSchedulerCouchbaseCollection.kt +++ b/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/PublishOnSchedulerCouchbaseCollection.kt @@ -1,6 +1,8 @@ package pl.allegro.tech.couchbasecommons import com.couchbase.client.java.kv.GetResult +import com.couchbase.client.java.kv.InsertOptions +import com.couchbase.client.java.kv.MutateInOptions import com.couchbase.client.java.kv.MutateInResult import com.couchbase.client.java.kv.MutateInSpec import com.couchbase.client.java.kv.MutationResult @@ -21,11 +23,11 @@ class PublishOnSchedulerCouchbaseCollection( override fun upsert(key: String, value: Any, options: UpsertOptions): Mono = collection.upsert(key, value, options).publishOn(scheduler) - override fun mutateIn(key: String, specs: List): Mono - = collection.mutateIn(key, specs).publishOn(scheduler) + override fun mutateIn(key: String, specs: List, options: MutateInOptions): Mono + = collection.mutateIn(key, specs, options).publishOn(scheduler) - override fun insert(key: String, value: Any): Mono - = collection.insert(key, value).publishOn(scheduler) + override fun insert(key: String, value: Any, options: InsertOptions): Mono + = collection.insert(key, value, options).publishOn(scheduler) override fun remove(key: String): Mono = collection.remove(key).publishOn(scheduler) diff --git a/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/ReactiveCouchbaseCollection.kt b/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/ReactiveCouchbaseCollection.kt index 7514f3e..029e3cc 100644 --- a/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/ReactiveCouchbaseCollection.kt +++ b/couchbase/src/main/kotlin/pl/allegro/tech/couchbasecommons/ReactiveCouchbaseCollection.kt @@ -2,6 +2,8 @@ package pl.allegro.tech.couchbasecommons import com.couchbase.client.java.ReactiveCollection import com.couchbase.client.java.kv.GetResult +import com.couchbase.client.java.kv.InsertOptions +import com.couchbase.client.java.kv.MutateInOptions import com.couchbase.client.java.kv.MutateInResult import com.couchbase.client.java.kv.MutateInSpec import com.couchbase.client.java.kv.MutationResult @@ -22,11 +24,11 @@ class ReactiveCouchbaseCollection( override fun upsert(key: String, value: Any, options: UpsertOptions): Mono = collection.upsert(prepareKey(key), value, options) - override fun mutateIn(key: String, specs: List): Mono - = collection.mutateIn(prepareKey(key), specs) + override fun mutateIn(key: String, specs: List, options: MutateInOptions): Mono + = collection.mutateIn(prepareKey(key), specs, options) - override fun insert(key: String, value: Any): Mono - = collection.insert(prepareKey(key), value) + override fun insert(key: String, value: Any, options: InsertOptions): Mono + = collection.insert(prepareKey(key), value, options) override fun remove(key: String): Mono = collection.remove(prepareKey(key))