Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ucx support (prototype) #18631

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions common/transport/src/main/proto/proto/dataserver/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,27 @@ enum RequestType {
UFS_FILE = 1;
}

// The rma read request.
// next available id: 10
message ReadRequestRMA {
optional int64 block_id = 1;
optional int64 offset = 2;
optional int64 length = 3;
// If set, this request is to cancel the reading request for the id.
optional bool cancel = 4;
// Whether the block should be promoted before reading
optional bool promote = 7;

// If set, the server should send packets in the specified packet size.
optional int64 chunk_size = 5;

// This is only set for UFS block read.
optional OpenUfsBlockOptions open_ufs_block_options = 6;

required int64 remote_mem_addr = 8;
required bytes rkey_buf = 9;
}

// The read request.
// next available id: 8
message ReadRequest {
Expand Down Expand Up @@ -97,6 +118,11 @@ message ReadResponse {
optional Type type = 1;
}

message ReadResponseRMA {
required int64 read_length = 1;
optional string error_msg = 2;
}

// A heartbeat
message Heartbeat {
// Empty message
Expand Down
17 changes: 17 additions & 0 deletions conf/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,23 @@ log4j.appender.FUSE_LOGGER.MaxBackupIndex=10
log4j.appender.FUSE_LOGGER.layout=org.apache.log4j.PatternLayout
log4j.appender.FUSE_LOGGER.layout.ConversionPattern=%d{ISO8601} %-5p [%t](%F:%L) - %m%n

#ucpserver
log4j.appender.UCPSERVER_LOGGER=org.apache.log4j.RollingFileAppender
log4j.appender.UCPSERVER_LOGGER.File=${alluxio.logs.dir}/ucpserver.log
log4j.appender.UCPSERVER_LOGGER.MaxFileSize=10MB
log4j.appender.UCPSERVER_LOGGER.MaxBackupIndex=100
log4j.appender.UCPSERVER_LOGGER.layout=org.apache.log4j.PatternLayout
log4j.appender.UCPSERVER_LOGGER.layout.ConversionPattern=%d{ISO8601} %-5p [%t](%F:%L) - %m%n

#ucpclienttest
log4j.appender.UCPCLIENTTEST_LOGGER=org.apache.log4j.RollingFileAppender
log4j.appender.UCPCLIENTTEST_LOGGER.File=${alluxio.logs.dir}/ucpclienttest.log
log4j.appender.UCPCLIENTTEST_LOGGER.MaxFileSize=10MB
log4j.appender.UCPCLIENTTEST_LOGGER.MaxBackupIndex=100
log4j.appender.UCPCLIENTTEST_LOGGER.layout=org.apache.log4j.PatternLayout
log4j.appender.UCPCLIENTTEST_LOGGER.layout.ConversionPattern=%d{ISO8601} %-5p [%t](%F:%L) - %m%n


# Disable noisy DEBUG logs
log4j.logger.com.amazonaws.util.EC2MetadataUtils=OFF
log4j.logger.io.grpc.netty.NettyServerHandler=OFF
Expand Down
3 changes: 3 additions & 0 deletions conf/start-ucp-server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=60003 -cp /root/github/alluxio/conf/::/root/github/alluxio/assembly/server/target/alluxio-assembly-server-305-SNAPSHOT-jar-with-dependencies.jar -Dalluxio.logger.type=UCPSERVER_LOGGER -Dalluxio.home=/root/github/alluxio -Dalluxio.conf.dir=/root/github/alluxio/conf -Dalluxio.logs.dir=/root/github/alluxio/logs -Dalluxio.user.logs.dir=/root/github/alluxio/logs/user -Dlog4j.configuration=file:/root/github/alluxio/conf/log4j.properties -Dorg.apache.jasper.compiler.disablejsr199=true -Djava.net.preferIPv4Stack=true -Xmx4g -XX:MaxDirectMemorySize=4g alluxio.worker.ucx.UcpServer 10 /root/testfolder 1 2&>1 > /root/github/alluxio/logs/ucpserver.out &
4 changes: 4 additions & 0 deletions dora/core/client/fs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@
<artifactId>alluxio-core-transport</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.openucx</groupId>
<artifactId>jucx</artifactId>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import alluxio.resource.LockResource;

import com.codahale.metrics.Counter;
import org.openucx.jucx.ucp.UcpMemory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,6 +36,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/**
Expand Down Expand Up @@ -89,12 +91,24 @@ class Factory {
* case creation takes a long time by other threads.
*/
public static CacheManager get(AlluxioConfiguration conf) throws IOException {
CacheManagerOptions options = CacheManagerOptions.create(conf);
return get(conf, options, PageMetaStore.create(options));
}

/**
* @param conf the Alluxio configuration
* @return current CacheManager handle, creating a new one if it doesn't yet exist or null in
* case creation takes a long time by other threads.
*/
public static CacheManager get(AlluxioConfiguration conf,
CacheManagerOptions options,
PageMetaStore pageMetaStore) throws IOException {
// TODO(feng): support multiple cache managers
if (CACHE_MANAGER.get() == null) {
try (LockResource lockResource = new LockResource(CACHE_INIT_LOCK)) {
if (CACHE_MANAGER.get() == null) {
CACHE_MANAGER.set(
create(conf));
create(conf, options, pageMetaStore));
}
} catch (IOException e) {
Metrics.CREATE_ERRORS.inc();
Expand All @@ -104,15 +118,6 @@ public static CacheManager get(AlluxioConfiguration conf) throws IOException {
return CACHE_MANAGER.get();
}

/**
* @param conf the Alluxio configuration
* @return an instance of {@link CacheManager}
*/
public static CacheManager create(AlluxioConfiguration conf) throws IOException {
CacheManagerOptions options = CacheManagerOptions.create(conf);
return create(conf, options, PageMetaStore.create(options));
}

/**
* @param conf the Alluxio configuration
* @param options the options for local cache manager
Expand Down Expand Up @@ -306,6 +311,10 @@ int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer buffer,
int getAndLoad(PageId pageId, int pageOffset, int bytesToRead,
ReadTargetBuffer buffer, CacheContext cacheContext, Supplier<byte[]> externalDataSupplier);

default int cache(PageId pageId, CacheContext cacheContext, Supplier<byte[]> externalDataSupplier) {
throw new UnsupportedOperationException();
}

/**
* Get page ids by the given file id.
* @param fileId file identifier
Expand Down Expand Up @@ -381,4 +390,9 @@ default void invalidate(Predicate<PageInfo> predicate) {
Optional<DataFileChannel> getDataFileChannel(
PageId pageId, int pageOffset, int bytesToRead, CacheContext cacheContext)
throws PageNotFoundException;

default Optional<UcpMemory> getUcpMemory(PageId pageId, int pageOffset, int bytesToRead)
throws PageNotFoundException, IOException {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Funnel;
import com.google.common.hash.PrimitiveSink;
import org.openucx.jucx.ucp.UcpMemory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.function.Supplier;
Expand Down Expand Up @@ -61,13 +63,24 @@ public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) {
return mCacheManager.put(pageId, page, cacheContext);
}

@Override
public int cache(PageId pageId, CacheContext cacheContext, Supplier<byte[]> externalDataSupplier) {
return mCacheManager.cache(pageId, cacheContext, externalDataSupplier);
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target,
CacheContext cacheContext) {
getOrUpdateShadowCache(pageId, bytesToRead, cacheContext);
return mCacheManager.get(pageId, pageOffset, bytesToRead, target, cacheContext);
}

@Override
public Optional<UcpMemory> getUcpMemory(PageId pageId, int pageOffset, int bytesToRead)
throws PageNotFoundException, IOException {
return mCacheManager.getUcpMemory(pageId, pageOffset, bytesToRead);
}

@Override
public int getAndLoad(PageId pageId, int pageOffset, int bytesToRead,
ReadTargetBuffer buffer, CacheContext cacheContext, Supplier<byte[]> externalDataSupplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;

import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.store.LocalPageStore;
import alluxio.client.file.cache.store.PageStoreDir;
import alluxio.client.quota.CacheQuota;
import alluxio.client.quota.CacheScope;
Expand All @@ -38,6 +39,7 @@
import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.openucx.jucx.ucp.UcpMemory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -582,6 +584,26 @@ public int get(PageId pageId, int pageOffset, ReadTargetBuffer buffer,
return get(pageId, pageOffset, (int) pageSize, buffer, cacheContext);
}

@Override
public Optional<UcpMemory> getUcpMemory(PageId pageId, int pageOffset, int bytesToRead)
throws PageNotFoundException, IOException {
if (mState.get() == NOT_IN_USE) {
Metrics.GET_NOT_READY_ERRORS.inc();
Metrics.GET_ERRORS.inc();
return Optional.empty();
}
PageInfo pageInfo;
try (LockResource r2 = new LockResource(mPageMetaStore.getLock().readLock())) {
pageInfo = mPageMetaStore.getPageInfo(pageId); //check if page exists and refresh LRU items
} catch (PageNotFoundException e) {
LOG.debug("get({},pageOffset={}) fails due to page not found", pageId, pageOffset);
throw e;
}
UcpMemory ucpMemory = ((LocalPageStore)(pageInfo.getLocalCacheDir().getPageStore()))
.get(pageId, false, pageOffset, bytesToRead);
return Optional.of(ucpMemory);
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer buffer,
CacheContext cacheContext) {
Expand Down Expand Up @@ -632,6 +654,24 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer
}
}

public int cache(PageId pageId, CacheContext cacheContext, Supplier<byte[]> externalDataSupplier) {
int bytesCached = 0;
long startTime = System.nanoTime();
byte[] page = externalDataSupplier.get();
long timeElapse = System.nanoTime() - startTime;
MetricsSystem.meter(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL.getName())
.mark(bytesCached);
MetricsSystem.counter(MetricKey.CLIENT_CACHE_EXTERNAL_REQUESTS.getName()).inc();
cacheContext.incrementCounter(
MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL.getMetricName(), BYTE,
bytesCached);
cacheContext.incrementCounter(
MetricKey.CLIENT_CACHE_PAGE_READ_EXTERNAL_TIME_NS.getMetricName(), NANO,
timeElapse);
put(pageId, page, cacheContext);
return bytesCached;
}

@Override
public int getAndLoad(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer buffer,
CacheContext cacheContext, Supplier<byte[]> externalDataSupplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import alluxio.network.protocol.databuffer.DataFileChannel;

import com.codahale.metrics.Counter;
import org.openucx.jucx.ucp.UcpMemory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -73,6 +75,11 @@ public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) {
}
}

@Override
public int cache(PageId pageId, CacheContext cacheContext, Supplier<byte[]> externalDataSupplier) {
return mCacheManager.cache(pageId, cacheContext, externalDataSupplier);
}

@Override
public int get(PageId pageId, int pageOffset, ReadTargetBuffer buffer,
CacheContext cacheContext) {
Expand Down Expand Up @@ -143,6 +150,12 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer
}
}

@Override
public Optional<UcpMemory> getUcpMemory(PageId pageId, int pageOffset, int bytesToRead)
throws PageNotFoundException, IOException {
return mCacheManager.getUcpMemory(pageId, pageOffset, bytesToRead);
}

@Override
public int getAndLoad(PageId pageId, int pageOffset, int bytesToRead,
ReadTargetBuffer buffer, CacheContext cacheContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package alluxio.client.file.cache.store;

import static alluxio.client.file.cache.store.PageStoreDir.getFileBucket;
import static java.nio.file.StandardOpenOption.READ;

import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageStore;
Expand All @@ -23,13 +24,20 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.openucx.jucx.UcxUtils;
import org.openucx.jucx.ucp.UcpContext;
import org.openucx.jucx.ucp.UcpMemMapParams;
import org.openucx.jucx.ucp.UcpMemory;
import org.openucx.jucx.ucp.UcpParams;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
Expand Down Expand Up @@ -199,6 +207,37 @@ public Path getPagePath(PageId pageId, boolean isTemporary) {
return filePath.resolve(Long.toString(pageId.getPageIndex()));
}

public static final UcpContext sGlobalContext = new UcpContext(new UcpParams()
.requestStreamFeature()
.requestTagFeature()
.requestWakeupFeature());
public UcpMemory get(PageId pageId, boolean isTemporary,
int pageOffset, int bytesToRead)
throws IOException, PageNotFoundException {
Preconditions.checkArgument(pageOffset >= 0,
"page offset should be non-negative");
Path pagePath = getPagePath(pageId, isTemporary);
File pageFile = pagePath.toFile();
if (!pageFile.exists()) {
throw new PageNotFoundException(pagePath.toString());
}
FileChannel fileChannel = FileChannel.open(pagePath, READ);
LOG.error("open fc for:{}:pageOffset:{}:bytesToRead:{}",
pagePath, pageOffset, bytesToRead);
long fileLength = pageFile.length();
if (pageOffset + bytesToRead > fileLength) {
bytesToRead = (int) (fileLength - (long) pageOffset);
}
// TODO set mem pool here
MappedByteBuffer buf = fileChannel.map(FileChannel.MapMode.READ_ONLY,
pageOffset, bytesToRead);
UcpMemory mmapedMemory = sGlobalContext.memoryMap(new UcpMemMapParams()
.setAddress(UcxUtils.getAddress(buf))
.setLength(bytesToRead).nonBlocking());
return mmapedMemory;
}


@Override
public DataFileChannel getDataFileChannel(
PageId pageId, int pageOffset, int bytesToRead, boolean isTemporary)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import alluxio.conf.PropertyKey;
import alluxio.util.FormatUtils;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;

import java.nio.file.Path;
Expand Down Expand Up @@ -296,4 +297,15 @@ public PageStoreOptions setStoreType(PageStoreType storeType) {
mStoreType = storeType;
return this;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("rootdir", mRootDir.toAbsolutePath().toString())
.add("index", mIndex)
.add("alluxioversion", mAlluxioVersion)
.add("pagesize", mPageSize)
.add("cachesize", mCacheSize)
.toString();
}
}
Loading