Skip to content

Introduce reroute method on IngestDocument #94000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a66a11e
Introduce redirect method on IngestDocument
felixbarny Feb 21, 2023
0a41d94
Add changelog
felixbarny Feb 22, 2023
47581d5
Skipp full pipeline even if invoked via pipeline processor
felixbarny Feb 22, 2023
fc34a28
Encapsulate more state in PipelineIterator
felixbarny Feb 23, 2023
b1c7b26
Only one boolean flag in IngestDocument
felixbarny Feb 23, 2023
d766c62
Reset redirect at the end of the handler
felixbarny Feb 23, 2023
5dd4d27
Apply spotless suggestions
felixbarny Feb 23, 2023
3b64727
Rename method and add javadoc
felixbarny Feb 23, 2023
1bfcf55
Reroute to remain
felixbarny Mar 1, 2023
98d7c94
Add test that final pipeline can't reroute
felixbarny Mar 1, 2023
9e5df7e
Merge branch 'main' into ingest-document-redirect
elasticmachine Mar 7, 2023
8ef09c0
Update test
joegallo Mar 7, 2023
f1a3b3e
Update docs/changelog/94000.yaml
felixbarny Mar 7, 2023
f0d7be0
Merge branch 'main' into ingest-document-redirect
joegallo Mar 7, 2023
117d89c
Move this resetReroute call earlier
joegallo Mar 7, 2023
5bc367f
Pull this block out of the else
joegallo Mar 7, 2023
61e5617
Reorder these blocks
joegallo Mar 7, 2023
35983d5
Add/tweak comments
joegallo Mar 7, 2023
5dfebb6
Add more context to error message
joegallo Mar 7, 2023
5dac7c2
Merge remote-tracking branch 'origin/main' into ingest-document-redirect
felixbarny Mar 8, 2023
2dc42e1
Adjust test to assert that final pipeline is not executed twice
felixbarny Mar 8, 2023
f3a2ad6
Merge branch 'main' into ingest-document-redirect
elasticmachine Mar 21, 2023
25e4a78
Merge branch 'main' into ingest-document-redirect
joegallo Mar 21, 2023
8dbef75
Make PipelineIterator an iterator over a triple
joegallo Mar 21, 2023
53f0b24
Rename this method and add a docstring
joegallo Mar 21, 2023
2db2a19
The final pipeline slot should be isFinal, of course
joegallo Mar 21, 2023
83112f8
Merge branch 'main' into ingest-document-redirect
joegallo Mar 21, 2023
e86579e
Actually, getPipeline is all we need here
joegallo Mar 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/94000.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 94000
summary: Introduce redirect method on IngestDocument
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.ingest.Processor;
Expand All @@ -49,6 +50,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -188,6 +190,70 @@ public void testDefaultPipelineOfNewDestinationIsNotInvoked() {
assertFalse(target.getHits().getAt(0).getSourceAsMap().containsKey("final"));
}

