Skip to content

Commit

Permalink
[Transform] Add skip_dest_index_creation setting
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek committed Feb 13, 2024
1 parent 0951b32 commit 44dad2d
Show file tree
Hide file tree
Showing 14 changed files with 330 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public final class TransformField {
public static final ParseField DEDUCE_MAPPINGS = new ParseField("deduce_mappings");
public static final ParseField NUM_FAILURE_RETRIES = new ParseField("num_failure_retries");
public static final ParseField UNATTENDED = new ParseField("unattended");
public static final ParseField SKIP_DEST_INDEX_CREATION = new ParseField("skip_dest_index_creation");

public static final ParseField FIELD = new ParseField("field");
public static final ParseField SYNC = new ParseField("sync");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class SettingsConfig implements Writeable, ToXContentObject {

public static final SettingsConfig EMPTY = new SettingsConfig(
null,
null,
(Integer) null,
(Integer) null,
(Integer) null,
(Integer) null,
(Integer) null,
(Integer) null,
(Integer) null
);

public static final ConstructingObjectParser<SettingsConfig, Void> STRICT_PARSER = createParser(false);
public static final ConstructingObjectParser<SettingsConfig, Void> LENIENT_PARSER = createParser(true);

Expand All @@ -44,6 +57,7 @@ public class SettingsConfig implements Writeable, ToXContentObject {
private static final int DEFAULT_DEDUCE_MAPPINGS = -1;
private static final int DEFAULT_NUM_FAILURE_RETRIES = -2;
private static final int DEFAULT_UNATTENDED = -1;
private static final int DEFAULT_SKIP_DEST_INDEX_CREATION = -1;

private static ConstructingObjectParser<SettingsConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<SettingsConfig, Void> parser = new ConstructingObjectParser<>(
Expand All @@ -57,7 +71,8 @@ private static ConstructingObjectParser<SettingsConfig, Void> createParser(boole
(Integer) args[4],
(Integer) args[5],
(Integer) args[6],
(Integer) args[7]
(Integer) args[7],
(Integer) args[8]
)
);
parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_MAX_PAGE_SEARCH_SIZE, TransformField.MAX_PAGE_SEARCH_SIZE);
Expand Down Expand Up @@ -98,6 +113,13 @@ private static ConstructingObjectParser<SettingsConfig, Void> createParser(boole
TransformField.UNATTENDED,
ValueType.BOOLEAN_OR_NULL
);
// this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser
parser.declareField(
optionalConstructorArg(),
p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? DEFAULT_SKIP_DEST_INDEX_CREATION : p.booleanValue() ? 1 : 0,
TransformField.SKIP_DEST_INDEX_CREATION,
ValueType.BOOLEAN_OR_NULL
);
return parser;
}

Expand All @@ -109,10 +131,7 @@ private static ConstructingObjectParser<SettingsConfig, Void> createParser(boole
private final Integer deduceMappings;
private final Integer numFailureRetries;
private final Integer unattended;

public SettingsConfig() {
this(null, null, (Integer) null, (Integer) null, (Integer) null, (Integer) null, (Integer) null, (Integer) null);
}
private final Integer skipDestIndexCreation;

public SettingsConfig(
Integer maxPageSearchSize,
Expand All @@ -122,7 +141,8 @@ public SettingsConfig(
Boolean usePit,
Boolean deduceMappings,
Integer numFailureRetries,
Boolean unattended
Boolean unattended,
Boolean skipDestIndexCreation
) {
this(
maxPageSearchSize,
Expand All @@ -132,19 +152,21 @@ public SettingsConfig(
usePit == null ? null : usePit ? 1 : 0,
deduceMappings == null ? null : deduceMappings ? 1 : 0,
numFailureRetries,
unattended == null ? null : unattended ? 1 : 0
unattended == null ? null : unattended ? 1 : 0,
skipDestIndexCreation == null ? null : skipDestIndexCreation ? 1 : 0
);
}

SettingsConfig(
private SettingsConfig(
Integer maxPageSearchSize,
Float docsPerSecond,
Integer datesAsEpochMillis,
Integer alignCheckpoints,
Integer usePit,
Integer deduceMappings,
Integer numFailureRetries,
Integer unattended
Integer unattended,
Integer skipDestIndexCreation
) {
this.maxPageSearchSize = maxPageSearchSize;
this.docsPerSecond = docsPerSecond;
Expand All @@ -154,6 +176,7 @@ public SettingsConfig(
this.deduceMappings = deduceMappings;
this.numFailureRetries = numFailureRetries;
this.unattended = unattended;
this.skipDestIndexCreation = skipDestIndexCreation;
}

public SettingsConfig(final StreamInput in) throws IOException {
Expand All @@ -178,6 +201,11 @@ public SettingsConfig(final StreamInput in) throws IOException {
} else {
unattended = DEFAULT_UNATTENDED;
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { // TODO
skipDestIndexCreation = in.readOptionalInt();
} else {
skipDestIndexCreation = DEFAULT_SKIP_DEST_INDEX_CREATION;
}
}

public Integer getMaxPageSearchSize() {
Expand All @@ -192,50 +220,60 @@ public Boolean getDatesAsEpochMillis() {
return datesAsEpochMillis != null ? datesAsEpochMillis > 0 : null;
}

public Integer getDatesAsEpochMillisForUpdate() {
Integer getDatesAsEpochMillisForUpdate() {
return datesAsEpochMillis;
}

public Boolean getAlignCheckpoints() {
return alignCheckpoints != null ? (alignCheckpoints > 0) || (alignCheckpoints == DEFAULT_ALIGN_CHECKPOINTS) : null;
}

public Integer getAlignCheckpointsForUpdate() {
Integer getAlignCheckpointsForUpdate() {
return alignCheckpoints;
}

public Boolean getUsePit() {
return usePit != null ? (usePit > 0) || (usePit == DEFAULT_USE_PIT) : null;
}

public Integer getUsePitForUpdate() {
Integer getUsePitForUpdate() {
return usePit;
}

public Boolean getDeduceMappings() {
return deduceMappings != null ? (deduceMappings > 0) || (deduceMappings == DEFAULT_DEDUCE_MAPPINGS) : null;
}

public Integer getDeduceMappingsForUpdate() {
Integer getDeduceMappingsForUpdate() {
return deduceMappings;
}

public Integer getNumFailureRetries() {
return numFailureRetries != null ? (numFailureRetries == DEFAULT_NUM_FAILURE_RETRIES ? null : numFailureRetries) : null;
}

public Integer getNumFailureRetriesForUpdate() {
Integer getNumFailureRetriesForUpdate() {
return numFailureRetries;
}

public Boolean getUnattended() {
return unattended != null ? (unattended == DEFAULT_UNATTENDED) ? null : (unattended > 0) : null;
}

public Integer getUnattendedForUpdate() {
Integer getUnattendedForUpdate() {
return unattended;
}

public Boolean getSkipDestIndexCreation() {
return skipDestIndexCreation != null
? (skipDestIndexCreation == DEFAULT_SKIP_DEST_INDEX_CREATION) ? null : (skipDestIndexCreation > 0)
: null;
}

Integer getSkipDestIndexCreationForUpdate() {
return skipDestIndexCreation;
}

public ActionRequestValidationException validate(ActionRequestValidationException validationException) {
if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > MultiBucketConsumerService.DEFAULT_MAX_BUCKETS)) {
validationException = addValidationError(
Expand Down Expand Up @@ -288,6 +326,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_5_0)) {
out.writeOptionalInt(unattended);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
out.writeOptionalInt(skipDestIndexCreation);
}
}

@Override
Expand Down Expand Up @@ -318,6 +359,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (unattended != null && (unattended.equals(DEFAULT_UNATTENDED) == false)) {
builder.field(TransformField.UNATTENDED.getPreferredName(), unattended > 0 ? true : false);
}
if (skipDestIndexCreation != null && (skipDestIndexCreation.equals(DEFAULT_SKIP_DEST_INDEX_CREATION) == false)) {
builder.field(TransformField.SKIP_DEST_INDEX_CREATION.getPreferredName(), skipDestIndexCreation > 0 ? true : false);
}
builder.endObject();
return builder;
}
Expand All @@ -339,7 +383,8 @@ public boolean equals(Object other) {
&& Objects.equals(usePit, that.usePit)
&& Objects.equals(deduceMappings, that.deduceMappings)
&& Objects.equals(numFailureRetries, that.numFailureRetries)
&& Objects.equals(unattended, that.unattended);
&& Objects.equals(unattended, that.unattended)
&& Objects.equals(skipDestIndexCreation, that.skipDestIndexCreation);
}

@Override
Expand All @@ -352,7 +397,8 @@ public int hashCode() {
usePit,
deduceMappings,
numFailureRetries,
unattended
unattended,
skipDestIndexCreation
);
}

Expand All @@ -374,6 +420,7 @@ public static class Builder {
private Integer deduceMappings;
private Integer numFailureRetries;
private Integer unattended;
private Integer skipDestIndexCreation;

/**
* Default builder
Expand All @@ -394,6 +441,7 @@ public Builder(SettingsConfig base) {
this.deduceMappings = base.deduceMappings;
this.numFailureRetries = base.numFailureRetries;
this.unattended = base.unattended;
this.skipDestIndexCreation = base.skipDestIndexCreation;
}

/**
Expand Down Expand Up @@ -495,13 +543,26 @@ public Builder setNumFailureRetries(Integer numFailureRetries) {
* An explicit `null` resets to default.
*
* @param unattended true if this is a unattended transform.
* @return the {@link Builder} with usePit set.
* @return the {@link Builder} with unattended set.
*/
public Builder setUnattended(Boolean unattended) {
this.unattended = unattended == null ? DEFAULT_UNATTENDED : unattended ? 1 : 0;
return this;
}

/**
* Whether to skip destination index creation for the transform.
*
* An explicit `null` resets to default.
*
* @param skipDestIndexCreation true if destination index creation should be skipped.
* @return the {@link Builder} with skipDestIndexCreation set.
*/
public Builder setSkipDestIndexCreation(Boolean skipDestIndexCreation) {
this.skipDestIndexCreation = skipDestIndexCreation == null ? DEFAULT_SKIP_DEST_INDEX_CREATION : skipDestIndexCreation ? 1 : 0;
return this;
}

/**
* Update settings according to given settings config.
*
Expand Down Expand Up @@ -545,6 +606,11 @@ public Builder update(SettingsConfig update) {
if (update.getUnattendedForUpdate() != null) {
this.unattended = update.getUnattendedForUpdate().equals(DEFAULT_UNATTENDED) ? null : update.getUnattendedForUpdate();
}
if (update.getSkipDestIndexCreationForUpdate() != null) {
this.skipDestIndexCreation = update.getSkipDestIndexCreationForUpdate().equals(DEFAULT_SKIP_DEST_INDEX_CREATION)
? null
: update.getSkipDestIndexCreationForUpdate();
}

return this;
}
Expand All @@ -558,7 +624,8 @@ public SettingsConfig build() {
usePit,
deduceMappings,
numFailureRetries,
unattended
unattended,
skipDestIndexCreation
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public TransformConfig(
this.pivotConfig = pivotConfig;
this.latestConfig = latestConfig;
this.description = description;
this.settings = settings == null ? new SettingsConfig() : settings;
this.settings = settings == null ? SettingsConfig.EMPTY : settings;
this.metadata = metadata;
this.retentionPolicyConfig = retentionPolicyConfig;
if (this.description != null && this.description.length() > MAX_DESCRIPTION_LENGTH) {
Expand Down Expand Up @@ -600,7 +600,8 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) {
builder.getSettings().getUsePit(),
builder.getSettings().getDeduceMappings(),
builder.getSettings().getNumFailureRetries(),
builder.getSettings().getUnattended()
builder.getSettings().getUnattended(),
builder.getSettings().getSkipDestIndexCreation()
)
);
}
Expand All @@ -616,7 +617,8 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) {
builder.getSettings().getUsePit(),
builder.getSettings().getDeduceMappings(),
builder.getSettings().getNumFailureRetries(),
builder.getSettings().getUnattended()
builder.getSettings().getUnattended(),
builder.getSettings().getSkipDestIndexCreation()
)
);
}
Expand All @@ -632,7 +634,8 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) {
builder.getSettings().getUsePit(),
builder.getSettings().getDeduceMappings(),
builder.getSettings().getNumFailureRetries(),
builder.getSettings().getUnattended()
builder.getSettings().getUnattended(),
builder.getSettings().getSkipDestIndexCreation()
)
);
}
Expand Down
Loading

0 comments on commit 44dad2d

Please sign in to comment.