Skip to content

Commit

Permalink
copy scanContex
Browse files Browse the repository at this point in the history
  • Loading branch information
ConradJam committed Aug 14, 2024
1 parent 4ad440b commit a638b49
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,10 @@ public class ScanContext implements Serializable {
protected final List<Expression> filters;
protected final long limit;
protected final boolean includeColumnStats;
protected final Collection<String> includeStatsForColumns;
protected final Integer planParallelism;
protected final int maxPlanningSnapshotCount;
protected final int maxAllowedPlanningFailures;
protected final Collection<String> includeStatsForColumns;
protected final String watermarkColumn;
protected final TimeUnit watermarkColumnTimeUnit;

Expand Down Expand Up @@ -262,10 +262,6 @@ void validate() {
startSnapshotId == null,
"Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null");
}
Preconditions.checkArgument(
branch == null,
String.format(
"Cannot scan table using ref %s configured for streaming reader yet", branch));

Preconditions.checkArgument(
tag == null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,10 @@ public static class Builder {

private String endTag = FlinkReadOptions.END_TAG.defaultValue();
private String scanStartupMode;
private Collection<String> includeStatsForColumns;
private String watermarkColumn;
private TimeUnit watermarkColumnTimeUnit;
private Collection<String> includeStatsForColumns = null;
private String watermarkColumn = FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();;
private TimeUnit watermarkColumnTimeUnit =
FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue();

private boolean batchMode = false;

Expand Down Expand Up @@ -330,16 +331,19 @@ public Builder batchMode(boolean batchMode) {
return this;
}

public Collection<String> includeStatsForColumns() {
return includeStatsForColumns;
public Builder includeColumnStats(Collection<String> newIncludeStatsForColumns) {
this.includeStatsForColumns = newIncludeStatsForColumns;
return this;
}

public String watermarkColumn() {
return watermarkColumn;
public Builder watermarkColumn(String newWatermarkColumn) {
this.watermarkColumn = newWatermarkColumn;
return this;
}

public TimeUnit watermarkColumnTimeUnit() {
return watermarkColumnTimeUnit;
public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) {
this.watermarkColumnTimeUnit = newWatermarkTimeUnit;
return this;
}

public Builder fromProperties(Map<String, String> properties) {
Expand All @@ -365,8 +369,11 @@ public Builder fromProperties(Map<String, String> properties) {
.nameMapping(properties.get(DEFAULT_NAME_MAPPING))
.scanStartupMode(properties.get(MixedFormatValidator.SCAN_STARTUP_MODE.key()))
.includeColumnStats(config.get(INCLUDE_COLUMN_STATS))
.includeColumnStats(includeStatsForColumns)
.maxPlanningSnapshotCount(config.get(MAX_PLANNING_SNAPSHOT_COUNT))
.maxAllowedPlanningFailures(maxAllowedPlanningFailures);
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkColumnTimeUnit(watermarkColumnTimeUnit);
}

public MixedFormatScanContext build() {
Expand Down
2 changes: 1 addition & 1 deletion amoro-mixed-format/amoro-mixed-format-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

<properties>
<!-- default flink version -->
<flink.version>1.17.1</flink.version>
<flink.version>1.17.2</flink.version>
<kafka.version>2.4.1</kafka.version>
<gson.version>2.9.0</gson.version>
<jackson.vesion>2.10.2</jackson.vesion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<kafka.version>3.2.3</kafka.version>
<assertj.version>3.21.0</assertj.version>
<testcontainers.version>1.17.2</testcontainers.version>
<flink.version>1.17.1</flink.version>
<flink.version>1.17.2</flink.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<url>https://amoro.apache.org</url>

<properties>
<flink.version>1.17.1</flink.version>
<flink.version>1.17.2</flink.version>
</properties>

<dependencies>
Expand Down

0 comments on commit a638b49

Please sign in to comment.