Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
BePPPower committed Nov 25, 2024
1 parent fc20669 commit f50c44b
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 120 deletions.
11 changes: 1 addition & 10 deletions be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,7 @@ Status DataTypeBitMapSerDe::write_column_to_orc(const std::string& timezone, con
auto& col_data = assert_cast<const ColumnBitmap&>(column);
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);

char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
if (!ptr) {
return Status::InternalError(
"malloc memory error when write largeint column data to orc file.");
}
StringRef bufferRef;
bufferRef.data = ptr;
bufferRef.size = BUFFER_UNIT_SIZE;
size_t offset = 0;
buffer_list.emplace_back(bufferRef);
INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
Expand Down
20 changes: 2 additions & 18 deletions be/src/vec/data_types/serde/data_type_date64_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,17 +289,7 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con
auto& col_data = static_cast<const ColumnVector<Int64>&>(column).get_data();
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);

char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
if (!ptr) {
return Status::InternalError(
"malloc memory error when write largeint column data to orc file.");
}
StringRef bufferRef;
bufferRef.data = ptr;
bufferRef.size = BUFFER_UNIT_SIZE;
size_t offset = 0;
const size_t begin_off = offset;
buffer_list.emplace_back(bufferRef);
INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 0) {
Expand All @@ -311,16 +301,10 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con

REALLOC_MEMORY_FOR_ORC_WRITER()

cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
size_t data_off = 0;
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + begin_off + data_off;
data_off += cur_batch->length[row_id];
}
}

cur_batch->numElements = end - start;
return Status::OK();
Expand Down
11 changes: 1 addition & 10 deletions be/src/vec/data_types/serde/data_type_hll_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,7 @@ Status DataTypeHLLSerDe::write_column_to_orc(const std::string& timezone, const
auto& col_data = assert_cast<const ColumnHLL&>(column);
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);

char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
if (!ptr) {
return Status::InternalError(
"malloc memory error when write largeint column data to orc file.");
}
StringRef bufferRef;
bufferRef.data = ptr;
bufferRef.size = BUFFER_UNIT_SIZE;
size_t offset = 0;
buffer_list.emplace_back(bufferRef);
INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
Expand Down
37 changes: 11 additions & 26 deletions be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,38 +187,23 @@ Status DataTypeIPv6SerDe::write_column_to_orc(const std::string& timezone, const
std::vector<StringRef>& buffer_list) const {
const auto& col_data = assert_cast<const ColumnIPv6&>(column).get_data();
orc::StringVectorBatch* cur_batch = assert_cast<orc::StringVectorBatch*>(orc_col_batch);
char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
if (!ptr) {
return Status::InternalError(
"malloc memory error when write largeint column data to orc file.");
}
StringRef bufferRef;
bufferRef.data = ptr;
bufferRef.size = BUFFER_UNIT_SIZE;
size_t offset = 0;
const size_t begin_off = offset;
buffer_list.emplace_back(bufferRef);

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 0) {
continue;
}
std::string ipv6_str = IPv6Value::to_string(col_data[row_id]);
size_t len = ipv6_str.size();

REALLOC_MEMORY_FOR_ORC_WRITER()
INIT_MEMORY_FOR_ORC_WRITER()

strcpy(const_cast<char*>(bufferRef.data) + offset, ipv6_str.c_str());
offset += len;
cur_batch->length[row_id] = len;
}
size_t data_off = 0;
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + begin_off + data_off;
data_off += cur_batch->length[row_id];
std::string ipv6_str = IPv6Value::to_string(col_data[row_id]);
size_t len = ipv6_str.size();

REALLOC_MEMORY_FOR_ORC_WRITER()

strcpy(const_cast<char*>(bufferRef.data) + offset, ipv6_str.c_str());
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
}

cur_batch->numElements = end - start;
return Status::OK();
}
Expand Down
11 changes: 1 addition & 10 deletions be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,7 @@ Status DataTypeJsonbSerDe::write_column_to_orc(const std::string& timezone, cons
auto* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);
const auto& string_column = assert_cast<const ColumnString&>(column);

