diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp index 8c677425310138..5d024a418340e8 100644 --- a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp @@ -193,16 +193,7 @@ Status DataTypeBitMapSerDe::write_column_to_orc(const std::string& timezone, con auto& col_data = assert_cast(column); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp b/be/src/vec/data_types/serde/data_type_date64_serde.cpp index 48a4b2c16785e9..6cc83faf7a5b52 100644 --- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp @@ -289,17 +289,7 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con auto& col_data = static_cast&>(column).get_data(); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - const size_t begin_off = offset; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 0) { @@ -311,16 +301,10 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con REALLOC_MEMORY_FOR_ORC_WRITER() + cur_batch->data[row_id] = const_cast(bufferRef.data) + offset; cur_batch->length[row_id] = len; offset += len; } - size_t data_off = 0; - for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 1) { - cur_batch->data[row_id] = const_cast(bufferRef.data) + begin_off + data_off; - data_off += cur_batch->length[row_id]; - } - } cur_batch->numElements = end - start; return Status::OK(); diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp b/be/src/vec/data_types/serde/data_type_hll_serde.cpp index f63124f291219f..42260b092605e1 100644 --- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp @@ -188,16 +188,7 @@ Status DataTypeHLLSerDe::write_column_to_orc(const std::string& timezone, const auto& col_data = assert_cast(column); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp index 643d136c22e0c6..e899de93c90ce0 100644 --- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp @@ -187,38 +187,23 @@ Status DataTypeIPv6SerDe::write_column_to_orc(const std::string& timezone, const std::vector& buffer_list) const { const auto& col_data = assert_cast(column).get_data(); orc::StringVectorBatch* cur_batch = assert_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - const size_t begin_off = offset; - buffer_list.emplace_back(bufferRef); - - for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 0) { - continue; - } - std::string ipv6_str = IPv6Value::to_string(col_data[row_id]); - size_t len = ipv6_str.size(); - REALLOC_MEMORY_FOR_ORC_WRITER() + INIT_MEMORY_FOR_ORC_WRITER() - strcpy(const_cast(bufferRef.data) + offset, ipv6_str.c_str()); - offset += len; - cur_batch->length[row_id] = len; - } - size_t data_off = 0; for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { - cur_batch->data[row_id] = const_cast(bufferRef.data) + begin_off + data_off; - data_off += cur_batch->length[row_id]; + std::string ipv6_str = IPv6Value::to_string(col_data[row_id]); + size_t len = ipv6_str.size(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + strcpy(const_cast(bufferRef.data) + offset, ipv6_str.c_str()); + cur_batch->data[row_id] = const_cast(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } + cur_batch->numElements = end - start; return Status::OK(); } diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp index 75e89e9dcf3d41..adc041f511198e 100644 --- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp @@ -146,16 +146,7 @@ Status DataTypeJsonbSerDe::write_column_to_orc(const std::string& timezone, cons auto* cur_batch = dynamic_cast(orc_col_batch); const auto& string_column = assert_cast(column); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp b/be/src/vec/data_types/serde/data_type_number_serde.cpp index 9b2ad5676f8a58..55c7b2c9505dae 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp @@ -342,38 +342,22 @@ Status DataTypeNumberSerDe::write_column_to_orc(const std::string& timezone, if constexpr (std::is_same_v) { // largeint orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - const size_t begin_off = offset; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 0) { - continue; - } - std::string value_str = fmt::format("{}", col_data[row_id]); - size_t len = value_str.size(); + if (cur_batch->notNull[row_id] == 1) { + std::string value_str = fmt::format("{}", col_data[row_id]); + size_t len = value_str.size(); - REALLOC_MEMORY_FOR_ORC_WRITER() + REALLOC_MEMORY_FOR_ORC_WRITER() - strcpy(const_cast(bufferRef.data) + offset, value_str.c_str()); - offset += len; - cur_batch->length[row_id] = len; - } - size_t data_off = 0; - for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 1) { - cur_batch->data[row_id] = const_cast(bufferRef.data) + begin_off + data_off; - data_off += cur_batch->length[row_id]; + strcpy(const_cast(bufferRef.data) + offset, value_str.c_str()); + cur_batch->data[row_id] = const_cast(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } + cur_batch->numElements = end - start; } else if constexpr (std::is_same_v || std::is_same_v) { // tinyint/boolean WRITE_INTEGRAL_COLUMN_TO_ORC(orc::ByteVectorBatch) diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index d489739c622a00..fc536d9ef0df7b 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -173,16 +173,7 @@ Status DataTypeObjectSerDe::write_column_to_orc(const std::string& timezone, con const auto* var = check_and_get_column(column); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h index d64552e46a87d6..d3526ba389925f 100644 --- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h +++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h @@ -155,16 +155,7 @@ class DataTypeQuantileStateSerDe : public DataTypeSerDe { auto& col_data = assert_cast(column); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { diff --git a/be/src/vec/data_types/serde/data_type_serde.h b/be/src/vec/data_types/serde/data_type_serde.h index f0e9eb27961439..a9200d1fccf316 100644 --- a/be/src/vec/data_types/serde/data_type_serde.h +++ b/be/src/vec/data_types/serde/data_type_serde.h @@ -77,6 +77,18 @@ struct ColumnVectorBatch; ++*num_deserialized; \ } +#define INIT_MEMORY_FOR_ORC_WRITER() \ + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); \ + if (!ptr) { \ + return Status::InternalError( \ + "malloc memory error when write largeint column data to orc file."); \ + } \ + StringRef bufferRef; \ + bufferRef.data = ptr; \ + bufferRef.size = BUFFER_UNIT_SIZE; \ + size_t offset = 0; \ + buffer_list.emplace_back(bufferRef); + #define REALLOC_MEMORY_FOR_ORC_WRITER() \ while (bufferRef.size - BUFFER_RESERVED_SIZE < offset + len) { \ char* new_ptr = (char*)malloc(bufferRef.size + BUFFER_UNIT_SIZE); \