Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incrementally remove resources from workflow state during deprovisioning #898

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.17...2.x)
### Features
### Enhancements
- Incrementally remove resources from workflow state during deprovisioning ([#898](https://github.com/opensearch-project/flow-framework/pull/898))

### Bug Fixes
### Infrastructure
### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessageFactory;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.DocWriteRequest.OpType;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand Down Expand Up @@ -693,6 +695,7 @@ public void addResourceToStateIndex(
getAndUpdateResourceInStateDocumentWithRetries(
workflowId,
newResource,
OpType.INDEX,
RETRIES,
ActionListener.runBefore(listener, context::restore)
);
Expand All @@ -701,15 +704,41 @@ public void addResourceToStateIndex(
}

/**
* Performs a get and update of a State Index document adding a new resource with strong consistency and retries
* Removes a resource from the state index, including common exception handling
* @param workflowId The workflow document id in the state index
* @param resourceToDelete The resource to delete
* @param listener the ActionListener for this step to handle completing the future after update
*/
public void deleteResourceFromStateIndex(String workflowId, ResourceCreated resourceToDelete, ActionListener<WorkflowData> listener) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to update state for " + workflowId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
getAndUpdateResourceInStateDocumentWithRetries(
workflowId,
resourceToDelete,
OpType.DELETE,
RETRIES,
ActionListener.runBefore(listener, context::restore)
);
}
}
}

/**
* Performs a get and update of a State Index document adding or removing a resource with strong consistency and retries
* @param workflowId The document id to update
* @param newResource The resource to add to the resources created list
* @param resource The resource to add or remove from the resources created list
* @param operation The operation to perform on the resource (INDEX to append to the list or DELETE to remove)
* @param retries The number of retries on update version conflicts
* @param listener The listener to complete on success or failure
*/
private void getAndUpdateResourceInStateDocumentWithRetries(
String workflowId,
ResourceCreated newResource,
ResourceCreated resource,
OpType operation,
int retries,
ActionListener<WorkflowData> listener
) {
Expand All @@ -721,7 +750,11 @@ private void getAndUpdateResourceInStateDocumentWithRetries(
}
WorkflowState currentState = WorkflowState.parse(getResponse.getSourceAsString());
List<ResourceCreated> resourcesCreated = new ArrayList<>(currentState.resourcesCreated());
resourcesCreated.add(newResource);
if (operation == OpType.DELETE) {
resourcesCreated.removeIf(r -> r.resourceMap().equals(resource.resourceMap()));
} else {
resourcesCreated.add(resource);
}
XContentBuilder builder = XContentFactory.jsonBuilder();
WorkflowState newState = WorkflowState.builder(currentState).resourcesCreated(resourcesCreated).build();
newState.toXContent(builder, null);
Expand All @@ -732,41 +765,54 @@ private void getAndUpdateResourceInStateDocumentWithRetries(
client.update(
updateRequest,
ActionListener.wrap(
r -> handleStateUpdateSuccess(workflowId, newResource, listener),
e -> handleStateUpdateException(workflowId, newResource, retries, listener, e)
r -> handleStateUpdateSuccess(workflowId, resource, operation, listener),
e -> handleStateUpdateException(workflowId, resource, operation, retries, listener, e)
)
);
}, ex -> handleStateUpdateException(workflowId, newResource, 0, listener, ex)));
}, ex -> handleStateUpdateException(workflowId, resource, operation, 0, listener, ex)));
}

