Skip to content

Introduce reroute method on IngestDocument #94000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a66a11e
Introduce redirect method on IngestDocument
felixbarny Feb 21, 2023
0a41d94
Add changelog
felixbarny Feb 22, 2023
47581d5
Skipp full pipeline even if invoked via pipeline processor
felixbarny Feb 22, 2023
fc34a28
Encapsulate more state in PipelineIterator
felixbarny Feb 23, 2023
b1c7b26
Only one boolean flag in IngestDocument
felixbarny Feb 23, 2023
d766c62
Reset redirect at the end of the handler
felixbarny Feb 23, 2023
5dd4d27
Apply spotless suggestions
felixbarny Feb 23, 2023
3b64727
Rename method and add javadoc
felixbarny Feb 23, 2023
1bfcf55
Reroute to remain
felixbarny Mar 1, 2023
98d7c94
Add test that final pipeline can't reroute
felixbarny Mar 1, 2023
9e5df7e
Merge branch 'main' into ingest-document-redirect
elasticmachine Mar 7, 2023
8ef09c0
Update test
joegallo Mar 7, 2023
f1a3b3e
Update docs/changelog/94000.yaml
felixbarny Mar 7, 2023
f0d7be0
Merge branch 'main' into ingest-document-redirect
joegallo Mar 7, 2023
117d89c
Move this resetReroute call earlier
joegallo Mar 7, 2023
5bc367f
Pull this block out of the else
joegallo Mar 7, 2023
61e5617
Reorder these blocks
joegallo Mar 7, 2023
35983d5
Add/tweak comments
joegallo Mar 7, 2023
5dfebb6
Add more context to error message
joegallo Mar 7, 2023
5dac7c2
Merge remote-tracking branch 'origin/main' into ingest-document-redirect
felixbarny Mar 8, 2023
2dc42e1
Adjust test to assert that final pipeline is not executed twice
felixbarny Mar 8, 2023
f3a2ad6
Merge branch 'main' into ingest-document-redirect
elasticmachine Mar 21, 2023
25e4a78
Merge branch 'main' into ingest-document-redirect
joegallo Mar 21, 2023
8dbef75
Make PipelineIterator an iterator over a triple
joegallo Mar 21, 2023
53f0b24
Rename this method and add a docstring
joegallo Mar 21, 2023
2db2a19
The final pipeline slot should be isFinal, of course
joegallo Mar 21, 2023
83112f8
Merge branch 'main' into ingest-document-redirect
joegallo Mar 21, 2023
e86579e
Actually, getPipeline is all we need here
joegallo Mar 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/94000.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 94000
summary: Introduce redirect method on IngestDocument
area: Ingest Node
type: enhancement
issues:
- 83653
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,7 @@ teardown:
index: foo
id: "1"
- match: { _source.a: true }
# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances
# (See issue https://github.com/elastic/elasticsearch/issues/83653).
# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior
#- match: { _source.accumulator: [ "non-repeated-value" ] }
- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] }
- match: { _source.accumulator: [ "non-repeated-value" ] }

# only the foo index
- do:
Expand Down Expand Up @@ -150,11 +146,7 @@ teardown:
index: foo
id: "1"
- match: { _source.a: true }
# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances
# (See issue https://github.com/elastic/elasticsearch/issues/83653).
# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior
#- match: { _source.accumulator: [ "non-repeated-value" ] }
- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] }
- match: { _source.accumulator: [ "non-repeated-value" ] }

# only the foo index
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.ingest.Processor;
Expand All @@ -48,6 +49,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -97,6 +99,26 @@ public void testFinalPipelineCantChangeDestination() {
);
}

public void testFinalPipelineCantRerouteDestination() {
final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build();
createIndex("index", settings);

final BytesReference finalPipelineBody = new BytesArray("""
{"processors": [{"reroute": {}}]}""");
client().admin().cluster().putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)).actionGet();

final IllegalStateException e = expectThrows(
IllegalStateException.class,
() -> client().prepareIndex("index").setId("1").setSource(Map.of("field", "value")).get()
);
assertThat(
e,
hasToString(
endsWith("final pipeline [final_pipeline] can't change the target index (from [index] to [target]) for document [1]")
)
);
}

