Skip to content

Commit

Permalink
refactor RemoteFSPhantomManager
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Dec 26, 2024
1 parent 905b374 commit 0039a17
Showing 1 changed file with 41 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.common.CustomThreadFactory;

import com.google.common.collect.Sets;
import org.apache.hadoop.fs.FileSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -28,12 +27,12 @@
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
* The RemoteFSPhantomManager class is responsible for managing the phantom references
Expand Down Expand Up @@ -62,10 +61,16 @@ public class RemoteFSPhantomManager {
private static final ReferenceQueue<RemoteFileSystem> referenceQueue = new ReferenceQueue<>();

// Map storing the phantom references and their corresponding FileSystem objects
private static final ConcurrentHashMap<PhantomReference<RemoteFileSystem>, FileSystem> referenceMap
= new ConcurrentHashMap<>();

private static final Set<FileSystem> fsSet = Sets.newConcurrentHashSet();
private static final ConcurrentHashMap<PhantomReference<RemoteFileSystem>, FileSystem> referenceMap =
new ConcurrentHashMap<>();
/**
* Map for tracking reference counts of FileSystem instances.
* Key: FileSystem instance, Value: AtomicInteger representing the reference count for the FileSystem.
* This ensures that the FileSystem is only closed when all associated RemoteFileSystem instances are
* garbage collected.
*/
private static final ConcurrentHashMap<FileSystem, AtomicInteger> fileSystemReferenceCounts =
new ConcurrentHashMap<>();

// Flag indicating whether the cleanup thread has been started
private static final AtomicBoolean isStarted = new AtomicBoolean(false);
Expand All @@ -76,18 +81,26 @@ public class RemoteFSPhantomManager {
*
* @param remoteFileSystem the RemoteFileSystem object to be registered
*/
public static void registerPhantomReference(RemoteFileSystem remoteFileSystem) {
public static synchronized void registerPhantomReference(RemoteFileSystem remoteFileSystem) {
if (!isStarted.get()) {
start();
isStarted.set(true);
}
if (fsSet.contains(remoteFileSystem.dfsFileSystem)) {
throw new RuntimeException("FileSystem already exists: " + remoteFileSystem.dfsFileSystem.getUri());
}
FileSystem fileSystem = remoteFileSystem.dfsFileSystem;
fileSystemReferenceCounts.compute(fileSystem, (fs, count) -> {
if (count == null) {
LOG.info("New FileSystem detected: {}", fileSystem.getUri());
return new AtomicInteger(1);
} else {
LOG.info("Incrementing reference count for FileSystem: {}", fileSystem.getUri());
count.incrementAndGet();
return count;
}
});

RemoteFileSystemPhantomReference phantomReference = new RemoteFileSystemPhantomReference(remoteFileSystem,
referenceQueue);
referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem);
fsSet.add(remoteFileSystem.dfsFileSystem);
referenceMap.put(phantomReference, fileSystem);
}

/**
Expand All @@ -108,13 +121,22 @@ public static void start() {

FileSystem fs = referenceMap.remove(phantomRef);
if (fs != null) {
try {
fs.close();
fsSet.remove(fs);
LOG.info("Closed file system: {}", fs.getUri());
} catch (IOException e) {
LOG.warn("Failed to close file system", e);
}
fileSystemReferenceCounts.computeIfPresent(fs, (key, count) -> {
int remaining = count.decrementAndGet();
if (remaining <= 0) {
try {
fs.close();
LOG.info("Closed FileSystem: {}", fs.getUri());
return null; // Remove the FileSystem from the map
} catch (IOException e) {
LOG.warn("Failed to close FileSystem: {}", fs.getUri(), e);
}
} else {
LOG.info("Decrementing reference count for FileSystem: {}, "
+ "remaining: {}", fs.getUri(), remaining);
}
return count;
});
}
}
}, 0, 1, TimeUnit.MINUTES);
Expand Down

0 comments on commit 0039a17

Please sign in to comment.