diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index be9e4ade5f0f..336462e80b90 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -1506,11 +1506,11 @@ class TKqpGatewayProxy : public IKikimrGateway { } if (settings.SequenceSettings.DataType) { if (settings.SequenceSettings.DataType == "int8") { - seqDesc->SetDataType(NKikimrSchemeOp::TSequenceDescription::BIGINT); + seqDesc->SetDataType("pgint8"); } else if (settings.SequenceSettings.DataType == "int4") { - seqDesc->SetDataType(NKikimrSchemeOp::TSequenceDescription::INTEGER); + seqDesc->SetDataType("pgint4"); } else if (settings.SequenceSettings.DataType == "int2") { - seqDesc->SetDataType(NKikimrSchemeOp::TSequenceDescription::SMALLINT); + seqDesc->SetDataType("pgint2"); } } @@ -1635,11 +1635,11 @@ class TKqpGatewayProxy : public IKikimrGateway { } if (settings.SequenceSettings.DataType) { if (settings.SequenceSettings.DataType == "int8") { - seqDesc->SetDataType(NKikimrSchemeOp::TSequenceDescription::BIGINT); + seqDesc->SetDataType("pgint8"); } else if (settings.SequenceSettings.DataType == "int4") { - seqDesc->SetDataType(NKikimrSchemeOp::TSequenceDescription::INTEGER); + seqDesc->SetDataType("pgint4"); } else if (settings.SequenceSettings.DataType == "int2") { - seqDesc->SetDataType(NKikimrSchemeOp::TSequenceDescription::SMALLINT); + seqDesc->SetDataType("pgint2"); } } diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 3049e19739b1..27e44221474c 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1408,11 +1408,6 @@ message TSequenceDescription { optional sint64 NextValue = 1; optional bool NextUsed = 2; } - enum EDataType { - BIGINT = 0; - INTEGER = 1; - SMALLINT = 2; - } optional string Name = 1; // mandatory optional NKikimrProto.TPathID PathId = 2; // sequence path id, assigned by schemeshard optional uint64 Version = 3; // incremented every time sequence is altered @@ -1425,7 +1420,7 @@ message TSequenceDescription { optional sint64 Increment = 9; // increment at each call, defaults to 1 optional bool Cycle = 10; // true when cycle on overflow is allowed optional TSetVal SetVal = 11; // SetVal(NextValue, NextUsed) is executed atomically when creating - optional EDataType DataType = 12; // data type of the sequence, defaults to BIGINT + optional string DataType = 12; // data type of the sequence: Int64/pgint8, Int32/pgint4, Int16/pgint2, defaults to Int64/pgint8 } message TSequenceSharding { diff --git a/ydb/core/tx/datashard/export_common.cpp b/ydb/core/tx/datashard/export_common.cpp index c72bd371fd7a..cf999f1f76a7 100644 --- a/ydb/core/tx/datashard/export_common.cpp +++ b/ydb/core/tx/datashard/export_common.cpp @@ -66,7 +66,12 @@ TMaybe GenYdbScheme( FillPartitioningSettings(scheme, tableDesc); FillKeyBloomFilter(scheme, tableDesc); FillReadReplicasSettings(scheme, tableDesc); - FillSequenceDescription(scheme, tableDesc); + + TString error; + Ydb::StatusIds::StatusCode status; + if (!FillSequenceDescription(scheme, tableDesc, status, error)) { + return Nothing(); + } return scheme; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp index 1fbc9ed2a27f..1d7a71f7b1d4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp @@ -227,7 +227,8 @@ class TPropose: public TSubOperationState { std::optional GetAlterSequenceDescription( const NKikimrSchemeOp::TSequenceDescription& sequence, const NKikimrSchemeOp::TSequenceDescription& alter, - TString& errStr, NKikimrScheme::EStatus& status) { + const NScheme::TTypeRegistry& typeRegistry, bool pgTypesEnabled, + TString& errStr) { NKikimrSchemeOp::TSequenceDescription result = sequence; @@ -240,28 +241,72 @@ std::optional GetAlterSequenceDescription } i64 dataTypeMaxValue, dataTypeMinValue; - switch (dataType) { - case NKikimrSchemeOp::TSequenceDescription::BIGINT: { - dataTypeMaxValue = Max(); - dataTypeMinValue = Min(); - break; + + auto typeName = NMiniKQL::AdaptLegacyYqlType(dataType); + const NScheme::IType* type = typeRegistry.GetType(typeName); + if (type) { + if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) { + errStr = Sprintf("Type '%s' specified for sequence '%s' is no longer supported", dataType.data(), sequence.GetName().data()); + return std::nullopt; + } + + switch (type->GetTypeId()) { + case NScheme::NTypeIds::Int16: { + dataTypeMaxValue = Max(); + dataTypeMinValue = Min(); + break; + } + case NScheme::NTypeIds::Int32: { + dataTypeMaxValue = Max(); + dataTypeMinValue = Min(); + break; + } + case NScheme::NTypeIds::Int64: { + dataTypeMaxValue = Max(); + dataTypeMinValue = Min(); + break; + } + default: { + errStr = Sprintf("Type '%s' specified for sequence '%s' is not supported", dataType.data(), sequence.GetName().data()); + return std::nullopt; + } + } + } else { + auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName); + if (!typeDesc) { + errStr = Sprintf("Type '%s' specified for sequence '%s' is not supported", dataType.data(), sequence.GetName().data()); + return std::nullopt; } - case NKikimrSchemeOp::TSequenceDescription::INTEGER: { - dataTypeMaxValue = Max(); - dataTypeMinValue = Min(); - break; + if (!pgTypesEnabled) { + errStr = Sprintf("Type '%s' specified for sequence '%s', but support for pg types is disabled (EnableTablePgTypes feature flag is off)", dataType.data(), sequence.GetName().data()); + return std::nullopt; } - case NKikimrSchemeOp::TSequenceDescription::SMALLINT: { - dataTypeMaxValue = Max(); - dataTypeMinValue = Min(); - break; + switch (NPg::PgTypeIdFromTypeDesc(typeDesc)) { + case INT2OID: { + dataTypeMaxValue = Max(); + dataTypeMinValue = Min(); + break; + } + case INT4OID: { + dataTypeMaxValue = Max(); + dataTypeMinValue = Min(); + break; + } + case INT8OID: { + dataTypeMaxValue = Max(); + dataTypeMinValue = Min(); + break; + } + default: { + errStr = Sprintf("Type '%s' specified for sequence '%s' is not supported", dataType.data(), sequence.GetName().data()); + return std::nullopt; + } } } if (maxValue != Max() && maxValue != Max() && maxValue != Max()) { if (maxValue > dataTypeMaxValue) { errStr = Sprintf("MAXVALUE (%ld) is out of range for sequence", maxValue); - status = NKikimrScheme::StatusInvalidParameter; return std::nullopt; } } else { @@ -271,7 +316,6 @@ std::optional GetAlterSequenceDescription if (minValue != Min() && minValue != Min() && minValue != Min()) { if (minValue < dataTypeMinValue) { errStr = Sprintf("MINVALUE (%ld) is out of range for sequence", minValue); - status = NKikimrScheme::StatusInvalidParameter; return std::nullopt; } } else { @@ -287,19 +331,16 @@ std::optional GetAlterSequenceDescription if (maxValue > dataTypeMaxValue) { errStr = Sprintf("MAXVALUE (%ld) is out of range for sequence", maxValue); - status = NKikimrScheme::StatusInvalidParameter; return std::nullopt; } if (minValue < dataTypeMinValue) { errStr = Sprintf("MINVALUE (%ld) is out of range for sequence", minValue); - status = NKikimrScheme::StatusInvalidParameter; return std::nullopt; } if (minValue >= maxValue) { errStr = Sprintf("MINVALUE (%ld) must be less than MAXVALUE (%ld)", minValue, maxValue); - status = NKikimrScheme::StatusInvalidParameter; return std::nullopt; } @@ -310,12 +351,10 @@ std::optional GetAlterSequenceDescription if (startValue > maxValue) { errStr = Sprintf("START value (%ld) cannot be greater than MAXVALUE (%ld)", startValue, maxValue); - status = NKikimrScheme::StatusInvalidParameter; return std::nullopt; } if (startValue < minValue) { errStr = Sprintf("START value (%ld) cannot be less than MINVALUE (%ld)", startValue, minValue); - status = NKikimrScheme::StatusInvalidParameter; return std::nullopt; } @@ -468,9 +507,11 @@ class TAlterSequence: public TSubOperation { return result; } + const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; auto description = GetAlterSequenceDescription( - sequenceInfo->Description, sequenceAlter, errStr, status); + sequenceInfo->Description, sequenceAlter, *typeRegistry, context.SS->EnableTablePgTypes, errStr); if (!description) { + status = NKikimrScheme::StatusInvalidParameter; result->SetError(status, errStr); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp index 5d6839d6c3b1..b0b48d0716ce 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp @@ -247,29 +247,77 @@ class TPropose: public TSubOperationState { }; // fill sequence description with default values -NKikimrSchemeOp::TSequenceDescription FillSequenceDescription(const NKikimrSchemeOp::TSequenceDescription& descr) { - NKikimrSchemeOp::TSequenceDescription result = descr; - - if (!result.HasDataType()) { - result.SetDataType(NKikimrSchemeOp::TSequenceDescription::BIGINT); +std::optional FillSequenceDescription(const NKikimrSchemeOp::TSequenceDescription& sequence, + const NScheme::TTypeRegistry& typeRegistry, bool pgTypesEnabled, + TString& errStr) { + NKikimrSchemeOp::TSequenceDescription result = sequence; + + TString dataType; + if (!sequence.HasDataType()) { + errStr = Sprintf("Type is not specified for sequence '%s'", sequence.GetName().data()); + return std::nullopt; } i64 dataTypeMaxValue, dataTypeMinValue; - switch (result.GetDataType()) { - case NKikimrSchemeOp::TSequenceDescription::BIGINT: { - dataTypeMaxValue = Max(); - dataTypeMinValue = Min(); - break; + auto typeName = NMiniKQL::AdaptLegacyYqlType(dataType); + const NScheme::IType* type = typeRegistry.GetType(typeName); + if (type) { + if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) { + errStr = Sprintf("Type '%s' specified for sequence '%s' is no longer supported", dataType.data(), sequence.GetName().data()); + return std::nullopt; } - case NKikimrSchemeOp::TSequenceDescription::INTEGER: { - dataTypeMaxValue = Max(); - dataTypeMinValue = Min(); - break; + + switch (type->GetTypeId()) { + case NScheme::NTypeIds::Int16: { + dataTypeMaxValue = Max(); + dataTypeMinValue = Min(); + break; + } + case NScheme::NTypeIds::Int32: { + dataTypeMaxValue = Max(); + dataTypeMinValue = Min(); + break; + } + case NScheme::NTypeIds::Int64: { + dataTypeMaxValue = Max(); + dataTypeMinValue = Min(); + break; + } + default: { + errStr = Sprintf("Type '%s' specified for sequence '%s' is not supported", dataType.data(), sequence.GetName().data()); + return std::nullopt; + } + } + } else { + auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName); + if (!typeDesc) { + errStr = Sprintf("Type '%s' specified for sequence '%s' is not supported", dataType.data(), sequence.GetName().data()); + return std::nullopt; } - case NKikimrSchemeOp::TSequenceDescription::SMALLINT: { - dataTypeMaxValue = Max(); - dataTypeMinValue = Min(); - break; + if (!pgTypesEnabled) { + errStr = Sprintf("Type '%s' specified for sequence '%s', but support for pg types is disabled (EnableTablePgTypes feature flag is off)", dataType.data(), sequence.GetName().data()); + return std::nullopt; + } + switch (NPg::PgTypeIdFromTypeDesc(typeDesc)) { + case INT2OID: { + dataTypeMaxValue = Max(); + dataTypeMinValue = Min(); + break; + } + case INT4OID: { + dataTypeMaxValue = Max(); + dataTypeMinValue = Min(); + break; + } + case INT8OID: { + dataTypeMaxValue = Max(); + dataTypeMinValue = Min(); + break; + } + default: { + errStr = Sprintf("Type '%s' specified for sequence '%s' is not supported", dataType.data(), sequence.GetName().data()); + return std::nullopt; + } } } @@ -511,7 +559,15 @@ class TCreateSequence : public TSubOperation { TSequenceInfo::TPtr sequenceInfo = new TSequenceInfo(0); TSequenceInfo::TPtr alterData = sequenceInfo->CreateNextVersion(); - alterData->Description = FillSequenceDescription(descr); + const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; + auto description = FillSequenceDescription( + descr, *typeRegistry, context.SS->EnableTablePgTypes, errStr); + if (!description) { + status = NKikimrScheme::StatusInvalidParameter; + result->SetError(status, errStr); + return result; + } + alterData->Description = *description; if (shardsToCreate) { sequenceShard = context.SS->RegisterShardInfo( diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp index 05b5244adda9..bc9943538e6e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp @@ -53,7 +53,9 @@ THolder CreateTablePropose( const auto& fromSequence = column.from_sequence(); auto* seqDesc = indexedTable->MutableSequenceDescription()->Add(); - FillSequenceDescription(*seqDesc, fromSequence, status, error); + if (!FillSequenceDescription(*seqDesc, fromSequence, status, error)) { + return nullptr; + } break; } diff --git a/ydb/core/tx/sequenceshard/schema.h b/ydb/core/tx/sequenceshard/schema.h index 0c43f3e4709d..08d8be97dd6f 100644 --- a/ydb/core/tx/sequenceshard/schema.h +++ b/ydb/core/tx/sequenceshard/schema.h @@ -43,10 +43,6 @@ namespace NSequenceShard { using Type = ESequenceState; static constexpr ESequenceState Default = ESequenceState::Active; }; - struct DataType : Column<11, NScheme::NTypeIds::Uint8> { - using Type = ESequenceState; - static constexpr ESequenceState Default = ESequenceState::Active; - }; struct MovedTo : Column<12, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey; diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index 3c83ec943457..3e8eca9af5f8 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -1426,7 +1427,7 @@ bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out, return true; } -void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in) { +bool FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in, Ydb::StatusIds::StatusCode& status, TString& error) { THashMap sequences; for (const auto& sequenceDescription : in.GetSequences()) { @@ -1466,20 +1467,54 @@ void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrS } if (sequenceDescription.HasDataType()) { auto* dataType = fromSequence->mutable_data_type(); - switch (sequenceDescription.GetDataType()) { - case NKikimrSchemeOp::TSequenceDescription::BIGINT: { - dataType->set_type_id(Ydb::Type::INT64); - break; + auto* typeDesc = NPg::TypeDescFromPgTypeName(sequenceDescription.GetDataType()); + if (typeDesc) { + auto* pg = dataType->mutable_pg_type(); + auto typeId = NPg::PgTypeIdFromTypeDesc(typeDesc); + switch (typeId) { + case INT2OID: + case INT4OID: + case INT8OID: + break; + default: { + TString sequenceType = NPg::PgTypeNameFromTypeDesc(typeDesc); + status = Ydb::StatusIds::BAD_REQUEST; + error = Sprintf( + "Invalid type name %s for sequence: %s", sequenceType.c_str(), sequenceDescription.GetName().data() + ); + return false; + break; + } } - case NKikimrSchemeOp::TSequenceDescription::INTEGER: { - dataType->set_type_id(Ydb::Type::INT32); - break; + pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc)); + pg->set_type_modifier(NPg::TypeModFromPgTypeName(sequenceDescription.GetDataType())); + pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc)); + pg->set_typlen(0); + pg->set_typmod(0); + } else { + NYql::NProto::TypeIds protoType; + if (!NYql::NProto::TypeIds_Parse(sequenceDescription.GetDataType(), &protoType)) { + status = Ydb::StatusIds::BAD_REQUEST; + error = Sprintf( + "Invalid type name %s for sequence: %s", sequenceDescription.GetDataType().data(), sequenceDescription.GetName().data() + ); + return false; } - case NKikimrSchemeOp::TSequenceDescription::SMALLINT: { - dataType->set_type_id(Ydb::Type::INT16); - break; + switch (protoType) { + case NYql::NProto::TypeIds::Int16: + case NYql::NProto::TypeIds::Int32: + case NYql::NProto::TypeIds::Int64: { + NMiniKQL::ExportPrimitiveTypeToProto(protoType, *dataType); + break; + } + default: { + status = Ydb::StatusIds::BAD_REQUEST; + error = Sprintf( + "Invalid type name %s for sequence: %s", sequenceDescription.GetDataType().data(), sequenceDescription.GetName().data() + ); + return false; + } } - default: break; } } break; @@ -1490,6 +1525,7 @@ void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrS default: break; } } + return true; } bool FillSequenceDescription(NKikimrSchemeOp::TSequenceDescription& out, const Ydb::Table::SequenceDescription& in, Ydb::StatusIds::StatusCode& status, TString& error) { @@ -1525,31 +1561,36 @@ bool FillSequenceDescription(NKikimrSchemeOp::TSequenceDescription& out, const Y } switch (typeInfo.GetTypeId()) { - case NScheme::NTypeIds::Int16: { - out.SetDataType(NKikimrSchemeOp::TSequenceDescription::SMALLINT); - break; - } - case NScheme::NTypeIds::Int32: { - out.SetDataType(NKikimrSchemeOp::TSequenceDescription::INTEGER); - break; - } + case NScheme::NTypeIds::Int16: + case NScheme::NTypeIds::Int32: case NScheme::NTypeIds::Int64: { - out.SetDataType(NKikimrSchemeOp::TSequenceDescription::BIGINT); + out.SetDataType(NScheme::TypeName(typeInfo, typeMod)); break; } case NScheme::NTypeIds::Pg: { - TString sequenceType = NPg::PgTypeNameFromTypeDesc(typeInfo.GetTypeDesc()); - status = Ydb::StatusIds::BAD_REQUEST; - error = Sprintf( - "Invalid type name %s for sequence", sequenceType.c_str() - ); - return false; + switch (NPg::PgTypeIdFromTypeDesc(typeInfo.GetTypeDesc())) { + case INT2OID: + case INT4OID: + case INT8OID: { + out.SetDataType(NScheme::TypeName(typeInfo, typeMod)); + break; + } + default: { + TString sequenceType = NPg::PgTypeNameFromTypeDesc(typeInfo.GetTypeDesc()); + status = Ydb::StatusIds::BAD_REQUEST; + error = Sprintf( + "Invalid type name %s for sequence: %s", sequenceType.c_str(), out.GetName().data() + ); + return false; + } + } + break; } default: { TString sequenceType = NScheme::TypeName(typeInfo.GetTypeId()); status = Ydb::StatusIds::BAD_REQUEST; error = Sprintf( - "Invalid type name %s for sequence", sequenceType.c_str() + "Invalid type name %s for sequence: %s", sequenceType.c_str(), out.GetName().data() ); return false; } diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index 8734e6b3595c..2b605d796e80 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -130,8 +130,8 @@ bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out, // out -void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, - const NKikimrSchemeOp::TTableDescription& in); +bool FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in, + Ydb::StatusIds::StatusCode& status, TString& error); // in bool FillSequenceDescription( diff --git a/ydb/core/ydb_convert/ya.make b/ydb/core/ydb_convert/ya.make index 3cf7bc08e6fd..ad7cde9af58a 100644 --- a/ydb/core/ydb_convert/ya.make +++ b/ydb/core/ydb_convert/ya.make @@ -13,6 +13,7 @@ SRCS( PEERDIR( ydb/core/base ydb/core/engine + ydb/core/formats/arrow/switch ydb/library/yql/core ydb/core/protos ydb/core/scheme