-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Optimize serializer decompress buffer for BufferInputStream #11836
base: main
Are you sure you want to change the base?
feat: Optimize serializer decompress buffer for BufferInputStream #11836
Conversation
✅ Deploy Preview for meta-velox canceled.
|
0a803d5
to
7cfade1
Compare
7e78159
to
5dfe257
Compare
Can you help review again? Thanks! @Yuhta |
auto newBuf = folly::IOBuf::wrapBuffer( | ||
current_->buffer + current_->position, readBytes); | ||
if (result) { | ||
result->prev()->appendChain(std::move(newBuf)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order of buffers here is incorrect. Just do result->appendToChain(std::move(newBuf))
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order is same with IOBufOutputStream::getIOBuf(), appendToChain is also OK. I will change as you say.
common::CompressionKind compressionKind, | ||
memory::MemoryPool& pool) { | ||
const auto codec = common::compressionKindToCodec(compressionKind); | ||
if (dynamic_cast<BufferInputStream*>(source)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just do the dynamic_cast
once: if (const auto* bufferSource = dynamic_cast<BufferInputStream*>(source))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implement the readBytes return IOBuf in FileInputStream, so we don't need to do the cast, and other compression code can also benefit from this.
auto byteStream = createStream(byteRanges); | ||
auto bufferStream = dynamic_cast<BufferInputStream*>(byteStream.get()); | ||
for (int offset = 0; offset < streamSize;) { | ||
auto iobuf = bufferStream->readBytes(streamSize / 8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not testing the case one IOBuf chain cross 2 buffers. Let's cover that as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added.
Failed by this issue #11857 |
Can you help review again? Thanks! @Yuhta |
current_->position += size; | ||
return iobuf; | ||
} | ||
auto iobuf = folly::IOBuf::create(size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allocation should be avoided. Can we make a chain that takeOwnership()
of the buffers instead?
We would also need to put pins on the buffers so that they are not overwritten while we have IOBuf
on top of them. If this feels too complicated we can leave this out for next PR and just cast to BufferInputStream
for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is complicated, and it breaks the FileInputStream should only reserve memory as QueryConfig::readBufferSize. Alternatively, we could add a new argument MemoryPool* pool = nullptr
, allocated the Buffer from pool, and wrap the buffer as IOBuf. Which way do you think is better? This maybe in a follow up PR.
ce58686
to
38fe41c
Compare
I move to the cast to BufferInputStream as you say, and compress Row has merged, so I update it in same way. Can you help review again? Thanks~ @Yuhta |
Mac13 failed by arrow download, velox release uts failed by existing issue hdfs test, other tests passed. |
Can you help review again? Thanks! @Yuhta |
Read the BufferInputStream by new added function readBytes to avoid copying from the buffer cache. Extract the uncompress process to a new function can help release the compressed buffer in time.
FileInputStream reuse the read buffer, so we can get the IOBuf without copy only if the required bytes is in one buffer, which is not the common case, so I don't do the optimization for it.