diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index fb4c963c11e0f80..01fc8a09413bff6 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -33,12 +33,14 @@ #include #include +#include #include #include #include #include #include "common/config.h" +#include "gutil/endian.h" #include "gutil/strings/substitute.h" #include "util/defer_op.h" #include "util/faststring.h" @@ -1006,6 +1008,88 @@ class GzipBlockCompression final : public ZlibBlockCompression { const static int MEM_LEVEL = 8; }; +class LzoBlockCompression final : public BlockCompressionCodec { +public: + static LzoBlockCompression* instance() { + static LzoBlockCompression s_instance; + return &s_instance; + } + + Status compress(const Slice& input, faststring* output) override { + return Status::InvalidArgument("not impl lzo compress."); + } + size_t max_compressed_len(size_t len) override { return 0; }; + Status decompress(const Slice& input, Slice* output) override { + auto* input_ptr = input.data; + auto remain_input_size = input.size; + auto* output_ptr = output->data; + auto remain_output_size = output->size; + auto* output_limit = output->data + output->size; + + // The original data will be divided into several large data block. + // The large data chunk will be divided into several small data block. + // Example: + // OriginData : large data chunk1 | large data chunk2 | large data chunk3 | .... + //Suppose a large data block is divided into three small blocks: + // large data chunk1: | small block1 | small block2 | small block3 | + // CompressDate: + // + // A : original length of the current block of large data chunk. sizeof(A) = 4 bytes. + // A = length(small block1) + length(small block2) + length(small block3) + // Bx : length of small data block bx. sizeof(Bx) = 4 bytes. + // Bx = length(compress(small blockx)) + + try { + while (remain_input_size > 0) { + if (remain_input_size < 4) { + return Status::InvalidArgument( + "Need more input buffer to get large_block_uncompressed_len."); + } + + uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr); + input_ptr += 4; + remain_input_size -= 4; + + if (remain_output_size < large_block_uncompressed_len) { + return Status::InvalidArgument( + "Need more output buffer to get uncompressed data."); + } + + while (large_block_uncompressed_len > 0) { + if (remain_input_size < 4) { + return Status::InvalidArgument( + "Need more input buffer to get small_block_compressed_len."); + } + + uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr); + input_ptr += 4; + remain_input_size -= 4; + + if (remain_input_size < small_block_compressed_len) { + return Status::InvalidArgument( + "Need more input buffer to decompress small block."); + } + + auto small_block_uncompressed_len = + orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len, + output_ptr, output_limit); + + input_ptr += small_block_compressed_len; + remain_input_size -= small_block_compressed_len; + + output_ptr += small_block_uncompressed_len; + large_block_uncompressed_len -= small_block_uncompressed_len; + remain_output_size -= small_block_uncompressed_len; + } + } + } catch (const orc::ParseError& e) { + //Prevent be from hanging due to orc::lzoDecompress throw exception + return Status::InternalError("Fail to do LZO decompress, error={}", e.what()); + } + return Status::OK(); + } +}; + Status get_block_compression_codec(segment_v2::CompressionTypePB type, BlockCompressionCodec** codec) { switch (type) { @@ -1056,6 +1140,9 @@ Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_code case tparquet::CompressionCodec::GZIP: *codec = GzipBlockCompression::instance(); break; + case tparquet::CompressionCodec::LZO: + *codec = LzoBlockCompression::instance(); + break; default: return Status::InternalError("unknown compression type({})", parquet_codec); }