Skip to content

Commit

Permalink
branch-3.0: [fix](iceberg)Different catalogs should use different cli…
Browse files Browse the repository at this point in the history
…ent pools apache#46694 (apache#46755)

Cherry-picked from apache#46694

Co-authored-by: wuwenchi <[email protected]>
  • Loading branch information
github-actions[bot] and wuwenchi authored Jan 10, 2025
1 parent 6a189b3 commit c095b83
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public abstract class HiveCompatibleCatalog extends BaseMetastoreCatalog impleme
protected Configuration conf;
protected ClientPool<IMetaStoreClient, TException> clients;
protected FileIO fileIO;
protected String uid;
protected String catalogName;

public void initialize(String name, FileIO fileIO,
ClientPool<IMetaStoreClient, TException> clients) {
this.uid = name;
this.catalogName = name;
this.fileIO = fileIO;
this.clients = clients;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;

import com.aliyun.datalake.metastore.common.DataLakeConfig;

import java.util.Map;

public class IcebergDLFExternalCatalog extends IcebergExternalCatalog {
Expand All @@ -43,8 +41,8 @@ protected void initCatalog() {
dlfCatalog.setConf(getConfiguration());
// initialize catalog
Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
String dlfUid = catalogProperties.get(DataLakeConfig.CATALOG_USER_ID);
dlfCatalog.initialize(dlfUid, catalogProperties);
String catalogName = getName();
dlfCatalog.initialize(catalogName, catalogProperties);
catalog = dlfCatalog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIOProperties;

import java.util.List;
import java.util.Map;

public class IcebergGlueExternalCatalog extends IcebergExternalCatalog {
Expand Down Expand Up @@ -61,9 +60,4 @@ protected void initCatalog() {
glueCatalog.initialize(getName(), catalogProperties);
catalog = glueCatalog;
}

@Override
protected List<String> listDatabaseNames() {
return metadataOps.listDatabaseNames();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void initialize(String name, Map<String, String> properties) {
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
String dbName = tableIdentifier.namespace().level(0);
String tableName = tableIdentifier.name();
return new DLFTableOperations(this.conf, this.clients, this.fileIO, this.uid, dbName, tableName);
return new DLFTableOperations(this.conf, this.clients, this.fileIO, this.catalogName, dbName, tableName);
}

protected FileIO initializeFileIO(Map<String, String> properties, Configuration hadoopConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@

public class DLFCachedClientPool implements ClientPool<IMetaStoreClient, TException> {

private static volatile Cache<String, DLFClientPool> clientPoolCache;
private static final Object clientPoolCacheLock = new Object();
private Cache<String, DLFClientPool> clientPoolCache;
private final Configuration conf;
private final String endpoint;
private final int clientPoolSize;
private final long evictionInterval;

// This cached client pool should belong to the catalog level,
// each catalog has its own pool
public DLFCachedClientPool(Configuration conf, Map<String, String> properties) {
this.conf = conf;
this.endpoint = conf.get("", "");
Expand All @@ -63,16 +64,10 @@ private long getEvictionInterval(Map<String, String> properties) {
}

private void initializeClientPoolCache() {
if (clientPoolCache == null) {
synchronized (clientPoolCacheLock) {
if (clientPoolCache == null) {
clientPoolCache = Caffeine.newBuilder()
.expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
.removalListener((key, value, cause) -> ((DLFClientPool) value).close())
.build();
}
}
}
clientPoolCache = Caffeine.newBuilder()
.expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
.removalListener((key, value, cause) -> ((DLFClientPool) value).close())
.build();
}

protected DLFClientPool clientPool() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.dlf.client;

import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;

public class IcebergDLFExternalCatalogTest {
@Test
public void testDatabaseList() {
HashMap<String, String> props = new HashMap<>();
Configuration conf = new Configuration();

DLFCachedClientPool cachedClientPool1 = new DLFCachedClientPool(conf, props);
DLFCachedClientPool cachedClientPool2 = new DLFCachedClientPool(conf, props);
DLFClientPool dlfClientPool1 = cachedClientPool1.clientPool();
DLFClientPool dlfClientPool2 = cachedClientPool2.clientPool();
// This cache should belong to the catalog level,
// so the object addresses of clients in different pools must be different
Assert.assertNotSame(dlfClientPool1, dlfClientPool2);

}
}

0 comments on commit c095b83

Please sign in to comment.