From 98d2e2a9e51a63d7cc18c6d1e95b955cd90030c8 Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Tue, 5 Nov 2024 01:08:14 +0800 Subject: [PATCH] [improvement](jdbc catalog) Optimize JdbcCatalog case mapping stability --- .../datasource/jdbc/JdbcExternalCatalog.java | 89 ++++- .../datasource/jdbc/JdbcExternalTable.java | 29 +- .../jdbc/JdbcIdentifierMapping.java | 45 --- .../datasource/jdbc/client/JdbcClient.java | 47 +-- .../jdbc/client/JdbcGbaseClient.java | 4 +- .../jdbc/client/JdbcMySQLClient.java | 4 +- .../jdbc/client/JdbcOracleClient.java | 4 +- .../datasource/mapping/IdentifierMapping.java | 311 +----------------- .../mapping/JdbcIdentifierMapping.java | 215 ++++++++++++ 9 files changed, 335 insertions(+), 413 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/JdbcIdentifierMapping.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index 80cc0f554f637e9..6eb9f9085f23d70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -31,6 +31,8 @@ import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.datasource.jdbc.client.JdbcClientException; +import org.apache.doris.datasource.mapping.IdentifierMapping; +import org.apache.doris.datasource.mapping.JdbcIdentifierMapping; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest; import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult; @@ -56,6 +58,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.stream.Collectors; @Getter public class JdbcExternalCatalog extends ExternalCatalog { @@ -70,6 +73,7 @@ public class JdbcExternalCatalog extends ExternalCatalog { // Must add "transient" for Gson to ignore this field, // or Gson will throw exception with HikariCP private transient JdbcClient jdbcClient; + private transient IdentifierMapping identifierMapping; public JdbcExternalCatalog(long catalogId, String name, String resource, Map props, String comment) @@ -118,12 +122,17 @@ public void onRefresh(boolean invalidCache) { super.onRefresh(invalidCache); if (jdbcClient != null) { jdbcClient.closeClient(); + jdbcClient = null; } + identifierMapping = null; } @Override public void onRefreshCache(boolean invalidCache) { - onRefresh(invalidCache); + super.onRefreshCache(invalidCache); + if (identifierMapping != null) { + ((JdbcIdentifierMapping) identifierMapping).clear(); + } } @Override @@ -131,7 +140,9 @@ public void onClose() { super.onClose(); if (jdbcClient != null) { jdbcClient.closeClient(); + jdbcClient = null; } + identifierMapping = null; } protected Map processCompatibleProperties(Map props) @@ -231,8 +242,6 @@ protected void initLocalObjectsImpl() { .setDriverUrl(getDriverUrl()) .setDriverClass(getDriverClass()) .setOnlySpecifiedDatabase(getOnlySpecifiedDatabase()) - .setIsLowerCaseMetaNames(getLowerCaseMetaNames()) - .setMetaNamesMapping(getMetaNamesMapping()) .setIncludeDatabaseMap(getIncludeDatabaseMap()) .setExcludeDatabaseMap(getExcludeDatabaseMap()) .setConnectionPoolMinSize(getConnectionPoolMinSize()) @@ -242,22 +251,88 @@ protected void initLocalObjectsImpl() { .setConnectionPoolKeepAlive(isConnectionPoolKeepAlive()); jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig); + identifierMapping = new JdbcIdentifierMapping(this); + } + + @Override + public List listDatabaseNames() { + return jdbcClient.getDatabaseNameList().stream() + .map(identifierMapping::fromRemoteDatabaseName) + .collect(Collectors.toList()); + } + + public void loadRemoteDatabase() { + makeSureInitialized(); + jdbcClient.getDatabaseNameList().stream() + .map(identifierMapping::fromRemoteDatabaseName) + .collect(Collectors.toList()); } - protected List listDatabaseNames() { - return jdbcClient.getDatabaseNameList(); + protected String getRemoteDatabaseName(String dbName) { + return identifierMapping.toRemoteDatabaseName(dbName); } @Override public List listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); - return jdbcClient.getTablesNameList(dbName); + String remoteDbName = getRemoteDatabaseName(dbName); + return jdbcClient.getTablesNameList(remoteDbName).stream() + .map(tblName -> identifierMapping.fromRemoteTableName(remoteDbName, tblName)) + .collect(Collectors.toList()); + } + + public void loadRemoteTable(String remoteDbName) { + makeSureInitialized(); + jdbcClient.getTablesNameList(remoteDbName).stream() + .map(tblName -> identifierMapping.fromRemoteTableName(remoteDbName, tblName)) + .collect(Collectors.toList()); + } + + protected String getRemoteTableName(String dbName, String tblName) { + return identifierMapping.toRemoteTableName(getRemoteDatabaseName(dbName), tblName); } @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { makeSureInitialized(); - return jdbcClient.isTableExist(dbName, tblName); + String remoteDbName = getRemoteDatabaseName(dbName); + String remoteTblName = getRemoteTableName(dbName, tblName); + return jdbcClient.isTableExist(remoteDbName, remoteTblName); + } + + public List listColumns(String dbName, String tblName) { + makeSureInitialized(); + String remoteDbName = getRemoteDatabaseName(dbName); + String remoteTblName = getRemoteTableName(dbName, tblName); + + List remoteColumns = jdbcClient.getColumnsFromJdbc(remoteDbName, remoteTblName); + + List remoteColumnNames = remoteColumns.stream() + .map(Column::getName) + .collect(Collectors.toList()); + + List localColumnNames = remoteColumnNames.stream() + .map(remoteColumnName -> identifierMapping.fromRemoteColumnName(remoteDbName, remoteTblName, + remoteColumnName)) + .collect(Collectors.toList()); + + for (int i = 0; i < remoteColumns.size(); i++) { + remoteColumns.get(i).setName(localColumnNames.get(i)); + } + + return remoteColumns; + } + + public void loadRemoteColumns(String remoteDbName, String remoteTblName) { + makeSureInitialized(); + jdbcClient.getColumnsFromJdbc(remoteDbName, remoteTblName).stream() + .map(column -> identifierMapping.fromRemoteColumnName(remoteDbName, remoteTblName, column.getName())) + .collect(Collectors.toList()); + } + + protected String getRemoteColumnNames(String dbName, String tblName, String localColumnName) { + return identifierMapping.toRemoteColumnName(getRemoteDatabaseName(dbName), getRemoteTableName(dbName, tblName), + localColumnName); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java index 20520d7c542833c..8a41a498327a104 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java @@ -32,6 +32,7 @@ import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.thrift.TTableDescriptor; +import com.google.common.collect.Maps; import org.apache.commons.text.StringSubstitutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -86,21 +87,33 @@ public TTableDescriptor toThrift() { @Override public Optional initSchema() { - return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).getJdbcClient() - .getColumnsFromJdbc(dbName, name))); + return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).listColumns(dbName, name))); } private JdbcTable toJdbcTable() { List schema = getFullSchema(); JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog; - String fullDbName = this.dbName + "." + this.name; - JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, TableType.JDBC_EXTERNAL_TABLE); - jdbcCatalog.configureJdbcTable(jdbcTable, fullDbName); + String fullTableName = this.dbName + "." + this.name; + JdbcTable jdbcTable = new JdbcTable(this.id, fullTableName, schema, TableType.JDBC_EXTERNAL_TABLE); + jdbcCatalog.configureJdbcTable(jdbcTable, fullTableName); // Set remote properties - jdbcTable.setRemoteDatabaseName(jdbcCatalog.getJdbcClient().getRemoteDatabaseName(this.dbName)); - jdbcTable.setRemoteTableName(jdbcCatalog.getJdbcClient().getRemoteTableName(this.dbName, this.name)); - jdbcTable.setRemoteColumnNames(jdbcCatalog.getJdbcClient().getRemoteColumnNames(this.dbName, this.name)); + jdbcTable.setRemoteDatabaseName(jdbcCatalog.getRemoteDatabaseName(this.dbName)); + jdbcTable.setRemoteTableName(jdbcCatalog.getRemoteTableName(this.dbName, this.name)); + Map remoteColumnNames = Maps.newHashMap(); + for (Column column : schema) { + String remoteColumnName = jdbcCatalog.getRemoteColumnNames(this.dbName, this.name, column.getName()); + remoteColumnNames.put(column.getName(), remoteColumnName); + } + if (!remoteColumnNames.isEmpty()) { + jdbcTable.setRemoteColumnNames(remoteColumnNames); + } else { + remoteColumnNames = Maps.newHashMap(); + for (Column column : getFullSchema()) { + remoteColumnNames.put(column.getName(), column.getName()); + } + jdbcTable.setRemoteColumnNames(remoteColumnNames); + } return jdbcTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java deleted file mode 100644 index 20a74724b3e4965..000000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java +++ /dev/null @@ -1,45 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource.jdbc; - -import org.apache.doris.datasource.jdbc.client.JdbcClient; -import org.apache.doris.datasource.mapping.IdentifierMapping; - -public class JdbcIdentifierMapping extends IdentifierMapping { - private final JdbcClient jdbcClient; - - public JdbcIdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping, JdbcClient jdbcClient) { - super(isLowerCaseMetaNames, metaNamesMapping); - this.jdbcClient = jdbcClient; - } - - @Override - protected void loadDatabaseNames() { - jdbcClient.getDatabaseNameList(); - } - - @Override - protected void loadTableNames(String localDbName) { - jdbcClient.getTablesNameList(localDbName); - } - - @Override - protected void loadColumnNames(String localDbName, String localTableName) { - jdbcClient.getColumnsFromJdbc(localDbName, localTableName); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index 54f15f7404efe96..dbadcbabe2e2bf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -23,7 +23,6 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.Util; -import org.apache.doris.datasource.jdbc.JdbcIdentifierMapping; import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema; import com.google.common.collect.ImmutableSet; @@ -62,11 +61,8 @@ public abstract class JdbcClient { protected ClassLoader classLoader = null; protected HikariDataSource dataSource = null; protected boolean isOnlySpecifiedDatabase; - protected boolean isLowerCaseMetaNames; - protected String metaNamesMapping; protected Map includeDatabaseMap; protected Map excludeDatabaseMap; - protected JdbcIdentifierMapping jdbcLowerCaseMetaMatching; public static JdbcClient createJdbcClient(JdbcClientConfig jdbcClientConfig) { String dbType = parseDbType(jdbcClientConfig.getJdbcUrl()); @@ -103,8 +99,6 @@ protected JdbcClient(JdbcClientConfig jdbcClientConfig) { this.catalogName = jdbcClientConfig.getCatalog(); this.jdbcUser = jdbcClientConfig.getUser(); this.isOnlySpecifiedDatabase = Boolean.parseBoolean(jdbcClientConfig.getOnlySpecifiedDatabase()); - this.isLowerCaseMetaNames = Boolean.parseBoolean(jdbcClientConfig.getIsLowerCaseMetaNames()); - this.metaNamesMapping = jdbcClientConfig.getMetaNamesMapping(); this.includeDatabaseMap = Optional.ofNullable(jdbcClientConfig.getIncludeDatabaseMap()).orElse(Collections.emptyMap()); this.excludeDatabaseMap = @@ -113,7 +107,6 @@ protected JdbcClient(JdbcClientConfig jdbcClientConfig) { this.dbType = parseDbType(jdbcUrl); initializeClassLoader(jdbcClientConfig); initializeDataSource(jdbcClientConfig); - this.jdbcLowerCaseMetaMatching = new JdbcIdentifierMapping(isLowerCaseMetaNames, metaNamesMapping, this); } // Initialize DataSource @@ -169,6 +162,7 @@ public static String parseDbType(String jdbcUrl) { public void closeClient() { dataSource.close(); + dataSource = null; } public Connection getConnection() throws JdbcClientException { @@ -307,10 +301,9 @@ public List getDatabaseNameList() { /** * get all tables of one database */ - public List getTablesNameList(String localDbName) { + public List getTablesNameList(String remoteDbName) { List remoteTablesNames = Lists.newArrayList(); String[] tableTypes = getTableTypes(); - String remoteDbName = getRemoteDatabaseName(localDbName); processTable(remoteDbName, null, tableTypes, (rs) -> { try { while (rs.next()) { @@ -320,14 +313,12 @@ public List getTablesNameList(String localDbName) { throw new JdbcClientException("failed to get all tables for remote database: `%s`", e, remoteDbName); } }); - return filterTableNames(remoteDbName, remoteTablesNames); + return remoteTablesNames; } - public boolean isTableExist(String localDbName, String localTableName) { + public boolean isTableExist(String remoteDbName, String remoteTableName) { final boolean[] isExist = {false}; String[] tableTypes = getTableTypes(); - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); processTable(remoteDbName, remoteTableName, tableTypes, (rs) -> { try { if (rs.next()) { @@ -344,12 +335,10 @@ public boolean isTableExist(String localDbName, String localTableName) { /** * get all columns of one table */ - public List getJdbcColumnsInfo(String localDbName, String localTableName) { + public List getJdbcColumnsInfo(String remoteDbName, String remoteTableName) { Connection conn = null; ResultSet rs = null; List tableSchema = Lists.newArrayList(); - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); try { conn = getConnection(); DatabaseMetaData databaseMetaData = conn.getMetaData(); @@ -376,21 +365,7 @@ public List getColumnsFromJdbc(String localDbName, String localTableName field.isAllowNull(), field.getRemarks(), true, -1)); } - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); - return filterColumnName(remoteDbName, remoteTableName, dorisTableSchema); - } - - public String getRemoteDatabaseName(String localDbname) { - return jdbcLowerCaseMetaMatching.getRemoteDatabaseName(localDbname); - } - - public String getRemoteTableName(String localDbName, String localTableName) { - return jdbcLowerCaseMetaMatching.getRemoteTableName(localDbName, localTableName); - } - - public Map getRemoteColumnNames(String localDbName, String localTableName) { - return jdbcLowerCaseMetaMatching.getRemoteColumnNames(localDbName, localTableName); + return dorisTableSchema; } // protected methods, for subclass to override @@ -449,7 +424,7 @@ protected List filterDatabaseNames(List remoteDbNames) { } filteredDatabaseNames.add(databaseName); } - return jdbcLowerCaseMetaMatching.setDatabaseNameMapping(filteredDatabaseNames); + return filteredDatabaseNames; } protected Set getFilterInternalDatabases() { @@ -460,14 +435,6 @@ protected Set getFilterInternalDatabases() { .build(); } - protected List filterTableNames(String remoteDbName, List remoteTableNames) { - return jdbcLowerCaseMetaMatching.setTableNameMapping(remoteDbName, remoteTableNames); - } - - protected List filterColumnName(String remoteDbName, String remoteTableName, List remoteColumns) { - return jdbcLowerCaseMetaMatching.setColumnNameMapping(remoteDbName, remoteTableName, remoteColumns); - } - protected abstract Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema); protected Type createDecimalOrStringType(int precision, int scale) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcGbaseClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcGbaseClient.java index 7ba393e0d0aae63..086a8a5f393614b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcGbaseClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcGbaseClient.java @@ -87,12 +87,10 @@ protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, String c } @Override - public List getJdbcColumnsInfo(String localDbName, String localTableName) { + public List getJdbcColumnsInfo(String remoteDbName, String remoteTableName) { Connection conn = null; ResultSet rs = null; List tableSchema = Lists.newArrayList(); - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); try { conn = getConnection(); DatabaseMetaData databaseMetaData = conn.getMetaData(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java index a8263f1621a3a88..9a0f6011e166e7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java @@ -130,12 +130,10 @@ protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, String c * get all columns of one table */ @Override - public List getJdbcColumnsInfo(String localDbName, String localTableName) { + public List getJdbcColumnsInfo(String remoteDbName, String remoteTableName) { Connection conn = null; ResultSet rs = null; List tableSchema = Lists.newArrayList(); - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); try { conn = getConnection(); DatabaseMetaData databaseMetaData = conn.getMetaData(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java index 9968de79ab3a7de..adffd06c244e54f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java @@ -49,12 +49,10 @@ public String getTestQuery() { } @Override - public List getJdbcColumnsInfo(String localDbName, String localTableName) { + public List getJdbcColumnsInfo(String remoteDbName, String remoteTableName) { Connection conn = null; ResultSet rs = null; List tableSchema = Lists.newArrayList(); - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); try { conn = getConnection(); DatabaseMetaData databaseMetaData = conn.getMetaData(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java index 363ef351152a39d..1c2941ea38bea3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java @@ -17,314 +17,17 @@ package org.apache.doris.datasource.mapping; -import org.apache.doris.catalog.Column; -import org.apache.doris.qe.GlobalVariable; +public interface IdentifierMapping { -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; + String fromRemoteDatabaseName(String remoteDatabaseName); -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; + String fromRemoteTableName(String remoteDatabaseName, String remoteTableName); -public abstract class IdentifierMapping { - private static final Logger LOG = LogManager.getLogger(IdentifierMapping.class); + String fromRemoteColumnName(String remoteDatabaseName, String remoteTableName, String remoteColumnNames); - private final ObjectMapper mapper = new ObjectMapper(); - private final ConcurrentHashMap localDBToRemoteDB = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> localTableToRemoteTable - = new ConcurrentHashMap<>(); - private final ConcurrentHashMap>> - localColumnToRemoteColumn = new ConcurrentHashMap<>(); + String toRemoteDatabaseName(String localDatabaseName); - private final AtomicBoolean dbNamesLoaded = new AtomicBoolean(false); - private final ConcurrentHashMap tableNamesLoadedMap = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> columnNamesLoadedMap - = new ConcurrentHashMap<>(); + String toRemoteTableName(String remoteDatabaseName, String localTableName); - private final boolean isLowerCaseMetaNames; - private final String metaNamesMapping; - - public IdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping) { - this.isLowerCaseMetaNames = isLowerCaseMetaNames; - this.metaNamesMapping = metaNamesMapping; - } - - public List setDatabaseNameMapping(List remoteDatabaseNames) { - JsonNode databasesNode = readAndParseJson(metaNamesMapping, "databases"); - - Map databaseNameMapping = Maps.newTreeMap(); - if (databasesNode.isArray()) { - for (JsonNode node : databasesNode) { - String remoteDatabase = node.path("remoteDatabase").asText(); - String mapping = node.path("mapping").asText(); - databaseNameMapping.put(remoteDatabase, mapping); - } - } - - Map> result = nameListToMapping(remoteDatabaseNames, localDBToRemoteDB, - databaseNameMapping, isLowerCaseMetaNames); - List localDatabaseNames = result.get("localNames"); - List conflictNames = result.get("conflictNames"); - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict database/schema names found when lower_case_meta_names is true: " + conflictNames - + ". Please set lower_case_meta_names to false or" - + " use meta_name_mapping to specify the names."); - } - return localDatabaseNames; - } - - public List setTableNameMapping(String remoteDbName, List remoteTableNames) { - JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables"); - - Map tableNameMapping = Maps.newTreeMap(); - if (tablesNode.isArray()) { - for (JsonNode node : tablesNode) { - String remoteDatabase = node.path("remoteDatabase").asText(); - if (remoteDbName.equals(remoteDatabase)) { - String remoteTable = node.path("remoteTable").asText(); - String mapping = node.path("mapping").asText(); - tableNameMapping.put(remoteTable, mapping); - } - } - } - - localTableToRemoteTable.putIfAbsent(remoteDbName, new ConcurrentHashMap<>()); - - List localTableNames; - List conflictNames; - - if (GlobalVariable.lowerCaseTableNames == 1) { - Map> result = nameListToMapping(remoteTableNames, - localTableToRemoteTable.get(remoteDbName), - tableNameMapping, true); - localTableNames = result.get("localNames"); - conflictNames = result.get("conflictNames"); - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict table names found in remote database/schema: " + remoteDbName - + " when lower_case_table_names is 1: " + conflictNames - + ". Please use meta_name_mapping to specify the names."); - } - } else { - Map> result = nameListToMapping(remoteTableNames, - localTableToRemoteTable.get(remoteDbName), - tableNameMapping, isLowerCaseMetaNames); - localTableNames = result.get("localNames"); - conflictNames = result.get("conflictNames"); - - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict table names found in remote database/schema: " + remoteDbName - + "when lower_case_meta_names is true: " + conflictNames - + ". Please set lower_case_meta_names to false or" - + " use meta_name_mapping to specify the table names."); - } - } - return localTableNames; - } - - public List setColumnNameMapping(String remoteDbName, String remoteTableName, List remoteColumns) { - JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns"); - - Map columnNameMapping = Maps.newTreeMap(); - if (tablesNode.isArray()) { - for (JsonNode node : tablesNode) { - String remoteDatabase = node.path("remoteDatabase").asText(); - String remoteTable = node.path("remoteTable").asText(); - if (remoteDbName.equals(remoteDatabase) && remoteTable.equals(remoteTableName)) { - String remoteColumn = node.path("remoteColumn").asText(); - String mapping = node.path("mapping").asText(); - columnNameMapping.put(remoteColumn, mapping); - } - } - } - localColumnToRemoteColumn.putIfAbsent(remoteDbName, new ConcurrentHashMap<>()); - localColumnToRemoteColumn.get(remoteDbName).putIfAbsent(remoteTableName, new ConcurrentHashMap<>()); - - List localColumnNames; - List conflictNames; - - // Get the name from localColumns and save it to List - List remoteColumnNames = Lists.newArrayList(); - for (Column remoteColumn : remoteColumns) { - remoteColumnNames.add(remoteColumn.getName()); - } - - Map> result = nameListToMapping(remoteColumnNames, - localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName), - columnNameMapping, isLowerCaseMetaNames); - localColumnNames = result.get("localNames"); - conflictNames = result.get("conflictNames"); - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict column names found in remote database/schema: " + remoteDbName - + " in remote table: " + remoteTableName - + " when lower_case_meta_names is true: " + conflictNames - + ". Please set lower_case_meta_names to false or" - + " use meta_name_mapping to specify the column names."); - } - // Replace the name in remoteColumns with localColumnNames - for (int i = 0; i < remoteColumns.size(); i++) { - remoteColumns.get(i).setName(localColumnNames.get(i)); - } - return remoteColumns; - } - - public String getRemoteDatabaseName(String localDbName) { - return getRequiredMapping(localDBToRemoteDB, localDbName, "database", this::loadDatabaseNamesIfNeeded, - localDbName); - } - - public String getRemoteTableName(String localDbName, String localTableName) { - String remoteDbName = getRemoteDatabaseName(localDbName); - Map tableMap = localTableToRemoteTable.computeIfAbsent(remoteDbName, - k -> new ConcurrentHashMap<>()); - return getRequiredMapping(tableMap, localTableName, "table", () -> loadTableNamesIfNeeded(localDbName), - localTableName); - } - - public Map getRemoteColumnNames(String localDbName, String localTableName) { - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); - ConcurrentHashMap> tableColumnMap - = localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k -> new ConcurrentHashMap<>()); - Map columnMap = tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>()); - if (columnMap.isEmpty()) { - LOG.info("Column name mapping missing, loading column names for localDbName: {}, localTableName: {}", - localDbName, localTableName); - loadColumnNamesIfNeeded(localDbName, localTableName); - columnMap = tableColumnMap.get(remoteTableName); - } - if (columnMap.isEmpty()) { - LOG.warn("No remote column found for localTableName: {}. Please refresh this catalog.", localTableName); - throw new RuntimeException( - "No remote column found for localTableName: " + localTableName + ". Please refresh this catalog."); - } - return columnMap; - } - - - private void loadDatabaseNamesIfNeeded() { - if (dbNamesLoaded.compareAndSet(false, true)) { - try { - loadDatabaseNames(); - } catch (Exception e) { - dbNamesLoaded.set(false); // Reset on failure - LOG.warn("Error loading database names", e); - } - } - } - - private void loadTableNamesIfNeeded(String localDbName) { - AtomicBoolean isLoaded = tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false)); - if (isLoaded.compareAndSet(false, true)) { - try { - loadTableNames(localDbName); - } catch (Exception e) { - tableNamesLoadedMap.get(localDbName).set(false); // Reset on failure - LOG.warn("Error loading table names for localDbName: {}", localDbName, e); - } - } - } - - private void loadColumnNamesIfNeeded(String localDbName, String localTableName) { - columnNamesLoadedMap.putIfAbsent(localDbName, new ConcurrentHashMap<>()); - AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName) - .computeIfAbsent(localTableName, k -> new AtomicBoolean(false)); - if (isLoaded.compareAndSet(false, true)) { - try { - loadColumnNames(localDbName, localTableName); - } catch (Exception e) { - columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset on failure - LOG.warn("Error loading column names for localDbName: {}, localTableName: {}", localDbName, - localTableName, e); - } - } - } - - private V getRequiredMapping(Map map, K key, String typeName, Runnable loadIfNeeded, - String entityName) { - if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) { - LOG.info("{} mapping missing, loading for {}: {}", typeName, typeName, entityName); - loadIfNeeded.run(); - } - V value = map.get(key); - if (value == null) { - LOG.warn("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName); - throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName - + ". Please refresh this catalog."); - } - return value; - } - - // Load the database name from the data source. - // In the corresponding getDatabaseNameList(), setDatabaseNameMapping() must be used to update the mapping. - protected abstract void loadDatabaseNames(); - - // Load the table names for the specified database from the data source. - // In the corresponding getTableNameList(), setTableNameMapping() must be used to update the mapping. - protected abstract void loadTableNames(String localDbName); - - // Load the column names for a specified table in a database from the data source. - // In the corresponding getColumnNameList(), setColumnNameMapping() must be used to update the mapping. - protected abstract void loadColumnNames(String localDbName, String localTableName); - - private JsonNode readAndParseJson(String jsonPath, String nodeName) { - JsonNode rootNode; - try { - rootNode = mapper.readTree(jsonPath); - return rootNode.path(nodeName); - } catch (JsonProcessingException e) { - throw new RuntimeException("parse meta_names_mapping property error", e); - } - } - - private Map> nameListToMapping(List remoteNames, - ConcurrentHashMap localNameToRemoteName, - Map nameMapping, boolean isLowerCaseMetaNames) { - List filteredDatabaseNames = Lists.newArrayList(); - Set lowerCaseNames = Sets.newHashSet(); - Map> nameMap = Maps.newHashMap(); - List conflictNames = Lists.newArrayList(); - - for (String name : remoteNames) { - String mappedName = nameMapping.getOrDefault(name, name); - String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() : mappedName; - - // Use computeIfAbsent to ensure atomicity - localNameToRemoteName.computeIfAbsent(localName, k -> name); - - if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) { - if (nameMap.containsKey(localName)) { - nameMap.get(localName).add(mappedName); - } - } else { - nameMap.putIfAbsent(localName, Lists.newArrayList(Collections.singletonList(mappedName))); - } - - filteredDatabaseNames.add(localName); - } - - for (List conflictNameList : nameMap.values()) { - if (conflictNameList.size() > 1) { - conflictNames.addAll(conflictNameList); - } - } - - Map> result = Maps.newConcurrentMap(); - result.put("localNames", filteredDatabaseNames); - result.put("conflictNames", conflictNames); - return result; - } + String toRemoteColumnName(String remoteDatabaseName, String remoteTableName, String localColumnNames); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/JdbcIdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/JdbcIdentifierMapping.java new file mode 100644 index 000000000000000..102d7ffe6167494 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/JdbcIdentifierMapping.java @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mapping; + +import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class JdbcIdentifierMapping implements IdentifierMapping { + private static final Logger LOG = LogManager.getLogger(JdbcIdentifierMapping.class); + + private final ObjectMapper mapper = new ObjectMapper(); + private final ConcurrentHashMap localDBToRemoteDB = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> localTableToRemoteTable + = new ConcurrentHashMap<>(); + private final ConcurrentHashMap>> + localColumnToRemoteColumn = new ConcurrentHashMap<>(); + + private final boolean isLowerCaseMetaNames; + private final String metaNamesMapping; + + private final JdbcExternalCatalog jdbcExternalCatalog; + + public JdbcIdentifierMapping(JdbcExternalCatalog jdbcExternalCatalog) { + this.jdbcExternalCatalog = jdbcExternalCatalog; + this.isLowerCaseMetaNames = Boolean.parseBoolean(jdbcExternalCatalog.getLowerCaseMetaNames()); + this.metaNamesMapping = jdbcExternalCatalog.getMetaNamesMapping(); + } + + public void clear() { + localDBToRemoteDB.clear(); + localTableToRemoteTable.clear(); + localColumnToRemoteColumn.clear(); + } + + private boolean isMappingInvalid() { + return metaNamesMapping == null || metaNamesMapping.isEmpty(); + } + + @Override + public String fromRemoteDatabaseName(String remoteDatabaseName) { + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return remoteDatabaseName; + } + JsonNode databasesNode = readAndParseJson(metaNamesMapping, "databases"); + + Map databaseNameMapping = Maps.newTreeMap(); + if (databasesNode.isArray()) { + for (JsonNode node : databasesNode) { + String remoteDatabase = node.path("remoteDatabase").asText(); + String mapping = node.path("mapping").asText(); + databaseNameMapping.put(remoteDatabase, mapping); + } + } + + return getMappedName(remoteDatabaseName, localDBToRemoteDB, databaseNameMapping); + } + + @Override + public String fromRemoteTableName(String remoteDatabaseName, String remoteTableName) { + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return remoteTableName; + } + JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables"); + + Map tableNameMapping = Maps.newTreeMap(); + if (tablesNode.isArray()) { + for (JsonNode node : tablesNode) { + String remoteDatabase = node.path("remoteDatabase").asText(); + if (remoteDatabaseName.equals(remoteDatabase)) { + String remoteTable = node.path("remoteTable").asText(); + String mapping = node.path("mapping").asText(); + tableNameMapping.put(remoteTable, mapping); + } + } + } + + localTableToRemoteTable.putIfAbsent(remoteDatabaseName, new ConcurrentHashMap<>()); + return getMappedName(remoteTableName, localTableToRemoteTable.get(remoteDatabaseName), tableNameMapping); + } + + @Override + public String fromRemoteColumnName(String remoteDatabaseName, String remoteTableName, String remoteColumnName) { + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return remoteColumnName; + } + + JsonNode columnsNode = readAndParseJson(metaNamesMapping, "columns"); + + Map columnNameMapping = Maps.newTreeMap(); + if (columnsNode.isArray()) { + for (JsonNode node : columnsNode) { + String remoteDatabase = node.path("remoteDatabase").asText(); + String remoteTable = node.path("remoteTable").asText(); + if (remoteDatabaseName.equals(remoteDatabase) && remoteTableName.equals(remoteTable)) { + String remoteColumn = node.path("remoteColumn").asText(); + String mapping = node.path("mapping").asText(); + columnNameMapping.put(remoteColumn, mapping); + } + } + } + + localColumnToRemoteColumn.putIfAbsent(remoteDatabaseName, new ConcurrentHashMap<>()); + localColumnToRemoteColumn.get(remoteDatabaseName).putIfAbsent(remoteTableName, new ConcurrentHashMap<>()); + + return getMappedName(remoteColumnName, localColumnToRemoteColumn.get(remoteDatabaseName).get(remoteTableName), + columnNameMapping); + } + + @Override + public String toRemoteDatabaseName(String localDatabaseName) { + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return localDatabaseName; + } + + String remoteDbName = localDBToRemoteDB.get(localDatabaseName); + if (remoteDbName != null) { + return remoteDbName; + } + + jdbcExternalCatalog.loadRemoteDatabase(); + remoteDbName = localDBToRemoteDB.get(localDatabaseName); + if (remoteDbName != null) { + return remoteDbName; + } + + throw new RuntimeException("No database found for: " + localDatabaseName); + } + + @Override + public String toRemoteTableName(String remoteDatabaseName, String localTableName) { + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return localTableName; + } + + Map tableMap = localTableToRemoteTable.get(remoteDatabaseName); + String remoteTableName = (tableMap != null) ? tableMap.get(localTableName) : null; + if (remoteTableName != null) { + return remoteTableName; + } + + jdbcExternalCatalog.loadRemoteTable(remoteDatabaseName); + tableMap = localTableToRemoteTable.get(remoteDatabaseName); + remoteTableName = (tableMap != null) ? tableMap.get(localTableName) : null; + if (remoteTableName != null) { + return remoteTableName; + } + + throw new RuntimeException( + "No table found for: " + localTableName + " in remote database: " + remoteDatabaseName); + } + + @Override + public String toRemoteColumnName(String remoteDatabaseName, String remoteTableName, String localColumnNames) { + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return localColumnNames; + } + Map columnMap = localColumnToRemoteColumn.get(remoteDatabaseName).get(remoteTableName); + String remoteColumnName = (columnMap != null) ? columnMap.get(localColumnNames) : null; + if (remoteColumnName != null) { + return remoteColumnName; + } + + jdbcExternalCatalog.loadRemoteColumns(remoteDatabaseName, remoteTableName); + columnMap = localColumnToRemoteColumn.get(remoteDatabaseName).get(remoteTableName); + remoteColumnName = (columnMap != null) ? columnMap.get(localColumnNames) : null; + if (remoteColumnName != null) { + return remoteColumnName; + } + + throw new RuntimeException("No column found for: " + localColumnNames + " in remote table: " + remoteTableName + + " in remote database: " + remoteDatabaseName); + } + + private String getMappedName(String name, ConcurrentHashMap localToRemoteMap, + Map nameMapping) { + String mappedName = nameMapping.getOrDefault(name, name); + String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() : mappedName; + localToRemoteMap.putIfAbsent(localName, name); + return localName; + } + + private JsonNode readAndParseJson(String jsonPath, String nodeName) { + JsonNode rootNode; + try { + rootNode = mapper.readTree(jsonPath); + return rootNode.path(nodeName); + } catch (JsonProcessingException e) { + throw new RuntimeException("parse meta_names_mapping property error", e); + } + } +}