Skip to content

Commit

Permalink
Use flink 1.14 API
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakub Dutkowski committed Oct 12, 2021
1 parent 315aef0 commit 5fe8bb0
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 104 deletions.
12 changes: 10 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ under the License.

<properties>
<java.version>11</java.version>
<flink.version>1.13.1</flink.version>
<flink.version>1.14.0</flink.version>
<log4j.version>2.12.1</log4j.version>
<scala.binary.version>2.11</scala.binary.version>
<ignite.version>2.10.0</ignite.version>
Expand Down Expand Up @@ -128,7 +128,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -154,6 +154,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<!-- Ignite needs this version, as per https://stackoverflow.com/a/68774254/3132741 -->
<version>1.4.197</version>
<scope>test</scope>
</dependency>

</dependencies>

<distributionManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
Expand Down Expand Up @@ -109,14 +109,11 @@ public DynamicTableSource createDynamicTableSource(Context context) {
// validate all options
helper.validate();

JdbcOptions jdbcOptions = getJdbcOptions(config);
JdbcConnectorOptions jdbcOptions = getJdbcOptions(config);
JdbcDatePartitionReadOptions readOptions = getJdbcReadOptions(config).orElse(null);

// get table schema
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

// table source
return new IgniteDynamicTableSource(jdbcOptions, readOptions, physicalSchema);
return new IgniteDynamicTableSource(jdbcOptions, readOptions, context.getCatalogTable().getResolvedSchema());

}

Expand All @@ -125,9 +122,9 @@ public DynamicTableSink createDynamicTableSink(Context context) {
throw new NotImplementedException("Ignite dynamic sink not implemented yet");
}

private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
private JdbcConnectorOptions getJdbcOptions(ReadableConfig readableConfig) {
final String url = readableConfig.get(URL);
final JdbcOptions.Builder builder = JdbcOptions.builder()
final JdbcConnectorOptions.Builder builder = JdbcConnectorOptions.builder()
.setDriverName(DRIVER_NAME)
.setDBUrl(url)
.setTableName(readableConfig.get(TABLE_NAME))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import java.time.LocalDate;
import java.time.ZoneId;

public class IgniteDynamicTableSource implements ScanTableSource {

private final JdbcOptions options;
private final JdbcConnectorOptions options;
private final JdbcDatePartitionReadOptions readOptions;
private final TableSchema tableSchema;
private final ResolvedSchema tableSchema;

public IgniteDynamicTableSource(JdbcOptions options, JdbcDatePartitionReadOptions readOptions, TableSchema tableSchema) {
public IgniteDynamicTableSource(JdbcConnectorOptions options, JdbcDatePartitionReadOptions readOptions, ResolvedSchema tableSchema) {
this.options = options;
this.readOptions = readOptions;
this.tableSchema = tableSchema;
Expand All @@ -41,11 +41,12 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
final JdbcDialect dialect = options.getDialect();

String query = dialect.getSelectFromStatement(
options.getTableName(), tableSchema.getFieldNames(), new String[0]);
options.getTableName(), tableSchema.getColumnNames().toArray(new String[0]), new String[0]);

final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
DataType rowDataType = tableSchema.toPhysicalRowDataType();
final RowType rowType = (RowType) rowDataType.getLogicalType();

TypeInformation<RowData> typeInformation = runtimeProviderContext.createTypeInformation(tableSchema.toRowDataType());
TypeInformation<RowData> typeInformation = runtimeProviderContext.createTypeInformation(rowDataType);
final JdbcRowDataInputFormat.Builder builder = JdbcRowDataInputFormat.builder()
.setDrivername(options.getDriverName())
.setDBUrl(options.getDbURL())
Expand Down
10 changes: 5 additions & 5 deletions src/test/java/pl/touk/flink/ignite/FlinkConnectorIgniteIT.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package pl.touk.flink.ignite;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.ignite.Ignite;
Expand Down Expand Up @@ -43,11 +46,8 @@ public class FlinkConnectorIgniteIT {

@BeforeAll
static void setUp() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
tableEnv = StreamTableEnvironmentUtil.create(
env, EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
);
tableEnv = StreamTableEnvironmentUtil.create();


ignitePort = PortFinder.getAvailablePort();
File igniteWorkDir = Files.createTempDirectory("igniteSpec").toFile();
Expand Down
85 changes: 5 additions & 80 deletions src/test/java/pl/touk/flink/ignite/StreamTableEnvironmentUtil.java
Original file line number Diff line number Diff line change
@@ -1,98 +1,23 @@
package pl.touk.flink.ignite;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.module.ModuleManager;

import java.lang.reflect.Method;
import java.util.Map;

/*
Workaround for StreamTableEnvironmentImpl.create check :
if (!settings.isStreamingMode()) {
throw new TableException(
"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
}
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Conversion-of-Table-Blink-batch-to-DataStream-tc34080.html#a34090
*/
public class StreamTableEnvironmentUtil {

public static StreamTableEnvironment create(
StreamExecutionEnvironment executionEnvironment,
EnvironmentSettings settings) {

// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

ModuleManager moduleManager = new ModuleManager();

CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(new Configuration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.executionConfig(executionEnvironment.getConfig())
.build();

public static StreamTableEnvironment create() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
TableConfig tableConfig = new TableConfig();
tableConfig.getConfiguration().set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);

Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment);

Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);

return new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
functionCatalog,
tableConfig,
executionEnvironment,
planner,
executor,
settings.isStreamingMode(),
classLoader
);
}

private static Executor lookupExecutor(
Map<String, String> executorProperties,
StreamExecutionEnvironment executionEnvironment) {
try {
ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod = executorFactory.getClass()
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();

return (Executor) createMethod.invoke(
executorFactory,
executorProperties,
executionEnvironment);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
e);
}
return StreamTableEnvironmentImpl.create(env, settings, tableConfig);
}

}

0 comments on commit 5fe8bb0

Please sign in to comment.