-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Introduce reroute method on IngestDocument #94000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
a66a11e
0a41d94
47581d5
fc34a28
b1c7b26
d766c62
5dd4d27
3b64727
1bfcf55
98d7c94
9e5df7e
8ef09c0
f1a3b3e
f0d7be0
117d89c
5bc367f
61e5617
35983d5
5dfebb6
5dac7c2
2dc42e1
f3a2ad6
25e4a78
8dbef75
53f0b24
2db2a19
83112f8
e86579e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 94000 | ||
summary: Introduce redirect method on IngestDocument | ||
area: Ingest Node | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,8 +62,9 @@ public final class IngestDocument { | |
|
||
// Contains all pipelines that have been executed for this document | ||
private final Set<String> executedPipelines = new LinkedHashSet<>(); | ||
|
||
private boolean doNoSelfReferencesCheck = false; | ||
private boolean invokeDefaultPipelineOfDestination = false; | ||
private boolean skipCurrentPipeline = false; | ||
|
||
public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map<String, Object> source) { | ||
this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); | ||
|
@@ -80,6 +81,8 @@ public IngestDocument(IngestDocument other) { | |
new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()), | ||
deepCopyMap(other.ingestMetadata) | ||
); | ||
this.invokeDefaultPipelineOfDestination = other.invokeDefaultPipelineOfDestination; | ||
this.skipCurrentPipeline = other.skipCurrentPipeline; | ||
} | ||
|
||
/** | ||
|
@@ -903,6 +906,24 @@ public String toString() { | |
return "IngestDocument{" + " sourceAndMetadata=" + ctxMap + ", ingestMetadata=" + ingestMetadata + '}'; | ||
} | ||
|
||
public void redirect(String destIndex) { | ||
getMetadata().setIndex(destIndex); | ||
invokeDefaultPipelineOfDestination = true; | ||
skipCurrentPipeline = true; | ||
} | ||
|
||
boolean isInvokeDefaultPipelineOfDestination() { | ||
return invokeDefaultPipelineOfDestination; | ||
} | ||
|
||
boolean isSkipCurrentPipeline() { | ||
return skipCurrentPipeline; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need multiple boolean flags for this right? If we do, then it's not clear enough why they have to be separated (needs documentation) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've changed it to just be a single boolean and a single getter for it ( |
||
|
||
void resetPipelineSkipping() { | ||
skipCurrentPipeline = false; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why doesn't this also reset There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, it needs to reset both. I've changed it to just be a single boolean. |
||
|
||
public enum Metadata { | ||
INDEX(IndexFieldMapper.NAME), | ||
TYPE("_type"), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,6 +68,7 @@ | |
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.LinkedHashSet; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Locale; | ||
|
@@ -714,21 +715,8 @@ protected void doRun() { | |
continue; | ||
} | ||
|
||
final String pipelineId = indexRequest.getPipeline(); | ||
indexRequest.setPipeline(NOOP_PIPELINE_NAME); | ||
final String finalPipelineId = indexRequest.getFinalPipeline(); | ||
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); | ||
boolean hasFinalPipeline = true; | ||
final List<String> pipelines; | ||
if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false | ||
&& IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { | ||
pipelines = List.of(pipelineId, finalPipelineId); | ||
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) { | ||
pipelines = List.of(pipelineId); | ||
hasFinalPipeline = false; | ||
} else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { | ||
pipelines = List.of(finalPipelineId); | ||
} else { | ||
Pipelines pipelines = getPipelines(indexRequest); | ||
if (pipelines.isEmpty()) { | ||
i++; | ||
continue; | ||
} | ||
|
@@ -763,21 +751,78 @@ public void onFailure(Exception e) { | |
}); | ||
|
||
IngestDocument ingestDocument = newIngestDocument(indexRequest); | ||
executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, ingestDocument, documentListener); | ||
|
||
LinkedHashSet<String> indexRecursionDetection = new LinkedHashSet<>(); | ||
indexRecursionDetection.add(indexRequest.index()); | ||
executePipelines( | ||
pipelines.iterator(), | ||
pipelines.hasFinalPipeline(), | ||
indexRequest, | ||
ingestDocument, | ||
documentListener, | ||
indexRecursionDetection | ||
); | ||
i++; | ||
} | ||
} | ||
} | ||
}); | ||
} | ||
|
||
private Pipelines getPipelines(IndexRequest indexRequest) { | ||
final String pipelineId = indexRequest.getPipeline(); | ||
indexRequest.setPipeline(NOOP_PIPELINE_NAME); | ||
final String finalPipelineId = indexRequest.getFinalPipeline(); | ||
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); | ||
return new Pipelines(pipelineId, finalPipelineId); | ||
} | ||
|
||
private static class Pipelines implements Iterable<String> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a reasonable abstraction, but I don't like that we make it mutable, and that we don't encapsulate enough here. The I also think we don't need to have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I abstracted the The The |
||
private String defaultPipeline; | ||
private String finalPipeline; | ||
|
||
private Pipelines(String defaultPipeline, String finalPipeline) { | ||
if (NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { | ||
this.defaultPipeline = defaultPipeline; | ||
} | ||
if (NOOP_PIPELINE_NAME.equals(finalPipeline) == false) { | ||
this.finalPipeline = finalPipeline; | ||
} | ||
} | ||
|
||
public boolean hasFinalPipeline() { | ||
return finalPipeline != null; | ||
} | ||
|
||
public boolean isEmpty() { | ||
return defaultPipeline == null && finalPipeline == null; | ||
} | ||
|
||
public void withoutDefaultPipeline() { | ||
defaultPipeline = null; | ||
} | ||
|
||
@Override | ||
public Iterator<String> iterator() { | ||
if (defaultPipeline != null && finalPipeline != null) { | ||
return List.of(defaultPipeline, finalPipeline).iterator(); | ||
} | ||
if (finalPipeline != null) { | ||
return List.of(finalPipeline).iterator(); | ||
} | ||
if (defaultPipeline != null) { | ||
return List.of(defaultPipeline).iterator(); | ||
} | ||
return Collections.emptyIterator(); | ||
} | ||
} | ||
|
||
private void executePipelines( | ||
final Iterator<String> pipelineIds, | ||
final boolean hasFinalPipeline, | ||
final IndexRequest indexRequest, | ||
final IngestDocument ingestDocument, | ||
final ActionListener<Boolean> listener | ||
final ActionListener<Boolean> listener, | ||
final Set<String> indexRecursionDetection | ||
) { | ||
assert pipelineIds.hasNext(); | ||
final String pipelineId = pipelineIds.next(); | ||
|
@@ -790,6 +835,7 @@ private void executePipelines( | |
final String originalIndex = indexRequest.indices()[0]; | ||
executePipeline(ingestDocument, pipeline, (keep, e) -> { | ||
assert keep != null; | ||
ingestDocument.resetPipelineSkipping(); | ||
|
||
if (e != null) { | ||
logger.debug( | ||
|
@@ -840,6 +886,14 @@ private void executePipelines( | |
final String newIndex = indexRequest.indices()[0]; | ||
|
||
if (Objects.equals(originalIndex, newIndex) == false) { | ||
if (indexRecursionDetection.add(newIndex) == false) { | ||
List<String> indexRoute = new ArrayList<>(indexRecursionDetection); | ||
indexRoute.add(newIndex); | ||
listener.onFailure( | ||
new IllegalStateException(format("index cycle detected while processing pipelines: %s", indexRoute)) | ||
); | ||
return; // document failed! | ||
} | ||
if (hasFinalPipeline && pipelineIds.hasNext() == false) { | ||
listener.onFailure( | ||
new IllegalStateException( | ||
|
@@ -854,19 +908,21 @@ private void executePipelines( | |
); | ||
return; // document failed! | ||
} else { | ||
// reset request pipeline that is set to _none which would take precedence over the default pipeline | ||
indexRequest.setPipeline(null); | ||
indexRequest.isPipelineResolved(false); | ||
resolvePipelines(null, indexRequest, state.metadata()); | ||
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) { | ||
newPipelineIds = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); | ||
newHasFinalPipeline = true; | ||
} else { | ||
newPipelineIds = Collections.emptyIterator(); | ||
Pipelines pipelines = getPipelines(indexRequest); | ||
if (ingestDocument.isInvokeDefaultPipelineOfDestination() == false) { | ||
pipelines.withoutDefaultPipeline(); | ||
} | ||
newHasFinalPipeline = pipelines.hasFinalPipeline(); | ||
newPipelineIds = pipelines.iterator(); | ||
} | ||
} | ||
|
||
if (newPipelineIds.hasNext()) { | ||
executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener); | ||
executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener, indexRecursionDetection); | ||
} else { | ||
// update the index request's source and (potentially) cache the timestamp for TSDB | ||
updateIndexRequestSource(indexRequest, ingestDocument); | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joegallo what do you think bout adding the history of the _index field in an ingest metadata field? This wouldn't be indexed by default but in order to debug, users can use a
set
processor to add this to the documents:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not opposed to adding a mechanism like that in a future PR, but I would like to keep the scope of this PR fixed.
When we do add that mechanism, though, I'd prefer that the list be an immutable reference to the collection we're tracking for index recursion purposes, rather than a new collection. Similarly,
appendFieldValue
is more for processors to use when the first argument is customer-provided -- we can just traverse the data structures ourselves inIngestService
, there's no need for string parsing and evaluation there.