From 0039a170b56cdc7d083ec3b2109b583afed2b392 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 26 Dec 2024 10:31:59 +0800 Subject: [PATCH] refactor RemoteFSPhantomManager --- .../fs/remote/RemoteFSPhantomManager.java | 60 +++++++++++++------ 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java index c0e48a1346651a..6a636fd2a25ec5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java @@ -19,7 +19,6 @@ import org.apache.doris.common.CustomThreadFactory; -import com.google.common.collect.Sets; import org.apache.hadoop.fs.FileSystem; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -28,12 +27,12 @@ import java.lang.ref.PhantomReference; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** * The RemoteFSPhantomManager class is responsible for managing the phantom references @@ -62,10 +61,16 @@ public class RemoteFSPhantomManager { private static final ReferenceQueue referenceQueue = new ReferenceQueue<>(); // Map storing the phantom references and their corresponding FileSystem objects - private static final ConcurrentHashMap, FileSystem> referenceMap - = new ConcurrentHashMap<>(); - - private static final Set fsSet = Sets.newConcurrentHashSet(); + private static final ConcurrentHashMap, FileSystem> referenceMap = + new ConcurrentHashMap<>(); + /** + * Map for tracking reference counts of FileSystem instances. + * Key: FileSystem instance, Value: AtomicInteger representing the reference count for the FileSystem. + * This ensures that the FileSystem is only closed when all associated RemoteFileSystem instances are + * garbage collected. + */ + private static final ConcurrentHashMap fileSystemReferenceCounts = + new ConcurrentHashMap<>(); // Flag indicating whether the cleanup thread has been started private static final AtomicBoolean isStarted = new AtomicBoolean(false); @@ -76,18 +81,26 @@ public class RemoteFSPhantomManager { * * @param remoteFileSystem the RemoteFileSystem object to be registered */ - public static void registerPhantomReference(RemoteFileSystem remoteFileSystem) { + public static synchronized void registerPhantomReference(RemoteFileSystem remoteFileSystem) { if (!isStarted.get()) { start(); isStarted.set(true); } - if (fsSet.contains(remoteFileSystem.dfsFileSystem)) { - throw new RuntimeException("FileSystem already exists: " + remoteFileSystem.dfsFileSystem.getUri()); - } + FileSystem fileSystem = remoteFileSystem.dfsFileSystem; + fileSystemReferenceCounts.compute(fileSystem, (fs, count) -> { + if (count == null) { + LOG.info("New FileSystem detected: {}", fileSystem.getUri()); + return new AtomicInteger(1); + } else { + LOG.info("Incrementing reference count for FileSystem: {}", fileSystem.getUri()); + count.incrementAndGet(); + return count; + } + }); + RemoteFileSystemPhantomReference phantomReference = new RemoteFileSystemPhantomReference(remoteFileSystem, referenceQueue); - referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem); - fsSet.add(remoteFileSystem.dfsFileSystem); + referenceMap.put(phantomReference, fileSystem); } /** @@ -108,13 +121,22 @@ public static void start() { FileSystem fs = referenceMap.remove(phantomRef); if (fs != null) { - try { - fs.close(); - fsSet.remove(fs); - LOG.info("Closed file system: {}", fs.getUri()); - } catch (IOException e) { - LOG.warn("Failed to close file system", e); - } + fileSystemReferenceCounts.computeIfPresent(fs, (key, count) -> { + int remaining = count.decrementAndGet(); + if (remaining <= 0) { + try { + fs.close(); + LOG.info("Closed FileSystem: {}", fs.getUri()); + return null; // Remove the FileSystem from the map + } catch (IOException e) { + LOG.warn("Failed to close FileSystem: {}", fs.getUri(), e); + } + } else { + LOG.info("Decrementing reference count for FileSystem: {}, " + + "remaining: {}", fs.getUri(), remaining); + } + return count; + }); } } }, 0, 1, TimeUnit.MINUTES);