Skip to content

Commit

Permalink
Fix YSON codec to support complex YT types (ydb-platform#6396)
Browse files Browse the repository at this point in the history
  • Loading branch information
avevad authored Jul 9, 2024
1 parent cc8a4ed commit 2bbdb0f
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 105 deletions.
261 changes: 173 additions & 88 deletions ydb/library/yql/providers/common/codec/yql_codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -800,65 +800,102 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
case TType::EKind::Variant: {
auto varType = static_cast<TVariantType*>(type);
auto underlyingType = varType->GetUnderlyingType();
if (cmd == StringMarker) {
YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type");
auto name = ReadNextString(cmd, buf);
auto index = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name);
YQL_ENSURE(index, "Unexpected member: " << name);
YQL_ENSURE(static_cast<TStructType*>(underlyingType)->GetMemberType(*index)->IsVoid(), "Expected Void as underlying type");
return holderFactory.CreateVariantHolder(NUdf::TUnboxedValuePod::Zero(), *index);
}

CHECK_EXPECTED(cmd, BeginListSymbol);
cmd = buf.Read();
i64 index = 0;
if (isTableFormat) {
YQL_ENSURE(cmd == Int64Marker || cmd == Uint64Marker);
if (cmd == Uint64Marker) {
index = buf.ReadVarUI64();
if (isTableFormat && (nativeYtTypeFlags & NTCF_COMPLEX)) {
CHECK_EXPECTED(cmd, BeginListSymbol);
cmd = buf.Read();
TType* type = nullptr;
i64 index = 0;
if (cmd == StringMarker) {
YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type");
auto structType = static_cast<TStructType*>(underlyingType);
auto nameBuffer = ReadNextString(cmd, buf);
auto foundIndex = structType->FindMemberIndex(nameBuffer);
YQL_ENSURE(foundIndex.Defined(), "Unexpected member: " << nameBuffer);
index = *foundIndex;
type = varType->GetAlternativeType(index);
} else {
index = buf.ReadVarI64();
YQL_ENSURE(cmd == Int64Marker || cmd == Uint64Marker);
YQL_ENSURE(underlyingType->IsTuple(), "Expected tuple as underlying type");
if (cmd == Uint64Marker) {
index = buf.ReadVarUI64();
} else {
index = buf.ReadVarI64();
}
YQL_ENSURE(0 <= index && index < varType->GetAlternativesCount(), "Unexpected member index: " << index);
type = varType->GetAlternativeType(index);
}
} else {
if (cmd == BeginListSymbol) {
cmd = buf.Read();
CHECK_EXPECTED(cmd, ListItemSeparatorSymbol);
cmd = buf.Read();
auto value = ReadYsonValue(type, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
cmd = buf.Read();
if (cmd != EndListSymbol) {
CHECK_EXPECTED(cmd, ListItemSeparatorSymbol);
cmd = buf.Read();
CHECK_EXPECTED(cmd, EndListSymbol);
}
return holderFactory.CreateVariantHolder(value.Release(), index);
} else {
if (cmd == StringMarker) {
YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type");
auto name = ReadNextString(cmd, buf);
auto foundIndex = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name);
YQL_ENSURE(foundIndex, "Unexpected member: " << name);
index = *foundIndex;
cmd = buf.Read();
if (cmd == ListItemSeparatorSymbol) {
cmd = buf.Read();
}
auto index = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name);
YQL_ENSURE(index, "Unexpected member: " << name);
YQL_ENSURE(static_cast<TStructType*>(underlyingType)->GetMemberType(*index)->IsVoid(), "Expected Void as underlying type");
return holderFactory.CreateVariantHolder(NUdf::TUnboxedValuePod::Zero(), *index);
}

CHECK_EXPECTED(cmd, EndListSymbol);
CHECK_EXPECTED(cmd, BeginListSymbol);
cmd = buf.Read();
i64 index = 0;
if (isTableFormat) {
YQL_ENSURE(cmd == Int64Marker || cmd == Uint64Marker);
if (cmd == Uint64Marker) {
index = buf.ReadVarUI64();
} else {
index = buf.ReadVarI64();
}
} else {
index = ReadNextSerializedNumber<ui64>(cmd, buf);
if (cmd == BeginListSymbol) {
cmd = buf.Read();
YQL_ENSURE(underlyingType->IsStruct(), "Expected struct as underlying type");
auto name = ReadNextString(cmd, buf);
auto foundIndex = static_cast<TStructType*>(underlyingType)->FindMemberIndex(name);
YQL_ENSURE(foundIndex, "Unexpected member: " << name);
index = *foundIndex;
cmd = buf.Read();
if (cmd == ListItemSeparatorSymbol) {
cmd = buf.Read();
}

CHECK_EXPECTED(cmd, EndListSymbol);
} else {
index = ReadNextSerializedNumber<ui64>(cmd, buf);
}
}
}

YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " <<
varType->GetAlternativesCount() << " are available");
YQL_ENSURE(underlyingType->IsTuple() || underlyingType->IsStruct(), "Wrong underlying type");
TType* itemType;
if (underlyingType->IsTuple()) {
itemType = static_cast<TTupleType*>(underlyingType)->GetElementType(index);
}
else {
itemType = static_cast<TStructType*>(underlyingType)->GetMemberType(index);
}
YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " <<
varType->GetAlternativesCount() << " are available");
YQL_ENSURE(underlyingType->IsTuple() || underlyingType->IsStruct(), "Wrong underlying type");
TType* itemType;
if (underlyingType->IsTuple()) {
itemType = static_cast<TTupleType*>(underlyingType)->GetElementType(index);
}
else {
itemType = static_cast<TStructType*>(underlyingType)->GetMemberType(index);
}

EXPECTED(buf, ListItemSeparatorSymbol);
cmd = buf.Read();
auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
cmd = buf.Read();
if (cmd == ListItemSeparatorSymbol) {
EXPECTED(buf, ListItemSeparatorSymbol);
cmd = buf.Read();
}
auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
cmd = buf.Read();
if (cmd == ListItemSeparatorSymbol) {
cmd = buf.Read();
}

CHECK_EXPECTED(cmd, EndListSymbol);
return holderFactory.CreateVariantHolder(value.Release(), index);
CHECK_EXPECTED(cmd, EndListSymbol);
return holderFactory.CreateVariantHolder(value.Release(), index);
}
}

