From aa68f1e04f2323045669ec11695c23da8bcc0039 Mon Sep 17 00:00:00 2001 From: humengyu Date: Mon, 10 Jul 2023 18:46:09 +0800 Subject: [PATCH] Support hdfs trash --- .../main/java/alluxio/conf/PropertyKey.java | 11 ++++ .../underfs/hdfs/HdfsUnderFileSystem.java | 53 +++++++++++++++++-- 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index e52e2c41affd..2ee232b877a6 100755 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -1184,6 +1184,16 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.SERVER) .build(); + public static final PropertyKey UNDERFS_HDFS_TRASH_ENABLE = + booleanBuilder(Name.UNDERFS_HDFS_TRASH_ENABLE) + .setDefaultValue(false) + .setDescription("Whether to enable the HDFS trash feature, " + + "it is important to note that after enabling this configuration, " + + "you need to set the relevant trash configurations (such as 'fs.trash.interval') " + + "in 'hdfs-site.xml' in order to truly use the HDFS trash functionality.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.SERVER) + .build(); public static final PropertyKey UNDERFS_IO_THREADS = intBuilder(Name.UNDERFS_IO_THREADS) .setDefaultSupplier(() -> Math.max(4, 3 * Runtime.getRuntime().availableProcessors()), @@ -7835,6 +7845,7 @@ public static final class Name { public static final String UNDERFS_HDFS_PREFIXES = "alluxio.underfs.hdfs.prefixes"; public static final String UNDERFS_OZONE_PREFIXES = "alluxio.underfs.ozone.prefixes"; public static final String UNDERFS_HDFS_REMOTE = "alluxio.underfs.hdfs.remote"; + public static final String UNDERFS_HDFS_TRASH_ENABLE = "alluxio.underfs.hdfs.trash.enable"; public static final String UNDERFS_IO_THREADS = "alluxio.underfs.io.threads"; public static final String UNDERFS_LOCAL_SKIP_BROKEN_SYMLINKS = "alluxio.underfs.local.skip.broken.symlinks"; diff --git a/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java b/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java index f5c435d2f470..a391f3b73013 100755 --- a/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java +++ b/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java @@ -44,14 +44,17 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import java.util.Optional; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.security.SecurityUtil; @@ -117,6 +120,9 @@ public class HdfsUnderFileSystem extends ConsistentUnderFileSystem private final LoadingCache mUserFs; private final HdfsAclProvider mHdfsAclProvider; + private final boolean mTrashEnable; + private final LoadingCache> mFsTrash; + private HdfsActiveSyncProvider mHdfsActiveSyncer; /** @@ -237,6 +243,21 @@ public FileSystem load(String userKey) throws Exception { } }); + mTrashEnable = alluxio.conf.Configuration.getBoolean( + PropertyKey.UNDERFS_HDFS_TRASH_ENABLE); + LOG.info(PropertyKey.UNDERFS_HDFS_TRASH_ENABLE.getName() + " is set to {}", mTrashEnable); + mFsTrash = CacheBuilder.newBuilder().build(new CacheLoader>() { + @Override + public Optional load(FileSystem fs) throws Exception { + Configuration configuration = fs.getConf(); + long interval = configuration.getLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY, 0); + if (interval == 0) { + return Optional.empty(); + } + return Optional.of(new Trash(fs, configuration)); + } + }); + // Create the supported HdfsActiveSyncer if possible. HdfsActiveSyncProvider hdfsActiveSyncProvider = new NoopHdfsActiveSyncProvider(); @@ -356,12 +377,12 @@ public OutputStream createDirect(String path, CreateOptions options) throws IOEx @Override public boolean deleteDirectory(String path, DeleteOptions options) throws IOException { - return isDirectory(path) && delete(path, options.isRecursive()); + return isDirectory(path) && delete(path, options.isRecursive(), true); } @Override public boolean deleteFile(String path) throws IOException { - return isFile(path) && delete(path, false); + return isFile(path) && delete(path, false, false); } @Override @@ -802,15 +823,28 @@ public void stopSync(AlluxioURI ufsUri) { * * @param path file or directory path * @param recursive whether to delete path recursively + * @param isDirectory whether path is a directory * @return true, if succeed */ - private boolean delete(String path, boolean recursive) throws IOException { + private boolean delete(String path, boolean recursive, boolean isDirectory) throws IOException { IOException te = null; FileSystem hdfs = getFs(); RetryPolicy retryPolicy = new CountingRetry(MAX_TRY); while (retryPolicy.attempt()) { try { - return hdfs.delete(new Path(path), recursive); + Path hdfsPath = new Path(path); + if (!mTrashEnable){ + return hdfs.delete(hdfsPath, recursive); + } + Optional trash = getTrash(hdfs); + if (!trash.isPresent()){ + return hdfs.delete(hdfsPath, recursive); + } + // move to trash + if (isDirectory && !recursive) { + return false; + } + return trash.get().moveToTrash(hdfsPath); } catch (IOException e) { LOG.warn("Attempt count {} : {}", retryPolicy.getAttemptCount(), e.toString()); te = e; @@ -883,4 +917,15 @@ private FileSystem getFs() throws IOException { throw new IOException("Failed get FileSystem for " + mUri, e.getCause()); } } + + private Optional getTrash(FileSystem fs) throws IOException { + if (!mTrashEnable) { + return Optional.empty(); + } + try { + return mFsTrash.get(fs); + } catch (ExecutionException e) { + throw new IOException("Failed get Trash for " + fs.getUri(), e.getCause()); + } + } }