Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3515] feat(flink-connector): Support flink iceberg catalog #5914

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

sunxiaojian
Copy link
Contributor

What changes were proposed in this pull request?

Support flink iceberg catalog

Why are the changes needed?

Fix: #3515

Does this PR introduce any user-facing change?

no

How was this patch tested?

FlinkIcebergCatalogIT
FlinkIcebergHiveCatalogIT

@sunxiaojian sunxiaojian changed the title [#3515]feat(flink-connector)Support flink iceberg catalog [#3515] feat(flink-connector): Support flink iceberg catalog Dec 19, 2024
@sunxiaojian sunxiaojian force-pushed the support-flink-iceberg-catalog branch 3 times, most recently from e5cb05a to 24cd6b8 Compare December 19, 2024 07:34
@sunxiaojian sunxiaojian force-pushed the support-flink-iceberg-catalog branch from 24cd6b8 to ef294ba Compare December 19, 2024 07:37
@sunxiaojian
Copy link
Contributor Author

@FANNG1 @coolderli PTAL

@FANNG1
Copy link
Contributor

FANNG1 commented Dec 19, 2024

Cool!, I'll review this PR, but may need some time, :)

@sunxiaojian
Copy link
Contributor Author

Cool!, I'll review this PR, but may need some time, :)

ok, thanks

Comment on lines +8 to +9
The Apache Gravitino Flink connector offers the capability to read and write Iceberg tables, with the metadata managed by the Gravitino server. To enable the use of the Iceberg catalog within the Flink connector, you must download the Iceberg Flink runtime JAR to the Flink classpath.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The Apache Gravitino Flink connector offers the capability to read and write Iceberg tables, with the metadata managed by the Gravitino server. To enable the use of the Iceberg catalog within the Flink connector, you must download the Iceberg Flink runtime JAR to the Flink classpath.
The Apache Gravitino Flink connector can be used to read and write Iceberg tables, with the metadata managed by the Gravitino server.
To enable the Flink connector, you must download the Iceberg Flink runtime JAR and place it in the Flink classpath.

- `INSERT INTO & OVERWRITE`
- `SELECT`

#### Not supported operations:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#### Not supported operations:
#### Operations Not Supported:


## Catalog properties

Gravitino Flink connector will transform the following property names defined in catalog properties to Flink Iceberg connector configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Gravitino Flink connector will transform the following property names defined in catalog properties to Flink Iceberg connector configuration.
The Gravitino Flink connector transforms the following properties in a catalog to Flink connector configuration.


### S3

You need to add s3 secret to the Flink configuration using `s3.access-key-id` and `s3.secret-access-key`. Additionally, download the [Iceberg AWS bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle) and place it in the classpath of Flink.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
You need to add s3 secret to the Flink configuration using `s3.access-key-id` and `s3.secret-access-key`. Additionally, download the [Iceberg AWS bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle) and place it in the classpath of Flink.
You need to add an S3 secret to the Flink configuration using `s3.access-key-id` and `s3.secret-access-key`.
Additionally, you need to download the [Iceberg AWS bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle)
and place it in the Flink classpath.


### OSS

You need to add OSS secret key to the Flink configuration using `client.access-key-id` and `client.access-key-secret`. Additionally, download the [Aliyun OSS SDK](https://gosspublic.alicdn.com/sdks/java/aliyun_java_sdk_3.10.2.zip) and copy `aliyun-sdk-oss-3.10.2.jar`, `hamcrest-core-1.1.jar`, `jdom2-2.0.6.jar` in the classpath of Flink.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
You need to add OSS secret key to the Flink configuration using `client.access-key-id` and `client.access-key-secret`. Additionally, download the [Aliyun OSS SDK](https://gosspublic.alicdn.com/sdks/java/aliyun_java_sdk_3.10.2.zip) and copy `aliyun-sdk-oss-3.10.2.jar`, `hamcrest-core-1.1.jar`, `jdom2-2.0.6.jar` in the classpath of Flink.
You need to add an OSS secret key to the Flink configuration using `client.access-key-id` and `client.access-key-secret`.
Additionally, you need download the [Aliyun OSS SDK](https://gosspublic.alicdn.com/sdks/java/aliyun_java_sdk_3.10.2.zip),
and copy `aliyun-sdk-oss-3.10.2.jar`, `hamcrest-core-1.1.jar`, `jdom2-2.0.6.jar` to the Flink classpath.


### GCS

No extra configuration is needed. Please make sure the credential file is accessible by Flink, like using `export GOOGLE_APPLICATION_CREDENTIALS=/xx/application_default_credentials.json`, and download [Iceberg GCP bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-gcp-bundle) and place it to the classpath of Flink.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
No extra configuration is needed. Please make sure the credential file is accessible by Flink, like using `export GOOGLE_APPLICATION_CREDENTIALS=/xx/application_default_credentials.json`, and download [Iceberg GCP bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-gcp-bundle) and place it to the classpath of Flink.
No extra configuration is needed. Please make sure the credential file is accessible by Flink.
For example, `export GOOGLE_APPLICATION_CREDENTIALS=/xx/application_default_credentials.json`.
You need to download [Iceberg GCP bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-gcp-bundle) and place it in the Flink classpath.

FactoryUtils.createCatalogFactoryHelper(this, context);
return new GravitinoIcebergCatalog(
context.getName(),
helper.getOptions().get(GravitinoIcebergCatalogFactoryOptions.DEFAULT_DATABASE),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to use such a long name?
Maybe CatalogFactoryOptions or even FactoryOptions is enough in such a well-defined package hierarchy?

*/
@Override
public String gravitinoCatalogProvider() {
return "lakehouse-iceberg";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a constant for this string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no constant, this value comes from the catalog provider, and there is no unified definition as a constant


String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive";

@VisibleForTesting String ICEBERG_CATALOG_BACKEND_REST = CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most, if not all fields in this interface are constants, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are all constants

all.put(
CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoIcebergCatalogFactoryOptions.IDENTIFIER);
// Map "catalog-backend" to "catalog-type".
String catalogBackend = all.remove(IcebergConstants.CATALOG_BACKEND);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can maintain a mapping list.


## Storage

### S3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested S3 GCS OSS storage?

@FANNG1
Copy link
Contributor

FANNG1 commented Dec 26, 2024

Hi, @sunxiaojian , Sorry for the delay, I'm working on the issues to release 0.8, may doesn't have enough time to review this PR these days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Subtask] [flink-connector] Support iceberg catalog
4 participants