diff --git a/search-client/src/commonMain/kotlin/com/jillesvangurp/ktsearch/reindex-api.kt b/search-client/src/commonMain/kotlin/com/jillesvangurp/ktsearch/reindex-api.kt index ee30b6e4..68ca69c1 100644 --- a/search-client/src/commonMain/kotlin/com/jillesvangurp/ktsearch/reindex-api.kt +++ b/search-client/src/commonMain/kotlin/com/jillesvangurp/ktsearch/reindex-api.kt @@ -1,5 +1,6 @@ package com.jillesvangurp.ktsearch +import com.jillesvangurp.searchdsls.SearchEngineVariant.ES7 import com.jillesvangurp.searchdsls.SearchEngineVariant.ES8 import com.jillesvangurp.searchdsls.VariantRestriction import com.jillesvangurp.searchdsls.querydsl.ReindexDSL @@ -30,15 +31,65 @@ data class ReindexResponse( val requestsPerSecond: Double, @SerialName("throttled_until_millis") val throttledUntilMillis: Int, - val failures: List + val failures: List, ) @Serializable data class ReindexRetries(val bulk: Int, val search: Int) -@VariantRestriction(ES8) +@VariantRestriction(ES7, ES8) @ExperimentalFeature suspend fun SearchClient.reindex( + refresh: Boolean? = null, + timeout: Duration? = null, + waitForActiveShards: String? = null, + requestsPerSecond: Int? = null, + requireAlias: Boolean? = null, + scroll: Duration? = null, + slices: Int? = null, + maxDocs: Int? = null, + block: ReindexDSL.() -> Unit, +): ReindexResponse = reindexGeneric( + refresh, + timeout, + waitForActiveShards, + true, + requestsPerSecond, + requireAlias, + scroll, + slices, + maxDocs, + block +).parse(ReindexResponse.serializer()) + + +@VariantRestriction(ES7, ES8) +@ExperimentalFeature +suspend fun SearchClient.reindexAsync( + refresh: Boolean? = null, + timeout: Duration? = null, + waitForActiveShards: String? = null, + requestsPerSecond: Int? = null, + requireAlias: Boolean? = null, + scroll: Duration? = null, + slices: Int? = null, + maxDocs: Int? = null, + block: ReindexDSL.() -> Unit, +): TaskId = reindexGeneric( + refresh, + timeout, + waitForActiveShards, + false, + requestsPerSecond, + requireAlias, + scroll, + slices, + maxDocs, + block +).parse(TaskResponse.serializer()).toTaskId() + + +private suspend fun SearchClient.reindexGeneric( refresh: Boolean? = null, timeout: Duration? = null, waitForActiveShards: String? = null, @@ -48,8 +99,8 @@ suspend fun SearchClient.reindex( scroll: Duration? = null, slices: Int? = null, maxDocs: Int? = null, - block: ReindexDSL.() -> Unit -): ReindexResponse { + block: ReindexDSL.() -> Unit, +): Result { val reindexDSL = ReindexDSL() block(reindexDSL) @@ -65,10 +116,17 @@ suspend fun SearchClient.reindex( parameter("slices", slices) parameter("max_docs", maxDocs) body = reindexDSL.toString() - }.parse(ReindexResponse.serializer()) + } } @RequiresOptIn(level = WARNING, message = "This API is experimental. It can be incompatibly changed in the future.") @Retention(BINARY) @Target(FUNCTION) -annotation class ExperimentalFeature \ No newline at end of file +annotation class ExperimentalFeature + +@Serializable +data class TaskResponse(val task: String) { + fun toTaskId() = TaskId(task) +} + +data class TaskId(val value: String) \ No newline at end of file diff --git a/search-client/src/commonTest/kotlin/com/jillesvangurp/ktsearch/ReindexTest.kt b/search-client/src/commonTest/kotlin/com/jillesvangurp/ktsearch/ReindexTest.kt index d3b7123b..091af8f5 100644 --- a/search-client/src/commonTest/kotlin/com/jillesvangurp/ktsearch/ReindexTest.kt +++ b/search-client/src/commonTest/kotlin/com/jillesvangurp/ktsearch/ReindexTest.kt @@ -7,6 +7,7 @@ import com.jillesvangurp.searchdsls.querydsl.ReindexVersionType.EXTERNAL import com.jillesvangurp.searchdsls.querydsl.term import io.kotest.matchers.shouldBe import kotlinx.coroutines.test.runTest +import kotlinx.serialization.json.jsonObject import kotlin.test.AfterTest import kotlin.test.BeforeTest import kotlin.test.Test @@ -51,7 +52,6 @@ class ReindexTest : SearchTestBase() { refresh = false, timeout = 10.seconds, waitForActiveShards = "1", - waitForCompletion = true, requestsPerSecond = 10, requireAlias = false, scroll = 10.seconds, @@ -93,6 +93,26 @@ class ReindexTest : SearchTestBase() { response.shouldHave(total = 1, created = 1, batches = 1) } + @Test + fun asyncReindex() = runTest { + client.indexDocument(sourceName, TestDocument(name = "t1"), refresh = WaitFor) + + val taskId = client.reindexAsync { + source { + index = sourceName + } + destination { + index = destinationName + } + } + + val taskResponse = client.getTask(taskId.value, waitForCompletion = true) + val jsonResponse = requireNotNull(taskResponse["response"]?.jsonObject) { "response element is missing on $taskResponse"} + val response = jsonResponse.parse(ReindexResponse.serializer()) + + response.shouldHave(total = 1, created = 1, batches = 1) + } + private suspend fun deleteIndices() { client.deleteIndex(sourceName)