-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Flink: Dynamic Iceberg Sink: Add dynamic writer and committer #13080
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This adds the dynamic version of the writer and committer for the Flink Dynamic Iceberg Sink. Conceptually, they work similar to the IcebergSink, but they support writing to multiple tables. Write results from each table are aggregated from the DynamicWriter in the DynamicWriteResultAggregator, from where they are sent to the DynamicCommitter. Broken out of apache#12424, depends on apache#13032.
import java.util.NavigableMap; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import org.apache.iceberg.io.WriteResult; | ||
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; | ||
import org.apache.iceberg.util.ScanTaskUtil; | ||
|
||
class CommitSummary { | ||
public class CommitSummary { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still an @Internal
class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting to annotate with @Internal
because of the visibility change?
@@ -23,7 +23,7 @@ | |||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | |||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | |||
|
|||
class DeltaManifests { | |||
public class DeltaManifests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still @Internal
@@ -148,6 +170,7 @@ public void initialize(int taskId, int attemptId) { | |||
OutputFileFactory.builderFor(table, taskId, attemptId) | |||
.format(format) | |||
.ioSupplier(() -> tableSupplier.get().io()) | |||
.defaultSpec(spec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a behavioral change in the IcebergSink?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. This field in the builder would normally be set to the current table spec, but for the dynamic sink we can write multiple active specs, for which we will have a dedicated writer each.
} | ||
|
||
@Override | ||
public byte[] serialize(DynamicCommittable committable) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I miss the tests
view.read(manifestBuf); | ||
return new DynamicCommittable(key, manifestBuf, jobId, operatorId, checkpointId); | ||
} | ||
throw new IOException("Unrecognized version or corrupt state: " + version); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit newline
view.read(manifestBuf); | ||
return new DynamicCommittable(key, manifestBuf, jobId, operatorId, checkpointId); | ||
} | ||
throw new IOException("Unrecognized version or corrupt state: " + version); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit newline
break; | ||
} | ||
} | ||
Long parentSnapshotId = snapshot.parentId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
} else { | ||
commitDeltaTxn(table, branch, pendingResults, summary, newFlinkJobId, operatorId); | ||
} | ||
continuousEmptyCheckpoints = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
Could you please provide tests? |
This adds the dynamic version of the writer and committer for the Flink Dynamic Iceberg Sink. Conceptually, they work similar to the IcebergSink, but they support writing to multiple tables. Write results from each table are aggregated from the DynamicWriter in the DynamicWriteResultAggregator, from where they are sent to the DynamicCommitter.
Broken out of #12424.