diff --git a/amoro-mixed-format/amoro-mixed-format-flink/amoro-mixed-format-flink-common-iceberg-bridge/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/amoro-mixed-format/amoro-mixed-format-flink/amoro-mixed-format-flink-common-iceberg-bridge/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 5d9c736c2e..7354ee9dfb 100644 --- a/amoro-mixed-format/amoro-mixed-format-flink/amoro-mixed-format-flink-common-iceberg-bridge/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/amoro-mixed-format/amoro-mixed-format-flink/amoro-mixed-format-flink-common-iceberg-bridge/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -175,10 +175,10 @@ public class ScanContext implements Serializable { protected final List filters; protected final long limit; protected final boolean includeColumnStats; + protected final Collection includeStatsForColumns; protected final Integer planParallelism; protected final int maxPlanningSnapshotCount; protected final int maxAllowedPlanningFailures; - protected final Collection includeStatsForColumns; protected final String watermarkColumn; protected final TimeUnit watermarkColumnTimeUnit; @@ -262,10 +262,6 @@ void validate() { startSnapshotId == null, "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); } - Preconditions.checkArgument( - branch == null, - String.format( - "Cannot scan table using ref %s configured for streaming reader yet", branch)); Preconditions.checkArgument( tag == null, diff --git a/amoro-mixed-format/amoro-mixed-format-flink/amoro-mixed-format-flink-common/src/main/java/org/apache/amoro/flink/read/source/MixedFormatScanContext.java b/amoro-mixed-format/amoro-mixed-format-flink/amoro-mixed-format-flink-common/src/main/java/org/apache/amoro/flink/read/source/MixedFormatScanContext.java index 32483b74ee..5f73577705 100644 --- a/amoro-mixed-format/amoro-mixed-format-flink/amoro-mixed-format-flink-common/src/main/java/org/apache/amoro/flink/read/source/MixedFormatScanContext.java +++ b/amoro-mixed-format/amoro-mixed-format-flink/amoro-mixed-format-flink-common/src/main/java/org/apache/amoro/flink/read/source/MixedFormatScanContext.java @@ -187,9 +187,10 @@ public static class Builder { private String endTag = FlinkReadOptions.END_TAG.defaultValue(); private String scanStartupMode; - private Collection includeStatsForColumns; - private String watermarkColumn; - private TimeUnit watermarkColumnTimeUnit; + private Collection includeStatsForColumns = null; + private String watermarkColumn = FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();; + private TimeUnit watermarkColumnTimeUnit = + FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue(); private boolean batchMode = false; @@ -330,16 +331,19 @@ public Builder batchMode(boolean batchMode) { return this; } - public Collection includeStatsForColumns() { - return includeStatsForColumns; + public Builder includeColumnStats(Collection newIncludeStatsForColumns) { + this.includeStatsForColumns = newIncludeStatsForColumns; + return this; } - public String watermarkColumn() { - return watermarkColumn; + public Builder watermarkColumn(String newWatermarkColumn) { + this.watermarkColumn = newWatermarkColumn; + return this; } - public TimeUnit watermarkColumnTimeUnit() { - return watermarkColumnTimeUnit; + public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) { + this.watermarkColumnTimeUnit = newWatermarkTimeUnit; + return this; } public Builder fromProperties(Map properties) { @@ -365,8 +369,11 @@ public Builder fromProperties(Map properties) { .nameMapping(properties.get(DEFAULT_NAME_MAPPING)) .scanStartupMode(properties.get(MixedFormatValidator.SCAN_STARTUP_MODE.key())) .includeColumnStats(config.get(INCLUDE_COLUMN_STATS)) + .includeColumnStats(includeStatsForColumns) .maxPlanningSnapshotCount(config.get(MAX_PLANNING_SNAPSHOT_COUNT)) - .maxAllowedPlanningFailures(maxAllowedPlanningFailures); + .maxAllowedPlanningFailures(maxAllowedPlanningFailures) + .watermarkColumn(watermarkColumn) + .watermarkColumnTimeUnit(watermarkColumnTimeUnit); } public MixedFormatScanContext build() { diff --git a/amoro-mixed-format/amoro-mixed-format-flink/pom.xml b/amoro-mixed-format/amoro-mixed-format-flink/pom.xml index 18d7cb1532..550b1f243e 100644 --- a/amoro-mixed-format/amoro-mixed-format-flink/pom.xml +++ b/amoro-mixed-format/amoro-mixed-format-flink/pom.xml @@ -43,7 +43,7 @@ - 1.17.1 + 1.17.2 2.4.1 2.9.0 2.10.2 diff --git a/amoro-mixed-format/amoro-mixed-format-flink/v1.17/amoro-mixed-format-flink-1.17/pom.xml b/amoro-mixed-format/amoro-mixed-format-flink/v1.17/amoro-mixed-format-flink-1.17/pom.xml index 8f52deb559..59241596eb 100644 --- a/amoro-mixed-format/amoro-mixed-format-flink/v1.17/amoro-mixed-format-flink-1.17/pom.xml +++ b/amoro-mixed-format/amoro-mixed-format-flink/v1.17/amoro-mixed-format-flink-1.17/pom.xml @@ -37,7 +37,7 @@ 3.2.3 3.21.0 1.17.2 - 1.17.1 + 1.17.2 diff --git a/amoro-mixed-format/amoro-mixed-format-flink/v1.17/amoro-mixed-format-flink-runtime-1.17/pom.xml b/amoro-mixed-format/amoro-mixed-format-flink/v1.17/amoro-mixed-format-flink-runtime-1.17/pom.xml index 04804ffa13..1f422a75ec 100644 --- a/amoro-mixed-format/amoro-mixed-format-flink/v1.17/amoro-mixed-format-flink-runtime-1.17/pom.xml +++ b/amoro-mixed-format/amoro-mixed-format-flink/v1.17/amoro-mixed-format-flink-runtime-1.17/pom.xml @@ -32,7 +32,7 @@ https://amoro.apache.org - 1.17.1 + 1.17.2