Skip to content

Commit

Permalink
[flink] Initialize Flink 1.18 to support Call Procedure (#1992)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Sep 12, 2023
1 parent a51c109 commit c73bac3
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/utitcase-flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Build and Test
timeout-minutes: 80
timeout-minutes: 60
run: |
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
<td><h5>scan.push-down</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>If true, flink will push down projection, filters, limit to the source. The cost is that it is difficult to reuse the source in a job.</td>
<td>If true, flink will push down projection, filters, limit to the source. The cost is that it is difficult to reuse the source in a job. With flink 1.18 or higher version, it is possible to reuse the source even with projection push down.</td>
</tr>
<tr>
<td><h5>scan.remove-normalize</h5></td>
Expand Down
2 changes: 1 addition & 1 deletion paimon-flink/paimon-flink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ under the License.
<name>Paimon : Flink : Common</name>

<properties>
<flink.version>1.17.1</flink.version>
<flink.version>1.18-SNAPSHOT</flink.version>
<frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -62,13 +63,15 @@
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

Expand Down Expand Up @@ -111,6 +114,7 @@ public class FlinkCatalog extends AbstractCatalog {
private final ClassLoader classLoader;

private final Catalog catalog;
private final String name;
private final boolean logStoreAutoRegister;

private final Duration logStoreAutoRegisterTimeout;
Expand All @@ -125,6 +129,7 @@ public FlinkCatalog(
Options options) {
super(name, defaultDatabase);
this.catalog = catalog;
this.name = name;
this.classLoader = classLoader;
this.logStoreAutoRegister = options.get(LOG_SYSTEM_AUTO_REGISTER);
this.logStoreAutoRegisterTimeout = options.get(REGISTER_TIMEOUT);
Expand Down Expand Up @@ -866,4 +871,25 @@ public final void alterPartitionColumnStatistics(
throws CatalogException {
throw new UnsupportedOperationException();
}

/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.17-.
*/
public List<String> listProcedures(String dbName)
throws DatabaseNotExistException, CatalogException {
if (!databaseExists(dbName)) {
throw new DatabaseNotExistException(name, dbName);
}

return ProcedureUtil.listProcedures();
}

/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.17-.
*/
public Procedure getProcedure(ObjectPath procedurePath)
throws ProcedureNotExistException, CatalogException {
return ProcedureUtil.getProcedure(procedurePath.getObjectName())
.orElseThrow(() -> new ProcedureNotExistException(name, procedurePath));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,9 @@ public class FlinkConnectorOptions {
.booleanType()
.defaultValue(true)
.withDescription(
"If true, flink will push down projection, filters, limit to the source. "
+ "The cost is that it is difficult to reuse the source in a job.");
"If true, flink will push down projection, filters, limit to the source. The cost is that it "
+ "is difficult to reuse the source in a job. With flink 1.18 or higher version, it "
+ "is possible to reuse the source even with projection push down.");

public static final ConfigOption<Boolean> SOURCE_CHECKPOINT_ALIGN_ENABLED =
ConfigOptions.key("source.checkpoint-align.enabled")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.flink.table.procedures.Procedure;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/** Utility methods for {@link Procedure}. */
public class ProcedureUtil {

private ProcedureUtil() {}

private static final List<String> SYSTEM_PROCEDURES = new ArrayList<>();
private static final Map<String, Procedure> SYSTEM_PROCEDURES_MAP = new HashMap<>();

public static List<String> listProcedures() {
return Collections.unmodifiableList(SYSTEM_PROCEDURES);
}

public static Optional<Procedure> getProcedure(String procedureName) {
return Optional.ofNullable(SYSTEM_PROCEDURES_MAP.get(procedureName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected List<String> ddl() {
}

@TestTemplate
public void testSourceReuse() {
public void testSourceReuseWithoutScanPushDown() {
sEnv.executeSql("CREATE TEMPORARY TABLE print1 (a STRING) WITH ('connector'='print')");
sEnv.executeSql("CREATE TEMPORARY TABLE print2 (b STRING) WITH ('connector'='print')");

Expand All @@ -86,8 +86,45 @@ public void testSourceReuse() {
assertThat(statementSet.compilePlan().explain()).contains("Reused");

statementSet = sEnv.createStatementSet();
statementSet.addInsertSql(
"INSERT INTO print1 SELECT a FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */ WHERE b = 'Apache'");
statementSet.addInsertSql(
"INSERT INTO print2 SELECT b FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */ WHERE a = 'Paimon'");
assertThat(statementSet.compilePlan().explain()).contains("Reused");

statementSet = sEnv.createStatementSet();
statementSet.addInsertSql(
"INSERT INTO print1 SELECT a FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */ WHERE b = 'Apache' LIMIT 5");
statementSet.addInsertSql(
"INSERT INTO print2 SELECT b FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */ WHERE a = 'Paimon' LIMIT 10");
assertThat(statementSet.compilePlan().explain()).contains("Reused");
}

@TestTemplate
public void testSourceReuseWithScanPushDown() {
// source can be reused with projection applied
sEnv.executeSql("CREATE TEMPORARY TABLE print1 (a STRING) WITH ('connector'='print')");
sEnv.executeSql("CREATE TEMPORARY TABLE print2 (b STRING) WITH ('connector'='print')");

StatementSet statementSet = sEnv.createStatementSet();
statementSet.addInsertSql("INSERT INTO print1 SELECT a FROM T1");
statementSet.addInsertSql("INSERT INTO print2 SELECT b FROM T1");
assertThat(statementSet.compilePlan().explain()).contains("Reused");

// source cannot be reused with filter or limit applied
sEnv.executeSql(
"CREATE TEMPORARY TABLE new_print1 (a STRING, b STRING, c STRING) WITH ('connector'='print')");
sEnv.executeSql(
"CREATE TEMPORARY TABLE new_print2 (a STRING, b STRING, c STRING) WITH ('connector'='print')");

statementSet = sEnv.createStatementSet();
statementSet.addInsertSql("INSERT INTO new_print1 SELECT * FROM T1 WHERE a = 'Apache'");
statementSet.addInsertSql("INSERT INTO new_print2 SELECT * FROM T1");
assertThat(statementSet.compilePlan().explain()).doesNotContain("Reused");

statementSet = sEnv.createStatementSet();
statementSet.addInsertSql("INSERT INTO new_print1 SELECT * FROM T1 LIMIT 5");
statementSet.addInsertSql("INSERT INTO new_print2 SELECT * FROM T1");
assertThat(statementSet.compilePlan().explain()).doesNotContain("Reused");
}

Expand Down

0 comments on commit c73bac3

Please sign in to comment.