From ef294ba26a7e0fdf8e1c900541266636d82d479f Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Thu, 12 Dec 2024 20:05:55 +0800 Subject: [PATCH] Support flink iceberg catalog #3515 --- docs/flink-connector/flink-catalog-iceberg.md | 83 +++ flink-connector/flink/build.gradle.kts | 6 + .../catalog/GravitinoCatalogManager.java | 11 - .../iceberg/GravitinoIcebergCatalog.java | 67 +++ .../GravitinoIcebergCatalogFactory.java | 95 ++++ ...GravitinoIcebergCatalogFactoryOptions.java | 33 ++ .../iceberg/IcebergPropertiesConstants.java | 46 ++ .../iceberg/IcebergPropertiesConverter.java | 63 +++ .../org.apache.flink.table.factories.Factory | 3 +- .../TestIcebergPropertiesConverter.java | 82 +++ .../integration/test/FlinkCommonIT.java | 57 +- .../integration/test/FlinkEnvIT.java | 30 +- .../test/hive/FlinkHiveCatalogIT.java | 16 +- .../test/iceberg/FlinkIcebergCatalogIT.java | 496 ++++++++++++++++++ .../iceberg/FlinkIcebergHiveCatalogIT.java | 46 ++ .../integration/test/utils/TestUtils.java | 3 +- 16 files changed, 1096 insertions(+), 41 deletions(-) create mode 100644 docs/flink-connector/flink-catalog-iceberg.md create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java create mode 100644 flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java create mode 100644 flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java create mode 100644 flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java diff --git a/docs/flink-connector/flink-catalog-iceberg.md b/docs/flink-connector/flink-catalog-iceberg.md new file mode 100644 index 00000000000..a7112d1fe4d --- /dev/null +++ b/docs/flink-connector/flink-catalog-iceberg.md @@ -0,0 +1,83 @@ +--- +title: "Flink connector Iceberg catalog" +slug: /flink-connector/flink-catalog-iceberg +keyword: flink connector iceberg catalog +license: "This software is licensed under the Apache License version 2." +--- + +The Apache Gravitino Flink connector offers the capability to read and write Iceberg tables, with the metadata managed by the Gravitino server. To enable the use of the Iceberg catalog within the Flink connector, you must download the Iceberg Flink runtime JAR to the Flink classpath. + +## Capabilities + +#### Supported DML and DDL operations: + +- `CREATE CATALOG` +- `CREATE DATABASE` +- `CREATE TABLE` +- `DROP TABLE` +- `ALTER TABLE` +- `INSERT INTO & OVERWRITE` +- `SELECT` + +#### Not supported operations: + +- Partition operations +- View operations +- Metadata tables, like: + - `{iceberg_catalog}.{iceberg_database}.{iceberg_table}&snapshots` +- Querying UDF + - `UPDATE` clause + - `DELETE` clause + - `CREATE TABLE LIKE` clause + +## SQL example +```sql + +-- Suppose iceberg_a is the Iceberg catalog name managed by Gravitino + +USE iceberg_a; + +CREATE DATABASE IF NOT EXISTS mydatabase; +USE mydatabase; + +CREATE TABLE sample ( + id BIGINT COMMENT 'unique id', + data STRING NOT NULL +) PARTITIONED BY (data) +WITH ('format-version'='2'); + +INSERT INTO sample +VALUES (1, 'A'), (2, 'B'); + +SELECT * FROM sample WHERE data = 'B'; + +``` + +## Catalog properties + +Gravitino Flink connector will transform the following property names defined in catalog properties to Flink Iceberg connector configuration. + +| Gravitino catalog property name | Flink Iceberg connector configuration | Description | Since Version | +|---------------------------------|---------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------| +| `catalog-backend` | `catalog-type` | Catalog backend type | 0.8.0-incubating | +| `uri` | `uri` | Catalog backend URI | 0.8.0-incubating | +| `warehouse` | `warehouse` | Catalog backend warehouse | 0.8.0-incubating | +| `io-impl` | `io-impl` | The IO implementation for `FileIO` in Iceberg. | 0.8.0-incubating | +| `s3-endpoint` | `s3.endpoint` | An alternative endpoint of the S3 service, This could be used for S3FileIO with any S3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. | 0.8.0-incubating | +| `s3-region` | `client.region` | The region of the S3 service, like `us-west-2`. | 0.8.0-incubating | +| `oss-endpoint` | `oss.endpoint` | The endpoint of Aliyun OSS service. | 0.8.0-incubating | + + +## Storage + +### S3 + +You need to add s3 secret to the Flink configuration using `s3.access-key-id` and `s3.secret-access-key`. Additionally, download the [Iceberg AWS bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle) and place it in the classpath of Flink. + +### OSS + +You need to add OSS secret key to the Flink configuration using `client.access-key-id` and `client.access-key-secret`. Additionally, download the [Aliyun OSS SDK](https://gosspublic.alicdn.com/sdks/java/aliyun_java_sdk_3.10.2.zip) and copy `aliyun-sdk-oss-3.10.2.jar`, `hamcrest-core-1.1.jar`, `jdom2-2.0.6.jar` in the classpath of Flink. + +### GCS + +No extra configuration is needed. Please make sure the credential file is accessible by Flink, like using `export GOOGLE_APPLICATION_CREDENTIALS=/xx/application_default_credentials.json`, and download [Iceberg GCP bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-gcp-bundle) and place it to the classpath of Flink. \ No newline at end of file diff --git a/flink-connector/flink/build.gradle.kts b/flink-connector/flink/build.gradle.kts index 9e2a48c036c..b52d85fffa6 100644 --- a/flink-connector/flink/build.gradle.kts +++ b/flink-connector/flink/build.gradle.kts @@ -29,6 +29,8 @@ repositories { val flinkVersion: String = libs.versions.flink.get() val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".") +val icebergVersion: String = libs.versions.iceberg.get() + // The Flink only support scala 2.12, and all scala api will be removed in a future version. // You can find more detail at the following issues: // https://issues.apache.org/jira/browse/FLINK-23986, @@ -43,6 +45,8 @@ dependencies { compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) + compileOnly("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion") + compileOnly("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") compileOnly("org.apache.flink:flink-table-common:$flinkVersion") compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion") @@ -86,7 +90,9 @@ dependencies { testImplementation(libs.testcontainers) testImplementation(libs.testcontainers.junit.jupiter) testImplementation(libs.testcontainers.mysql) + testImplementation(libs.metrics.core) + testImplementation("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion") testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") testImplementation("org.apache.flink:flink-table-common:$flinkVersion") testImplementation("org.apache.flink:flink-table-api-java:$flinkVersion") diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java index 7693e5d4c9a..0b0b89f3a5a 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java @@ -35,12 +35,10 @@ public class GravitinoCatalogManager { private static GravitinoCatalogManager gravitinoCatalogManager; private volatile boolean isClosed = false; - private final String metalakeName; private final GravitinoMetalake metalake; private final GravitinoAdminClient gravitinoClient; private GravitinoCatalogManager(String gravitinoUri, String metalakeName) { - this.metalakeName = metalakeName; this.gravitinoClient = GravitinoAdminClient.builder(gravitinoUri).build(); this.metalake = gravitinoClient.loadMetalake(metalakeName); } @@ -99,15 +97,6 @@ public Catalog getGravitinoCatalogInfo(String name) { return catalog; } - /** - * Get the metalake. - * - * @return the metalake name. - */ - public String getMetalakeName() { - return metalakeName; - } - /** * Create catalog in Gravitino. * diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java new file mode 100644 index 00000000000..30fac96bbc8 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.iceberg; + +import java.util.Map; +import java.util.Optional; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.factories.Factory; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalog; +import org.apache.iceberg.flink.FlinkCatalog; +import org.apache.iceberg.flink.FlinkCatalogFactory; + +/** Gravitino Iceberg Catalog. */ +public class GravitinoIcebergCatalog extends BaseCatalog { + + private final FlinkCatalog icebergCatalog; + + protected GravitinoIcebergCatalog( + String catalogName, + String defaultDatabase, + PropertiesConverter propertiesConverter, + PartitionConverter partitionConverter, + Map properties) { + super(catalogName, defaultDatabase, propertiesConverter, partitionConverter); + FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory(); + this.icebergCatalog = (FlinkCatalog) flinkCatalogFactory.createCatalog(catalogName, properties); + } + + @Override + public void open() throws CatalogException { + icebergCatalog.open(); + } + + @Override + public void close() throws CatalogException { + icebergCatalog.close(); + } + + @Override + public Optional getFactory() { + return icebergCatalog.getFactory(); + } + + @Override + protected AbstractCatalog realCatalog() { + return icebergCatalog; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java new file mode 100644 index 00000000000..ad0363d9867 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.iceberg; + +import java.util.Collections; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.gravitino.flink.connector.DefaultPartitionConverter; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; +import org.apache.gravitino.flink.connector.utils.FactoryUtils; + +public class GravitinoIcebergCatalogFactory implements BaseCatalogFactory { + + @Override + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtils.createCatalogFactoryHelper(this, context); + return new GravitinoIcebergCatalog( + context.getName(), + helper.getOptions().get(GravitinoIcebergCatalogFactoryOptions.DEFAULT_DATABASE), + propertiesConverter(), + partitionConverter(), + context.getOptions()); + } + + @Override + public String factoryIdentifier() { + return GravitinoIcebergCatalogFactoryOptions.IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + /** + * Define gravitino catalog provider. + * + * @return + */ + @Override + public String gravitinoCatalogProvider() { + return "lakehouse-iceberg"; + } + + /** + * Define gravitino catalog type. + * + * @return + */ + @Override + public org.apache.gravitino.Catalog.Type gravitinoCatalogType() { + return org.apache.gravitino.Catalog.Type.RELATIONAL; + } + + /** + * Define properties converter. + * + * @return + */ + @Override + public PropertiesConverter propertiesConverter() { + return IcebergPropertiesConverter.INSTANCE; + } + + @Override + public PartitionConverter partitionConverter() { + return DefaultPartitionConverter.INSTANCE; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java new file mode 100644 index 00000000000..95e1a21de85 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.iceberg; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.iceberg.flink.FlinkCatalogFactory; + +public class GravitinoIcebergCatalogFactoryOptions { + + public static final String IDENTIFIER = "gravitino-iceberg"; + public static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(FlinkCatalogFactory.DEFAULT_DATABASE) + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME); +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java new file mode 100644 index 00000000000..8f86ae5c50b --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.iceberg; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.flink.FlinkCatalogFactory; + +public interface IcebergPropertiesConstants { + @VisibleForTesting String GRAVITINO_ICEBERG_CATALOG_BACKEND = IcebergConstants.CATALOG_BACKEND; + + String ICEBERG_CATALOG_TYPE = FlinkCatalogFactory.ICEBERG_CATALOG_TYPE; + + String GRAVITINO_ICEBERG_CATALOG_WAREHOUSE = IcebergConstants.WAREHOUSE; + + String ICEBERG_CATALOG_WAREHOUSE = CatalogProperties.WAREHOUSE_LOCATION; + + String GRAVITINO_ICEBERG_CATALOG_URI = IcebergConstants.URI; + + String ICEBERG_CATALOG_URI = CatalogProperties.URI; + + @VisibleForTesting String ICEBERG_CATALOG_BACKEND_HIVE = CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE; + + String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive"; + + @VisibleForTesting String ICEBERG_CATALOG_BACKEND_REST = CatalogUtil.ICEBERG_CATALOG_TYPE_REST; +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java new file mode 100644 index 00000000000..f30593e6bc3 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.iceberg; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils; +import org.apache.gravitino.flink.connector.PropertiesConverter; + +public class IcebergPropertiesConverter implements PropertiesConverter { + public static IcebergPropertiesConverter INSTANCE = new IcebergPropertiesConverter(); + + private IcebergPropertiesConverter() {} + + @Override + public Map toFlinkCatalogProperties(Map gravitinoProperties) { + Preconditions.checkArgument( + gravitinoProperties != null, "Iceberg Catalog properties should not be null."); + + Map all = + IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties); + all.put( + CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoIcebergCatalogFactoryOptions.IDENTIFIER); + // Map "catalog-backend" to "catalog-type". + String catalogBackend = all.remove(IcebergConstants.CATALOG_BACKEND); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogBackend), + String.format("%s should not be empty", IcebergConstants.CATALOG_BACKEND)); + all.put(IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, catalogBackend); + return all; + } + + @Override + public Map toGravitinoTableProperties(Map properties) { + return new HashMap<>(properties); + } + + @Override + public Map toFlinkTableProperties(Map properties) { + return new HashMap<>(properties); + } +} diff --git a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index c9d9c92b5ef..8a785565f15 100644 --- a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -18,4 +18,5 @@ # org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactory -org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory \ No newline at end of file +org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory +org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactory \ No newline at end of file diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java new file mode 100644 index 00000000000..d6de522f396 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.iceberg; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestIcebergPropertiesConverter { + private static final IcebergPropertiesConverter CONVERTER = IcebergPropertiesConverter.INSTANCE; + + @Test + void testCatalogPropertiesWithHiveBackend() { + Map properties = + CONVERTER.toFlinkCatalogProperties( + ImmutableMap.of( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE, + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, + "hive-uri", + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, + "hive-warehouse", + "key1", + "value1")); + Assertions.assertEquals( + ImmutableMap.of( + CommonCatalogOptions.CATALOG_TYPE.key(), + GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, + IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE, + IcebergPropertiesConstants.ICEBERG_CATALOG_URI, + "hive-uri", + IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, + "hive-warehouse"), + properties); + } + + @Test + void testCatalogPropertiesWithRestBackend() { + Map properties = + CONVERTER.toFlinkCatalogProperties( + ImmutableMap.of( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST, + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, + "rest-uri", + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, + "rest-warehouse", + "key1", + "value1")); + Assertions.assertEquals( + ImmutableMap.of( + CommonCatalogOptions.CATALOG_TYPE.key(), + GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, + IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST, + IcebergPropertiesConstants.ICEBERG_CATALOG_URI, + "rest-uri", + IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, + "rest-warehouse"), + properties); + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 2d022b4a8a4..8f56f6a7ad9 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -58,6 +58,10 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { protected abstract Catalog currentCatalog(); + protected abstract String getProvider(); + + protected abstract boolean dropCascade(); + @Test public void testCreateSchema() { doWithCatalog( @@ -69,7 +73,7 @@ public void testCreateSchema() { TestUtils.assertTableResult(tableResult, ResultKind.SUCCESS); catalog.asSchemas().schemaExists(schema); } finally { - catalog.asSchemas().dropSchema(schema, true); + catalog.asSchemas().dropSchema(schema, dropCascade()); Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); } }); @@ -98,12 +102,11 @@ public void testGetSchema() { Schema loadedSchema = catalog.asSchemas().loadSchema(schema); Assertions.assertEquals(schema, loadedSchema.name()); Assertions.assertEquals(comment, loadedSchema.comment()); - Assertions.assertEquals(2, loadedSchema.properties().size()); Assertions.assertEquals(propertyValue, loadedSchema.properties().get(propertyKey)); Assertions.assertEquals( location, loadedSchema.properties().get(HiveConstants.LOCATION)); } finally { - catalog.asSchemas().dropSchema(schema, true); + catalog.asSchemas().dropSchema(schema, dropCascade()); Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); } }); @@ -141,9 +144,9 @@ public void testListSchema() { Assertions.assertEquals(schema2, schemas[2]); Assertions.assertEquals(schema3, schemas[3]); } finally { - catalog.asSchemas().dropSchema(schema, true); - catalog.asSchemas().dropSchema(schema2, true); - catalog.asSchemas().dropSchema(schema3, true); + catalog.asSchemas().dropSchema(schema, dropCascade()); + catalog.asSchemas().dropSchema(schema2, dropCascade()); + catalog.asSchemas().dropSchema(schema3, dropCascade()); Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); } }); @@ -167,7 +170,6 @@ public void testAlterSchema() { Schema loadedSchema = catalog.asSchemas().loadSchema(schema); Assertions.assertEquals(schema, loadedSchema.name()); Assertions.assertEquals("test comment", loadedSchema.comment()); - Assertions.assertEquals(3, loadedSchema.properties().size()); Assertions.assertEquals("value1", loadedSchema.properties().get("key1")); Assertions.assertEquals("value2", loadedSchema.properties().get("key2")); Assertions.assertNotNull(loadedSchema.properties().get("location")); @@ -178,11 +180,10 @@ public void testAlterSchema() { Schema reloadedSchema = catalog.asSchemas().loadSchema(schema); Assertions.assertEquals(schema, reloadedSchema.name()); Assertions.assertEquals("test comment", reloadedSchema.comment()); - Assertions.assertEquals(4, reloadedSchema.properties().size()); Assertions.assertEquals("new-value", reloadedSchema.properties().get("key1")); Assertions.assertEquals("value3", reloadedSchema.properties().get("key3")); } finally { - catalog.asSchemas().dropSchema(schema, true); + catalog.asSchemas().dropSchema(schema, dropCascade()); } }); } @@ -232,7 +233,8 @@ public void testCreateSimpleTable() { Row.of("A", 1.0), Row.of("B", 2.0)); }, - true); + true, + dropCascade()); } @Test @@ -264,7 +266,8 @@ public void testListTables() { Row.of("test_table1"), Row.of("test_table2")); }, - true); + true, + dropCascade()); } @Test @@ -280,12 +283,11 @@ public void testDropTable() { NameIdentifier identifier = NameIdentifier.of(databaseName, tableName); catalog.asTableCatalog().createTable(identifier, columns, "comment1", ImmutableMap.of()); Assertions.assertTrue(catalog.asTableCatalog().tableExists(identifier)); - - TableResult result = sql("DROP TABLE %s", tableName); - TestUtils.assertTableResult(result, ResultKind.SUCCESS); + sql("DROP TABLE IF EXISTS %s", tableName); Assertions.assertFalse(catalog.asTableCatalog().tableExists(identifier)); }, - true); + true, + dropCascade()); } @Test @@ -338,7 +340,8 @@ public void testGetSimpleTable() { fail(e); } }, - true); + true, + dropCascade()); } @Test @@ -373,7 +376,8 @@ public void testRenameColumn() { }; assertColumns(expected, actual); }, - true); + true, + dropCascade()); } @Test @@ -423,6 +427,7 @@ public void testAlterTableComment() { .asTableCatalog() .loadTable(NameIdentifier.of(databaseName, tableName)); Assertions.assertEquals(newComment, gravitinoTable.comment()); + } catch (DatabaseNotExistException | TableAlreadyExistException | TableNotExistException e) { @@ -432,7 +437,8 @@ public void testAlterTableComment() { fail("Catalog doesn't exist"); } }, - true); + true, + dropCascade()); } @Test @@ -467,7 +473,8 @@ public void testAlterTableAddColumn() { }; assertColumns(expected, actual); }, - true); + true, + dropCascade()); } @Test @@ -497,7 +504,8 @@ public void testAlterTableDropColumn() { new Column[] {Column.of("order_amount", Types.IntegerType.get(), "ORDER_AMOUNT")}; assertColumns(expected, actual); }, - true); + true, + dropCascade()); } @Test @@ -538,7 +546,8 @@ public void testAlterColumnTypeAndChangeOrder() { }; assertColumns(expected, actual); }, - true); + true, + dropCascade()); } @Test @@ -565,7 +574,8 @@ public void testRenameTable() { Assertions.assertTrue( catalog.asTableCatalog().tableExists(NameIdentifier.of(databaseName, newTableName))); }, - true); + true, + dropCascade()); } @Test @@ -607,6 +617,7 @@ public void testAlterTableProperties() { Assertions.assertEquals("value1", properties.get("key")); Assertions.assertNull(properties.get("key2")); }, - true); + true, + dropCascade()); } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java index 5ae8847c6c1..8d9d1ccc89b 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java @@ -20,19 +20,24 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.function.Consumer; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.ResultKind; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.types.Row; import org.apache.gravitino.Catalog; import org.apache.gravitino.client.GravitinoMetalake; import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils; import org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions; import org.apache.gravitino.integration.test.container.ContainerSuite; import org.apache.gravitino.integration.test.container.HiveContainer; @@ -155,12 +160,21 @@ private static void stopFlinkEnv() { } @FormatMethod - protected TableResult sql(@FormatString String sql, Object... args) { + protected static TableResult sql(@FormatString String sql, Object... args) { return tableEnv.executeSql(String.format(sql, args)); } protected static void doWithSchema( Catalog catalog, String schemaName, Consumer action, boolean dropSchema) { + doWithSchema(catalog, schemaName, action, dropSchema, true); + } + + protected static void doWithSchema( + Catalog catalog, + String schemaName, + Consumer action, + boolean dropSchema, + boolean cascade) { Preconditions.checkNotNull(catalog); Preconditions.checkNotNull(schemaName); try { @@ -175,7 +189,8 @@ protected static void doWithSchema( action.accept(catalog); } finally { if (dropSchema) { - catalog.asSchemas().dropSchema(schemaName, true); + clearTableInSchema(); + catalog.asSchemas().dropSchema(schemaName, cascade); } } } @@ -185,4 +200,15 @@ protected static void doWithCatalog(Catalog catalog, Consumer action) { tableEnv.useCatalog(catalog.name()); action.accept(catalog); } + + /** Iceberg requires deleting the table first, then deleting the schema. */ + protected static void clearTableInSchema() { + TableResult result = sql("SHOW TABLES"); + List rows = Lists.newArrayList(result.collect()); + for (Row row : rows) { + String tableName = row.getField(0).toString(); + TableResult deleteResult = sql("DROP TABLE IF EXISTS %s", tableName); + TestUtils.assertTableResult(deleteResult, ResultKind.SUCCESS); + } + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index 333aa83f0b6..42cf6bb5326 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -72,7 +72,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog hiveCatalog; @BeforeAll - static void hiveStartUp() { + void hiveStartUp() { initDefaultHiveCatalog(); } @@ -82,13 +82,13 @@ static void hiveStop() { metalake.dropCatalog(DEFAULT_HIVE_CATALOG, true); } - protected static void initDefaultHiveCatalog() { + protected void initDefaultHiveCatalog() { Preconditions.checkNotNull(metalake); hiveCatalog = metalake.createCatalog( DEFAULT_HIVE_CATALOG, org.apache.gravitino.Catalog.Type.RELATIONAL, - "hive", + getProvider(), null, ImmutableMap.of("metastore.uris", hiveMetastoreUri)); } @@ -586,4 +586,14 @@ public void testGetHiveTable() { protected org.apache.gravitino.Catalog currentCatalog() { return hiveCatalog; } + + @Override + protected String getProvider() { + return "hive"; + } + + @Override + protected boolean dropCascade() { + return true; + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java new file mode 100644 index 00000000000..74553fa00e5 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.integration.test.iceberg; + +import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns; +import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn; +import static org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ResultKind; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.DefaultCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.types.Row; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalog; +import org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactoryOptions; +import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT; +import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.types.Types; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public abstract class FlinkIcebergCatalogIT extends FlinkCommonIT { + + private static final String DEFAULT_ICEBERG_CATALOG = "flink_iceberg_catalog"; + + private static org.apache.gravitino.Catalog icebergCatalog; + + @BeforeAll + public void before() { + Preconditions.checkNotNull(metalake); + icebergCatalog = + metalake.createCatalog( + DEFAULT_ICEBERG_CATALOG, + org.apache.gravitino.Catalog.Type.RELATIONAL, + getProvider(), + null, + getCatalogConfigs()); + } + + protected abstract Map getCatalogConfigs(); + + @Test + public void testCreateGravitinoIcebergCatalog() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + + // Create a new catalog. + String catalogName = "gravitino_iceberg_catalog"; + Configuration configuration = Configuration.fromMap(getCatalogConfigs()); + configuration.set( + CommonCatalogOptions.CATALOG_TYPE, GravitinoIcebergCatalogFactoryOptions.IDENTIFIER); + + CatalogDescriptor catalogDescriptor = CatalogDescriptor.of(catalogName, configuration); + tableEnv.createCatalog(catalogName, catalogDescriptor); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + // Check the catalog properties. + org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); + Map properties = gravitinoCatalog.properties(); + Assertions.assertEquals(hiveMetastoreUri, properties.get(IcebergConstants.URI)); + + // Get the created catalog. + Optional catalog = tableEnv.getCatalog(catalogName); + Assertions.assertTrue(catalog.isPresent()); + Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, catalog.get()); + + // List catalogs. + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(catalogName), "Should create the correct catalog."); + + Assertions.assertEquals( + DEFAULT_CATALOG, + tableEnv.getCurrentCatalog(), + "Current catalog should be default_catalog in flink"); + + // Change the current catalog to the new created catalog. + tableEnv.useCatalog(catalogName); + Assertions.assertEquals( + catalogName, + tableEnv.getCurrentCatalog(), + "Current catalog should be the one that is created just now."); + + // Drop the catalog. Only support drop catalog by SQL. + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("drop catalog " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + + Optional droppedCatalog = + tableEnv.getCatalog(catalogName); + Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be dropped"); + } + + @Test + public void testCreateGravitinoIcebergUsingSQL() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + + // Create a new catalog. + String catalogName = "gravitino_iceberg_using_sql"; + tableEnv.executeSql( + String.format( + "create catalog %s with (" + + "'type'='%s', " + + "'catalog-backend'='%s'," + + "'uri'='%s'," + + "'warehouse'='%s'" + + ")", + catalogName, + GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, + getCatalogBackend(), + hiveMetastoreUri, + warehouse)); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + // Check the properties of the created catalog. + org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); + Map properties = gravitinoCatalog.properties(); + Assertions.assertEquals(hiveMetastoreUri, properties.get(IcebergConstants.URI)); + + Assertions.assertEquals( + GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, + properties.get(CommonCatalogOptions.CATALOG_TYPE.key())); + + // Get the created catalog. + Optional catalog = tableEnv.getCatalog(catalogName); + Assertions.assertTrue(catalog.isPresent()); + Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, catalog.get()); + + // List catalogs. + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(catalogName), "Should create the correct catalog."); + + // Use SQL to list catalogs. + TableResult result = tableEnv.executeSql("show catalogs"); + Assertions.assertEquals( + numCatalogs + 1, Lists.newArrayList(result.collect()).size(), "Should have 2 catalogs"); + + Assertions.assertEquals( + DEFAULT_CATALOG, + tableEnv.getCurrentCatalog(), + "Current catalog should be default_catalog in flink"); + + // Change the current catalog to the new created catalog. + tableEnv.useCatalog(catalogName); + Assertions.assertEquals( + catalogName, + tableEnv.getCurrentCatalog(), + "Current catalog should be the one that is created just now."); + + // Drop the catalog. Only support using SQL to drop catalog. + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("drop catalog " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + + Optional droppedCatalog = + tableEnv.getCatalog(catalogName); + Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be dropped"); + } + + @Test + public void testGetCatalogFromGravitino() { + // list catalogs. + int numCatalogs = tableEnv.listCatalogs().length; + + // create a new catalog. + String catalogName = "iceberg_catalog_in_gravitino"; + org.apache.gravitino.Catalog gravitinoCatalog = + metalake.createCatalog( + catalogName, + org.apache.gravitino.Catalog.Type.RELATIONAL, + getProvider(), + null, + getCatalogConfigs()); + Assertions.assertNotNull(gravitinoCatalog); + Assertions.assertEquals(catalogName, gravitinoCatalog.name()); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + Assertions.assertEquals( + numCatalogs + 1, tableEnv.listCatalogs().length, "Should create a new catalog"); + + // get the catalog from Gravitino. + Optional flinkIcebergCatalog = + tableEnv.getCatalog(catalogName); + Assertions.assertTrue(flinkIcebergCatalog.isPresent()); + Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, flinkIcebergCatalog.get()); + + // drop the catalog. + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("drop catalog " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + Assertions.assertEquals( + numCatalogs, tableEnv.listCatalogs().length, "The created catalog should be dropped."); + } + + @Test + public void testIcebergTableWithPartition() { + String databaseName = "test_iceberg_table_partition"; + String tableName = "iceberg_table_with_partition"; + String key = "test key"; + String value = "test value"; + + doWithSchema( + icebergCatalog, + databaseName, + catalog -> { + TableResult result = + sql( + "CREATE TABLE %s (" + + " id BIGINT COMMENT 'unique id'," + + " data STRING NOT NULL" + + " ) PARTITIONED BY (data)" + + " WITH (" + + "'%s' = '%s')", + tableName, key, value); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + + Table table = + catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName)); + Assertions.assertNotNull(table); + Assertions.assertEquals(value, table.properties().get(key)); + Column[] columns = + new Column[] { + Column.of("id", Types.LongType.get(), "unique id", true, false, null), + Column.of("data", Types.StringType.get(), null, false, false, null) + }; + assertColumns(columns, table.columns()); + Transform[] partitions = new Transform[] {Transforms.identity("data")}; + Assertions.assertArrayEquals(partitions, table.partitioning()); + + // load flink catalog + try { + org.apache.flink.table.catalog.Catalog flinkCatalog = + tableEnv.getCatalog(currentCatalog().name()).get(); + CatalogBaseTable flinkTable = + flinkCatalog.getTable(ObjectPath.fromString(databaseName + "." + tableName)); + DefaultCatalogTable catalogTable = (DefaultCatalogTable) flinkTable; + Assertions.assertTrue(catalogTable.isPartitioned()); + Assertions.assertArrayEquals( + new String[] {"data"}, catalogTable.getPartitionKeys().toArray()); + } catch (Exception e) { + Assertions.fail("Table should be exist", e); + } + + // write and read. + TestUtils.assertTableResult( + sql("INSERT INTO %s VALUES (1, 'A'), (2, 'B')", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(-1L)); + TestUtils.assertTableResult( + sql("SELECT * FROM %s ORDER BY data", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1, "A"), + Row.of(2, "B")); + }, + true, + dropCascade()); + } + + @Test + public void testCreateIcebergTable() { + String databaseName = "test_create_iceberg_table"; + String tableName = "iceberg_table"; + String comment = "test table comment"; + String key = "test key"; + String value = "test value"; + + doWithSchema( + metalake.loadCatalog(DEFAULT_ICEBERG_CATALOG), + databaseName, + catalog -> { + TableResult result = + sql( + "CREATE TABLE %s (" + + " string_type STRING COMMENT 'string_type', " + + " double_type DOUBLE COMMENT 'double_type'," + + " int_type INT COMMENT 'int_type'," + + " varchar_type VARCHAR COMMENT 'varchar_type'," + + " boolean_type BOOLEAN COMMENT 'boolean_type'," + + " binary_type VARBINARY(10) COMMENT 'binary_type'," + + " decimal_type DECIMAL(10, 2) COMMENT 'decimal_type'," + + " bigint_type BIGINT COMMENT 'bigint_type'," + + " float_type FLOAT COMMENT 'float_type'," + + " date_type DATE COMMENT 'date_type'," + + " timestamp_type TIMESTAMP COMMENT 'timestamp_type'," + + " array_type ARRAY COMMENT 'array_type'," + + " map_type MAP COMMENT 'map_type'," + + " struct_type ROW" + + " ) COMMENT '%s' WITH (" + + "'%s' = '%s')", + tableName, comment, key, value); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + + Table table = + catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName)); + Assertions.assertNotNull(table); + Assertions.assertEquals(comment, table.comment()); + Assertions.assertEquals(value, table.properties().get(key)); + Column[] columns = + new Column[] { + Column.of("string_type", Types.StringType.get(), "string_type", true, false, null), + Column.of("double_type", Types.DoubleType.get(), "double_type"), + Column.of("int_type", Types.IntegerType.get(), "int_type"), + Column.of("varchar_type", Types.StringType.get(), "varchar_type"), + Column.of("boolean_type", Types.BooleanType.get(), "boolean_type"), + Column.of("binary_type", Types.BinaryType.get(), "binary_type"), + Column.of("decimal_type", Types.DecimalType.of(10, 2), "decimal_type"), + Column.of("bigint_type", Types.LongType.get(), "bigint_type"), + Column.of("float_type", Types.FloatType.get(), "float_type"), + Column.of("date_type", Types.DateType.get(), "date_type"), + Column.of( + "timestamp_type", Types.TimestampType.withoutTimeZone(), "timestamp_type"), + Column.of( + "array_type", Types.ListType.of(Types.IntegerType.get(), true), "array_type"), + Column.of( + "map_type", + Types.MapType.of(Types.IntegerType.get(), Types.StringType.get(), true), + "map_type"), + Column.of( + "struct_type", + Types.StructType.of( + Types.StructType.Field.nullableField("k1", Types.IntegerType.get()), + Types.StructType.Field.nullableField("k2", Types.StringType.get())), + null) + }; + assertColumns(columns, table.columns()); + Assertions.assertArrayEquals(EMPTY_TRANSFORM, table.partitioning()); + }, + true, + dropCascade()); + } + + @Test + public void testGetIcebergTable() { + Column[] columns = + new Column[] { + Column.of("string_type", Types.StringType.get(), "string_type", true, false, null), + Column.of("double_type", Types.DoubleType.get(), "double_type"), + Column.of("int_type", Types.IntegerType.get(), "int_type"), + Column.of("varchar_type", Types.StringType.get(), "varchar_type"), + Column.of("boolean_type", Types.BooleanType.get(), "boolean_type"), + Column.of("binary_type", Types.BinaryType.get(), "binary_type"), + Column.of("decimal_type", Types.DecimalType.of(10, 2), "decimal_type"), + Column.of("bigint_type", Types.LongType.get(), "bigint_type"), + Column.of("float_type", Types.FloatType.get(), "float_type"), + Column.of("date_type", Types.DateType.get(), "date_type"), + Column.of("timestamp_type", Types.TimestampType.withoutTimeZone(), "timestamp_type"), + Column.of("array_type", Types.ListType.of(Types.IntegerType.get(), true), "array_type"), + Column.of( + "map_type", + Types.MapType.of(Types.IntegerType.get(), Types.StringType.get(), true), + "map_type"), + Column.of( + "struct_type", + Types.StructType.of( + Types.StructType.Field.nullableField("k1", Types.IntegerType.get()), + Types.StructType.Field.nullableField("k2", Types.StringType.get())), + null) + }; + + String databaseName = "test_get_iceberg_table"; + doWithSchema( + metalake.loadCatalog(DEFAULT_ICEBERG_CATALOG), + databaseName, + catalog -> { + String tableName = "test_desc_table"; + String comment = "comment1"; + catalog + .asTableCatalog() + .createTable( + NameIdentifier.of(databaseName, "test_desc_table"), + columns, + comment, + ImmutableMap.of("k1", "v1")); + + Optional flinkCatalog = + tableEnv.getCatalog(DEFAULT_ICEBERG_CATALOG); + Assertions.assertTrue(flinkCatalog.isPresent()); + try { + CatalogBaseTable table = + flinkCatalog.get().getTable(new ObjectPath(databaseName, tableName)); + Assertions.assertNotNull(table); + Assertions.assertEquals(CatalogBaseTable.TableKind.TABLE, table.getTableKind()); + Assertions.assertEquals(comment, table.getComment()); + + org.apache.flink.table.catalog.Column[] expected = + new org.apache.flink.table.catalog.Column[] { + org.apache.flink.table.catalog.Column.physical("string_type", DataTypes.STRING()) + .withComment("string_type"), + org.apache.flink.table.catalog.Column.physical("double_type", DataTypes.DOUBLE()) + .withComment("double_type"), + org.apache.flink.table.catalog.Column.physical("int_type", DataTypes.INT()) + .withComment("int_type"), + org.apache.flink.table.catalog.Column.physical( + "varchar_type", DataTypes.VARCHAR(Integer.MAX_VALUE)) + .withComment("varchar_type"), + org.apache.flink.table.catalog.Column.physical( + "boolean_type", DataTypes.BOOLEAN()) + .withComment("boolean_type"), + org.apache.flink.table.catalog.Column.physical("binary_type", DataTypes.BYTES()) + .withComment("binary_type"), + org.apache.flink.table.catalog.Column.physical( + "decimal_type", DataTypes.DECIMAL(10, 2)) + .withComment("decimal_type"), + org.apache.flink.table.catalog.Column.physical("bigint_type", DataTypes.BIGINT()) + .withComment("bigint_type"), + org.apache.flink.table.catalog.Column.physical("float_type", DataTypes.FLOAT()) + .withComment("float_type"), + org.apache.flink.table.catalog.Column.physical("date_type", DataTypes.DATE()) + .withComment("date_type"), + org.apache.flink.table.catalog.Column.physical( + "timestamp_type", DataTypes.TIMESTAMP()) + .withComment("timestamp_type"), + org.apache.flink.table.catalog.Column.physical( + "array_type", DataTypes.ARRAY(DataTypes.INT())) + .withComment("array_type"), + org.apache.flink.table.catalog.Column.physical( + "map_type", DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())) + .withComment("map_type"), + org.apache.flink.table.catalog.Column.physical( + "struct_type", + DataTypes.ROW( + DataTypes.FIELD("k1", DataTypes.INT()), + DataTypes.FIELD("k2", DataTypes.STRING()))) + }; + org.apache.flink.table.catalog.Column[] actual = + toFlinkPhysicalColumn(table.getUnresolvedSchema().getColumns()); + Assertions.assertArrayEquals(expected, actual); + + CatalogTable catalogTable = (CatalogTable) table; + Assertions.assertFalse(catalogTable.isPartitioned()); + } catch (TableNotExistException e) { + Assertions.fail(e); + } + }, + true, + dropCascade()); + } + + @Override + protected Catalog currentCatalog() { + return icebergCatalog; + } + + @Override + protected String getProvider() { + return "lakehouse-iceberg"; + } + + @Override + protected boolean dropCascade() { + return false; + } + + protected abstract String getCatalogBackend(); +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java new file mode 100644 index 00000000000..fc21ce2c247 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.integration.test.iceberg; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.gravitino.flink.connector.iceberg.IcebergPropertiesConstants; +import org.junit.jupiter.api.Tag; + +@Tag("gravitino-docker-test") +public class FlinkIcebergHiveCatalogIT extends FlinkIcebergCatalogIT { + + @Override + protected Map getCatalogConfigs() { + Map catalogProperties = Maps.newHashMap(); + catalogProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE); + catalogProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, warehouse); + catalogProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, hiveMetastoreUri); + return catalogProperties; + } + + protected String getCatalogBackend() { + return "hive"; + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java index ba16a9c07b1..02710bcfb35 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java @@ -42,7 +42,8 @@ public static void assertTableResult( Row expectedRow = expected[i]; Row actualRow = actualRows.get(i); Assertions.assertEquals(expectedRow.getKind(), actualRow.getKind()); - Assertions.assertEquals(expectedRow, actualRow); + // Only compare string value. + Assertions.assertEquals(expectedRow.toString(), actualRow.toString()); } } }