Skip to content

Commit

Permalink
s3 arrow converters have been added (ydb-platform#1249)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored Jan 26, 2024
1 parent 098713b commit 39db476
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 77 deletions.
1 change: 1 addition & 0 deletions ydb/library/yql/providers/s3/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ ADDINCL(
YQL_LAST_ABI_VERSION()

SRCS(
yql_arrow_column_converters.cpp
yql_s3_actors_util.cpp
yql_s3_applicator_actor.cpp
yql_s3_sink_factory.cpp
Expand Down
211 changes: 211 additions & 0 deletions ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
#include "yql_arrow_column_converters.h"

#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_binary.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/cast.h>
#include <contrib/libs/apache/arrow/cpp/src/parquet/exception.h>

#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
#include <ydb/library/yql/parser/pg_catalog/catalog.h>
#include <ydb/library/yql/public/udf/arrow/block_builder.h>
#include <ydb/library/yql/public/udf/arrow/block_item.h>
#include <ydb/library/yql/public/udf/arrow/block_reader.h>
#include <ydb/library/yql/utils/yql_panic.h>

#ifdef THROW
#undef THROW
#endif

#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"

#include <ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsConversion.h>

#pragma clang diagnostic pop

namespace {

#define THROW_ARROW_NOT_OK(status) \
do \
{ \
if (::arrow::Status _s = (status); !_s.ok()) \
throw yexception() << _s.ToString(); \
} while (false)

using namespace NYql;
using namespace NKikimr::NMiniKQL;

template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui16, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
::NYql::NUdf::TFixedSizeBlockReader<i32, isOptional> reader;
for (i64 i = 0; i < value->length(); ++i) {
const NUdf::TBlockItem item = reader.GetItem(*value->data(), i);
if constexpr (isOptional) {
if (!item) {
builder.Add(item);
continue;
}
} else if (!item) {
throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type");
}

const i32 v = item.As<i32>();
if (v < 0 || v > ::NYql::NUdf::MAX_DATE) {
throw parquet::ParquetException(TStringBuilder() << "date in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATE << "]: " << v);
}
builder.Add(NUdf::TBlockItem(static_cast<ui16>(v)));
}
return builder.Build(true).make_array();
}

TColumnConverter ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowDate32AsYqlDate<true>(targetType, value)
: ArrowDate32AsYqlDate<false>(targetType, value);
};
}

template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowStringAsYqlDateTime(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value, const NDB::FormatSettings& formatSettings) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui32, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
::NYql::NUdf::TStringBlockReader<arrow::BinaryType, isOptional, NKikimr::NUdf::EDataSlot::String> reader;
for (i64 i = 0; i < value->length(); ++i) {
NUdf::TBlockItem item = reader.GetItem(*value->data(), i);

if constexpr (isOptional) {
if (!item) {
builder.Add(item);
continue;
}
} else if (!item) {
throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type");
}

auto ref = item.AsStringRef();
NDB::ReadBufferFromMemory rb{ref.Data(), ref.Size()};
uint32_t result = 0;
parseImpl<NDB::DataTypeDateTime>(result, rb, nullptr, formatSettings);
builder.Add(NUdf::TBlockItem(static_cast<ui32>(result)));
}
return builder.Build(true).make_array();
}

TColumnConverter ArrowStringAsYqlDateTime(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, const NDB::FormatSettings& formatSettings) {
return [targetType, isOptional, formatSettings](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowStringAsYqlDateTime<true>(targetType, value, formatSettings)
: ArrowStringAsYqlDateTime<false>(targetType, value, formatSettings);
};
}

template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowStringAsYqlTimestamp(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value, const NDB::FormatSettings& formatSettings) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui64, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
::NYql::NUdf::TStringBlockReader<arrow::BinaryType, isOptional, NKikimr::NUdf::EDataSlot::String> reader;
for (i64 i = 0; i < value->length(); ++i) {
NUdf::TBlockItem item = reader.GetItem(*value->data(), i);

if constexpr (isOptional) {
if (!item) {
builder.Add(item);
continue;
}
} else if (!item) {
throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type");
}

auto ref = item.AsStringRef();
NDB::ReadBufferFromMemory rb{ref.Data(), ref.Size()};
NDB::DateTime64 result = 0;
readTextTimestamp64(result, 0, rb, DateLUT::instance(), formatSettings);
builder.Add(NUdf::TBlockItem(static_cast<ui64>(result)));
}
return builder.Build(true).make_array();
}

TColumnConverter ArrowStringAsYqlTimestamp(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, const NDB::FormatSettings& formatSettings) {
return [targetType, isOptional, formatSettings](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowStringAsYqlTimestamp<true>(targetType, value, formatSettings)
: ArrowStringAsYqlTimestamp<false>(targetType, value, formatSettings);
};
}

TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& originalType, const std::shared_ptr<arrow::DataType>& targetType, TType* yqlType, const NDB::FormatSettings& formatSettings) {
// TODO: support more than 1 optional level
bool isOptional = false;
auto unpackedYqlType = UnpackOptional(yqlType, isOptional);
if (!unpackedYqlType->IsData()) {
return {};
}

auto slot = AS_TYPE(TDataType, unpackedYqlType)->GetDataSlot();
if (!slot) {
return {};
}

auto slotItem = *slot;
switch (originalType->id()) {
case arrow::Type::DATE32:
switch (slotItem) {
case NUdf::EDataSlot::Date:
return ArrowDate32AsYqlDate(targetType, isOptional);
default:
return {};
}
return {};
case arrow::Type::BINARY:
switch (slotItem) {
case NUdf::EDataSlot::Datetime:
return ArrowStringAsYqlDateTime(targetType, isOptional, formatSettings);
case NUdf::EDataSlot::Timestamp:
return ArrowStringAsYqlTimestamp(targetType, isOptional, formatSettings);
default:
return {};
}
return {};
default:
return {};
}
}

}

namespace NYql::NDq {

TColumnConverter BuildColumnConverter(const std::string& columnName, const std::shared_ptr<arrow::DataType>& originalType, const std::shared_ptr<arrow::DataType>& targetType, TType* yqlType, const NDB::FormatSettings& formatSettings) {
if (yqlType->IsPg()) {
auto pgType = AS_TYPE(TPgType, yqlType);
auto conv = BuildPgColumnConverter(originalType, pgType);
if (!conv) {
ythrow yexception() << "Arrow type: " << originalType->ToString() <<
" of field: " << columnName << " isn't compatible to PG type: " << NPg::LookupType(pgType->GetTypeId()).Name;
}

return conv;
}

if (auto customConverter = BuildCustomConverter(originalType, targetType, yqlType, formatSettings); customConverter) {
return customConverter;
}

if (targetType->Equals(originalType)) {
return {};
}

YQL_ENSURE(arrow::compute::CanCast(*originalType, *targetType), "Mismatch type for field: " << columnName << ", expected: "
<< targetType->ToString() << ", got: " << originalType->ToString());


return [targetType](const std::shared_ptr<arrow::Array>& value) {
auto res = arrow::compute::Cast(*value, targetType);
THROW_ARROW_NOT_OK(res.status());
return std::move(res).ValueOrDie();
};
}

} // namespace NYql::NDq
15 changes: 15 additions & 0 deletions ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include <ydb/library/yql/parser/pg_wrapper/interface/arrow.h>
#include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatSettings.h>

namespace NYql::NDq {

TColumnConverter BuildColumnConverter(
const std::string& columnName,
const std::shared_ptr<arrow::DataType>& originalType,
const std::shared_ptr<arrow::DataType>& targetType,
NKikimr::NMiniKQL::TType* yqlType,
const NDB::FormatSettings& formatSettings);

} // namespace NYql::NDq
75 changes: 3 additions & 72 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@

#endif

#include "yql_arrow_column_converters.h"
#include "yql_s3_actors_util.h"
#include "yql_s3_read_actor.h"
#include "yql_s3_source_factory.h"
#include "yql_s3_actors_util.h"

#include <ydb/library/services/services.pb.h>

Expand Down Expand Up @@ -1122,76 +1123,6 @@ void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSyste
inflightCounter);
}

template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui16, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
::NYql::NUdf::TFixedSizeBlockReader<i32, isOptional> reader;
for (i64 i = 0; i < value->length(); ++i) {
const NUdf::TBlockItem item = reader.GetItem(*value->data(), i);
if constexpr (isOptional) {
if (!item) {
builder.Add(item);
continue;
}
} else if (!item) {
throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type");
}

const i32 v = item.As<i32>();
if (v < 0 || v > ::NYql::NUdf::MAX_DATE) {
throw parquet::ParquetException(TStringBuilder() << "date in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATE << "]: " << v);
}
builder.Add(NUdf::TBlockItem(static_cast<ui16>(v)));
}
return builder.Build(true).make_array();
}

TColumnConverter BuildColumnConverter(const std::string& columnName, const std::shared_ptr<arrow::DataType>& originalType, const std::shared_ptr<arrow::DataType>& targetType, TType* yqlType) {
if (yqlType->IsPg()) {
auto pgType = AS_TYPE(TPgType, yqlType);
auto conv = BuildPgColumnConverter(originalType, pgType);
if (!conv) {
ythrow yexception() << "Arrow type: " << originalType->ToString() <<
" of field: " << columnName << " isn't compatible to PG type: " << NPg::LookupType(pgType->GetTypeId()).Name;
}

return conv;
}

if (originalType->id() == arrow::Type::DATE32) {
// TODO: support more than 1 optional level
bool isOptional = false;
auto unpackedYqlType = UnpackOptional(yqlType, isOptional);

// arrow date -> yql date
if (unpackedYqlType->IsData()) {
auto slot = AS_TYPE(TDataType, unpackedYqlType)->GetDataSlot();
if (slot == NUdf::EDataSlot::Date) {
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
if (isOptional) {
return ArrowDate32AsYqlDate<true>(targetType, value);
}
return ArrowDate32AsYqlDate<false>(targetType, value);
};
}
}
}

