From 509a96fd52e6d0a7ef823cf7d225d34d3bea7a0e Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Wed, 22 Nov 2023 10:35:26 +0800 Subject: [PATCH] [fix](parquet)fix can not read parquet lz4 compress. --- be/src/exec/decompressor.cpp | 16 +++++++++++-- be/src/util/block_compression.cpp | 39 ++++++++++++++++++++++++++++++- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp index 964654132a3a29..bc97ef0aa724d0 100644 --- a/be/src/exec/decompressor.cpp +++ b/be/src/exec/decompressor.cpp @@ -382,10 +382,22 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t remaining_input_size -= 2 * sizeof(uint32_t); // Decompress - int uncompressed_len = LZ4_decompress_safe(reinterpret_cast(src), + auto uncompressed_len = LZ4_decompress_safe(reinterpret_cast(src), reinterpret_cast(output), compressed_len, remaining_output_size); - if (uncompressed_len < 0 || uncompressed_len != uncompressed_block_len) { + + if (uncompressed_len < 0 || uncompressed_len != uncompressed_block_len ) { + static std::mutex mtx; + { + std::unique_lock lck(mtx); + std::cout <<"Input len = " << input_len<<"\n"; + std::cout <<"Output len = " << output_max_len<<"\n"; + std::cout << "compressed_len = " << compressed_len <<"\n"; + std::cout << "remaining_output_size = " << remaining_output_size <<"\n"; + std::cout << "uncompressed_len = " << uncompressed_len <<"\n"; + std::cout <<"is error = " << LZ4F_isError(uncompressed_len) <<"\n"; + std::cout <<"error code = "<< LZ4F_getErrorName(uncompressed_len)<<"\n"; + } return Status::InternalError( "lz4 block decompress failed. uncompressed_len: {}, expected: {}", uncompressed_len, uncompressed_block_len); diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index fb4c963c11e0f8..48b94f442c39ec 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -42,6 +42,8 @@ #include "gutil/strings/substitute.h" #include "util/defer_op.h" #include "util/faststring.h" +#include "util/bit_util.h" +#include "exec/decompressor.h" namespace doris { @@ -183,6 +185,41 @@ class Lz4BlockCompression : public BlockCompressionCodec { static const int32_t ACCELARATION = 1; }; +class HadoopLz4BlockCompression : public Lz4BlockCompression { +public: + static HadoopLz4BlockCompression* instance() { + static HadoopLz4BlockCompression s_instance; + return &s_instance; + } + Status decompress(const Slice& input, Slice* output) override { + RETURN_IF_ERROR(Decompressor::create_decompressor(CompressType::LZ4BLOCK,&_decompressor)); + size_t input_bytes_read = 0; + size_t decompressed_len = 0; + size_t more_input_bytes = 0; + size_t more_output_bytes = 0; + bool stream_end = false; + auto st = _decompressor->decompress( (uint8_t*)input.data,input.size,&input_bytes_read, + (uint8_t*)output->data,output->size, + &decompressed_len,&stream_end,&more_input_bytes,&more_output_bytes); + static std::mutex mtx; + { + std::unique_lock lck(mtx); + if (st != Status::OK()) { + std::cout << st.to_string() << "\n"; + } + + } + +// if (st!=Status::OK()){ +// auto st2 = Lz4BlockCompression::decompress(input,output); +// } + + return Status::OK(); + } +private: + Decompressor* _decompressor; + +}; // Used for LZ4 frame format, decompress speed is two times faster than LZ4. class Lz4fBlockCompression : public BlockCompressionCodec { private: @@ -1048,7 +1085,7 @@ Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_code break; case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW case tparquet::CompressionCodec::LZ4: - *codec = Lz4BlockCompression::instance(); + *codec = HadoopLz4BlockCompression::instance(); break; case tparquet::CompressionCodec::ZSTD: *codec = ZstdBlockCompression::instance();