Skip to content

Commit

Permalink
[BugFix] Fix csv converter and chunk column inconsistent in select in…
Browse files Browse the repository at this point in the history
…to outfile (backport #48052) (#48441)

Co-authored-by: wyb <[email protected]>
  • Loading branch information
mergify[bot] and wyb committed Jul 16, 2024
1 parent 80b7ae8 commit 746ae5e
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 19 deletions.
25 changes: 19 additions & 6 deletions be/src/exec/plain_text_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "plain_text_builder.h"

#include "column/chunk.h"
#include "column/column_helper.h"
#include "column/const_column.h"
#include "exprs/expr.h"
#include "exprs/vectorized/column_ref.h"
Expand Down Expand Up @@ -31,8 +32,13 @@ Status PlainTextBuilder::init() {
_converters.reserve(_output_expr_ctxs.size());
for (auto* ctx : _output_expr_ctxs) {
const auto& type = ctx->root()->type();
auto conv = vectorized::csv::get_converter(type, ctx->root()->is_nullable());

// in some cases, the nullable property between the column in the chunk and _output_expr_ctxs
// may not be consistent.
// for example: order by limit + left outer join
// select t1.k1, t1.k2, count(distinct t1.k3) as k33 from t1 left join t2 on t1.k1 = t2.k1
// group by t1.k1,t1.k2 order by k33
// so we use nullable converter, and process whether the column is nullable in the nullable converter.
auto conv = vectorized::csv::get_converter(type, true);
if (conv == nullptr) {
return Status::InternalError("No CSV converter for type " + type.debug_string());
}
Expand All @@ -52,15 +58,22 @@ Status PlainTextBuilder::add_chunk(vectorized::Chunk* chunk) {
auto err = strings::Substitute("Unmatched number of columns expected=$0 real=$1", _converters.size(), num_cols);
return Status::InternalError(err);
}
std::vector<const vectorized::Column*> columns_raw_ptr;
columns_raw_ptr.reserve(num_cols);

vectorized::Columns columns;
columns.reserve(num_cols);
for (int i = 0; i < num_cols; i++) {
auto root = _output_expr_ctxs[i]->root();
if (!root->is_slotref()) {
return Status::InternalError("Not slot ref column");
}

auto column_ref = ((vectorized::ColumnRef*)root);
columns_raw_ptr.emplace_back(chunk->get_column_by_slot_id(column_ref->slot_id()).get());
auto col = chunk->get_column_by_slot_id(column_ref->slot_id());
if (col == nullptr) {
return Status::InternalError(strings::Substitute("Column not found by slot id %0", column_ref->slot_id()));
}
col = vectorized::ColumnHelper::unfold_const_column(column_ref->type(), num_rows, col);
columns.emplace_back(col);
}

const std::string& row_delimiter = _options.line_terminated_by;
Expand All @@ -70,7 +83,7 @@ Status PlainTextBuilder::add_chunk(vectorized::Chunk* chunk) {
auto* os = _output_stream.get();
for (size_t row = 0; row < num_rows; row++) {
for (size_t col = 0; col < num_cols; col++) {
auto col_ptr = columns_raw_ptr[col];
auto& col_ptr = columns[col];
RETURN_IF_ERROR(_converters[col]->write_string(os, *col_ptr, row, opts));
RETURN_IF_ERROR(os->write((col == num_cols - 1) ? row_delimiter : column_delimiter));
}
Expand Down
32 changes: 20 additions & 12 deletions be/src/formats/csv/nullable_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,33 @@ namespace starrocks::vectorized::csv {

Status NullableConverter::write_string(OutputStream* os, const Column& column, size_t row_num,
const Options& options) const {
auto nullable_column = down_cast<const NullableColumn*>(&column);
auto data_column = nullable_column->data_column().get();
auto null_column = nullable_column->null_column().get();
if (null_column->get_data()[row_num] != 0) {
return os->write("\\N");
if (column.is_nullable()) {
auto nullable_column = down_cast<const NullableColumn*>(&column);
auto data_column = nullable_column->data_column().get();
auto null_column = nullable_column->null_column().get();
if (null_column->get_data()[row_num] != 0) {
return os->write("\\N");
} else {
return _base_converter->write_string(os, *data_column, row_num, options);
}
} else {
return _base_converter->write_string(os, *data_column, row_num, options);
return _base_converter->write_string(os, column, row_num, options);
}
}

Status NullableConverter::write_quoted_string(OutputStream* os, const Column& column, size_t row_num,
const Options& options) const {
auto nullable_column = down_cast<const NullableColumn*>(&column);
auto data_column = nullable_column->data_column().get();
auto null_column = nullable_column->null_column().get();
if (null_column->get_data()[row_num] != 0) {
return os->write("null");
if (column.is_nullable()) {
auto nullable_column = down_cast<const NullableColumn*>(&column);
auto data_column = nullable_column->data_column().get();
auto null_column = nullable_column->null_column().get();
if (null_column->get_data()[row_num] != 0) {
return os->write("null");
} else {
return _base_converter->write_quoted_string(os, *data_column, row_num, options);
}
} else {
return _base_converter->write_quoted_string(os, *data_column, row_num, options);
return _base_converter->write_quoted_string(os, column, row_num, options);
}
}

Expand Down
18 changes: 17 additions & 1 deletion be/test/formats/csv/nullable_converter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ TEST_F(NullableConverterTest, test_read_string_invalid_value_as_error) {
}

// NOLINTNEXTLINE
TEST_F(NullableConverterTest, test_write_string) {
TEST_F(NullableConverterTest, test_write_string_nullable_column) {
auto conv = csv::get_converter(_type, true);
auto col = ColumnHelper::create_column(_type, true);
col->append_datum(Datum()); // null
Expand All @@ -101,4 +101,20 @@ TEST_F(NullableConverterTest, test_write_string) {
ASSERT_EQ("\\N10null10", buff.as_string());
}

// NOLINTNEXTLINE
TEST_F(NullableConverterTest, test_write_string_not_nullable_column) {
auto conv = csv::get_converter(_type, true);
auto col = ColumnHelper::create_column(_type, false);
col->append_datum((int32_t)1);
col->append_datum((int32_t)10);

csv::OutputStreamString buff;
ASSERT_TRUE(conv->write_string(&buff, *col, 0, Converter::Options()).ok());
ASSERT_TRUE(conv->write_string(&buff, *col, 1, Converter::Options()).ok());
ASSERT_TRUE(conv->write_quoted_string(&buff, *col, 0, Converter::Options()).ok());
ASSERT_TRUE(conv->write_quoted_string(&buff, *col, 1, Converter::Options()).ok());
ASSERT_TRUE(buff.finalize().ok());
ASSERT_EQ("110110", buff.as_string());
}

} // namespace starrocks::vectorized::csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- name: test_select_into_outfile_csv_inconsistent_converter_and_column
create table t1 (k1 int, k2 int, k3 int) distributed by hash(k1) buckets 1;
-- result:
-- !result
insert into t1 values(1,1,1), (2,2,2), (2,2,3), (3,3,4), (3,3,5), (3,3,6);
-- result:
-- !result

create table t2 (k1 int, k2 int, k3 int) distributed by hash(k1) buckets 1;
-- result:
-- !result
insert into t2 values(1,1,1), (2,2,2), (2,2,3), (3,3,4), (3,3,5), (3,3,6);
-- result:
-- !result

select t1.k1, t1.k2, count(distinct t1.k3) as k33 from t1 left join t2 on t1.k1 = t2.k1 group by t1.k1,t1.k2 order by k33 into outfile "oss://${oss_bucket}/test_sink/test_files_sink/${uuid0}/" format as csv;
-- result:
-- !result
shell: ossutil64 cat oss://${oss_bucket}/test_sink/test_files_sink/${uuid0}/0.csv | head -3
-- result:
0
1 1 1
2 2 2
3 3 3
-- !result

shell: ossutil64 rm -rf oss://${oss_bucket}/test_sink/test_files_sink/${uuid0} >/dev/null || echo "exit 0" >/dev/null
-- result:
0

-- !result
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- name: test_select_into_outfile_csv_inconsistent_converter_and_column
create table t1 (k1 int, k2 int, k3 int) distributed by hash(k1) buckets 1;
insert into t1 values(1,1,1), (2,2,2), (2,2,3), (3,3,4), (3,3,5), (3,3,6);

create table t2 (k1 int, k2 int, k3 int) distributed by hash(k1) buckets 1;
insert into t2 values(1,1,1), (2,2,2), (2,2,3), (3,3,4), (3,3,5), (3,3,6);

select t1.k1, t1.k2, count(distinct t1.k3) as k33 from t1 left join t2 on t1.k1 = t2.k1 group by t1.k1,t1.k2 order by k33 into outfile "oss://${oss_bucket}/test_sink/test_files_sink/${uuid0}/" format as csv;
shell: ossutil64 cat oss://${oss_bucket}/test_sink/test_files_sink/${uuid0}/0.csv | head -3

shell: ossutil64 rm -rf oss://${oss_bucket}/test_sink/test_files_sink/${uuid0} >/dev/null || echo "exit 0" >/dev/null

0 comments on commit 746ae5e

Please sign in to comment.