Skip to content

Flink: Dynamic Iceberg Sink: Add dynamic writer and committer #13080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

mxm
Copy link
Contributor

@mxm mxm commented May 16, 2025

This adds the dynamic version of the writer and committer for the Flink Dynamic Iceberg Sink. Conceptually, they work similar to the IcebergSink, but they support writing to multiple tables. Write results from each table are aggregated from the DynamicWriter in the DynamicWriteResultAggregator, from where they are sent to the DynamicCommitter.

Broken out of #12424.

This adds the dynamic version of the writer and committer for the Flink Dynamic
Iceberg Sink. Conceptually, they work similar to the IcebergSink, but they
support writing to multiple tables. Write results from each table are aggregated
from the DynamicWriter in the DynamicWriteResultAggregator, from where they are
sent to the DynamicCommitter.

Broken out of apache#12424, depends on apache#13032.
@github-actions github-actions bot added the flink label May 16, 2025
@mxm mxm changed the title Flink: Dynamic Sink: Add dynamic writer and committer Flink: Dynamic Iceberg Sink: Add dynamic writer and committer May 16, 2025
import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.util.ScanTaskUtil;

class CommitSummary {
public class CommitSummary {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still an @Internal class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting to annotate with @Internal because of the visibility change?

@@ -23,7 +23,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

class DeltaManifests {
public class DeltaManifests {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still @Internal

@@ -148,6 +170,7 @@ public void initialize(int taskId, int attemptId) {
OutputFileFactory.builderFor(table, taskId, attemptId)
.format(format)
.ioSupplier(() -> tableSupplier.get().io())
.defaultSpec(spec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a behavioral change in the IcebergSink?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. This field in the builder would normally be set to the current table spec, but for the dynamic sink we can write multiple active specs, for which we will have a dedicated writer each.

}

@Override
public byte[] serialize(DynamicCommittable committable) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I miss the tests

view.read(manifestBuf);
return new DynamicCommittable(key, manifestBuf, jobId, operatorId, checkpointId);
}
throw new IOException("Unrecognized version or corrupt state: " + version);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit newline

view.read(manifestBuf);
return new DynamicCommittable(key, manifestBuf, jobId, operatorId, checkpointId);
}
throw new IOException("Unrecognized version or corrupt state: " + version);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit newline

break;
}
}
Long parentSnapshotId = snapshot.parentId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline

} else {
commitDeltaTxn(table, branch, pendingResults, summary, newFlinkJobId, operatorId);
}
continuousEmptyCheckpoints = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline

@pvary
Copy link
Contributor

pvary commented May 19, 2025

Could you please provide tests?
Also double check, that every class we made public is marked with @Internal.
Thanks,
Peter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants