Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix](Iceberg)Fix HDFS FileSystem Leak Caused by Frequent Refresh of Iceberg Catalog #45956

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ default <T> T doAs(PrivilegedExceptionAction<T> action) throws IOException {
}

static HadoopAuthenticator getHadoopAuthenticator(AuthenticationConfig config) {
if (config instanceof KerberosAuthenticationConfig) {
return new HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config);
} else {
return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) config);
}
return HadoopAuthenticatorManager.getAuthenticator(config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.security.authentication;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.ConcurrentHashMap;

/**
* HadoopAuthenticatorManager is a centralized manager responsible for managing and reusing instances
* of {@link HadoopAuthenticator}. It ensures that authenticators are created and cached based on
* the given {@link AuthenticationConfig} to minimize redundant authenticator and resource creation.
*
* <p>Key Features:</p>
* <ul>
* <li>Uses a {@link ConcurrentHashMap} to cache authenticators and avoid creating duplicates.</li>
* <li>Supports different authentication mechanisms, such as Kerberos and Simple Authentication.</li>
* <li>Provides a unified entry point to retrieve a {@link HadoopAuthenticator} instance
* based on the given {@link AuthenticationConfig}.</li>
* </ul>
*
* <p>How It Works:</p>
* <ol>
* <li>When a new {@link AuthenticationConfig} is passed to {@code getAuthenticator},
* it checks if an existing authenticator is cached for the configuration.</li>
* <li>If an authenticator exists, it is returned directly.</li>
* <li>If not, a new authenticator is created using {@code createAuthenticator} and cached for future use.</li>
* </ol>
*
* <p>This design helps optimize resource usage, particularly when working with file systems
* that depend on authentication mechanisms (e.g., Hadoop's {@link org.apache.hadoop.security.UserGroupInformation}).
* </p>
*
* <p>Note:</p>
* <ul>
* <li>Ensure that {@link AuthenticationConfig} implementations have properly implemented
* {@code equals} and {@code hashCode} to guarantee correct caching behavior.</li>
* <li>The class is thread-safe, leveraging the thread-safety guarantees of {@link ConcurrentHashMap}.</li>
* </ul>
*
* @see HadoopAuthenticator
* @see AuthenticationConfig
* @see KerberosAuthenticationConfig
* @see SimpleAuthenticationConfig
*/
public class HadoopAuthenticatorManager {
private static final Logger LOG = LogManager.getLogger(HadoopAuthenticatorManager.class);

private static final ConcurrentHashMap<AuthenticationConfig, HadoopAuthenticator> authenticatorMap =
new ConcurrentHashMap<>();

public static HadoopAuthenticator getAuthenticator(AuthenticationConfig config) {
return authenticatorMap.computeIfAbsent(config, HadoopAuthenticatorManager::createAuthenticator);
}

private static HadoopAuthenticator createAuthenticator(AuthenticationConfig config) {
LOG.info("Creating a new authenticator.");
if (config instanceof KerberosAuthenticationConfig) {
return new HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config);
} else if (config instanceof SimpleAuthenticationConfig) {
return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) config);
} else {
throw new IllegalArgumentException("Unsupported AuthenticationConfig type: " + config.getClass().getName());
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;

import java.util.Objects;

@EqualsAndHashCode(callSuper = true)
@Data
public class KerberosAuthenticationConfig extends AuthenticationConfig {
Expand All @@ -34,4 +36,23 @@ public class KerberosAuthenticationConfig extends AuthenticationConfig {
public boolean isValid() {
return StringUtils.isNotEmpty(kerberosPrincipal) && StringUtils.isNotEmpty(kerberosKeytab);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KerberosAuthenticationConfig that = (KerberosAuthenticationConfig) o;
return printDebugLog == that.printDebugLog && Objects.equals(kerberosPrincipal, that.kerberosPrincipal)
&& Objects.equals(kerberosKeytab, that.kerberosKeytab)
&& Objects.equals(conf, that.conf);
}

@Override
public int hashCode() {
return Objects.hash(kerberosPrincipal, kerberosKeytab, conf, printDebugLog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.common.CustomThreadFactory;

import com.google.common.collect.Sets;
import org.apache.hadoop.fs.FileSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -28,12 +27,12 @@
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
* The RemoteFSPhantomManager class is responsible for managing the phantom references
Expand Down Expand Up @@ -62,10 +61,16 @@ public class RemoteFSPhantomManager {
private static final ReferenceQueue<RemoteFileSystem> referenceQueue = new ReferenceQueue<>();

// Map storing the phantom references and their corresponding FileSystem objects
private static final ConcurrentHashMap<PhantomReference<RemoteFileSystem>, FileSystem> referenceMap
= new ConcurrentHashMap<>();

private static final Set<FileSystem> fsSet = Sets.newConcurrentHashSet();
private static final ConcurrentHashMap<PhantomReference<RemoteFileSystem>, FileSystem> referenceMap =
new ConcurrentHashMap<>();
/**
* Map for tracking reference counts of FileSystem instances.
* Key: FileSystem instance, Value: AtomicInteger representing the reference count for the FileSystem.
* This ensures that the FileSystem is only closed when all associated RemoteFileSystem instances are
* garbage collected.
*/
private static final ConcurrentHashMap<FileSystem, AtomicInteger> fileSystemReferenceCounts =
new ConcurrentHashMap<>();

// Flag indicating whether the cleanup thread has been started
private static final AtomicBoolean isStarted = new AtomicBoolean(false);
Expand All @@ -76,18 +81,26 @@ public class RemoteFSPhantomManager {
*
* @param remoteFileSystem the RemoteFileSystem object to be registered
*/
public static void registerPhantomReference(RemoteFileSystem remoteFileSystem) {
public static synchronized void registerPhantomReference(RemoteFileSystem remoteFileSystem) {
if (!isStarted.get()) {
start();
isStarted.set(true);
}
if (fsSet.contains(remoteFileSystem.dfsFileSystem)) {
throw new RuntimeException("FileSystem already exists: " + remoteFileSystem.dfsFileSystem.getUri());
}
FileSystem fileSystem = remoteFileSystem.dfsFileSystem;
fileSystemReferenceCounts.compute(fileSystem, (fs, count) -> {
if (count == null) {
LOG.info("New FileSystem detected: {}", fileSystem.getUri());
return new AtomicInteger(1);
} else {
LOG.info("Incrementing reference count for FileSystem: {}", fileSystem.getUri());
count.incrementAndGet();
return count;
}
});

RemoteFileSystemPhantomReference phantomReference = new RemoteFileSystemPhantomReference(remoteFileSystem,
referenceQueue);
referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem);
fsSet.add(remoteFileSystem.dfsFileSystem);
referenceMap.put(phantomReference, fileSystem);
}

/**
Expand All @@ -108,13 +121,22 @@ public static void start() {

FileSystem fs = referenceMap.remove(phantomRef);
if (fs != null) {
try {
fs.close();
fsSet.remove(fs);
LOG.info("Closed file system: {}", fs.getUri());
} catch (IOException e) {
LOG.warn("Failed to close file system", e);
}
fileSystemReferenceCounts.computeIfPresent(fs, (key, count) -> {
int remaining = count.decrementAndGet();
if (remaining <= 0) {
try {
fs.close();
LOG.info("Closed FileSystem: {}", fs.getUri());
return null; // Remove the FileSystem from the map
} catch (IOException e) {
LOG.warn("Failed to close FileSystem: {}", fs.getUri(), e);
}
} else {
LOG.info("Decrementing reference count for FileSystem: {}, "
+ "remaining: {}", fs.getUri(), remaining);
}
return count;
});
}
}
}, 0, 1, TimeUnit.MINUTES);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.security.authentication;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.junit.Assert;
import org.junit.Test;

public class AuthenticationTest {

@Test
public void testAuthConf() {
Configuration conf = new Configuration();
AuthenticationConfig conf1 = AuthenticationConfig.getKerberosConfig(conf);
Assert.assertEquals(SimpleAuthenticationConfig.class, conf1.getClass());

conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");

AuthenticationConfig conf2 = AuthenticationConfig.getKerberosConfig(conf);
Assert.assertEquals(SimpleAuthenticationConfig.class, conf2.getClass());

conf.set(AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, "principal");
conf.set(AuthenticationConfig.HADOOP_KERBEROS_KEYTAB, "keytab");

AuthenticationConfig conf3 = AuthenticationConfig.getKerberosConfig(conf);
Assert.assertEquals(KerberosAuthenticationConfig.class, conf3.getClass());
}


@Test
public void testAuthConf2() {
Configuration conf1 = new Configuration();
conf1.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
conf1.set(AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, "principal");
conf1.set(AuthenticationConfig.HADOOP_KERBEROS_KEYTAB, "keytab");
AuthenticationConfig authenticationConfig1 = AuthenticationConfig.getKerberosConfig(conf1);
AuthenticationConfig authenticationConfig2 = AuthenticationConfig.getKerberosConfig(conf1);
Assert.assertEquals(authenticationConfig1.equals(authenticationConfig2), true);
Assert.assertEquals(authenticationConfig1, authenticationConfig2);
HadoopAuthenticator authenticator1 = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig1);
HadoopAuthenticator authenticator2 = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig2);
Assert.assertEquals(authenticator1, authenticator2);
Configuration conf2 = new Configuration();
conf2.set(AuthenticationConfig.HADOOP_USER_NAME, "hadoop");
AuthenticationConfig authenticationConfig3 = AuthenticationConfig.getSimpleAuthenticationConfig(conf2);
AuthenticationConfig authenticationConfig4 = AuthenticationConfig.getSimpleAuthenticationConfig(conf2);
Assert.assertEquals(authenticationConfig3.getClass(), authenticationConfig4.getClass());
HadoopAuthenticator authenticator3 = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig3);
HadoopAuthenticator authenticator4 = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig4);
Assert.assertEquals(authenticator3, authenticator4);
}
}
Loading