diff --git a/be/src/exec/plain_text_builder.cpp b/be/src/exec/plain_text_builder.cpp index e7215a8481283..60f6cabbbc35f 100644 --- a/be/src/exec/plain_text_builder.cpp +++ b/be/src/exec/plain_text_builder.cpp @@ -3,6 +3,7 @@ #include "plain_text_builder.h" #include "column/chunk.h" +#include "column/column_helper.h" #include "column/const_column.h" #include "exprs/expr.h" #include "exprs/vectorized/column_ref.h" @@ -31,8 +32,13 @@ Status PlainTextBuilder::init() { _converters.reserve(_output_expr_ctxs.size()); for (auto* ctx : _output_expr_ctxs) { const auto& type = ctx->root()->type(); - auto conv = vectorized::csv::get_converter(type, ctx->root()->is_nullable()); - + // in some cases, the nullable property between the column in the chunk and _output_expr_ctxs + // may not be consistent. + // for example: order by limit + left outer join + // select t1.k1, t1.k2, count(distinct t1.k3) as k33 from t1 left join t2 on t1.k1 = t2.k1 + // group by t1.k1,t1.k2 order by k33 + // so we use nullable converter, and process whether the column is nullable in the nullable converter. + auto conv = vectorized::csv::get_converter(type, true); if (conv == nullptr) { return Status::InternalError("No CSV converter for type " + type.debug_string()); } @@ -52,15 +58,22 @@ Status PlainTextBuilder::add_chunk(vectorized::Chunk* chunk) { auto err = strings::Substitute("Unmatched number of columns expected=$0 real=$1", _converters.size(), num_cols); return Status::InternalError(err); } - std::vector columns_raw_ptr; - columns_raw_ptr.reserve(num_cols); + + vectorized::Columns columns; + columns.reserve(num_cols); for (int i = 0; i < num_cols; i++) { auto root = _output_expr_ctxs[i]->root(); if (!root->is_slotref()) { return Status::InternalError("Not slot ref column"); } + auto column_ref = ((vectorized::ColumnRef*)root); - columns_raw_ptr.emplace_back(chunk->get_column_by_slot_id(column_ref->slot_id()).get()); + auto col = chunk->get_column_by_slot_id(column_ref->slot_id()); + if (col == nullptr) { + return Status::InternalError(strings::Substitute("Column not found by slot id %0", column_ref->slot_id())); + } + col = vectorized::ColumnHelper::unfold_const_column(column_ref->type(), num_rows, col); + columns.emplace_back(col); } const std::string& row_delimiter = _options.line_terminated_by; @@ -70,7 +83,7 @@ Status PlainTextBuilder::add_chunk(vectorized::Chunk* chunk) { auto* os = _output_stream.get(); for (size_t row = 0; row < num_rows; row++) { for (size_t col = 0; col < num_cols; col++) { - auto col_ptr = columns_raw_ptr[col]; + auto& col_ptr = columns[col]; RETURN_IF_ERROR(_converters[col]->write_string(os, *col_ptr, row, opts)); RETURN_IF_ERROR(os->write((col == num_cols - 1) ? row_delimiter : column_delimiter)); } diff --git a/be/src/formats/csv/nullable_converter.cpp b/be/src/formats/csv/nullable_converter.cpp index 28beba920c31a..8cec9fb8eb1d2 100644 --- a/be/src/formats/csv/nullable_converter.cpp +++ b/be/src/formats/csv/nullable_converter.cpp @@ -10,25 +10,33 @@ namespace starrocks::vectorized::csv { Status NullableConverter::write_string(OutputStream* os, const Column& column, size_t row_num, const Options& options) const { - auto nullable_column = down_cast(&column); - auto data_column = nullable_column->data_column().get(); - auto null_column = nullable_column->null_column().get(); - if (null_column->get_data()[row_num] != 0) { - return os->write("\\N"); + if (column.is_nullable()) { + auto nullable_column = down_cast(&column); + auto data_column = nullable_column->data_column().get(); + auto null_column = nullable_column->null_column().get(); + if (null_column->get_data()[row_num] != 0) { + return os->write("\\N"); + } else { + return _base_converter->write_string(os, *data_column, row_num, options); + } } else { - return _base_converter->write_string(os, *data_column, row_num, options); + return _base_converter->write_string(os, column, row_num, options); } } Status NullableConverter::write_quoted_string(OutputStream* os, const Column& column, size_t row_num, const Options& options) const { - auto nullable_column = down_cast(&column); - auto data_column = nullable_column->data_column().get(); - auto null_column = nullable_column->null_column().get(); - if (null_column->get_data()[row_num] != 0) { - return os->write("null"); + if (column.is_nullable()) { + auto nullable_column = down_cast(&column); + auto data_column = nullable_column->data_column().get(); + auto null_column = nullable_column->null_column().get(); + if (null_column->get_data()[row_num] != 0) { + return os->write("null"); + } else { + return _base_converter->write_quoted_string(os, *data_column, row_num, options); + } } else { - return _base_converter->write_quoted_string(os, *data_column, row_num, options); + return _base_converter->write_quoted_string(os, column, row_num, options); } } diff --git a/be/test/formats/csv/nullable_converter_test.cpp b/be/test/formats/csv/nullable_converter_test.cpp index 64def2a33efa2..ffbbed7aa382c 100644 --- a/be/test/formats/csv/nullable_converter_test.cpp +++ b/be/test/formats/csv/nullable_converter_test.cpp @@ -86,7 +86,7 @@ TEST_F(NullableConverterTest, test_read_string_invalid_value_as_error) { } // NOLINTNEXTLINE -TEST_F(NullableConverterTest, test_write_string) { +TEST_F(NullableConverterTest, test_write_string_nullable_column) { auto conv = csv::get_converter(_type, true); auto col = ColumnHelper::create_column(_type, true); col->append_datum(Datum()); // null @@ -101,4 +101,20 @@ TEST_F(NullableConverterTest, test_write_string) { ASSERT_EQ("\\N10null10", buff.as_string()); } +// NOLINTNEXTLINE +TEST_F(NullableConverterTest, test_write_string_not_nullable_column) { + auto conv = csv::get_converter(_type, true); + auto col = ColumnHelper::create_column(_type, false); + col->append_datum((int32_t)1); + col->append_datum((int32_t)10); + + csv::OutputStreamString buff; + ASSERT_TRUE(conv->write_string(&buff, *col, 0, Converter::Options()).ok()); + ASSERT_TRUE(conv->write_string(&buff, *col, 1, Converter::Options()).ok()); + ASSERT_TRUE(conv->write_quoted_string(&buff, *col, 0, Converter::Options()).ok()); + ASSERT_TRUE(conv->write_quoted_string(&buff, *col, 1, Converter::Options()).ok()); + ASSERT_TRUE(buff.finalize().ok()); + ASSERT_EQ("110110", buff.as_string()); +} + } // namespace starrocks::vectorized::csv diff --git a/test/sql/test_sink/R/test_select_into_outfile_csv_inconsistent_converter_and_column b/test/sql/test_sink/R/test_select_into_outfile_csv_inconsistent_converter_and_column new file mode 100644 index 0000000000000..5fecd7734b854 --- /dev/null +++ b/test/sql/test_sink/R/test_select_into_outfile_csv_inconsistent_converter_and_column @@ -0,0 +1,31 @@ +-- name: test_select_into_outfile_csv_inconsistent_converter_and_column +create table t1 (k1 int, k2 int, k3 int) distributed by hash(k1) buckets 1; +-- result: +-- !result +insert into t1 values(1,1,1), (2,2,2), (2,2,3), (3,3,4), (3,3,5), (3,3,6); +-- result: +-- !result + +create table t2 (k1 int, k2 int, k3 int) distributed by hash(k1) buckets 1; +-- result: +-- !result +insert into t2 values(1,1,1), (2,2,2), (2,2,3), (3,3,4), (3,3,5), (3,3,6); +-- result: +-- !result + +select t1.k1, t1.k2, count(distinct t1.k3) as k33 from t1 left join t2 on t1.k1 = t2.k1 group by t1.k1,t1.k2 order by k33 into outfile "oss://${oss_bucket}/test_sink/test_files_sink/${uuid0}/" format as csv; +-- result: +-- !result +shell: ossutil64 cat oss://${oss_bucket}/test_sink/test_files_sink/${uuid0}/0.csv | head -3 +-- result: +0 +1 1 1 +2 2 2 +3 3 3 +-- !result + +shell: ossutil64 rm -rf oss://${oss_bucket}/test_sink/test_files_sink/${uuid0} >/dev/null || echo "exit 0" >/dev/null +-- result: +0 + +-- !result diff --git a/test/sql/test_sink/T/test_select_into_outfile_csv_inconsistent_converter_and_column b/test/sql/test_sink/T/test_select_into_outfile_csv_inconsistent_converter_and_column new file mode 100644 index 0000000000000..3937c8d883917 --- /dev/null +++ b/test/sql/test_sink/T/test_select_into_outfile_csv_inconsistent_converter_and_column @@ -0,0 +1,11 @@ +-- name: test_select_into_outfile_csv_inconsistent_converter_and_column +create table t1 (k1 int, k2 int, k3 int) distributed by hash(k1) buckets 1; +insert into t1 values(1,1,1), (2,2,2), (2,2,3), (3,3,4), (3,3,5), (3,3,6); + +create table t2 (k1 int, k2 int, k3 int) distributed by hash(k1) buckets 1; +insert into t2 values(1,1,1), (2,2,2), (2,2,3), (3,3,4), (3,3,5), (3,3,6); + +select t1.k1, t1.k2, count(distinct t1.k3) as k33 from t1 left join t2 on t1.k1 = t2.k1 group by t1.k1,t1.k2 order by k33 into outfile "oss://${oss_bucket}/test_sink/test_files_sink/${uuid0}/" format as csv; +shell: ossutil64 cat oss://${oss_bucket}/test_sink/test_files_sink/${uuid0}/0.csv | head -3 + +shell: ossutil64 rm -rf oss://${oss_bucket}/test_sink/test_files_sink/${uuid0} >/dev/null || echo "exit 0" >/dev/null