Skip to content

Commit

Permalink
[feature](parquet)support read parquet lzo compress
Browse files Browse the repository at this point in the history
  • Loading branch information
hubgeter committed Nov 28, 2023
1 parent c02c009 commit 68f84bb
Showing 1 changed file with 87 additions and 0 deletions.
87 changes: 87 additions & 0 deletions be/src/util/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
#include <zstd_errors.h>

#include <algorithm>
#include <apache-orc/c++/src/LzoDecompressor.hh>
#include <limits>
#include <mutex>
#include <new>
#include <ostream>

#include "common/config.h"
#include "gutil/endian.h"
#include "gutil/strings/substitute.h"
#include "util/defer_op.h"
#include "util/faststring.h"
Expand Down Expand Up @@ -1006,6 +1008,88 @@ class GzipBlockCompression final : public ZlibBlockCompression {
const static int MEM_LEVEL = 8;
};

class LzoBlockCompression final : public BlockCompressionCodec {
public:
static LzoBlockCompression* instance() {
static LzoBlockCompression s_instance;
return &s_instance;
}

Status compress(const Slice& input, faststring* output) override {
return Status::InvalidArgument("not impl lzo compress.");
}
size_t max_compressed_len(size_t len) override { return 0; };
Status decompress(const Slice& input, Slice* output) override {
auto* input_ptr = input.data;
auto remain_input_size = input.size;
auto* output_ptr = output->data;
auto remain_output_size = output->size;
auto* output_limit = output->data + output->size;

// The original data will be divided into several large data block.
// The large data chunk will be divided into several small data block.
// Example:
// OriginData : large data chunk1 | large data chunk2 | large data chunk3 | ....
//Suppose a large data block is divided into three small blocks:
// large data chunk1: | small block1 | small block2 | small block3 |
// CompressDate: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
//
// A : original length of the current block of large data chunk. sizeof(A) = 4 bytes.
// A = length(small block1) + length(small block2) + length(small block3)
// Bx : length of small data block bx. sizeof(Bx) = 4 bytes.
// Bx = length(compress(small blockx))

try {
while (remain_input_size > 0) {
if (remain_input_size < 4) {
return Status::InvalidArgument(
"Need more input buffer to get large_block_uncompressed_len.");
}

uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr);
input_ptr += 4;
remain_input_size -= 4;

if (remain_output_size < large_block_uncompressed_len) {
return Status::InvalidArgument(
"Need more output buffer to get uncompressed data.");
}

while (large_block_uncompressed_len > 0) {
if (remain_input_size < 4) {
return Status::InvalidArgument(
"Need more input buffer to get small_block_compressed_len.");
}

uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr);
input_ptr += 4;
remain_input_size -= 4;

if (remain_input_size < small_block_compressed_len) {
return Status::InvalidArgument(
"Need more input buffer to decompress small block.");
}

auto small_block_uncompressed_len =
orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len,
output_ptr, output_limit);

input_ptr += small_block_compressed_len;
remain_input_size -= small_block_compressed_len;

output_ptr += small_block_uncompressed_len;
large_block_uncompressed_len -= small_block_uncompressed_len;
remain_output_size -= small_block_uncompressed_len;
}
}
} catch (const orc::ParseError& e) {
//Prevent be from hanging due to orc::lzoDecompress throw exception
return Status::InternalError("Fail to do LZO decompress, error={}", e.what());
}
return Status::OK();
}
};

Status get_block_compression_codec(segment_v2::CompressionTypePB type,
BlockCompressionCodec** codec) {
switch (type) {
Expand Down Expand Up @@ -1056,6 +1140,9 @@ Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_code
case tparquet::CompressionCodec::GZIP:
*codec = GzipBlockCompression::instance();
break;
case tparquet::CompressionCodec::LZO:
*codec = LzoBlockCompression::instance();
break;
default:
return Status::InternalError("unknown compression type({})", parquet_codec);
}
Expand Down

0 comments on commit 68f84bb

Please sign in to comment.