Skip to content

Commit

Permalink
add support for async reindex
Browse files Browse the repository at this point in the history
  • Loading branch information
barbulescu committed Jan 12, 2024
1 parent 67f9343 commit 53231a4
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.jillesvangurp.ktsearch

import com.jillesvangurp.searchdsls.SearchEngineVariant.ES7
import com.jillesvangurp.searchdsls.SearchEngineVariant.ES8
import com.jillesvangurp.searchdsls.VariantRestriction
import com.jillesvangurp.searchdsls.querydsl.ReindexDSL
Expand Down Expand Up @@ -30,15 +31,65 @@ data class ReindexResponse(
val requestsPerSecond: Double,
@SerialName("throttled_until_millis")
val throttledUntilMillis: Int,
val failures: List<String>
val failures: List<String>,
)

@Serializable
data class ReindexRetries(val bulk: Int, val search: Int)

@VariantRestriction(ES8)
@VariantRestriction(ES7, ES8)
@ExperimentalFeature
suspend fun SearchClient.reindex(
refresh: Boolean? = null,
timeout: Duration? = null,
waitForActiveShards: String? = null,
requestsPerSecond: Int? = null,
requireAlias: Boolean? = null,
scroll: Duration? = null,
slices: Int? = null,
maxDocs: Int? = null,
block: ReindexDSL.() -> Unit,
): ReindexResponse = reindexGeneric(
refresh,
timeout,
waitForActiveShards,
true,
requestsPerSecond,
requireAlias,
scroll,
slices,
maxDocs,
block
).parse(ReindexResponse.serializer())


@VariantRestriction(ES7, ES8)
@ExperimentalFeature
suspend fun SearchClient.reindexAsync(
refresh: Boolean? = null,
timeout: Duration? = null,
waitForActiveShards: String? = null,
requestsPerSecond: Int? = null,
requireAlias: Boolean? = null,
scroll: Duration? = null,
slices: Int? = null,
maxDocs: Int? = null,
block: ReindexDSL.() -> Unit,
): TaskId = reindexGeneric(
refresh,
timeout,
waitForActiveShards,
false,
requestsPerSecond,
requireAlias,
scroll,
slices,
maxDocs,
block
).parse(TaskResponse.serializer()).toTaskId()


private suspend fun SearchClient.reindexGeneric(
refresh: Boolean? = null,
timeout: Duration? = null,
waitForActiveShards: String? = null,
Expand All @@ -48,8 +99,8 @@ suspend fun SearchClient.reindex(
scroll: Duration? = null,
slices: Int? = null,
maxDocs: Int? = null,
block: ReindexDSL.() -> Unit
): ReindexResponse {
block: ReindexDSL.() -> Unit,
): Result<RestResponse.Status2XX> {
val reindexDSL = ReindexDSL()
block(reindexDSL)

Expand All @@ -65,10 +116,17 @@ suspend fun SearchClient.reindex(
parameter("slices", slices)
parameter("max_docs", maxDocs)
body = reindexDSL.toString()
}.parse(ReindexResponse.serializer())
}
}

@RequiresOptIn(level = WARNING, message = "This API is experimental. It can be incompatibly changed in the future.")
@Retention(BINARY)
@Target(FUNCTION)
annotation class ExperimentalFeature
annotation class ExperimentalFeature

@Serializable
data class TaskResponse(val task: String) {
fun toTaskId() = TaskId(task)
}

data class TaskId(val value: String)
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.jillesvangurp.searchdsls.querydsl.ReindexVersionType.EXTERNAL
import com.jillesvangurp.searchdsls.querydsl.term
import io.kotest.matchers.shouldBe
import kotlinx.coroutines.test.runTest
import kotlinx.serialization.json.jsonObject
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
Expand Down Expand Up @@ -51,7 +52,6 @@ class ReindexTest : SearchTestBase() {
refresh = false,
timeout = 10.seconds,
waitForActiveShards = "1",
waitForCompletion = true,
requestsPerSecond = 10,
requireAlias = false,
scroll = 10.seconds,
Expand Down Expand Up @@ -93,6 +93,26 @@ class ReindexTest : SearchTestBase() {
response.shouldHave(total = 1, created = 1, batches = 1)
}

@Test
fun asyncReindex() = runTest {
client.indexDocument(sourceName, TestDocument(name = "t1"), refresh = WaitFor)

val taskId = client.reindexAsync {
source {
index = sourceName
}
destination {
index = destinationName
}
}

val taskResponse = client.getTask(taskId.value, waitForCompletion = true)
val jsonResponse = requireNotNull(taskResponse["response"]?.jsonObject) { "response element is missing on $taskResponse"}
val response = jsonResponse.parse(ReindexResponse.serializer())

response.shouldHave(total = 1, created = 1, batches = 1)
}


private suspend fun deleteIndices() {
client.deleteIndex(sourceName)
Expand Down

0 comments on commit 53231a4

Please sign in to comment.