if (targetType->Equals(originalType)) {
return {};
}

YQL_ENSURE(arrow::compute::CanCast(*originalType, *targetType), "Mismatch type for field: " << columnName << ", expected: "
<< targetType->ToString() << ", got: " << originalType->ToString());


return [targetType](const std::shared_ptr<arrow::Array>& value) {
auto res = arrow::compute::Cast(*value, targetType);
THROW_ARROW_NOT_OK(res.status());
return std::move(res).ValueOrDie();
};
}

std::shared_ptr<arrow::RecordBatch> ConvertArrowColumns(std::shared_ptr<arrow::RecordBatch> batch, std::vector<TColumnConverter>& columnConverters) {
auto columns = batch->columns();
for (size_t i = 0; i < columnConverters.size(); ++i) {
Expand Down Expand Up @@ -1571,7 +1502,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
columnIndices.push_back(srcFieldIndex);
auto rowSpecColumnIt = ReadSpec->RowSpec.find(targetField->name());
YQL_ENSURE(rowSpecColumnIt != ReadSpec->RowSpec.end(), "Column " << targetField->name() << " not found in row spec");
columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second));
columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second, ReadSpec->Settings));
}
}

Expand Down
16 changes: 11 additions & 5 deletions ydb/tests/fq/s3/test_format_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def create_sink_date_time_binding(self, client, connection_id, prefix, type_form
("timestamp/simple_iso/test.csv", "csv_with_names"),
("timestamp/simple_iso/test.tsv", "tsv_with_names"),
("timestamp/simple_iso/test.json", "json_each_row"),
("timestamp/simple_iso/test.parquet", "parquet")
])
def test_timestamp_simple_iso(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down Expand Up @@ -357,6 +358,7 @@ def test_timestamp_simple_iso_insert(self, kikimr, s3, client, filename, type_fo
("common/simple_posix/test.csv", "csv_with_names"),
("common/simple_posix/test.tsv", "tsv_with_names"),
("common/simple_posix/test.json", "json_each_row"),
("common/simple_posix/test.parquet", "parquet")
])
def test_timestamp_simple_posix(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand All @@ -380,7 +382,7 @@ def test_timestamp_simple_posix(self, kikimr, s3, client, filename, type_format)
@pytest.mark.parametrize("filename, type_format", [
("common/simple_posix/test.csv", "csv_with_names"),
("common/simple_posix/test.tsv", "tsv_with_names"),
("common/simple_posix/test.json", "json_each_row"),
("common/simple_posix/test.json", "json_each_row")
])
def test_timestamp_simple_posix_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand All @@ -404,7 +406,8 @@ def test_timestamp_simple_posix_insert(self, kikimr, s3, client, filename, type_
@pytest.mark.parametrize("filename, type_format", [
("date_time/simple_iso/test.csv", "csv_with_names"),
("date_time/simple_iso/test.tsv", "tsv_with_names"),
("date_time/simple_iso/test.json", "json_each_row")
("date_time/simple_iso/test.json", "json_each_row"),
("date_time/simple_iso/test.parquet", "parquet")
])
def test_date_time_simple_iso(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down Expand Up @@ -452,7 +455,8 @@ def test_date_time_simple_iso_insert(self, kikimr, s3, client, filename, type_fo
@pytest.mark.parametrize("filename, type_format", [
("common/simple_posix/test.csv", "csv_with_names"),
("common/simple_posix/test.tsv", "tsv_with_names"),
("common/simple_posix/test.json", "json_each_row")
("common/simple_posix/test.json", "json_each_row"),
("common/simple_posix/test.parquet", "parquet")
])
def test_date_time_simple_posix(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand All @@ -476,7 +480,8 @@ def test_date_time_simple_posix(self, kikimr, s3, client, filename, type_format)
@pytest.mark.parametrize("filename, type_format", [
("common/simple_posix/test.csv", "csv_with_names"),
("common/simple_posix/test.tsv", "tsv_with_names"),
("common/simple_posix/test.json", "json_each_row")
("common/simple_posix/test.json", "json_each_row"),
("common/simple_posix/test.parquet", "parquet")
])
def test_date_time_simple_posix_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down Expand Up @@ -549,7 +554,8 @@ def test_timestamp_simple_format_insert(self, kikimr, s3, client, filename, type
@pytest.mark.parametrize("filename, type_format", [
("common/simple_format/test.csv", "csv_with_names"),
("common/simple_format/test.tsv", "tsv_with_names"),
("common/simple_format/test.json", "json_each_row")
("common/simple_format/test.json", "json_each_row"),
("common/simple_format/test.parquet", "parquet")
])
def test_date_time_simple_format_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down

0 comments on commit 39db476

Please sign in to comment.