From d6f598cf839eed9b2b702047f6312f7528a935a9 Mon Sep 17 00:00:00 2001 From: yang Date: Mon, 25 Nov 2024 14:06:16 +0800 Subject: [PATCH 1/3] =?UTF-8?q?update:=20=E6=B7=BB=E5=8A=A0doris=E6=94=AF?= =?UTF-8?q?=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../classloader/DatasourceLoadConfig.java | 6 +- .../datasource-all/pom.xml | 5 + .../datasource-doris/pom.xml | 56 +++++ .../plugin/doris/DorisDataSourceChannel.java | 202 ++++++++++++++++++ .../plugin/doris/DorisDataSourceConfig.java | 50 +++++ .../plugin/doris/DorisDataSourceFactory.java | 47 ++++ .../plugin/doris/DorisOptionRule.java | 95 ++++++++ .../src/test/java/DorisConnection.java | 48 +++++ .../seatunnel-datasource-plugins/pom.xml | 1 + .../app/bean/engine/EngineDataType.java | 1 + .../impl/DorisDataSourceConfigSwitcher.java | 111 ++++++++++ .../apache/seatunnel/app/utils/JobUtils.java | 8 + .../connector-datasource-mapper.yaml | 16 ++ 13 files changed, 645 insertions(+), 1 deletion(-) create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/pom.xml create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisDataSourceChannel.java create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisDataSourceConfig.java create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisDataSourceFactory.java create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisOptionRule.java create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/test/java/DorisConnection.java create mode 100644 seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/DorisDataSourceConfigSwitcher.java diff --git a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java index af8399e91..c827da06a 100644 --- a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java +++ b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java @@ -106,6 +106,8 @@ public class DatasourceLoadConfig { classLoaderFactoryName.put( "CONSOLE", "org.apache.seatunnel.datasource.plugin.console.ConsoleDataSourceFactory"); + classLoaderFactoryName.put( + "DORIS", "org.apache.seatunnel.datasource.plugin.doris.DorisDataSourceFactory"); classLoaderJarName.put("JDBC-ORACLE", "datasource-jdbc-oracle-"); classLoaderJarName.put("JDBC-CLICKHOUSE", "datasource-jdbc-clickhouse-"); @@ -130,6 +132,7 @@ public class DatasourceLoadConfig { classLoaderJarName.put("JDBC-HIVE", "datasource-jdbc-hive-"); classLoaderJarName.put("FAKESOURCE", "datasource-fakesource-"); classLoaderJarName.put("CONSOLE", "datasource-console-"); + classLoaderJarName.put("DORIS", "datasource-doris-"); } public static final Set pluginSet = @@ -151,7 +154,8 @@ public class DatasourceLoadConfig { "MongoDB", "JDBC-Db2", "FakeSource", - "Console"); + "Console", + "DORIS"); public static Map datasourceClassLoaders = new HashMap<>(); diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml index d5dee4503..6d980b473 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml @@ -102,6 +102,11 @@ datasource-jdbc-db2 ${project.version} + + org.apache.seatunnel + datasource-doris + ${project.version} + diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/pom.xml new file mode 100644 index 000000000..dbe74e8d2 --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/pom.xml @@ -0,0 +1,56 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-datasource-plugins + ${revision} + + + datasource-doris + + + org.apache.seatunnel + datasource-plugins-api + ${project.version} + provided + + + org.apache.commons + commons-lang3 + + + + com.google.auto.service + auto-service + + + org.apache.seatunnel + seatunnel-api + provided + + + + + mysql + mysql-connector-java + ${mysql-connector.version} + provided + + + diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisDataSourceChannel.java new file mode 100644 index 000000000..a6dbc34f9 --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisDataSourceChannel.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.datasource.plugin.doris; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; +import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; +import org.apache.seatunnel.datasource.plugin.api.model.TableField; +import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils; + +import org.apache.commons.lang3.StringUtils; + +import lombok.NonNull; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class DorisDataSourceChannel implements DataSourceChannel { + + @Override + public OptionRule getDataSourceOptions(@NonNull String pluginName) { + return DorisDataSourceConfig.OPTION_RULE; + } + + @Override + public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) { + return DorisDataSourceConfig.METADATA_RULE; + } + + @Override + public List getTables( + @NonNull String pluginName, + Map requestParams, + String database, + Map options) { + List tableNames = new ArrayList<>(); + String filterName = options.get("filterName"); + String size = options.get("size"); + boolean isSize = StringUtils.isNotEmpty(size); + if (StringUtils.isNotEmpty(filterName) && !filterName.contains("%")) { + filterName = "%" + filterName + "%"; + } else if (StringUtils.equals(filterName, "")) { + filterName = null; + } + try (Connection connection = getConnection(requestParams); + ResultSet resultSet = + connection + .getMetaData() + .getTables(database, null, filterName, new String[] {"TABLE"})) { + while (resultSet.next()) { + String tableName = resultSet.getString("TABLE_NAME"); + if (StringUtils.isNotBlank(tableName)) { + tableNames.add(tableName); + if (isSize && tableNames.size() >= Integer.parseInt(size)) { + break; + } + } + } + return tableNames; + } catch (ClassNotFoundException | SQLException e) { + throw new DataSourcePluginException("get table names failed", e); + } + } + + private Connection getConnection(Map requestParams) + throws SQLException, ClassNotFoundException { + return getConnection(requestParams, null); + } + + private Connection getConnection(Map requestParams, String databaseName) + throws SQLException, ClassNotFoundException { + checkNotNull(requestParams.get(DorisOptionRule.DRIVER.key())); + checkNotNull(requestParams.get(DorisOptionRule.URL.key()), "doris url cannot be null"); + String url = + JdbcUtils.replaceDatabase( + requestParams.get(DorisOptionRule.URL.key()), databaseName); + + Properties info = new java.util.Properties(); + info.put("autoDeserialize", "false"); + info.put("allowLoadLocalInfile", "false"); + info.put("allowLoadLocalInfileInPath", ""); + if (requestParams.containsKey(DorisOptionRule.USERNAME.key())) { + info.put("user", requestParams.get(DorisOptionRule.USERNAME.key())); + info.put("password", requestParams.get(DorisOptionRule.PASSWORD.key())); + } + return DriverManager.getConnection(url, info); + } + + @Override + public List getDatabases( + @NonNull String pluginName, @NonNull Map requestParams) { + List dbNames = new ArrayList<>(); + try (Connection connection = getConnection(requestParams); + PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;"); + ResultSet re = statement.executeQuery()) { + // filter system databases + while (re.next()) { + String dbName = re.getString("database"); + if (StringUtils.isNotBlank(dbName) + && !DorisDataSourceConfig.DORIS_SYSTEM_DATABASES.contains(dbName)) { + dbNames.add(dbName); + } + } + return dbNames; + } catch (SQLException | ClassNotFoundException e) { + throw new DataSourcePluginException("Get databases failed", e); + } + } + + @Override + public boolean checkDataSourceConnectivity( + @NonNull String pluginName, @NonNull Map requestParams) { + try (Connection ignored = getConnection(requestParams)) { + return true; + } catch (Exception e) { + throw new DataSourcePluginException( + "check doris connectivity failed; " + e.getMessage()); + } + } + + @Override + public List getTableFields( + @NonNull String pluginName, + @NonNull Map requestParams, + @NonNull String database, + @NonNull String table) { + List tableFields = new ArrayList<>(); + try (Connection connection = getConnection(requestParams, database)) { + DatabaseMetaData metaData = connection.getMetaData(); + String primaryKey = getPrimaryKey(metaData, database, table); + try (ResultSet resultSet = metaData.getColumns(database, null, table, null)) { + while (resultSet.next()) { + TableField tableField = new TableField(); + String columnName = resultSet.getString("COLUMN_NAME"); + tableField.setPrimaryKey(false); + if (StringUtils.isNotBlank(primaryKey) && primaryKey.equals(columnName)) { + tableField.setPrimaryKey(true); + } + tableField.setName(columnName); + tableField.setType(resultSet.getString("TYPE_NAME")); + tableField.setComment(resultSet.getString("REMARKS")); + Object nullable = resultSet.getObject("IS_NULLABLE"); + tableField.setNullable(Boolean.TRUE.toString().equals(nullable.toString())); + tableFields.add(tableField); + } + } + } catch (ClassNotFoundException | SQLException e) { + throw new DataSourcePluginException("get table fields failed", e); + } + return tableFields; + } + + @Override + public Map> getTableFields( + @NonNull String pluginName, + @NonNull Map requestParams, + @NonNull String database, + @NonNull List tables) { + return tables.parallelStream() + .collect( + Collectors.toMap( + Function.identity(), + table -> + getTableFields( + pluginName, requestParams, database, table))); + } + + private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName) + throws SQLException { + ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName); + while (primaryKeysInfo.next()) { + return primaryKeysInfo.getString("COLUMN_NAME"); + } + return null; + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisDataSourceConfig.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisDataSourceConfig.java new file mode 100644 index 000000000..cfd8c2e11 --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisDataSourceConfig.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.datasource.plugin.doris; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo; +import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum; + +import com.google.common.collect.Sets; + +import java.util.Set; + +public class DorisDataSourceConfig { + + public static final String PLUGIN_NAME = "Doris"; + + public static final DataSourcePluginInfo DORIS_DATASOURCE_PLUGIN_INFO = + DataSourcePluginInfo.builder() + .name(PLUGIN_NAME) + .icon(PLUGIN_NAME) + .version("1.0.0") + .type(DatasourcePluginTypeEnum.DATABASE.getCode()) + .build(); + + public static final OptionRule OPTION_RULE = + OptionRule.builder() + .required(DorisOptionRule.URL, DorisOptionRule.FE_NODES, DorisOptionRule.DRIVER) + .optional(DorisOptionRule.QUERY_PORT) + .required(DorisOptionRule.USERNAME, DorisOptionRule.PASSWORD) + .build(); + + public static final OptionRule METADATA_RULE = OptionRule.builder().build(); + + public static final Set DORIS_SYSTEM_DATABASES = + Sets.newHashSet("__internal_schema", "information_schema", "mysql"); +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisDataSourceFactory.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisDataSourceFactory.java new file mode 100644 index 000000000..e22161224 --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisDataSourceFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.datasource.plugin.doris; + +import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; +import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory; +import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo; + +import com.google.auto.service.AutoService; +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; + +import java.util.Set; + +@Slf4j +@AutoService(DataSourceFactory.class) +public class DorisDataSourceFactory implements DataSourceFactory { + + @Override + public String factoryIdentifier() { + return DorisDataSourceConfig.PLUGIN_NAME; + } + + @Override + public Set supportedDataSources() { + return Sets.newHashSet(DorisDataSourceConfig.DORIS_DATASOURCE_PLUGIN_INFO); + } + + @Override + public DataSourceChannel createChannel() { + return new DorisDataSourceChannel(); + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisOptionRule.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisOptionRule.java new file mode 100644 index 000000000..853934c4e --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/main/java/org/apache/seatunnel/datasource/plugin/doris/DorisOptionRule.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.datasource.plugin.doris; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class DorisOptionRule { + + public static final Option FE_NODES = + Options.key("fenodes") + .stringType() + .noDefaultValue() + .withDescription( + "Doris cluster fenodes address, the format is fe_ip:fe_http_port, ..."); + + public static final Option URL = + Options.key("url") + .stringType() + .noDefaultValue() + .withDescription( + "jdbc url, eg:" + + " jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8"); + + public static final Option USERNAME = + Options.key("username") + .stringType() + .noDefaultValue() + .withDescription("Doris user username"); + + public static final Option PASSWORD = + Options.key("password") + .stringType() + .noDefaultValue() + .withDescription("Doris user password"); + + public static final Option QUERY_PORT = + Options.key("query-port") + .stringType() + .noDefaultValue() + .withDescription("Doris Fenodes query_port"); + + public static final Option DATABASE = + Options.key("database") + .stringType() + .noDefaultValue() + .withDescription( + "The database name of Doris table, use ${database_name} to represent the upstream table name"); + + public static final Option TABLE = + Options.key("table") + .stringType() + .noDefaultValue() + .withDescription( + "The table name of Doris table, use ${table_name} to represent the upstream table name"); + + public static final Option DRIVER = + Options.key("driver") + .enumType(DriverType.class) + .defaultValue(DriverType.MYSQL) + .withDescription("driver"); + + public enum DriverType { + MYSQL("com.mysql.cj.jdbc.Driver"), + ; + private final String driverClassName; + + DriverType(String driverClassName) { + this.driverClassName = driverClassName; + } + + public String getDriverClassName() { + return driverClassName; + } + + @Override + public String toString() { + return driverClassName; + } + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/test/java/DorisConnection.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/test/java/DorisConnection.java new file mode 100644 index 000000000..945644569 --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-doris/src/test/java/DorisConnection.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +@Slf4j +public class DorisConnection { + public static void main(String[] args) { + String user = "root"; + String password = "root"; + String newUrl = + "jdbc:mysql://127.0.0.1:9030/test?useUnicode=true&characterEncoding=utf8&useTimezone=true&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true"; + try { + Connection myCon = DriverManager.getConnection(newUrl, user, password); + Statement stmt = myCon.createStatement(); + ResultSet result = stmt.executeQuery("show databases"); + ResultSetMetaData metaData = result.getMetaData(); + int columnCount = metaData.getColumnCount(); + while (result.next()) { + for (int i = 1; i <= columnCount; i++) { + System.out.println(result.getObject(i)); + } + } + } catch (SQLException e) { + log.error("get JDBC connection exception.", e); + } + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml index 64dda5d2d..409310dcd 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml +++ b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml @@ -48,6 +48,7 @@ datasource-jdbc-db2 datasource-fakesource datasource-console + datasource-doris 3.1.3 diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java index 8d56224ff..d87808d69 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java @@ -59,6 +59,7 @@ public enum DataType { T_LOCAL_DATE("date", LocalTimeType.LOCAL_DATE_TYPE), T_LOCAL_TIME("time", LocalTimeType.LOCAL_TIME_TYPE), T_LOCAL_DATE_TIME("timestamp", LocalTimeType.LOCAL_DATE_TIME_TYPE), + T_DATE_TIME("datetime", LocalTimeType.LOCAL_DATE_TIME_TYPE), T_PRIMITIVE_BYTE_ARRAY("bytes", PrimitiveByteArrayType.INSTANCE), diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/DorisDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/DorisDataSourceConfigSwitcher.java new file mode 100644 index 000000000..b2f7fee1e --- /dev/null +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/DorisDataSourceConfigSwitcher.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.thirdparty.datasource.impl; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.configuration.util.RequiredOption; +import org.apache.seatunnel.app.domain.request.connector.BusinessMode; +import org.apache.seatunnel.app.domain.request.job.DataSourceOption; +import org.apache.seatunnel.app.domain.request.job.SelectTableFields; +import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes; +import org.apache.seatunnel.app.dynamicforms.FormStructure; +import org.apache.seatunnel.app.thirdparty.datasource.AbstractDataSourceConfigSwitcher; +import org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcher; +import org.apache.seatunnel.common.constants.PluginType; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j +@AutoService(DataSourceConfigSwitcher.class) +public class DorisDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher { + private static final String DATABASE = "database"; + + private static final String TABLE = "table"; + + @Override + public String getDataSourceName() { + return "Doris"; + } + + @Override + public FormStructure filterOptionRule( + String connectorName, + OptionRule dataSourceOptionRule, + OptionRule virtualTableOptionRule, + BusinessMode businessMode, + PluginType pluginType, + OptionRule connectorOptionRule, + List addRequiredOptions, + List> addOptionalOptions, + List excludedKeys) { + + excludedKeys.add(DATABASE); + excludedKeys.add(TABLE); + return super.filterOptionRule( + connectorName, + dataSourceOptionRule, + virtualTableOptionRule, + businessMode, + pluginType, + connectorOptionRule, + addRequiredOptions, + addOptionalOptions, + excludedKeys); + } + + @Override + public Config mergeDatasourceConfig( + Config dataSourceInstanceConfig, + VirtualTableDetailRes virtualTableDetail, + DataSourceOption dataSourceOption, + SelectTableFields selectTableFields, + BusinessMode businessMode, + PluginType pluginType, + Config connectorConfig) { + + String databaseName = dataSourceOption.getDatabases().get(0); + + String tableName = dataSourceOption.getTables().get(0); + + // 填充table 和 database + if (pluginType.equals(PluginType.SINK)) { + connectorConfig = + connectorConfig.withValue( + DATABASE, ConfigValueFactory.fromAnyRef(databaseName)); + connectorConfig = + connectorConfig.withValue(TABLE, ConfigValueFactory.fromAnyRef(tableName)); + + } else { + throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType); + } + return super.mergeDatasourceConfig( + dataSourceInstanceConfig, + virtualTableDetail, + dataSourceOption, + selectTableFields, + businessMode, + pluginType, + connectorConfig); + } +} diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java index bf0f36d33..7bc234c1a 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java @@ -22,6 +22,8 @@ import org.apache.seatunnel.server.common.SeatunnelErrorEnum; import org.apache.seatunnel.server.common.SeatunnelException; +import com.google.common.collect.Lists; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -30,6 +32,8 @@ public class JobUtils { + private static final List SKIP_MATCH_KEY = + Lists.newArrayList("TABLE_NAME", "DATABASE_NAME"); // The maximum length of the job execution error message, 4KB private static final int ERROR_MESSAGE_MAX_LENGTH = 4096; private static final Pattern placeholderPattern = @@ -83,6 +87,10 @@ public static String replaceJobConfigPlaceholders( String escapeCharacter = matcher.group(1); String placeholderName = matcher.group(2); + if (SKIP_MATCH_KEY.contains(placeholderName.toUpperCase())) { + continue; + } + if (escapeCharacter != null && !escapeCharacter.isEmpty()) { String withoutEscape = matcher.group().replace("\\\\${", "${").replace("\\${", "${"); diff --git a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml index 392bb2bf8..8c4276692 100644 --- a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml +++ b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml @@ -17,6 +17,10 @@ connector-datasource-mapper: connectorDatasourceMappers: + Doris: + dataSources: + - Doris + Jdbc: dataSources: - JDBC-Mysql @@ -73,6 +77,11 @@ connector-datasource-mapper: - Hive sourceDatasourceFeatures: + Doris: + businessMode: + - DATA_INTEGRATION + sceneMode: + - SINGLE_TABLE JDBC-Mysql: businessMode: - DATA_INTEGRATION @@ -173,6 +182,13 @@ connector-datasource-mapper: - SINGLE_TABLE - MULTIPLE_TABLE sinkDatasourceFeatures: + Doris: + businessMode: + - DATA_INTEGRATION + - DATA_REPLICA + sceneMode: + - SINGLE_TABLE + - MULTIPLE_TABLE JDBC-Mysql: businessMode: - DATA_INTEGRATION From 511a169be17693651c70c109cfd9a76ab5ea16e8 Mon Sep 17 00:00:00 2001 From: yang Date: Mon, 9 Dec 2024 20:10:18 +0800 Subject: [PATCH 2/3] [Fix][datasource-doris] package datasource not have doris plugin --- seatunnel-web-dist/pom.xml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/seatunnel-web-dist/pom.xml b/seatunnel-web-dist/pom.xml index 531e31f0d..98d33e516 100644 --- a/seatunnel-web-dist/pom.xml +++ b/seatunnel-web-dist/pom.xml @@ -203,6 +203,18 @@ + + org.apache.seatunnel + datasource-doris + ${project.version} + provided + + + * + * + + + org.apache.seatunnel datasource-jdbc-mysql From bf816cc17d5b239687a6444322008b7dffe909bd Mon Sep 17 00:00:00 2001 From: yang Date: Fri, 10 Jan 2025 17:15:39 +0800 Subject: [PATCH 3/3] [Fix] As per the 2.3.8 documentation Doris does not support stream --- .../src/main/resources/connector-datasource-mapper.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml index 8c4276692..c532aeb94 100644 --- a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml +++ b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml @@ -185,7 +185,6 @@ connector-datasource-mapper: Doris: businessMode: - DATA_INTEGRATION - - DATA_REPLICA sceneMode: - SINGLE_TABLE - MULTIPLE_TABLE