Skip to content

Commit

Permalink
[fix](parquet)fix can not read parquet lz4 compress.
Browse files Browse the repository at this point in the history
  • Loading branch information
hubgeter committed Nov 22, 2023
1 parent c02c009 commit 509a96f
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 3 deletions.
16 changes: 14 additions & 2 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,22 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t
remaining_input_size -= 2 * sizeof(uint32_t);

// Decompress
int uncompressed_len = LZ4_decompress_safe(reinterpret_cast<const char*>(src),
auto uncompressed_len = LZ4_decompress_safe(reinterpret_cast<const char*>(src),
reinterpret_cast<char*>(output), compressed_len,
remaining_output_size);
if (uncompressed_len < 0 || uncompressed_len != uncompressed_block_len) {

if (uncompressed_len < 0 || uncompressed_len != uncompressed_block_len ) {
static std::mutex mtx;
{
std::unique_lock<std::mutex> lck(mtx);
std::cout <<"Input len = " << input_len<<"\n";
std::cout <<"Output len = " << output_max_len<<"\n";
std::cout << "compressed_len = " << compressed_len <<"\n";
std::cout << "remaining_output_size = " << remaining_output_size <<"\n";
std::cout << "uncompressed_len = " << uncompressed_len <<"\n";
std::cout <<"is error = " << LZ4F_isError(uncompressed_len) <<"\n";
std::cout <<"error code = "<< LZ4F_getErrorName(uncompressed_len)<<"\n";
}
return Status::InternalError(
"lz4 block decompress failed. uncompressed_len: {}, expected: {}",
uncompressed_len, uncompressed_block_len);
Expand Down
39 changes: 38 additions & 1 deletion be/src/util/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
#include "gutil/strings/substitute.h"
#include "util/defer_op.h"
#include "util/faststring.h"
#include "util/bit_util.h"
#include "exec/decompressor.h"

namespace doris {

Expand Down Expand Up @@ -183,6 +185,41 @@ class Lz4BlockCompression : public BlockCompressionCodec {
static const int32_t ACCELARATION = 1;
};

class HadoopLz4BlockCompression : public Lz4BlockCompression {
public:
static HadoopLz4BlockCompression* instance() {
static HadoopLz4BlockCompression s_instance;
return &s_instance;
}
Status decompress(const Slice& input, Slice* output) override {
RETURN_IF_ERROR(Decompressor::create_decompressor(CompressType::LZ4BLOCK,&_decompressor));
size_t input_bytes_read = 0;
size_t decompressed_len = 0;
size_t more_input_bytes = 0;
size_t more_output_bytes = 0;
bool stream_end = false;
auto st = _decompressor->decompress( (uint8_t*)input.data,input.size,&input_bytes_read,
(uint8_t*)output->data,output->size,
&decompressed_len,&stream_end,&more_input_bytes,&more_output_bytes);
static std::mutex mtx;
{
std::unique_lock<std::mutex> lck(mtx);
if (st != Status::OK()) {
std::cout << st.to_string() << "\n";
}

}

// if (st!=Status::OK()){
// auto st2 = Lz4BlockCompression::decompress(input,output);
// }

return Status::OK();
}
private:
Decompressor* _decompressor;

};
// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
class Lz4fBlockCompression : public BlockCompressionCodec {
private:
Expand Down Expand Up @@ -1048,7 +1085,7 @@ Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_code
break;
case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW
case tparquet::CompressionCodec::LZ4:
*codec = Lz4BlockCompression::instance();
*codec = HadoopLz4BlockCompression::instance();
break;
case tparquet::CompressionCodec::ZSTD:
*codec = ZstdBlockCompression::instance();
Expand Down

0 comments on commit 509a96f

Please sign in to comment.