Skip to content

Commit

Permalink
Merge pull request #9 from allegro/ttl
Browse files Browse the repository at this point in the history
Ttl for set dictionaries
  • Loading branch information
krzysiekbielicki authored Oct 18, 2022
2 parents 3d9ef01 + b8bb20c commit 5d139d3
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 33 deletions.
5 changes: 3 additions & 2 deletions couchbase/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
plugins {
id("com.coditory.integration-test") version "1.4.4"
`idea`
}

dependencies {
Expand All @@ -9,10 +10,10 @@ dependencies {
implementation("io.micrometer:micrometer-core:1.9.5")
api(group = "com.couchbase.client", name = "java-client", version = "3.2.5")
api(group = "com.couchbase.client", name = "metrics-micrometer", version = "0.1.0")
api(group = "com.fasterxml.jackson.module", name = "jackson-module-kotlin", version = "2.13.1")
api(group = "com.fasterxml.jackson.module", name = "jackson-module-kotlin", version = "2.13.4")

testImplementation(group = "io.kotest", name = "kotest-runner-junit5", version = "5.1.0")
testImplementation(group = "io.kotest.extensions", name = "kotest-extensions-testcontainers", version = "1.2.1")
testImplementation(group = "io.kotest.extensions", name = "kotest-extensions-testcontainers", version = "1.3.4")
testImplementation(group = "org.testcontainers", name = "couchbase", version = "1.17.5")
testImplementation(group = "org.mockito.kotlin", name = "mockito-kotlin", version = "4.0.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@ import io.kotest.matchers.collections.shouldContainExactly
import io.kotest.matchers.collections.shouldHaveLowerBound
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.collections.shouldHaveUpperBound
import io.kotest.matchers.date.shouldBeAfter
import io.kotest.matchers.date.shouldBeBefore
import io.kotest.matchers.shouldBe
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.mockito.kotlin.any
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import org.testcontainers.couchbase.BucketDefinition
import org.testcontainers.couchbase.CouchbaseContainer
import reactor.core.publisher.Mono
import java.time.Duration
import java.time.Instant
import org.testcontainers.couchbase.BucketDefinition
import org.testcontainers.couchbase.CouchbaseContainer

class CouchbaseRepositorySpec : StringSpec() {

private lateinit var reactiveCollection: ReactiveCollection
private lateinit var collection: CouchbaseCollection
private val prefix = "prefix"
private val dictionaryTtl = Duration.ofSeconds(100)

override fun beforeSpec(spec: Spec) {
val bucketName = "mybucket"
Expand All @@ -45,7 +48,8 @@ class CouchbaseRepositorySpec : StringSpec() {

val cluster = ReactiveCluster.connect(
container.connectionString,
ClusterOptions.clusterOptions(container.username, container.password)
ClusterOptions
.clusterOptions(container.username, container.password)
.environment(
ClusterEnvironment.builder()
.jsonSerializer(JacksonJsonSerializer.create(jacksonObjectMapper()))
Expand All @@ -61,7 +65,7 @@ class CouchbaseRepositorySpec : StringSpec() {
"should save object in cache" {
//GIVEN
val repository = TypedCouchbaseRepository(collection, DTO::class.java, SimpleMeterRegistry())
val dto = DTO("key", "body")
val dto = DTO("partialId", "partialBody")

//WHEN
repository.put(dto.id, dto).block()
Expand All @@ -77,7 +81,7 @@ class CouchbaseRepositorySpec : StringSpec() {
"should save object in cache using hashed key when provided ts longer than 250 characters" {
//GIVEN
val repository = TypedCouchbaseRepository(collection, DTO::class.java, SimpleMeterRegistry())
val dto = DTO("id".repeat(250), "body")
val dto = DTO("id".repeat(250), "partialBody")

//WHEN
repository.put(dto.id, dto).block()
Expand All @@ -102,7 +106,7 @@ class CouchbaseRepositorySpec : StringSpec() {
val writeActualCount = writeTimer.count()
val readActualCount = readTimer.count()

val dto = DTO("key", "body")
val dto = DTO("somePartial", "partialBody")

//WHEN
repository.put(dto.id, dto).block()
Expand All @@ -129,7 +133,7 @@ class CouchbaseRepositorySpec : StringSpec() {
val readActualCount = readErrors.count()

//WHEN
repository.put("id", DTO("key", "body")).onErrorResume { _ -> Mono.empty() }.block()
repository.put("id", DTO("somePartial", "partialBody")).onErrorResume { _ -> Mono.empty() }.block()
repository.get("id").onErrorResume { _ -> Mono.empty() }.block()

//THEN
Expand All @@ -144,14 +148,15 @@ class CouchbaseRepositorySpec : StringSpec() {
val writeDictionaryTimer = meterRegistry.timer("cache.couchbase.set.write.dictionary")
val mutateDictionaryTimer = meterRegistry.timer("cache.couchbase.set.mutate.dictionary")
val writeTimer = meterRegistry.timer("cache.couchbase.set.write")
val repository = CouchbaseSetRepository(collection, DTO::class.java, meterRegistry)
val repository = CouchbaseSetRepository(collection, DTO::class.java, dictionaryTtl, meterRegistry)
val longKey = "dtos1".repeat(250)

//WHEN
val ttl = Duration.ofSeconds(60)
val dtos = (1..30).map { DTO("id_$it", "value_$it") }.toSet()
repository.add(longKey, dtos, ttl = ttl).block()
val expiryTime = Instant.now().plus(ttl)
val dictionaryExpiry = Instant.now().plus(dictionaryTtl)

//EXPECT
writeItemTimer.count() shouldBe 30
Expand All @@ -174,9 +179,11 @@ class CouchbaseRepositorySpec : StringSpec() {
mutateDictionaryTimer.count() shouldBe 4
writeTimer.count() shouldBe 2

val raw = reactiveCollection.get("prefix_$longKey".sha256()).block()!!
val rawObject = raw.contentAs(Map::class.java)
val getOptions = GetOptions.getOptions().withExpiry(true)
val raw = reactiveCollection.get("prefix_$longKey".sha256(), getOptions).block()!!
raw.expiryTime().get() shouldBeAfter dictionaryExpiry.minusSeconds(2)
raw.expiryTime().get() shouldBeBefore dictionaryExpiry.plusSeconds(2)
val rawObject = raw.contentAs(Map::class.java)
val setItems = rawObject.keys
.map { it as String }
.map { reactiveCollection.get("prefix_${longKey}_$it".sha256(), getOptions).block() }
Expand All @@ -187,7 +194,7 @@ class CouchbaseRepositorySpec : StringSpec() {

"should remove evicted values from dictionary" {
//GIVEN
val repository = CouchbaseSetRepository(collection, DTO::class.java, SimpleMeterRegistry())
val repository = CouchbaseSetRepository(collection, DTO::class.java, dictionaryTtl, SimpleMeterRegistry())

repository.add("dtos2", DTO("id_1", "value_1"), ttl = Duration.ofSeconds(20)).block()
repository.add("dtos2", DTO("id_2", "value_2"), ttl = Duration.ofSeconds(10)).block()
Expand Down Expand Up @@ -215,7 +222,7 @@ class CouchbaseRepositorySpec : StringSpec() {

"should remove more than 16 items from set" {
//GIVEN
val repository = CouchbaseSetRepository(collection, DTO::class.java, SimpleMeterRegistry())
val repository = CouchbaseSetRepository(collection, DTO::class.java, dictionaryTtl, SimpleMeterRegistry())
val ttl = Duration.ofSeconds(60)
val dtos = (1..30).map { DTO("id_$it", "value_$it") }.toSet()
repository.add("dtosA", dtos, ttl = ttl).block()
Expand All @@ -235,7 +242,7 @@ class CouchbaseRepositorySpec : StringSpec() {

"should remove same value from single set only" {
//GIVEN
val repository = CouchbaseSetRepository(collection, DTO::class.java, SimpleMeterRegistry())
val repository = CouchbaseSetRepository(collection, DTO::class.java, dictionaryTtl, SimpleMeterRegistry())

val ttl = Duration.ofSeconds(60)
val dtos = (1..10).map { DTO("id_$it", "value_$it") }.toSet()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pl.allegro.tech.couchbasecommons

import com.couchbase.client.java.kv.GetResult
import com.couchbase.client.java.kv.InsertOptions
import com.couchbase.client.java.kv.MutateInOptions
import com.couchbase.client.java.kv.MutateInResult
import com.couchbase.client.java.kv.MutateInSpec
import com.couchbase.client.java.kv.MutationResult
Expand All @@ -10,8 +12,8 @@ import reactor.core.publisher.Mono
interface CouchbaseCollection {
fun get(key: String): Mono<GetResult>
fun upsert(key: String, value: Any): Mono<MutationResult>
fun upsert(key: String, value: Any, options: UpsertOptions): Mono<MutationResult>
fun mutateIn(key: String, specs: List<MutateInSpec>): Mono<MutateInResult>
fun insert(key: String, value: Any): Mono<MutationResult>
fun upsert(key: String, value: Any, options: UpsertOptions = UpsertOptions.upsertOptions()): Mono<MutationResult>
fun mutateIn(key: String, specs: List<MutateInSpec>, options: MutateInOptions = MutateInOptions.mutateInOptions()): Mono<MutateInResult>
fun insert(key: String, value: Any, options: InsertOptions = InsertOptions.insertOptions()): Mono<MutationResult>
fun remove(key: String): Mono<MutationResult>
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import com.couchbase.client.core.error.DocumentNotFoundException
import com.couchbase.client.java.codec.TypeRef
import com.couchbase.client.java.json.JsonObject
import com.couchbase.client.java.kv.GetResult
import com.couchbase.client.java.kv.InsertOptions
import com.couchbase.client.java.kv.MutateInOptions
import com.couchbase.client.java.kv.MutateInSpec
import com.couchbase.client.java.kv.MutationResult
import com.couchbase.client.java.kv.UpsertOptions
Expand Down Expand Up @@ -56,7 +58,8 @@ private const val MAX_MUTATE_IN_OPERATIONS = 16
class CouchbaseSetRepository<T: CouchbaseSetEntry>(
private val collection: CouchbaseCollection,
private val type: Class<T>,
private val meterRegistry: MeterRegistry
private val setDictionaryTtl: Duration,
private val meterRegistry: MeterRegistry,
) {

fun add(key: String, value: T, ttl: Duration): Mono<Unit> = add(key, setOf(value), ttl)
Expand All @@ -83,12 +86,14 @@ class CouchbaseSetRepository<T: CouchbaseSetEntry>(
private fun insertKeysToIndex(collectionKey: String, keys: List<String>): Mono<MutationResult> {
return collection.mutateIn(
collectionKey,
keys.map { singleKey -> MutateInSpec.upsert(singleKey, 1).createPath() })
keys.map { singleKey -> MutateInSpec.upsert(singleKey, 1).createPath() },
dictionaryMutateInOptions()
)
.measure(meterRegistry, "set.mutate.dictionary")
.cast(MutationResult::class.java)
.onErrorResume { throwable ->
if (throwable is DocumentNotFoundException)
collection.insert(collectionKey, keys.map { it to 1 }.toMap())
collection.insert(collectionKey, keys.map { it to 1 }.toMap(), dictionaryInsertOptions())
.measure(meterRegistry, "set.write.dictionary")
else
Mono.error(throwable)
Expand Down Expand Up @@ -139,6 +144,11 @@ class CouchbaseSetRepository<T: CouchbaseSetEntry>(
collection.mutateIn(key, it)
}.collectList()

private fun dictionaryMutateInOptions()
= MutateInOptions.mutateInOptions().expiry(setDictionaryTtl)

private fun dictionaryInsertOptions()
= InsertOptions.insertOptions().expiry(setDictionaryTtl)
}

private class StringKeyedMapTypeRef<T>(valueType: Class<T>) : TypeRef<Map<String, T>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package pl.allegro.tech.couchbasecommons

import com.couchbase.client.core.error.DocumentNotFoundException
import com.couchbase.client.java.kv.GetResult
import com.couchbase.client.java.kv.InsertOptions
import com.couchbase.client.java.kv.MutateInOptions
import com.couchbase.client.java.kv.MutateInResult
import com.couchbase.client.java.kv.MutateInSpec
import com.couchbase.client.java.kv.MutationResult
Expand All @@ -28,11 +30,11 @@ class KeyPrefixingCollection(
override fun upsert(key: String, value: Any, options: UpsertOptions): Mono<MutationResult>
= collection.upsert(prefix(key), value, options)

override fun mutateIn(key: String, specs: List<MutateInSpec>): Mono<MutateInResult>
= collection.mutateIn(prefix(key), specs)
override fun mutateIn(key: String, specs: List<MutateInSpec>, options: MutateInOptions): Mono<MutateInResult>
= collection.mutateIn(prefix(key), specs, options)

override fun insert(key: String, value: Any): Mono<MutationResult>
= collection.insert(prefix(key), value)
override fun insert(key: String, value: Any, options: InsertOptions): Mono<MutationResult>
= collection.insert(prefix(key), value, options)

override fun remove(key: String): Mono<MutationResult>
= collection.remove(prefix(key))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pl.allegro.tech.couchbasecommons

import com.couchbase.client.java.kv.GetResult
import com.couchbase.client.java.kv.InsertOptions
import com.couchbase.client.java.kv.MutateInOptions
import com.couchbase.client.java.kv.MutateInResult
import com.couchbase.client.java.kv.MutateInSpec
import com.couchbase.client.java.kv.MutationResult
Expand All @@ -21,11 +23,11 @@ class PublishOnSchedulerCouchbaseCollection(
override fun upsert(key: String, value: Any, options: UpsertOptions): Mono<MutationResult>
= collection.upsert(key, value, options).publishOn(scheduler)

override fun mutateIn(key: String, specs: List<MutateInSpec>): Mono<MutateInResult>
= collection.mutateIn(key, specs).publishOn(scheduler)
override fun mutateIn(key: String, specs: List<MutateInSpec>, options: MutateInOptions): Mono<MutateInResult>
= collection.mutateIn(key, specs, options).publishOn(scheduler)

override fun insert(key: String, value: Any): Mono<MutationResult>
= collection.insert(key, value).publishOn(scheduler)
override fun insert(key: String, value: Any, options: InsertOptions): Mono<MutationResult>
= collection.insert(key, value, options).publishOn(scheduler)

override fun remove(key: String): Mono<MutationResult>
= collection.remove(key).publishOn(scheduler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package pl.allegro.tech.couchbasecommons

import com.couchbase.client.java.ReactiveCollection
import com.couchbase.client.java.kv.GetResult
import com.couchbase.client.java.kv.InsertOptions
import com.couchbase.client.java.kv.MutateInOptions
import com.couchbase.client.java.kv.MutateInResult
import com.couchbase.client.java.kv.MutateInSpec
import com.couchbase.client.java.kv.MutationResult
Expand All @@ -22,11 +24,11 @@ class ReactiveCouchbaseCollection(
override fun upsert(key: String, value: Any, options: UpsertOptions): Mono<MutationResult>
= collection.upsert(prepareKey(key), value, options)

override fun mutateIn(key: String, specs: List<MutateInSpec>): Mono<MutateInResult>
= collection.mutateIn(prepareKey(key), specs)
override fun mutateIn(key: String, specs: List<MutateInSpec>, options: MutateInOptions): Mono<MutateInResult>
= collection.mutateIn(prepareKey(key), specs, options)

override fun insert(key: String, value: Any): Mono<MutationResult>
= collection.insert(prepareKey(key), value)
override fun insert(key: String, value: Any, options: InsertOptions): Mono<MutationResult>
= collection.insert(prepareKey(key), value, options)

override fun remove(key: String): Mono<MutationResult>
= collection.remove(prepareKey(key))
Expand Down

0 comments on commit 5d139d3

Please sign in to comment.