From 39db476ca19eb1607bf419ecf39568f42b8955ca Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Fri, 26 Jan 2024 16:49:16 +0300 Subject: [PATCH] s3 arrow converters have been added (#1249) --- ydb/library/yql/providers/s3/actors/ya.make | 1 + .../s3/actors/yql_arrow_column_converters.cpp | 211 ++++++++++++++++++ .../s3/actors/yql_arrow_column_converters.h | 15 ++ .../providers/s3/actors/yql_s3_read_actor.cpp | 75 +------ ydb/tests/fq/s3/test_format_setting.py | 16 +- 5 files changed, 241 insertions(+), 77 deletions(-) create mode 100644 ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp create mode 100644 ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.h diff --git a/ydb/library/yql/providers/s3/actors/ya.make b/ydb/library/yql/providers/s3/actors/ya.make index 86db149454a0..1cb74e6f809c 100644 --- a/ydb/library/yql/providers/s3/actors/ya.make +++ b/ydb/library/yql/providers/s3/actors/ya.make @@ -9,6 +9,7 @@ ADDINCL( YQL_LAST_ABI_VERSION() SRCS( + yql_arrow_column_converters.cpp yql_s3_actors_util.cpp yql_s3_applicator_actor.cpp yql_s3_sink_factory.cpp diff --git a/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp new file mode 100644 index 000000000000..32f647a06ef5 --- /dev/null +++ b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp @@ -0,0 +1,211 @@ +#include "yql_arrow_column_converters.h" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef THROW +#undef THROW +#endif + +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wunused-parameter" + +#include + +#pragma clang diagnostic pop + +namespace { + +#define THROW_ARROW_NOT_OK(status) \ + do \ + { \ + if (::arrow::Status _s = (status); !_s.ok()) \ + throw yexception() << _s.ToString(); \ + } while (false) + +using namespace NYql; +using namespace NKikimr::NMiniKQL; + +template +std::shared_ptr ArrowDate32AsYqlDate(const std::shared_ptr& targetType, const std::shared_ptr& value) { + ::NYql::NUdf::TFixedSizeArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); + ::NYql::NUdf::TFixedSizeBlockReader reader; + for (i64 i = 0; i < value->length(); ++i) { + const NUdf::TBlockItem item = reader.GetItem(*value->data(), i); + if constexpr (isOptional) { + if (!item) { + builder.Add(item); + continue; + } + } else if (!item) { + throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type"); + } + + const i32 v = item.As(); + if (v < 0 || v > ::NYql::NUdf::MAX_DATE) { + throw parquet::ParquetException(TStringBuilder() << "date in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATE << "]: " << v); + } + builder.Add(NUdf::TBlockItem(static_cast(v))); + } + return builder.Build(true).make_array(); +} + +TColumnConverter ArrowDate32AsYqlDate(const std::shared_ptr& targetType, bool isOptional) { + return [targetType, isOptional](const std::shared_ptr& value) { + return isOptional + ? ArrowDate32AsYqlDate(targetType, value) + : ArrowDate32AsYqlDate(targetType, value); + }; +} + +template +std::shared_ptr ArrowStringAsYqlDateTime(const std::shared_ptr& targetType, const std::shared_ptr& value, const NDB::FormatSettings& formatSettings) { + ::NYql::NUdf::TFixedSizeArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); + ::NYql::NUdf::TStringBlockReader reader; + for (i64 i = 0; i < value->length(); ++i) { + NUdf::TBlockItem item = reader.GetItem(*value->data(), i); + + if constexpr (isOptional) { + if (!item) { + builder.Add(item); + continue; + } + } else if (!item) { + throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type"); + } + + auto ref = item.AsStringRef(); + NDB::ReadBufferFromMemory rb{ref.Data(), ref.Size()}; + uint32_t result = 0; + parseImpl(result, rb, nullptr, formatSettings); + builder.Add(NUdf::TBlockItem(static_cast(result))); + } + return builder.Build(true).make_array(); +} + +TColumnConverter ArrowStringAsYqlDateTime(const std::shared_ptr& targetType, bool isOptional, const NDB::FormatSettings& formatSettings) { + return [targetType, isOptional, formatSettings](const std::shared_ptr& value) { + return isOptional + ? ArrowStringAsYqlDateTime(targetType, value, formatSettings) + : ArrowStringAsYqlDateTime(targetType, value, formatSettings); + }; +} + +template +std::shared_ptr ArrowStringAsYqlTimestamp(const std::shared_ptr& targetType, const std::shared_ptr& value, const NDB::FormatSettings& formatSettings) { + ::NYql::NUdf::TFixedSizeArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); + ::NYql::NUdf::TStringBlockReader reader; + for (i64 i = 0; i < value->length(); ++i) { + NUdf::TBlockItem item = reader.GetItem(*value->data(), i); + + if constexpr (isOptional) { + if (!item) { + builder.Add(item); + continue; + } + } else if (!item) { + throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type"); + } + + auto ref = item.AsStringRef(); + NDB::ReadBufferFromMemory rb{ref.Data(), ref.Size()}; + NDB::DateTime64 result = 0; + readTextTimestamp64(result, 0, rb, DateLUT::instance(), formatSettings); + builder.Add(NUdf::TBlockItem(static_cast(result))); + } + return builder.Build(true).make_array(); +} + +TColumnConverter ArrowStringAsYqlTimestamp(const std::shared_ptr& targetType, bool isOptional, const NDB::FormatSettings& formatSettings) { + return [targetType, isOptional, formatSettings](const std::shared_ptr& value) { + return isOptional + ? ArrowStringAsYqlTimestamp(targetType, value, formatSettings) + : ArrowStringAsYqlTimestamp(targetType, value, formatSettings); + }; +} + +TColumnConverter BuildCustomConverter(const std::shared_ptr& originalType, const std::shared_ptr& targetType, TType* yqlType, const NDB::FormatSettings& formatSettings) { + // TODO: support more than 1 optional level + bool isOptional = false; + auto unpackedYqlType = UnpackOptional(yqlType, isOptional); + if (!unpackedYqlType->IsData()) { + return {}; + } + + auto slot = AS_TYPE(TDataType, unpackedYqlType)->GetDataSlot(); + if (!slot) { + return {}; + } + + auto slotItem = *slot; + switch (originalType->id()) { + case arrow::Type::DATE32: + switch (slotItem) { + case NUdf::EDataSlot::Date: + return ArrowDate32AsYqlDate(targetType, isOptional); + default: + return {}; + } + return {}; + case arrow::Type::BINARY: + switch (slotItem) { + case NUdf::EDataSlot::Datetime: + return ArrowStringAsYqlDateTime(targetType, isOptional, formatSettings); + case NUdf::EDataSlot::Timestamp: + return ArrowStringAsYqlTimestamp(targetType, isOptional, formatSettings); + default: + return {}; + } + return {}; + default: + return {}; + } +} + +} + +namespace NYql::NDq { + +TColumnConverter BuildColumnConverter(const std::string& columnName, const std::shared_ptr& originalType, const std::shared_ptr& targetType, TType* yqlType, const NDB::FormatSettings& formatSettings) { + if (yqlType->IsPg()) { + auto pgType = AS_TYPE(TPgType, yqlType); + auto conv = BuildPgColumnConverter(originalType, pgType); + if (!conv) { + ythrow yexception() << "Arrow type: " << originalType->ToString() << + " of field: " << columnName << " isn't compatible to PG type: " << NPg::LookupType(pgType->GetTypeId()).Name; + } + + return conv; + } + + if (auto customConverter = BuildCustomConverter(originalType, targetType, yqlType, formatSettings); customConverter) { + return customConverter; + } + + if (targetType->Equals(originalType)) { + return {}; + } + + YQL_ENSURE(arrow::compute::CanCast(*originalType, *targetType), "Mismatch type for field: " << columnName << ", expected: " + << targetType->ToString() << ", got: " << originalType->ToString()); + + + return [targetType](const std::shared_ptr& value) { + auto res = arrow::compute::Cast(*value, targetType); + THROW_ARROW_NOT_OK(res.status()); + return std::move(res).ValueOrDie(); + }; +} + +} // namespace NYql::NDq diff --git a/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.h b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.h new file mode 100644 index 000000000000..4b85bf7ea2f2 --- /dev/null +++ b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.h @@ -0,0 +1,15 @@ +#pragma once + +#include +#include + +namespace NYql::NDq { + +TColumnConverter BuildColumnConverter( + const std::string& columnName, + const std::shared_ptr& originalType, + const std::shared_ptr& targetType, + NKikimr::NMiniKQL::TType* yqlType, + const NDB::FormatSettings& formatSettings); + +} // namespace NYql::NDq diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 1849aeaafcee..1c58ffe536be 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -33,9 +33,10 @@ #endif +#include "yql_arrow_column_converters.h" +#include "yql_s3_actors_util.h" #include "yql_s3_read_actor.h" #include "yql_s3_source_factory.h" -#include "yql_s3_actors_util.h" #include @@ -1122,76 +1123,6 @@ void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSyste inflightCounter); } -template -std::shared_ptr ArrowDate32AsYqlDate(const std::shared_ptr& targetType, const std::shared_ptr& value) { - ::NYql::NUdf::TFixedSizeArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); - ::NYql::NUdf::TFixedSizeBlockReader reader; - for (i64 i = 0; i < value->length(); ++i) { - const NUdf::TBlockItem item = reader.GetItem(*value->data(), i); - if constexpr (isOptional) { - if (!item) { - builder.Add(item); - continue; - } - } else if (!item) { - throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type"); - } - - const i32 v = item.As(); - if (v < 0 || v > ::NYql::NUdf::MAX_DATE) { - throw parquet::ParquetException(TStringBuilder() << "date in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATE << "]: " << v); - } - builder.Add(NUdf::TBlockItem(static_cast(v))); - } - return builder.Build(true).make_array(); -} - -TColumnConverter BuildColumnConverter(const std::string& columnName, const std::shared_ptr& originalType, const std::shared_ptr& targetType, TType* yqlType) { - if (yqlType->IsPg()) { - auto pgType = AS_TYPE(TPgType, yqlType); - auto conv = BuildPgColumnConverter(originalType, pgType); - if (!conv) { - ythrow yexception() << "Arrow type: " << originalType->ToString() << - " of field: " << columnName << " isn't compatible to PG type: " << NPg::LookupType(pgType->GetTypeId()).Name; - } - - return conv; - } - - if (originalType->id() == arrow::Type::DATE32) { - // TODO: support more than 1 optional level - bool isOptional = false; - auto unpackedYqlType = UnpackOptional(yqlType, isOptional); - - // arrow date -> yql date - if (unpackedYqlType->IsData()) { - auto slot = AS_TYPE(TDataType, unpackedYqlType)->GetDataSlot(); - if (slot == NUdf::EDataSlot::Date) { - return [targetType, isOptional](const std::shared_ptr& value) { - if (isOptional) { - return ArrowDate32AsYqlDate(targetType, value); - } - return ArrowDate32AsYqlDate(targetType, value); - }; - } - } - } - - if (targetType->Equals(originalType)) { - return {}; - } - - YQL_ENSURE(arrow::compute::CanCast(*originalType, *targetType), "Mismatch type for field: " << columnName << ", expected: " - << targetType->ToString() << ", got: " << originalType->ToString()); - - - return [targetType](const std::shared_ptr& value) { - auto res = arrow::compute::Cast(*value, targetType); - THROW_ARROW_NOT_OK(res.status()); - return std::move(res).ValueOrDie(); - }; -} - std::shared_ptr ConvertArrowColumns(std::shared_ptr batch, std::vector& columnConverters) { auto columns = batch->columns(); for (size_t i = 0; i < columnConverters.size(); ++i) { @@ -1571,7 +1502,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { columnIndices.push_back(srcFieldIndex); auto rowSpecColumnIt = ReadSpec->RowSpec.find(targetField->name()); YQL_ENSURE(rowSpecColumnIt != ReadSpec->RowSpec.end(), "Column " << targetField->name() << " not found in row spec"); - columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second)); + columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second, ReadSpec->Settings)); } } diff --git a/ydb/tests/fq/s3/test_format_setting.py b/ydb/tests/fq/s3/test_format_setting.py index cc7fb0b8bfde..f1267348db02 100644 --- a/ydb/tests/fq/s3/test_format_setting.py +++ b/ydb/tests/fq/s3/test_format_setting.py @@ -309,6 +309,7 @@ def create_sink_date_time_binding(self, client, connection_id, prefix, type_form ("timestamp/simple_iso/test.csv", "csv_with_names"), ("timestamp/simple_iso/test.tsv", "tsv_with_names"), ("timestamp/simple_iso/test.json", "json_each_row"), + ("timestamp/simple_iso/test.parquet", "parquet") ]) def test_timestamp_simple_iso(self, kikimr, s3, client, filename, type_format): self.create_bucket_and_upload_file(filename, s3, kikimr) @@ -357,6 +358,7 @@ def test_timestamp_simple_iso_insert(self, kikimr, s3, client, filename, type_fo ("common/simple_posix/test.csv", "csv_with_names"), ("common/simple_posix/test.tsv", "tsv_with_names"), ("common/simple_posix/test.json", "json_each_row"), + ("common/simple_posix/test.parquet", "parquet") ]) def test_timestamp_simple_posix(self, kikimr, s3, client, filename, type_format): self.create_bucket_and_upload_file(filename, s3, kikimr) @@ -380,7 +382,7 @@ def test_timestamp_simple_posix(self, kikimr, s3, client, filename, type_format) @pytest.mark.parametrize("filename, type_format", [ ("common/simple_posix/test.csv", "csv_with_names"), ("common/simple_posix/test.tsv", "tsv_with_names"), - ("common/simple_posix/test.json", "json_each_row"), + ("common/simple_posix/test.json", "json_each_row") ]) def test_timestamp_simple_posix_insert(self, kikimr, s3, client, filename, type_format): self.create_bucket_and_upload_file(filename, s3, kikimr) @@ -404,7 +406,8 @@ def test_timestamp_simple_posix_insert(self, kikimr, s3, client, filename, type_ @pytest.mark.parametrize("filename, type_format", [ ("date_time/simple_iso/test.csv", "csv_with_names"), ("date_time/simple_iso/test.tsv", "tsv_with_names"), - ("date_time/simple_iso/test.json", "json_each_row") + ("date_time/simple_iso/test.json", "json_each_row"), + ("date_time/simple_iso/test.parquet", "parquet") ]) def test_date_time_simple_iso(self, kikimr, s3, client, filename, type_format): self.create_bucket_and_upload_file(filename, s3, kikimr) @@ -452,7 +455,8 @@ def test_date_time_simple_iso_insert(self, kikimr, s3, client, filename, type_fo @pytest.mark.parametrize("filename, type_format", [ ("common/simple_posix/test.csv", "csv_with_names"), ("common/simple_posix/test.tsv", "tsv_with_names"), - ("common/simple_posix/test.json", "json_each_row") + ("common/simple_posix/test.json", "json_each_row"), + ("common/simple_posix/test.parquet", "parquet") ]) def test_date_time_simple_posix(self, kikimr, s3, client, filename, type_format): self.create_bucket_and_upload_file(filename, s3, kikimr) @@ -476,7 +480,8 @@ def test_date_time_simple_posix(self, kikimr, s3, client, filename, type_format) @pytest.mark.parametrize("filename, type_format", [ ("common/simple_posix/test.csv", "csv_with_names"), ("common/simple_posix/test.tsv", "tsv_with_names"), - ("common/simple_posix/test.json", "json_each_row") + ("common/simple_posix/test.json", "json_each_row"), + ("common/simple_posix/test.parquet", "parquet") ]) def test_date_time_simple_posix_insert(self, kikimr, s3, client, filename, type_format): self.create_bucket_and_upload_file(filename, s3, kikimr) @@ -549,7 +554,8 @@ def test_timestamp_simple_format_insert(self, kikimr, s3, client, filename, type @pytest.mark.parametrize("filename, type_format", [ ("common/simple_format/test.csv", "csv_with_names"), ("common/simple_format/test.tsv", "tsv_with_names"), - ("common/simple_format/test.json", "json_each_row") + ("common/simple_format/test.json", "json_each_row"), + ("common/simple_format/test.parquet", "parquet") ]) def test_date_time_simple_format_insert(self, kikimr, s3, client, filename, type_format): self.create_bucket_and_upload_file(filename, s3, kikimr)