public void testFinalPipelineOfOldDestinationIsNotInvoked() {
Settings settings = Settings.builder()
.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
Expand Down Expand Up @@ -187,6 +209,73 @@ public void testDefaultPipelineOfNewDestinationIsNotInvoked() {
assertFalse(target.getHits().getAt(0).getSourceAsMap().containsKey("final"));
}

public void testDefaultPipelineOfRerouteDestinationIsInvoked() {
Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
createIndex("index", settings);

settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build();
createIndex("target", settings);

BytesReference defaultPipelineBody = new BytesArray("""
{"processors": [{"reroute": {}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON))
.actionGet();

BytesReference targetPipeline = new BytesArray("""
{"processors": [{"final": {}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON))
.actionGet();

IndexResponse indexResponse = client().prepareIndex("index")
.setId("1")
.setSource(Map.of("field", "value"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
assertEquals(RestStatus.CREATED, indexResponse.status());
SearchResponse target = client().prepareSearch("target").get();
assertEquals(1, target.getHits().getTotalHits().value);
assertTrue(target.getHits().getAt(0).getSourceAsMap().containsKey("final"));
}

public void testAvoidIndexingLoop() {
Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
createIndex("index", settings);

settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build();
createIndex("target", settings);

BytesReference defaultPipelineBody = new BytesArray("""
{"processors": [{"reroute": {"dest": "target"}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON))
.actionGet();

BytesReference targetPipeline = new BytesArray("""
{"processors": [{"reroute": {"dest": "index"}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON))
.actionGet();

IllegalStateException exception = expectThrows(
IllegalStateException.class,
() -> client().prepareIndex("index")
.setId("1")
.setSource(Map.of("dest", "index"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get()
);
assertThat(
exception.getMessage(),
equalTo("index cycle detected while processing pipeline [target_default_pipeline] for document [1]: [index, target, index]")
);
}

public void testFinalPipeline() {
final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build();
createIndex("index", settings);
Expand Down Expand Up @@ -393,6 +482,26 @@ public String getType() {
return "changing_dest";
}

},
"reroute",
(processorFactories, tag, description, config) -> {
final String dest = Objects.requireNonNullElse(
ConfigurationUtils.readOptionalStringProperty(description, tag, config, "dest"),
"target"
);
return new AbstractProcessor(tag, description) {
@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
ingestDocument.reroute(dest);
return ingestDocument;
}

@Override
public String getType() {
return "reroute";
}

};
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex

void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiConsumer<IngestDocument, Exception> handler) {
assert currentProcessor <= processorsWithMetrics.size();
if (currentProcessor == processorsWithMetrics.size()) {
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) {
handler.accept(ingestDocument, null);
return;
}
Expand All @@ -150,7 +150,9 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
Processor processor;
IngestMetric metric;
// iteratively execute any sync processors
while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false) {
while (currentProcessor < processorsWithMetrics.size()
&& processorsWithMetrics.get(currentProcessor).v1().isAsync() == false
&& ingestDocument.isReroute() == false) {
processorWithMetric = processorsWithMetrics.get(currentProcessor);
processor = processorWithMetric.v1();
metric = processorWithMetric.v2();
Expand All @@ -176,7 +178,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
}

assert currentProcessor <= processorsWithMetrics.size();
if (currentProcessor == processorsWithMetrics.size()) {
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) {
handler.accept(ingestDocument, null);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public final class IngestDocument {

// Contains all pipelines that have been executed for this document
private final Set<String> executedPipelines = new LinkedHashSet<>();

private boolean doNoSelfReferencesCheck = false;
private boolean reroute = false;

public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map<String, Object> source) {
this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source);
Expand All @@ -80,6 +80,7 @@ public IngestDocument(IngestDocument other) {
new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()),
deepCopyMap(other.ingestMetadata)
);
this.reroute = other.reroute;
}

/**
Expand Down Expand Up @@ -903,6 +904,29 @@ public String toString() {
return "IngestDocument{" + " sourceAndMetadata=" + ctxMap + ", ingestMetadata=" + ingestMetadata + '}';
}

public void reroute(String destIndex) {
getMetadata().setIndex(destIndex);
Copy link
Member Author

@felixbarny felixbarny Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joegallo what do you think bout adding the history of the _index field in an ingest metadata field? This wouldn't be indexed by default but in order to debug, users can use a set processor to add this to the documents:

{
  "set": {
    "field": "reroute_history",
    "copy_from": "_ingest.reroute_history"
  }
}
Suggested change
getMetadata().setIndex(destIndex);
getMetadata().setIndex(destIndex);
appendFieldValue("_ingest.reroute_history", getMetadata().getIndex());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not opposed to adding a mechanism like that in a future PR, but I would like to keep the scope of this PR fixed.

When we do add that mechanism, though, I'd prefer that the list be an immutable reference to the collection we're tracking for index recursion purposes, rather than a new collection. Similarly, appendFieldValue is more for processors to use when the first argument is customer-provided -- we can just traverse the data structures ourselves in IngestService, there's no need for string parsing and evaluation there.

reroute = true;
}

/**
* The document is redirected to another target.
* This implies that we'll skip the current pipeline and invoke the default pipeline of the new target
*
* @return whether the document is redirected to another target
*/
boolean isReroute() {
return reroute;
}

/**
* Set the {@link #reroute} flag to false so that subsequent calls to {@link #isReroute()} will return false until/unless
* {@link #reroute(String)} is called.
*/
void resetReroute() {
reroute = false;
}

public enum Metadata {
INDEX(IndexFieldMapper.NAME),
TYPE("_type"),
Expand Down
Loading