Skip to content

Commit

Permalink
[UNOMI-853] Adapt migration job to use asynchronous mode avoiding tim…
Browse files Browse the repository at this point in the history
…eout and connection lost (#697)

* UNOMI-853: Adapt migration to handle long time reindex tasks with asynchronous mode.

* UNOMI-853: Fix small typo and update documentation pointing to broken karaf link

* UNOMI-853: Finally log a task waiting info every 15s.
  • Loading branch information
jayblanc authored Oct 9, 2024
1 parent 9323346 commit 315c4bf
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
2 changes: 1 addition & 1 deletion manual/src/main/asciidoc/configuration.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ highly recommended that you design your client applications to use the HTTPS por
The user accounts to access the REST API are actually routed through Karaf's JAAS support, which you may find the
documentation for here :

* http://karaf.apache.org/manual/latest/users-guide/security.html[http://karaf.apache.org/manual/latest/users-guide/security.html]
* https://karaf.apache.org/manual/latest/#_security_2[https://karaf.apache.org/manual/latest/#_security_2]

The default username/password is

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.json.JSONObject;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.URL;
Expand All @@ -42,6 +44,8 @@
*/
public class MigrationUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(MigrationUtils.class);

public static JSONObject queryWithScroll(CloseableHttpClient httpClient, String url) throws IOException {
url += "?scroll=1m";

Expand Down Expand Up @@ -129,7 +133,7 @@ public static String extractMappingFromBundles(BundleContext bundleContext, Stri
if (predefinedMappings == null) {
continue;
}
while (predefinedMappings.hasMoreElements()) {
if (predefinedMappings.hasMoreElements()) {
URL predefinedMappingURL = predefinedMappings.nextElement();
return IOUtils.toString(predefinedMappingURL);
}
Expand Down Expand Up @@ -176,7 +180,10 @@ public static String buildRolloverPolicyCreationRequest(String baseRequest, Migr
public static void moveToIndex(CloseableHttpClient httpClient, BundleContext bundleContext, String esAddress, String sourceIndexName, String targetIndexName, String painlessScript) throws Exception {
String reIndexRequest = resourceAsString(bundleContext, "requestBody/2.2.0/base_reindex_request.json").replace("#source", sourceIndexName).replace("#dest", targetIndexName).replace("#painless", StringUtils.isNotEmpty(painlessScript) ? getScriptPart(painlessScript) : "");

HttpUtils.executePostRequest(httpClient, esAddress + "/_reindex", reIndexRequest, null);
// Reindex
JSONObject task = new JSONObject(HttpUtils.executePostRequest(httpClient, esAddress + "/_reindex?wait_for_completion=false", reIndexRequest, null));
//Wait for the reindex task to finish
waitForTaskToFinish(httpClient, esAddress, task.getString("task"), null);
}

public static void deleteIndex(CloseableHttpClient httpClient, String esAddress, String indexName) throws Exception {
Expand Down Expand Up @@ -216,7 +223,9 @@ public static void reIndex(CloseableHttpClient httpClient, BundleContext bundleC
// Recreate the original index with new mappings
HttpUtils.executePutRequest(httpClient, esAddress + "/" + indexName, newIndexSettings, null);
// Reindex data from clone
HttpUtils.executePostRequest(httpClient, esAddress + "/_reindex", reIndexRequest, null);
JSONObject task = new JSONObject(HttpUtils.executePostRequest(httpClient, esAddress + "/_reindex?wait_for_completion=false", reIndexRequest, null));
//Wait for the reindex task to finish
waitForTaskToFinish(httpClient, esAddress, task.getString("task"), migrationContext);
});

migrationContext.performMigrationStep("Reindex step for: " + indexName + " (delete clone)", () -> {
Expand Down Expand Up @@ -249,7 +258,7 @@ public static void scrollQuery(CloseableHttpClient httpClient, String esAddress,
}

// no more results, delete scroll
if (hits.length() == 0) {
if (hits.isEmpty()) {
if (scrollId != null) {
HttpUtils.executeDeleteRequest(httpClient, esAddress + "/_search/scroll/" + scrollId, null);
}
Expand Down Expand Up @@ -281,6 +290,31 @@ public static void waitForYellowStatus(CloseableHttpClient httpClient, String es

}

public static void waitForTaskToFinish(CloseableHttpClient httpClient, String esAddress, String taskId, MigrationContext migrationContext) throws IOException {
while (true) {
final JSONObject status = new JSONObject(
HttpUtils.executeGetRequest(httpClient, esAddress + "/_tasks/" + taskId + "?wait_for_completion=true&timeout=15s",
null));
if (status.has("completed") && status.getBoolean("completed")) {
if (migrationContext != null) {
migrationContext.printMessage("Task is completed");
} else {
LOGGER.info("Task is completed");
}
break;
}
if (status.has("error")) {
final JSONObject error = status.getJSONObject("error");
throw new IOException("Task error: " + error.getString("type") + " - " + error.getString("reason"));
}
if (migrationContext != null) {
migrationContext.printMessage("Waiting for Task " + taskId + " to complete");
} else {
LOGGER.info("Waiting for Task {} to complete", taskId);
}
}
}

public interface ScrollCallback {
void execute(String hits);
}
Expand Down

0 comments on commit 315c4bf

Please sign in to comment.