-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Netty based s3 API in worker #17661
Conversation
@lucyge2022 @JiamingMai @beinan please take a review about this PR. I started the pipeline of the HTTP protocol on the existing NettyDataServer to process S3 requests. Currently, it supports headObject and getObject (including zero-copy and non-zero-copy). I'm working on the listObject, putObject and other Ops. |
dora/core/server/worker/src/main/java/alluxio/worker/s3/S3HttpHandler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great work! left some comments mostly in style and convention
dora/core/server/common/src/main/java/alluxio/s3/NettyRestUtils.java
Outdated
Show resolved
Hide resolved
dora/core/server/common/src/main/java/alluxio/s3/NettyRestUtils.java
Outdated
Show resolved
Hide resolved
dora/core/server/common/src/main/java/alluxio/s3/NettyRestUtils.java
Outdated
Show resolved
Hide resolved
dora/core/server/common/src/main/java/alluxio/s3/NettyRestUtils.java
Outdated
Show resolved
Hide resolved
dora/core/server/worker/src/main/java/alluxio/worker/s3/S3HttpHandler.java
Outdated
Show resolved
Hide resolved
dora/core/server/worker/src/main/java/alluxio/worker/s3/S3HttpPipelineHandler.java
Outdated
Show resolved
Hide resolved
dora/core/server/worker/src/main/java/alluxio/worker/s3/S3NettyBaseTask.java
Outdated
Show resolved
Hide resolved
dora/core/server/worker/src/main/java/alluxio/worker/s3/S3HttpPipelineHandler.java
Outdated
Show resolved
Hide resolved
dora/core/server/worker/src/main/java/alluxio/worker/s3/S3HttpPipelineHandler.java
Outdated
Show resolved
Hide resolved
…_api � Conflicts: � dora/tests/src/test/java/alluxio/client/rest/CreateBucketTest.java
pagedFileReader.getMultipleDataFileChannel(mHandler.getContext().channel(), length); | ||
} | ||
if (packet != null) { | ||
mHandler.processTransferResponse(packet); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we read from pagestore, which is a list of defaultfileregion opened for multiple pages for the total lengthed object, where the buffer is read thru the page files when u process them one by one, will it race with other reads thru Netty data reader? Meaning will we be reading a page where it no long exist any more during the read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't worry, that's what we do in the rpc reading. I follow the exact same page reading process here and I have confirmed the opinions of Jiaming and Bowen on this process。 it's okay.
dora/core/server/worker/src/main/java/alluxio/worker/s3/S3NettyObjectTask.java
Show resolved
Hide resolved
ByteBuf buf = mContext.channel().alloc().buffer(packetSize, packetSize); | ||
try { | ||
while (buf.writableBytes() > 0 && blockReader.transferTo(buf) != -1) { | ||
mContext.write(new DefaultHttpContent(buf)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not directly write buf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to pass it package by package here, so we wrapped it with HTTPContent.
try { | ||
while (buf.writableBytes() > 0 && blockReader.transferTo(buf) != -1) { | ||
mContext.write(new DefaultHttpContent(buf)); | ||
buf = mContext.channel().alloc().buffer(packetSize, packetSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we reuse a buf here instead of alloc everytime?
also is processMappedResponse ever going to be used now that all BlockReaders are actually PagedFileReader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to reuse the buf before but I can't control when the channel release the buffer after I write it to the channel. So I just alloc it for the channel and then let channel release them after used.
this processMappedResponse is for the transfer type of MAPPED. This is indeed a limitation of FileRegion, which uses zero-copy and avoids putting data in the user space. But TLS needs to rewrite the data on the fly before sending it, so these two are incompatible
dora/core/server/worker/src/main/java/alluxio/worker/s3/S3NettyObjectTask.java
Outdated
Show resolved
Hide resolved
// the encoding type. | ||
boolean isChunkedEncoding = decodedLengthHeader != null; | ||
long toRead; | ||
ByteBuf buf = mHandler.getRequestContent(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so FullHttpRequest contains the entire http body? The HttpObjectAggregator helps to aggregate entire http body into a fullhttprequest and then invoke channelread ? Have u tried upload a obj > 512KB ? And what if upload a 5G obj? is this buf gonna be of 5G in size?
dora/core/server/worker/src/main/java/alluxio/worker/s3/S3HttpHandler.java
Outdated
Show resolved
Hide resolved
@apc999 @lucyge2022 Based on the review comments, I have modified the relevant code. Can you take a look again and see if there are any other code lines that need to be modified? Currently, only large file uploads are not supported, but considering that this may be a big and significant modification, can I support large file uploads in the next PR? |
@apc999 @lucyge2022 Any other comments or changement suggestion for this PR? It is a basic PR, some functions are not currently supported. I will continue to supplement and improve them in subsequent PRs. |
…_api � Conflicts: � dora/minicluster/src/main/java/alluxio/multi/process/MultiProcessCluster.java � dora/tests/src/test/java/alluxio/client/rest/RestApiTest.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks ! it looks good mostly, leaving another batch of comments.
dora/core/common/src/main/java/alluxio/util/network/NetworkAddressUtils.java
Outdated
Show resolved
Hide resolved
dora/core/server/common/src/main/java/alluxio/s3/NettyRestUtils.java
Outdated
Show resolved
Hide resolved
dora/core/server/common/src/main/java/alluxio/s3/NettyRestUtils.java
Outdated
Show resolved
Hide resolved
requestHeaders, responseHeaders); | ||
LOG.debug(accessLog + " " + moreInfoStr); | ||
} else { | ||
LOG.info(accessLog); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we log every access?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we log some basic info for every access request.
dora/core/server/worker/src/main/java/alluxio/worker/s3/S3NettyHandler.java
Outdated
Show resolved
Hide resolved
dora/core/server/worker/src/main/java/alluxio/worker/s3/S3NettyHandler.java
Outdated
Show resolved
Hide resolved
dora/core/server/worker/src/main/java/alluxio/worker/s3/S3NettyHandler.java
Outdated
Show resolved
Hide resolved
dora/core/server/worker/src/main/java/alluxio/worker/s3/S3NettyHandler.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments left. LGTM!
thanks for the contribution
dora/core/common/src/main/java/alluxio/util/network/NetworkAddressUtils.java
Outdated
Show resolved
Hide resolved
} else if (e instanceof IOException) { | ||
return createNettyErrorResponse((IOException) e, resource); | ||
} else { | ||
ByteBuf contentBuffer = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we put the allocation of contentBuff
from Unpooled.copiedBuffer
also into generateS3ErrorResponse
, so generateS3ErrorResponse
takes a message String as arg in stead of ByteBuff
?
ex.
else {
return generateS3ErrorResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage(),
HttpHeaderValues.TEXT_PLAIN);
}
the reason is netty buffer allocation can be very tricky, so we want to reduce the locations where we allocate Netty ByteBuf (like using Unpooled.copiedBuffer
). Typically, Netty ByteBuf requires calling release
to reclaim the resource, but Unpooled.copiedBuffer
is an exception. So let's try to consolidate these allocations and it is easier to manage in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, I will change it
alluxio-bot, merge this please |
What changes are proposed in this pull request?
Implement Netty based s3 API in worker
Why are the changes needed?
Please clarify why the changes are needed. For instance,
Does this PR introduce any user facing changes?
Please list the user-facing changes introduced by your change, including