Skip to content

Commit

Permalink
Add move request to job factory
Browse files Browse the repository at this point in the history
  • Loading branch information
ssyssy committed Jun 29, 2023
1 parent 713f854 commit dc3fd83
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 1 deletion.
9 changes: 9 additions & 0 deletions common/transport/src/main/proto/grpc/file_system_master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,15 @@ message CopyJobPOptions {
optional bool check_content = 6;
}

message MoveJobPOptions {
optional int64 bandwidth = 1;
optional bool verify = 2;
optional bool partialListing = 3;
optional bool overwrite = 4;
optional WritePType writeType = 5;
optional bool check_content = 6;
}

message StopJobPRequest {
required JobDescription jobDescription = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import javax.annotation.concurrent.ThreadSafe;

/**
* The request of loading files.
* The request of copying files.
*/
@ThreadSafe
public class CopyJobRequest implements JobRequest {
Expand Down
81 changes: 81 additions & 0 deletions dora/core/common/src/main/java/alluxio/job/MoveJobRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.job;

import alluxio.grpc.MoveJobPOptions;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;

import javax.annotation.concurrent.ThreadSafe;

/**
* The request of moving files.
*/
@ThreadSafe
public class MoveJobRequest implements JobRequest {
private static final String TYPE = "move";
private static final long serialVersionUID = -8565405317284410500L;
private final String mDst;
private final MoveJobPOptions mOptions;
private final String mSrc;

/**
* @param src the source file path
* @param dst the destination file path
* @param options move job options
**/
public MoveJobRequest(@JsonProperty("src") String src,
@JsonProperty("dst") String dst,
@JsonProperty("copyJobPOptions") MoveJobPOptions options) {
mSrc = Preconditions.checkNotNull(src, "The source path cannot be null");
mDst = Preconditions.checkNotNull(dst, "The destination path cannot be null");
mOptions = Preconditions.checkNotNull(options, "The job options cannot be null");
}

/**
* @return the source file path
*/
public String getSrc() {
return mSrc;
}

/**
* @return the file path
*/
public String getDst() {
return mDst;
}

/**
* @return job options
*/
public MoveJobPOptions getOptions() {
return mOptions;
}

@Override
public String toString() {
return MoreObjects
.toStringHelper(this)
.add("Src", mSrc)
.add("Dst", mDst)
.add("Options", mOptions)
.toString();
}

@Override
public String getType() {
return TYPE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import alluxio.job.CopyJobRequest;
import alluxio.job.JobRequest;
import alluxio.job.LoadJobRequest;
import alluxio.job.MoveJobRequest;
import alluxio.master.file.DefaultFileSystemMaster;
import alluxio.proto.journal.Journal;
import alluxio.scheduler.job.JobFactory;
Expand All @@ -37,6 +38,9 @@ public static JobFactory create(JobRequest request, DefaultFileSystemMaster fsMa
if (request instanceof CopyJobRequest) {
return new CopyJobFactory((CopyJobRequest) request, fsMaster);
}
if (request instanceof MoveJobRequest) {
return new MoveJobFactory((MoveJobRequest) request, fsMaster);
}
throw new IllegalArgumentException("Unknown job type: " + request.getType());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package alluxio.master.job;

import alluxio.AlluxioURI;
import alluxio.conf.Configuration;
import alluxio.grpc.MoveJobPOptions;
import alluxio.job.MoveJobRequest;
import alluxio.master.file.DefaultFileSystemMaster;
import alluxio.scheduler.job.Job;
import alluxio.scheduler.job.JobFactory;
import alluxio.security.User;
import alluxio.security.authentication.AuthenticatedClientUser;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.UnderFileSystemConfiguration;
import alluxio.wire.FileInfo;

import java.util.Optional;
import java.util.OptionalLong;
import java.util.UUID;

/**
* Factory for creating {@link MoveJob}s that get file infos from master.
*/
public class MoveJobFactory implements JobFactory {
private final DefaultFileSystemMaster mFs;
private final MoveJobRequest mRequest;

/**
* Create factory.
* @param request copy job request
* @param fsMaster file system master
*/
public MoveJobFactory(MoveJobRequest request, DefaultFileSystemMaster fsMaster) {
mFs = fsMaster;
mRequest = request;
}

@Override
public Job<?> create() {
MoveJobPOptions options = mRequest.getOptions();
String src = mRequest.getSrc();
OptionalLong bandwidth =
options.hasBandwidth() ? OptionalLong.of(options.getBandwidth()) : OptionalLong.empty();
boolean partialListing = options.hasPartialListing() && options.getPartialListing();
boolean verificationEnabled = options.hasVerify() && options.getVerify();
boolean overwrite = options.hasOverwrite() && options.getOverwrite();
boolean checkContent = options.hasCheckContent() && options.getCheckContent();
UnderFileSystem ufs = mFs.getUfsManager().getOrAdd(new AlluxioURI(src),
UnderFileSystemConfiguration.defaults(Configuration.global()));
Iterable<FileInfo> fileIterator = new UfsFileIterable(ufs, src, Optional
.ofNullable(AuthenticatedClientUser.getOrNull())
.map(User::getName), partialListing, FileInfo::isCompleted);
Optional<String> user = Optional
.ofNullable(AuthenticatedClientUser.getOrNull())
.map(User::getName);
return new MoveJob(src, mRequest.getDst(), overwrite, user, UUID.randomUUID().toString(),
bandwidth, partialListing, verificationEnabled, checkContent, fileIterator,
Optional.empty());
}
}

0 comments on commit dc3fd83

Please sign in to comment.