case TType::EKind::Data: {
Expand Down Expand Up @@ -1189,7 +1226,7 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
if (pos && cmd != '#') {
auto memberType = structType->GetMemberType(*pos);
auto unwrappedType = memberType;
if (isTableFormat && unwrappedType->IsOptional()) {
if (!(nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) && isTableFormat && unwrappedType->IsOptional()) {
unwrappedType = static_cast<TOptionalType*>(unwrappedType)->GetItemType();
}

Expand Down Expand Up @@ -1242,26 +1279,41 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
if (cmd == EntitySymbol) {
return NUdf::TUnboxedValuePod();
}

auto itemType = static_cast<TOptionalType*>(type)->GetItemType();
if (cmd != BeginListSymbol) {
auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
return value.Release().MakeOptional();
}
if (isTableFormat && (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX)) {
if (itemType->GetKind() == TType::EKind::Optional) {
CHECK_EXPECTED(cmd, BeginListSymbol);
cmd = buf.Read();
auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
cmd = buf.Read();
if (cmd == ListItemSeparatorSymbol) {
cmd = buf.Read();
}
CHECK_EXPECTED(cmd, EndListSymbol);
return value.Release().MakeOptional();
} else {
return ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat).Release().MakeOptional();
}
} else {
if (cmd != BeginListSymbol) {
auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
return value.Release().MakeOptional();
}

cmd = buf.Read();
if (cmd == EndListSymbol) {
return NUdf::TUnboxedValuePod();
}
cmd = buf.Read();
if (cmd == EndListSymbol) {
return NUdf::TUnboxedValuePod();
}

auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
cmd = buf.Read();
if (cmd == ListItemSeparatorSymbol) {
auto value = ReadYsonValue(itemType, nativeYtTypeFlags, holderFactory, cmd, buf, isTableFormat);
cmd = buf.Read();
}
if (cmd == ListItemSeparatorSymbol) {
cmd = buf.Read();
}

CHECK_EXPECTED(cmd, EndListSymbol);
return value.Release().MakeOptional();
CHECK_EXPECTED(cmd, EndListSymbol);
return value.Release().MakeOptional();
}
}

case TType::EKind::Dict: {
Expand Down Expand Up @@ -2067,14 +2119,10 @@ void WriteYsonValueInTableFormat(TOutputBuf& buf, TType* type, ui64 nativeYtType
switch (type->GetKind()) {
case TType::EKind::Variant: {
buf.Write(BeginListSymbol);
buf.Write(Uint64Marker);
auto index = value.GetVariantIndex();
buf.WriteVarUI64(index);
buf.Write(ListItemSeparatorSymbol);
auto varType = static_cast<TVariantType*>(type);
auto underlyingType = varType->GetUnderlyingType();
YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " <<
varType->GetAlternativesCount() << " are available");
auto index = value.GetVariantIndex();
YQL_ENSURE(index < varType->GetAlternativesCount(), "Bad variant alternative: " << index << ", only " << varType->GetAlternativesCount() << " are available");
YQL_ENSURE(underlyingType->IsTuple() || underlyingType->IsStruct(), "Wrong underlying type");
TType* itemType;
if (underlyingType->IsTuple()) {
Expand All @@ -2083,7 +2131,17 @@ void WriteYsonValueInTableFormat(TOutputBuf& buf, TType* type, ui64 nativeYtType
else {
itemType = static_cast<TStructType*>(underlyingType)->GetMemberType(index);
}

if (!(nativeYtTypeFlags & NTCF_COMPLEX) || underlyingType->IsTuple()) {
buf.Write(Uint64Marker);
buf.WriteVarUI64(index);
} else {
auto structType = static_cast<TStructType*>(underlyingType);
auto varName = structType->GetMemberName(index);
buf.Write(StringMarker);
buf.WriteVarI32(varName.size());
buf.WriteMany(varName);
}
buf.Write(ListItemSeparatorSymbol);
WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetVariantItem(), false);
buf.Write(ListItemSeparatorSymbol);
buf.Write(EndListSymbol);
Expand Down Expand Up @@ -2315,13 +2373,26 @@ void WriteYsonValueInTableFormat(TOutputBuf& buf, TType* type, ui64 nativeYtType

case TType::EKind::Struct: {
auto structType = static_cast<TStructType*>(type);
buf.Write(BeginListSymbol);
for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
WriteYsonValueInTableFormat(buf, structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), false);
buf.Write(ListItemSeparatorSymbol);
if (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) {
buf.Write(BeginMapSymbol);
for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
buf.Write(StringMarker);
auto key = structType->GetMemberName(i);
buf.WriteVarI32(key.Size());
buf.WriteMany(key);
buf.Write(KeyValueSeparatorSymbol);
WriteYsonValueInTableFormat(buf, structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), false);
buf.Write(KeyedItemSeparatorSymbol);
}
buf.Write(EndMapSymbol);
} else {
buf.Write(BeginListSymbol);
for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
WriteYsonValueInTableFormat(buf, structType->GetMemberType(i), nativeYtTypeFlags, value.GetElement(i), false);
buf.Write(ListItemSeparatorSymbol);
}
buf.Write(EndListSymbol);
}

buf.Write(EndListSymbol);
break;
}

Expand All @@ -2339,22 +2410,36 @@ void WriteYsonValueInTableFormat(TOutputBuf& buf, TType* type, ui64 nativeYtType

case TType::EKind::Optional: {
auto itemType = static_cast<TOptionalType*>(type)->GetItemType();
if (!value) {
if (topLevel) {
buf.Write(BeginListSymbol);
buf.Write(EndListSymbol);
if (nativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) {
if (value) {
if (itemType->GetKind() == TType::EKind::Optional) {
buf.Write(BeginListSymbol);
}
WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetOptionalValue(), false);
if (itemType->GetKind() == TType::EKind::Optional) {
buf.Write(ListItemSeparatorSymbol);
buf.Write(EndListSymbol);
}
} else {
buf.Write(EntitySymbol);
}
} else {
if (!value) {
if (topLevel) {
buf.Write(BeginListSymbol);
buf.Write(EndListSymbol);
}
else {
buf.Write(EntitySymbol);
}
}
else {
buf.Write(EntitySymbol);
buf.Write(BeginListSymbol);
WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetOptionalValue(), false);
buf.Write(ListItemSeparatorSymbol);
buf.Write(EndListSymbol);
}
}
else {
buf.Write(BeginListSymbol);
WriteYsonValueInTableFormat(buf, itemType, nativeYtTypeFlags, value.GetOptionalValue(), false);
buf.Write(ListItemSeparatorSymbol);
buf.Write(EndListSymbol);
}

break;
}

Expand Down
15 changes: 13 additions & 2 deletions ydb/library/yql/providers/yt/codec/yt_codec_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -887,8 +887,8 @@ class TYsonDecoder: public TMkqlReaderImpl::TDecoder {
return NUdf::TUnboxedValue();
}
auto& decoder = *SpecsCache_.GetSpecs().Inputs[TableIndex_];
auto val = ReadYsonValue(uwrappedType, decoder.NativeYtTypeFlags, SpecsCache_.GetHolderFactory(), cmd, Buf_, true);
return val.Release().MakeOptional();
auto val = ReadYsonValue((decoder.NativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) ? type : uwrappedType, decoder.NativeYtTypeFlags, SpecsCache_.GetHolderFactory(), cmd, Buf_, true);
return (decoder.NativeYtTypeFlags & ENativeTypeCompatFlags::NTCF_COMPLEX) ? val : val.Release().MakeOptional();
} else {
if (Y_LIKELY(cmd != EntitySymbol)) {
auto& decoder = *SpecsCache_.GetSpecs().Inputs[TableIndex_];
Expand Down Expand Up @@ -1759,9 +1759,20 @@ class TYsonEncoder: public TMkqlWriterImpl::TEncoder {
Buf_.WriteVarI32(field.Name.size());
Buf_.WriteMany(field.Name.data(), field.Name.size());
Buf_.Write(KeyValueSeparatorSymbol);

bool isOptionalFieldTypeV3 = field.Optional && (NativeYtTypeFlags_ & ENativeTypeCompatFlags::NTCF_COMPLEX);
bool wrapOptionalTypeV3 = isOptionalFieldTypeV3 && field.Type->GetKind() == TTypeBase::EKind::Optional;
if (wrapOptionalTypeV3) {
Buf_.Write(BeginListSymbol);
}

WriteYsonValueInTableFormat(Buf_, field.Type, NativeYtTypeFlags_, std::move(value), true);

if (wrapOptionalTypeV3) {
Buf_.Write(ListItemSeparatorSymbol);
Buf_.Write(EndListSymbol);
}

Buf_.Write(KeyedItemSeparatorSymbol);
}
Buf_.Write(EndMapSymbol);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2593,9 +2593,9 @@
],
"test.test[type_v3-insert_struct_v3_with_native--Debug]": [
{
"checksum": "a9d8d4df01650c1a934396299ba0f22c",
"checksum": "bafb7479c4d60b703c509ef3a5d162e6",
"size": 5262,
"uri": "https://{canondata_backend}/1942415/ad5ade59004e97b62f357b05c8baba968f696aa5/resource.tar.gz#test.test_type_v3-insert_struct_v3_with_native--Debug_/opt.yql_patched"
"uri": "https://{canondata_backend}/1899731/86910ab4cb76dcfc4b917458564f186e26f037d3/resource.tar.gz#test.test_type_v3-insert_struct_v3_with_native--Debug_/opt.yql_patched"
}
],
"test.test[type_v3-insert_struct_v3_with_native--Plan]": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2848,9 +2848,9 @@
],
"test.test[type_v3-append_struct-default.txt-Debug]": [
{
"checksum": "c1e215599582a1caaabbc23cd6ba0411",
"size": 4354,
"uri": "https://{canondata_backend}/1889210/2fbf7f68942208b15ab6eb23b14b78640f078541/resource.tar.gz#test.test_type_v3-append_struct-default.txt-Debug_/opt.yql_patched"
"checksum": "62c8aaaa648cae9143c87902a7b67c37",
"size": 4351,
"uri": "https://{canondata_backend}/1925821/68e7a27709136850ff3e2dbf29d04ceac681d1a8/resource.tar.gz#test.test_type_v3-append_struct-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[type_v3-append_struct-default.txt-Plan]": [
Expand Down
Loading

0 comments on commit 2bbdb0f

Please sign in to comment.