Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix csv converter and chunk column inconsistent in select into outfile (backport #48052) #48441

Merged
merged 2 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions be/src/exec/plain_text_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "plain_text_builder.h"

#include "column/chunk.h"
#include "column/column_helper.h"
#include "column/const_column.h"
#include "exprs/expr.h"
#include "exprs/vectorized/column_ref.h"
Expand Down Expand Up @@ -31,8 +32,13 @@ Status PlainTextBuilder::init() {
_converters.reserve(_output_expr_ctxs.size());
for (auto* ctx : _output_expr_ctxs) {
const auto& type = ctx->root()->type();
auto conv = vectorized::csv::get_converter(type, ctx->root()->is_nullable());

// in some cases, the nullable property between the column in the chunk and _output_expr_ctxs
// may not be consistent.
// for example: order by limit + left outer join
// select t1.k1, t1.k2, count(distinct t1.k3) as k33 from t1 left join t2 on t1.k1 = t2.k1
// group by t1.k1,t1.k2 order by k33
// so we use nullable converter, and process whether the column is nullable in the nullable converter.
auto conv = vectorized::csv::get_converter(type, true);
if (conv == nullptr) {
return Status::InternalError("No CSV converter for type " + type.debug_string());
}
Expand All @@ -52,15 +58,22 @@ Status PlainTextBuilder::add_chunk(vectorized::Chunk* chunk) {
auto err = strings::Substitute("Unmatched number of columns expected=$0 real=$1", _converters.size(), num_cols);
return Status::InternalError(err);
}
std::vector<const vectorized::Column*> columns_raw_ptr;
columns_raw_ptr.reserve(num_cols);

vectorized::Columns columns;
columns.reserve(num_cols);
for (int i = 0; i < num_cols; i++) {
auto root = _output_expr_ctxs[i]->root();
if (!root->is_slotref()) {
return Status::InternalError("Not slot ref column");
}

auto column_ref = ((vectorized::ColumnRef*)root);
columns_raw_ptr.emplace_back(chunk->get_column_by_slot_id(column_ref->slot_id()).get());
auto col = chunk->get_column_by_slot_id(column_ref->slot_id());
if (col == nullptr) {
return Status::InternalError(strings::Substitute("Column not found by slot id %0", column_ref->slot_id()));
}
col = vectorized::ColumnHelper::unfold_const_column(column_ref->type(), num_rows, col);
columns.emplace_back(col);
}

const std::string& row_delimiter = _options.line_terminated_by;
Expand All @@ -70,7 +83,7 @@ Status PlainTextBuilder::add_chunk(vectorized::Chunk* chunk) {
auto* os = _output_stream.get();
for (size_t row = 0; row < num_rows; row++) {
for (size_t col = 0; col < num_cols; col++) {
auto col_ptr = columns_raw_ptr[col];
auto& col_ptr = columns[col];
RETURN_IF_ERROR(_converters[col]->write_string(os, *col_ptr, row, opts));
RETURN_IF_ERROR(os->write((col == num_cols - 1) ? row_delimiter : column_delimiter));
}
Expand Down
32 changes: 20 additions & 12 deletions be/src/formats/csv/nullable_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,33 @@ namespace starrocks::vectorized::csv {

Status NullableConverter::write_string(OutputStream* os, const Column& column, size_t row_num,
const Options& options) const {
auto nullable_column = down_cast<const NullableColumn*>(&column);
auto data_column = nullable_column->data_column().get();
auto null_column = nullable_column->null_column().get();
if (null_column->get_data()[row_num] != 0) {
return os->write("\\N");
if (column.is_nullable()) {
auto nullable_column = down_cast<const NullableColumn*>(&column);
auto data_column = nullable_column->data_column().get();
auto null_column = nullable_column->null_column().get();
if (null_column->get_data()[row_num] != 0) {
return os->write("\\N");
} else {
return _base_converter->write_string(os, *data_column, row_num, options);
}
} else {
return _base_converter->write_string(os, *data_column, row_num, options);
return _base_converter->write_string(os, column, row_num, options);
}
}

Status NullableConverter::write_quoted_string(OutputStream* os, const Column& column, size_t row_num,
const Options& options) const {
auto nullable_column = down_cast<const NullableColumn*>(&column);
auto data_column = nullable_column->data_column().get();
auto null_column = nullable_column->null_column().get();
if (null_column->get_data()[row_num] != 0) {
return os->write("null");
if (column.is_nullable()) {
auto nullable_column = down_cast<const NullableColumn*>(&column);
auto data_column = nullable_column->data_column().get();
auto null_column = nullable_column->null_column().get();
if (null_column->get_data()[row_num] != 0) {
return os->write("null");
} else {
return _base_converter->write_quoted_string(os, *data_column, row_num, options);
}
} else {
return _base_converter->write_quoted_string(os, *data_column, row_num, options);
return _base_converter->write_quoted_string(os, column, row_num, options);
}
}

Expand Down
18 changes: 17 additions & 1 deletion be/test/formats/csv/nullable_converter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ TEST_F(NullableConverterTest, test_read_string_invalid_value_as_error) {
}

// NOLINTNEXTLINE
TEST_F(NullableConverterTest, test_write_string) {
TEST_F(NullableConverterTest, test_write_string_nullable_column) {
auto conv = csv::get_converter(_type, true);
auto col = ColumnHelper::create_column(_type, true);
col->append_datum(Datum()); // null
Expand All @@ -101,4 +101,20 @@ TEST_F(NullableConverterTest, test_write_string) {
ASSERT_EQ("\\N10null10", buff.as_string());
}

// NOLINTNEXTLINE
TEST_F(NullableConverterTest, test_write_string_not_nullable_column) {
auto conv = csv::get_converter(_type, true);
auto col = ColumnHelper::create_column(_type, false);
col->append_datum((int32_t)1);
col->append_datum((int32_t)10);

csv::OutputStreamString buff;
ASSERT_TRUE(conv->write_string(&buff, *col, 0, Converter::Options()).ok());
ASSERT_TRUE(conv->write_string(&buff, *col, 1, Converter::Options()).ok());
ASSERT_TRUE(conv->write_quoted_string(&buff, *col, 0, Converter::Options()).ok());
ASSERT_TRUE(conv->write_quoted_string(&buff, *col, 1, Converter::Options()).ok());
ASSERT_TRUE(buff.finalize().ok());
ASSERT_EQ("110110", buff.as_string());
}

} // namespace starrocks::vectorized::csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- name: test_select_into_outfile_csv_inconsistent_converter_and_column
create table t1 (k1 int, k2 int, k3 int) distributed by hash(k1) buckets 1;
-- result:
-- !result
insert into t1 values(1,1,1), (2,2,2), (2,2,3), (3,3,4), (3,3,5), (3,3,6);
-- result:
-- !result

create table t2 (k1 int, k2 int, k3 int) distributed by hash(k1) buckets 1;
-- result:
-- !result
insert into t2 values(1,1,1), (2,2,2), (2,2,3), (3,3,4), (3,3,5), (3,3,6);
-- result:
-- !result

select t1.k1, t1.k2, count(distinct t1.k3) as k33 from t1 left join t2 on t1.k1 = t2.k1 group by t1.k1,t1.k2 order by k33 into outfile "oss://${oss_bucket}/test_sink/test_files_sink/${uuid0}/" format as csv;
-- result:
-- !result
shell: ossutil64 cat oss://${oss_bucket}/test_sink/test_files_sink/${uuid0}/0.csv | head -3
-- result:
0
1 1 1
2 2 2
3 3 3
-- !result

shell: ossutil64 rm -rf oss://${oss_bucket}/test_sink/test_files_sink/${uuid0} >/dev/null || echo "exit 0" >/dev/null
-- result:
0

-- !result
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- name: test_select_into_outfile_csv_inconsistent_converter_and_column
create table t1 (k1 int, k2 int, k3 int) distributed by hash(k1) buckets 1;
insert into t1 values(1,1,1), (2,2,2), (2,2,3), (3,3,4), (3,3,5), (3,3,6);

create table t2 (k1 int, k2 int, k3 int) distributed by hash(k1) buckets 1;
insert into t2 values(1,1,1), (2,2,2), (2,2,3), (3,3,4), (3,3,5), (3,3,6);

select t1.k1, t1.k2, count(distinct t1.k3) as k33 from t1 left join t2 on t1.k1 = t2.k1 group by t1.k1,t1.k2 order by k33 into outfile "oss://${oss_bucket}/test_sink/test_files_sink/${uuid0}/" format as csv;
shell: ossutil64 cat oss://${oss_bucket}/test_sink/test_files_sink/${uuid0}/0.csv | head -3

shell: ossutil64 rm -rf oss://${oss_bucket}/test_sink/test_files_sink/${uuid0} >/dev/null || echo "exit 0" >/dev/null
Loading