diff --git a/common/transport/src/main/proto/grpc/file_system_master.proto b/common/transport/src/main/proto/grpc/file_system_master.proto index 5c6644009c7a..96cea1f7394e 100644 --- a/common/transport/src/main/proto/grpc/file_system_master.proto +++ b/common/transport/src/main/proto/grpc/file_system_master.proto @@ -643,6 +643,15 @@ message CopyJobPOptions { optional bool check_content = 6; } +message MoveJobPOptions { + optional int64 bandwidth = 1; + optional bool verify = 2; + optional bool partialListing = 3; + optional bool overwrite = 4; + optional WritePType writeType = 5; + optional bool check_content = 6; +} + message StopJobPRequest { required JobDescription jobDescription = 1; } diff --git a/dora/core/common/src/main/java/alluxio/job/CopyJobRequest.java b/dora/core/common/src/main/java/alluxio/job/CopyJobRequest.java index b9fba0652671..6392fcc25c05 100644 --- a/dora/core/common/src/main/java/alluxio/job/CopyJobRequest.java +++ b/dora/core/common/src/main/java/alluxio/job/CopyJobRequest.java @@ -20,7 +20,7 @@ import javax.annotation.concurrent.ThreadSafe; /** - * The request of loading files. + * The request of copying files. */ @ThreadSafe public class CopyJobRequest implements JobRequest { diff --git a/dora/core/common/src/main/java/alluxio/job/MoveJobRequest.java b/dora/core/common/src/main/java/alluxio/job/MoveJobRequest.java new file mode 100644 index 000000000000..54f7e626f2be --- /dev/null +++ b/dora/core/common/src/main/java/alluxio/job/MoveJobRequest.java @@ -0,0 +1,81 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.job; + +import alluxio.grpc.MoveJobPOptions; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * The request of moving files. + */ +@ThreadSafe +public class MoveJobRequest implements JobRequest { + private static final String TYPE = "move"; + private static final long serialVersionUID = -8565405317284410500L; + private final String mDst; + private final MoveJobPOptions mOptions; + private final String mSrc; + + /** + * @param src the source file path + * @param dst the destination file path + * @param options move job options + **/ + public MoveJobRequest(@JsonProperty("src") String src, + @JsonProperty("dst") String dst, + @JsonProperty("copyJobPOptions") MoveJobPOptions options) { + mSrc = Preconditions.checkNotNull(src, "The source path cannot be null"); + mDst = Preconditions.checkNotNull(dst, "The destination path cannot be null"); + mOptions = Preconditions.checkNotNull(options, "The job options cannot be null"); + } + + /** + * @return the source file path + */ + public String getSrc() { + return mSrc; + } + + /** + * @return the file path + */ + public String getDst() { + return mDst; + } + + /** + * @return job options + */ + public MoveJobPOptions getOptions() { + return mOptions; + } + + @Override + public String toString() { + return MoreObjects + .toStringHelper(this) + .add("Src", mSrc) + .add("Dst", mDst) + .add("Options", mOptions) + .toString(); + } + + @Override + public String getType() { + return TYPE; + } +} diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/JobFactoryProducer.java b/dora/core/server/master/src/main/java/alluxio/master/job/JobFactoryProducer.java index f46eab706c87..f31bce13eb56 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/JobFactoryProducer.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/JobFactoryProducer.java @@ -14,6 +14,7 @@ import alluxio.job.CopyJobRequest; import alluxio.job.JobRequest; import alluxio.job.LoadJobRequest; +import alluxio.job.MoveJobRequest; import alluxio.master.file.DefaultFileSystemMaster; import alluxio.proto.journal.Journal; import alluxio.scheduler.job.JobFactory; @@ -37,6 +38,9 @@ public static JobFactory create(JobRequest request, DefaultFileSystemMaster fsMa if (request instanceof CopyJobRequest) { return new CopyJobFactory((CopyJobRequest) request, fsMaster); } + if (request instanceof MoveJobRequest) { + return new MoveJobFactory((MoveJobRequest) request, fsMaster); + } throw new IllegalArgumentException("Unknown job type: " + request.getType()); } diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/MoveJobFactory.java b/dora/core/server/master/src/main/java/alluxio/master/job/MoveJobFactory.java new file mode 100644 index 000000000000..bc9e364804c1 --- /dev/null +++ b/dora/core/server/master/src/main/java/alluxio/master/job/MoveJobFactory.java @@ -0,0 +1,59 @@ +package alluxio.master.job; + +import alluxio.AlluxioURI; +import alluxio.conf.Configuration; +import alluxio.grpc.MoveJobPOptions; +import alluxio.job.MoveJobRequest; +import alluxio.master.file.DefaultFileSystemMaster; +import alluxio.scheduler.job.Job; +import alluxio.scheduler.job.JobFactory; +import alluxio.security.User; +import alluxio.security.authentication.AuthenticatedClientUser; +import alluxio.underfs.UnderFileSystem; +import alluxio.underfs.UnderFileSystemConfiguration; +import alluxio.wire.FileInfo; + +import java.util.Optional; +import java.util.OptionalLong; +import java.util.UUID; + +/** + * Factory for creating {@link MoveJob}s that get file infos from master. + */ +public class MoveJobFactory implements JobFactory { + private final DefaultFileSystemMaster mFs; + private final MoveJobRequest mRequest; + + /** + * Create factory. + * @param request copy job request + * @param fsMaster file system master + */ + public MoveJobFactory(MoveJobRequest request, DefaultFileSystemMaster fsMaster) { + mFs = fsMaster; + mRequest = request; + } + + @Override + public Job create() { + MoveJobPOptions options = mRequest.getOptions(); + String src = mRequest.getSrc(); + OptionalLong bandwidth = + options.hasBandwidth() ? OptionalLong.of(options.getBandwidth()) : OptionalLong.empty(); + boolean partialListing = options.hasPartialListing() && options.getPartialListing(); + boolean verificationEnabled = options.hasVerify() && options.getVerify(); + boolean overwrite = options.hasOverwrite() && options.getOverwrite(); + boolean checkContent = options.hasCheckContent() && options.getCheckContent(); + UnderFileSystem ufs = mFs.getUfsManager().getOrAdd(new AlluxioURI(src), + UnderFileSystemConfiguration.defaults(Configuration.global())); + Iterable fileIterator = new UfsFileIterable(ufs, src, Optional + .ofNullable(AuthenticatedClientUser.getOrNull()) + .map(User::getName), partialListing, FileInfo::isCompleted); + Optional user = Optional + .ofNullable(AuthenticatedClientUser.getOrNull()) + .map(User::getName); + return new MoveJob(src, mRequest.getDst(), overwrite, user, UUID.randomUUID().toString(), + bandwidth, partialListing, verificationEnabled, checkContent, fileIterator, + Optional.empty()); + } +}