private void handleStateUpdateSuccess(String workflowId, ResourceCreated newResource, ActionListener<WorkflowData> listener) {
private void handleStateUpdateSuccess(
String workflowId,
ResourceCreated newResource,
OpType operation,
ActionListener<WorkflowData> listener
) {
String resourceName = newResource.resourceType();
String resourceId = newResource.resourceId();
String nodeId = newResource.workflowStepId();
logger.info("Updated resources created for {} on step {} with {} {}", workflowId, nodeId, resourceName, resourceId);
logger.info(
"Updated resources created for {} on step {} to {} resource {} {}",
workflowId,
nodeId,
operation.equals(OpType.DELETE) ? "delete" : "add",
resourceName,
resourceId
);
listener.onResponse(new WorkflowData(Map.of(resourceName, resourceId), workflowId, nodeId));
}

private void handleStateUpdateException(
String workflowId,
ResourceCreated newResource,
OpType operation,
int retries,
ActionListener<WorkflowData> listener,
Exception e
) {
if (e instanceof VersionConflictEngineException && retries > 0) {
// Retry if we haven't exhausted retries
getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, retries - 1, listener);
getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, operation, retries - 1, listener);
return;
}
String errorMessage = "Failed to update workflow state for "
+ workflowId
+ " on step "
+ newResource.workflowStepId()
+ " with "
+ newResource.resourceType()
+ " "
+ newResource.resourceId();
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to update workflow state for {} on step {} to {} resource {} {}",
workflowId,
newResource.workflowStepId(),
operation.equals(OpType.DELETE) ? "delete" : "add",
newResource.resourceType(),
newResource.resourceId()
).getFormattedMessage();
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE;
Expand Down Expand Up @@ -214,19 +215,32 @@
// Repeat attempting to delete resources as long as at least one is successful
int resourceCount = deprovisionProcessSequence.size();
while (resourceCount > 0) {
PlainActionFuture<WorkflowData> stateUpdateFuture;
Iterator<ProcessNode> iter = deprovisionProcessSequence.iterator();
while (iter.hasNext()) {
do {
ProcessNode deprovisionNode = iter.next();
ResourceCreated resource = getResourceFromDeprovisionNode(deprovisionNode, resourcesCreated);
String resourceNameAndId = getResourceNameAndId(resource);
PlainActionFuture<WorkflowData> deprovisionFuture = deprovisionNode.execute();
stateUpdateFuture = PlainActionFuture.newFuture();
try {
deprovisionFuture.get();
logger.info("Successful {} for {}", deprovisionNode.id(), resourceNameAndId);
// Remove from state index resource list
flowFrameworkIndicesHandler.deleteResourceFromStateIndex(workflowId, resource, stateUpdateFuture);
try {
// Wait at most 1 second for state index update.
stateUpdateFuture.actionGet(1, TimeUnit.SECONDS);

Check warning on line 233 in src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java#L233

Added line #L233 was not covered by tests
} catch (Exception e) {
// Ignore incremental resource removal failures (or timeouts) as we catch up at the end with remainingResources
}

Check warning on line 236 in src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java#L236

Added line #L236 was not covered by tests
// Remove from list so we don't try again
iter.remove();
// Pause briefly before next step
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;

Check warning on line 243 in src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java#L241-L243

Added lines #L241 - L243 were not covered by tests
} catch (Throwable t) {
// If any deprovision fails due to not found, it's a success
if (t.getCause() instanceof OpenSearchStatusException
Expand All @@ -238,7 +252,7 @@
logger.info("Failed {} for {}", deprovisionNode.id(), resourceNameAndId);
}
}
}
} while (iter.hasNext());
if (deprovisionProcessSequence.size() < resourceCount) {
// If we've deleted something, decrement and try again if not zero
resourceCount = deprovisionProcessSequence.size();
Expand All @@ -259,6 +273,7 @@
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();

Check warning on line 276 in src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java#L276

Added line #L276 was not covered by tests
break;
}
} else {
Expand All @@ -274,6 +289,7 @@
if (!deleteNotAllowed.isEmpty()) {
logger.info("Resources requiring allow_delete: {}.", deleteNotAllowed);
}
// This is a redundant best-effort backup to the incremental deletion done earlier
updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,17 @@ protected List<ResourceCreated> getResourcesCreated(RestClient client, String wo
TimeUnit.SECONDS
);

return getResourcesCreated(client, workflowId);
}

/**
* Helper method retrieve any resources created incrementally without waiting for completion
* @param client the rest client
* @param workflowId the workflow id to retrieve resources from
* @return a list of created resources
* @throws Exception if the request fails
*/
protected List<ResourceCreated> getResourcesCreated(RestClient client, String workflowId) throws Exception {
Response response = getWorkflowStatus(client, workflowId, true);

// Parse workflow state from response and retrieve resources created
Expand Down
Loading
Loading