char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
if (!ptr) {
return Status::InternalError(
"malloc memory error when write largeint column data to orc file.");
}
StringRef bufferRef;
bufferRef.data = ptr;
bufferRef.size = BUFFER_UNIT_SIZE;
size_t offset = 0;
buffer_list.emplace_back(bufferRef);
INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
Expand Down
36 changes: 10 additions & 26 deletions be/src/vec/data_types/serde/data_type_number_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,38 +342,22 @@ Status DataTypeNumberSerDe<T>::write_column_to_orc(const std::string& timezone,
if constexpr (std::is_same_v<T, Int128>) { // largeint
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);

char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
if (!ptr) {
return Status::InternalError(
"malloc memory error when write largeint column data to orc file.");
}
StringRef bufferRef;
bufferRef.data = ptr;
bufferRef.size = BUFFER_UNIT_SIZE;
size_t offset = 0;
const size_t begin_off = offset;
buffer_list.emplace_back(bufferRef);
INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 0) {
continue;
}
std::string value_str = fmt::format("{}", col_data[row_id]);
size_t len = value_str.size();
if (cur_batch->notNull[row_id] == 1) {
std::string value_str = fmt::format("{}", col_data[row_id]);
size_t len = value_str.size();

REALLOC_MEMORY_FOR_ORC_WRITER()
REALLOC_MEMORY_FOR_ORC_WRITER()

strcpy(const_cast<char*>(bufferRef.data) + offset, value_str.c_str());
offset += len;
cur_batch->length[row_id] = len;
}
size_t data_off = 0;
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + begin_off + data_off;
data_off += cur_batch->length[row_id];
strcpy(const_cast<char*>(bufferRef.data) + offset, value_str.c_str());
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
}

cur_batch->numElements = end - start;
} else if constexpr (std::is_same_v<T, Int8> || std::is_same_v<T, UInt8>) { // tinyint/boolean
WRITE_INTEGRAL_COLUMN_TO_ORC(orc::ByteVectorBatch)
Expand Down
11 changes: 1 addition & 10 deletions be/src/vec/data_types/serde/data_type_object_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,7 @@ Status DataTypeObjectSerDe::write_column_to_orc(const std::string& timezone, con
const auto* var = check_and_get_column<ColumnObject>(column);
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);

char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
if (!ptr) {
return Status::InternalError(
"malloc memory error when write largeint column data to orc file.");
}
StringRef bufferRef;
bufferRef.data = ptr;
bufferRef.size = BUFFER_UNIT_SIZE;
size_t offset = 0;
buffer_list.emplace_back(bufferRef);
INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
Expand Down
11 changes: 1 addition & 10 deletions be/src/vec/data_types/serde/data_type_quantilestate_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,7 @@ class DataTypeQuantileStateSerDe : public DataTypeSerDe {
auto& col_data = assert_cast<const ColumnQuantileState&>(column);
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);

char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
if (!ptr) {
return Status::InternalError(
"malloc memory error when write largeint column data to orc file.");
}
StringRef bufferRef;
bufferRef.data = ptr;
bufferRef.size = BUFFER_UNIT_SIZE;
size_t offset = 0;
buffer_list.emplace_back(bufferRef);
INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
Expand Down
12 changes: 12 additions & 0 deletions be/src/vec/data_types/serde/data_type_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ struct ColumnVectorBatch;
++*num_deserialized; \
}

#define INIT_MEMORY_FOR_ORC_WRITER() \
char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); \
if (!ptr) { \
return Status::InternalError( \
"malloc memory error when write largeint column data to orc file."); \
} \
StringRef bufferRef; \
bufferRef.data = ptr; \
bufferRef.size = BUFFER_UNIT_SIZE; \
size_t offset = 0; \
buffer_list.emplace_back(bufferRef);

#define REALLOC_MEMORY_FOR_ORC_WRITER() \
while (bufferRef.size - BUFFER_RESERVED_SIZE < offset + len) { \
char* new_ptr = (char*)malloc(bufferRef.size + BUFFER_UNIT_SIZE); \
Expand Down

0 comments on commit f50c44b

Please sign in to comment.