public void testDefaultPipelineOfRedirectDestinationIsInvoked() {
Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
createIndex("index", settings);

settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build();
createIndex("target", settings);

BytesReference defaultPipelineBody = new BytesArray("""
{"processors": [{"redirect": {}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON))
.actionGet();

BytesReference targetPipeline = new BytesArray("""
{"processors": [{"final": {}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON))
.actionGet();

IndexResponse indexResponse = client().prepareIndex("index")
.setId("1")
.setSource(Map.of("field", "value"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
assertEquals(RestStatus.CREATED, indexResponse.status());
SearchResponse target = client().prepareSearch("target").get();
assertEquals(1, target.getHits().getTotalHits().value);
assertTrue(target.getHits().getAt(0).getSourceAsMap().containsKey("final"));
}

public void testAvoidIndexingLoop() {
Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
createIndex("index", settings);

settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build();
createIndex("target", settings);

BytesReference defaultPipelineBody = new BytesArray("""
{"processors": [{"redirect": {"dest": "target"}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON))
.actionGet();

BytesReference targetPipeline = new BytesArray("""
{"processors": [{"redirect": {"dest": "index"}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON))
.actionGet();

IllegalStateException exception = expectThrows(
IllegalStateException.class,
() -> client().prepareIndex("index")
.setId("1")
.setSource(Map.of("dest", "index"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get()
);
assertThat(exception.getMessage(), containsString("index cycle detected while processing pipelines: [index, target, index]"));
}

public void testFinalPipeline() {
final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build();
createIndex("index", settings);
Expand Down Expand Up @@ -394,6 +460,26 @@ public String getType() {
return "changing_dest";
}

},
"redirect",
(processorFactories, tag, description, config) -> {
final String dest = Objects.requireNonNullElse(
ConfigurationUtils.readOptionalStringProperty(description, tag, config, "dest"),
"target"
);
return new AbstractProcessor(tag, description) {
@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
ingestDocument.redirect(dest);
return ingestDocument;
}

@Override
public String getType() {
return "redirect";
}

};
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex

void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiConsumer<IngestDocument, Exception> handler) {
assert currentProcessor <= processorsWithMetrics.size();
if (currentProcessor == processorsWithMetrics.size()) {
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) {
handler.accept(ingestDocument, null);
return;
}
Expand All @@ -150,7 +150,9 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
Processor processor;
IngestMetric metric;
// iteratively execute any sync processors
while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false) {
while (currentProcessor < processorsWithMetrics.size()
&& processorsWithMetrics.get(currentProcessor).v1().isAsync() == false
&& ingestDocument.isSkipCurrentPipeline() == false) {
processorWithMetric = processorsWithMetrics.get(currentProcessor);
processor = processorWithMetric.v1();
metric = processorWithMetric.v2();
Expand All @@ -176,7 +178,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
}

assert currentProcessor <= processorsWithMetrics.size();
if (currentProcessor == processorsWithMetrics.size()) {
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) {
handler.accept(ingestDocument, null);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ public final class IngestDocument {

// Contains all pipelines that have been executed for this document
private final Set<String> executedPipelines = new LinkedHashSet<>();

private boolean doNoSelfReferencesCheck = false;
private boolean invokeDefaultPipelineOfDestination = false;
private boolean skipCurrentPipeline = false;

public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map<String, Object> source) {
this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source);
Expand All @@ -80,6 +81,8 @@ public IngestDocument(IngestDocument other) {
new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()),
deepCopyMap(other.ingestMetadata)
);
this.invokeDefaultPipelineOfDestination = other.invokeDefaultPipelineOfDestination;
this.skipCurrentPipeline = other.skipCurrentPipeline;
}

/**
Expand Down Expand Up @@ -903,6 +906,24 @@ public String toString() {
return "IngestDocument{" + " sourceAndMetadata=" + ctxMap + ", ingestMetadata=" + ingestMetadata + '}';
}

public void redirect(String destIndex) {
getMetadata().setIndex(destIndex);
Copy link
Member Author

@felixbarny felixbarny Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joegallo what do you think bout adding the history of the _index field in an ingest metadata field? This wouldn't be indexed by default but in order to debug, users can use a set processor to add this to the documents:

{
  "set": {
    "field": "reroute_history",
    "copy_from": "_ingest.reroute_history"
  }
}
Suggested change
getMetadata().setIndex(destIndex);
getMetadata().setIndex(destIndex);
appendFieldValue("_ingest.reroute_history", getMetadata().getIndex());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not opposed to adding a mechanism like that in a future PR, but I would like to keep the scope of this PR fixed.

When we do add that mechanism, though, I'd prefer that the list be an immutable reference to the collection we're tracking for index recursion purposes, rather than a new collection. Similarly, appendFieldValue is more for processors to use when the first argument is customer-provided -- we can just traverse the data structures ourselves in IngestService, there's no need for string parsing and evaluation there.

invokeDefaultPipelineOfDestination = true;
skipCurrentPipeline = true;
}

boolean isInvokeDefaultPipelineOfDestination() {
return invokeDefaultPipelineOfDestination;
}

boolean isSkipCurrentPipeline() {
return skipCurrentPipeline;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need multiple boolean flags for this right? If we do, then it's not clear enough why they have to be separated (needs documentation)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed it to just be a single boolean and a single getter for it (isRedirect()). The downside is that isRedirect() is less expressive as isSkipCurrentPipeline() in the context of determining whether to skip the pipeline.
We could keep isSkipCurrentPipeline() and just return the redirect flag. But that feels a bit off as well.


void resetPipelineSkipping() {
skipCurrentPipeline = false;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't this also reset invokeDefaultPipelineOfDestination? Also, it's unclear from the code why we need this method to be invoked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, it needs to reset both. I've changed it to just be a single boolean.


public enum Metadata {
INDEX(IndexFieldMapper.NAME),
TYPE("_type"),
Expand Down
104 changes: 80 additions & 24 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -714,21 +715,8 @@ protected void doRun() {
continue;
}

final String pipelineId = indexRequest.getPipeline();
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
final String finalPipelineId = indexRequest.getFinalPipeline();
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME);
boolean hasFinalPipeline = true;
final List<String> pipelines;
if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false
&& IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
pipelines = List.of(pipelineId, finalPipelineId);
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) {
pipelines = List.of(pipelineId);
hasFinalPipeline = false;
} else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
pipelines = List.of(finalPipelineId);
} else {
Pipelines pipelines = getPipelines(indexRequest);
if (pipelines.isEmpty()) {
i++;
continue;
}
Expand Down Expand Up @@ -763,21 +751,78 @@ public void onFailure(Exception e) {
});

IngestDocument ingestDocument = newIngestDocument(indexRequest);
executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, ingestDocument, documentListener);

LinkedHashSet<String> indexRecursionDetection = new LinkedHashSet<>();
indexRecursionDetection.add(indexRequest.index());
executePipelines(
pipelines.iterator(),
pipelines.hasFinalPipeline(),
indexRequest,
ingestDocument,
documentListener,
indexRecursionDetection
);
i++;
}
}
}
});
}

private Pipelines getPipelines(IndexRequest indexRequest) {
final String pipelineId = indexRequest.getPipeline();
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
final String finalPipelineId = indexRequest.getFinalPipeline();
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME);
return new Pipelines(pipelineId, finalPipelineId);
}

private static class Pipelines implements Iterable<String> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a reasonable abstraction, but I don't like that we make it mutable, and that we don't encapsulate enough here.

The withoutDefaultPipeline method makes me uncomfortable as its name makes it sound like it would return a new object rather than making a mutable change.

I also think we don't need to have executePipelines(...) take a boolean, we're treating this internally as though we'll always have a list of pipelines, but we could probably get away with passing a proper object instead Iterator<String>. I think it'd be much clearer that way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I abstracted the Iterator<String> and the boolean flag into a PipelineIterator, however it's not immutable. As that's the nature of iterators, I think that's fine.

The executePipelines needs to know about three properties of the current pipeline: the name (to add the name in the exception in case the pipeline itself can't be resolved), the pipeline itself, and whether the current pipeline is the final pipeline (if true, it's disallowed to override _index).

The PipelineIterator encapsulates that state and I think that makes it cleaner than before. Thanks for the suggestion 👍

private String defaultPipeline;
private String finalPipeline;

private Pipelines(String defaultPipeline, String finalPipeline) {
if (NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
this.defaultPipeline = defaultPipeline;
}
if (NOOP_PIPELINE_NAME.equals(finalPipeline) == false) {
this.finalPipeline = finalPipeline;
}
}

public boolean hasFinalPipeline() {
return finalPipeline != null;
}

public boolean isEmpty() {
return defaultPipeline == null && finalPipeline == null;
}

public void withoutDefaultPipeline() {
defaultPipeline = null;
}

@Override
public Iterator<String> iterator() {
if (defaultPipeline != null && finalPipeline != null) {
return List.of(defaultPipeline, finalPipeline).iterator();
}
if (finalPipeline != null) {
return List.of(finalPipeline).iterator();
}
if (defaultPipeline != null) {
return List.of(defaultPipeline).iterator();
}
return Collections.emptyIterator();
}
}

private void executePipelines(
final Iterator<String> pipelineIds,
final boolean hasFinalPipeline,
final IndexRequest indexRequest,
final IngestDocument ingestDocument,
final ActionListener<Boolean> listener
final ActionListener<Boolean> listener,
final Set<String> indexRecursionDetection
) {
assert pipelineIds.hasNext();
final String pipelineId = pipelineIds.next();
Expand All @@ -790,6 +835,7 @@ private void executePipelines(
final String originalIndex = indexRequest.indices()[0];
executePipeline(ingestDocument, pipeline, (keep, e) -> {
assert keep != null;
ingestDocument.resetPipelineSkipping();

if (e != null) {
logger.debug(
Expand Down Expand Up @@ -840,6 +886,14 @@ private void executePipelines(
final String newIndex = indexRequest.indices()[0];

if (Objects.equals(originalIndex, newIndex) == false) {
if (indexRecursionDetection.add(newIndex) == false) {
List<String> indexRoute = new ArrayList<>(indexRecursionDetection);
indexRoute.add(newIndex);
listener.onFailure(
new IllegalStateException(format("index cycle detected while processing pipelines: %s", indexRoute))
);
return; // document failed!
}
if (hasFinalPipeline && pipelineIds.hasNext() == false) {
listener.onFailure(
new IllegalStateException(
Expand All @@ -854,19 +908,21 @@ private void executePipelines(
);
return; // document failed!
} else {
// reset request pipeline that is set to _none which would take precedence over the default pipeline
indexRequest.setPipeline(null);
indexRequest.isPipelineResolved(false);
resolvePipelines(null, indexRequest, state.metadata());
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) {
newPipelineIds = Collections.singleton(indexRequest.getFinalPipeline()).iterator();
newHasFinalPipeline = true;
} else {
newPipelineIds = Collections.emptyIterator();
Pipelines pipelines = getPipelines(indexRequest);
if (ingestDocument.isInvokeDefaultPipelineOfDestination() == false) {
pipelines.withoutDefaultPipeline();
}
newHasFinalPipeline = pipelines.hasFinalPipeline();
newPipelineIds = pipelines.iterator();
}
}

if (newPipelineIds.hasNext()) {
executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener);
executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener, indexRecursionDetection);
} else {
// update the index request's source and (potentially) cache the timestamp for TSDB
updateIndexRequestSource(indexRequest, ingestDocument);
Expand Down
Loading