Skip to content

Commit

Permalink
Move timeout to read task
Browse files Browse the repository at this point in the history
  • Loading branch information
Haoning-Sun committed Jun 9, 2024
1 parent 8f48a25 commit a6e33cb
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import alluxio.conf.PropertyKey;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.exception.runtime.BlockDoesNotExistRuntimeException;
import alluxio.exception.runtime.DeadlineExceededRuntimeException;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.NotFoundException;
import alluxio.exception.status.UnavailableException;
Expand Down Expand Up @@ -57,7 +56,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -339,8 +337,6 @@ public CompletableFuture<List<BlockStatus>> load(List<Block> blocks, UfsReadOpti
() -> manager.read(buf, block.getOffsetInFile(), blockSize, blockId,
block.getUfsPath(), options),
new ExponentialBackoffRetry(1000, 5000, 5))
// use orTimeout in java 11
.applyToEither(timeoutAfter(LOAD_TIMEOUT, TimeUnit.MILLISECONDS), d -> d)
.thenRunAsync(() -> {
buf.flip();
blockWriter.append(buf);
Expand Down Expand Up @@ -384,13 +380,6 @@ private void handleException(Throwable e, Block block, List<BlockStatus> errors,
}
}

private <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<>();
mDelayer.schedule(() -> result.completeExceptionally(new DeadlineExceededRuntimeException(
format("time out after waiting for %s %s", timeout, unit))), timeout, unit);
return result;
}

@Override
public void close() throws IOException {
mLocalBlockStore.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.exception.runtime.DeadlineExceededRuntimeException;
import alluxio.exception.runtime.OutOfRangeRuntimeException;
import alluxio.exception.runtime.ResourceExhaustedRuntimeException;
import alluxio.grpc.UfsReadOptions;
Expand All @@ -37,19 +38,26 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Control UFS IO.
*/
public class UfsIOManager implements Closeable {
private static final int READ_CAPACITY = 1024;
private static final long READ_TIMEOUT =
Configuration.getMs(PropertyKey.USER_NETWORK_RPC_KEEPALIVE_TIMEOUT);
private final UfsManager.UfsClient mUfsClient;
private final ConcurrentMap<String, Long> mThroughputQuota = new ConcurrentHashMap<>();
private final UfsInputStreamCache mUfsInstreamCache = new UfsInputStreamCache();
Expand All @@ -61,6 +69,8 @@ public class UfsIOManager implements Closeable {
ThreadFactoryUtils.build("UfsIOManager-IO-%d", false));
private final ExecutorService mScheduleExecutor = Executors
.newSingleThreadExecutor(ThreadFactoryUtils.build("UfsIOManager-Scheduler-%d", true));
private final ScheduledExecutorService mDelayer = new ScheduledThreadPoolExecutor(1,
ThreadFactoryUtils.build("UfsIOManager-Read-TimeOut", true));

/**
* @param ufsClient ufs client
Expand All @@ -83,6 +93,7 @@ public void start() {
public void close() {
mScheduleExecutor.shutdownNow();
mUfsIoExecutor.shutdownNow();
mDelayer.shutdownNow();
}

/**
Expand All @@ -99,8 +110,11 @@ private void schedule() {
while (!Thread.currentThread().isInterrupted()) {
try {
ReadTask task = mReadQueue.take();
if (task.isCancelled()) {
continue;
}
if (mThroughputQuota.containsKey(task.mOptions.getTag())
&& mThroughputQuota.get(task.mOptions.getTag()) < getUsedThroughput(task.mMeter)) {
&& mThroughputQuota.get(task.mOptions.getTag()) < getUsedThroughput(task.mMeter)) {
// resubmit to queue
mReadQueue.put(task);
} else {
Expand Down Expand Up @@ -164,8 +178,16 @@ public CompletableFuture<Integer> read(ByteBuffer buf, long offset, long len, lo
MetricsSystem.escape(mUfsClient.getUfsMountPointUri()), MetricInfo.TAG_USER,
options.getTag()));

mReadQueue.add(new ReadTask(buf, ufsPath, IdUtils.fileIdFromBlockId(blockId), offset,
len, options, future, meter));
ReadTask task = new ReadTask(buf, ufsPath, IdUtils.fileIdFromBlockId(blockId), offset,
len, options, future, meter);
mReadQueue.add(task);

ScheduledFuture<?> timeoutFuture = mDelayer.schedule(() -> {
task.cancel();
future.completeExceptionally(new DeadlineExceededRuntimeException(
String.format("time out after waiting for %s %s", READ_TIMEOUT, TimeUnit.MILLISECONDS)));
}, READ_TIMEOUT, TimeUnit.MILLISECONDS);
future.whenComplete((result, ex) -> timeoutFuture.cancel(false));
return future;
}

Expand All @@ -177,7 +199,8 @@ private class ReadTask implements Runnable {
private final UfsReadOptions mOptions;
private final Meter mMeter;
private final long mFileId;
private final ByteBuffer mBuffuer;
private final ByteBuffer mBuffer;
private volatile boolean mCancelled = false;

private ReadTask(ByteBuffer buf, String ufsPath, long fileId, long offset, long length,
UfsReadOptions options, CompletableFuture<Integer> future, Meter meter) {
Expand All @@ -188,18 +211,23 @@ private ReadTask(ByteBuffer buf, String ufsPath, long fileId, long offset, long
mLength = length;
mFuture = future;
mMeter = meter;
mBuffuer = buf;
mBuffer = buf;
}

public void run() {
try {
mFuture.complete(readInternal());
if (!mCancelled) {
mFuture.complete(readInternal());
}
} catch (RuntimeException e) {
mFuture.completeExceptionally(e);
}
}

private int readInternal() {
if (mCancelled) {
throw new CancellationException("Read task was cancelled");
}
int bytesRead = 0;
InputStream inStream = null;
try (CloseableResource<UnderFileSystem> ufsResource = mUfsClient.acquireUfsResource()) {
Expand All @@ -211,8 +239,13 @@ private int readInternal() {
OpenOptions.defaults().setOffset(mOffset)
.setPositionShort(mOptions.getPositionShort()));
while (bytesRead < mLength) {
if (mCancelled) {
throw new CancellationException("Read task was cancelled");
}
int read;
read = Channels.newChannel(inStream).read(mBuffuer);
synchronized (mBuffer) {
read = Channels.newChannel(inStream).read(mBuffer);
}
if (read == -1) {
break;
}
Expand All @@ -228,5 +261,15 @@ private int readInternal() {
mMeter.mark(bytesRead);
return bytesRead;
}

public boolean isCancelled() {
return mCancelled;
}

public void cancel() {
synchronized (mBuffer) {
mCancelled = true;
}
}
}
}

0 comments on commit a6e33cb

Please sign in to comment.