Skip to content

Commit

Permalink
Support hdfs trash
Browse files Browse the repository at this point in the history
  • Loading branch information
humengyu2012 committed Jul 10, 2023
1 parent 73cea83 commit aa68f1e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 4 deletions.
11 changes: 11 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,16 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_HDFS_TRASH_ENABLE =
booleanBuilder(Name.UNDERFS_HDFS_TRASH_ENABLE)
.setDefaultValue(false)
.setDescription("Whether to enable the HDFS trash feature, "
+ "it is important to note that after enabling this configuration, "
+ "you need to set the relevant trash configurations (such as 'fs.trash.interval') "
+ "in 'hdfs-site.xml' in order to truly use the HDFS trash functionality.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_IO_THREADS =
intBuilder(Name.UNDERFS_IO_THREADS)
.setDefaultSupplier(() -> Math.max(4, 3 * Runtime.getRuntime().availableProcessors()),
Expand Down Expand Up @@ -7835,6 +7845,7 @@ public static final class Name {
public static final String UNDERFS_HDFS_PREFIXES = "alluxio.underfs.hdfs.prefixes";
public static final String UNDERFS_OZONE_PREFIXES = "alluxio.underfs.ozone.prefixes";
public static final String UNDERFS_HDFS_REMOTE = "alluxio.underfs.hdfs.remote";
public static final String UNDERFS_HDFS_TRASH_ENABLE = "alluxio.underfs.hdfs.trash.enable";
public static final String UNDERFS_IO_THREADS = "alluxio.underfs.io.threads";
public static final String UNDERFS_LOCAL_SKIP_BROKEN_SYMLINKS =
"alluxio.underfs.local.skip.broken.symlinks";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,17 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.Optional;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.security.SecurityUtil;
Expand Down Expand Up @@ -117,6 +120,9 @@ public class HdfsUnderFileSystem extends ConsistentUnderFileSystem
private final LoadingCache<String, FileSystem> mUserFs;
private final HdfsAclProvider mHdfsAclProvider;

private final boolean mTrashEnable;
private final LoadingCache<FileSystem, Optional<Trash>> mFsTrash;

private HdfsActiveSyncProvider mHdfsActiveSyncer;

/**
Expand Down Expand Up @@ -237,6 +243,21 @@ public FileSystem load(String userKey) throws Exception {
}
});

mTrashEnable = alluxio.conf.Configuration.getBoolean(
PropertyKey.UNDERFS_HDFS_TRASH_ENABLE);
LOG.info(PropertyKey.UNDERFS_HDFS_TRASH_ENABLE.getName() + " is set to {}", mTrashEnable);
mFsTrash = CacheBuilder.newBuilder().build(new CacheLoader<FileSystem, Optional<Trash>>() {
@Override
public Optional<Trash> load(FileSystem fs) throws Exception {
Configuration configuration = fs.getConf();
long interval = configuration.getLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY, 0);
if (interval == 0) {
return Optional.empty();
}
return Optional.of(new Trash(fs, configuration));
}
});

// Create the supported HdfsActiveSyncer if possible.
HdfsActiveSyncProvider hdfsActiveSyncProvider = new NoopHdfsActiveSyncProvider();

Expand Down Expand Up @@ -356,12 +377,12 @@ public OutputStream createDirect(String path, CreateOptions options) throws IOEx

@Override
public boolean deleteDirectory(String path, DeleteOptions options) throws IOException {
return isDirectory(path) && delete(path, options.isRecursive());
return isDirectory(path) && delete(path, options.isRecursive(), true);
}

@Override
public boolean deleteFile(String path) throws IOException {
return isFile(path) && delete(path, false);
return isFile(path) && delete(path, false, false);
}

@Override
Expand Down Expand Up @@ -802,15 +823,28 @@ public void stopSync(AlluxioURI ufsUri) {
*
* @param path file or directory path
* @param recursive whether to delete path recursively
* @param isDirectory whether path is a directory
* @return true, if succeed
*/
private boolean delete(String path, boolean recursive) throws IOException {
private boolean delete(String path, boolean recursive, boolean isDirectory) throws IOException {
IOException te = null;
FileSystem hdfs = getFs();
RetryPolicy retryPolicy = new CountingRetry(MAX_TRY);
while (retryPolicy.attempt()) {
try {
return hdfs.delete(new Path(path), recursive);
Path hdfsPath = new Path(path);
if (!mTrashEnable){
return hdfs.delete(hdfsPath, recursive);
}
Optional<Trash> trash = getTrash(hdfs);
if (!trash.isPresent()){
return hdfs.delete(hdfsPath, recursive);
}
// move to trash
if (isDirectory && !recursive) {
return false;
}
return trash.get().moveToTrash(hdfsPath);
} catch (IOException e) {
LOG.warn("Attempt count {} : {}", retryPolicy.getAttemptCount(), e.toString());
te = e;
Expand Down Expand Up @@ -883,4 +917,15 @@ private FileSystem getFs() throws IOException {
throw new IOException("Failed get FileSystem for " + mUri, e.getCause());
}
}

private Optional<Trash> getTrash(FileSystem fs) throws IOException {
if (!mTrashEnable) {
return Optional.empty();
}
try {
return mFsTrash.get(fs);
} catch (ExecutionException e) {
throw new IOException("Failed get Trash for " + fs.getUri(), e.getCause());
}
}
}

0 comments on commit aa68f1e

Please sign in to comment.