Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd committed Jul 17, 2024
1 parent 7e90390 commit e56b5aa
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 89 deletions.
12 changes: 6 additions & 6 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1506,11 +1506,11 @@ class TKqpGatewayProxy : public IKikimrGateway {
}
if (settings.SequenceSettings.DataType) {
if (settings.SequenceSettings.DataType == "int8") {
seqDesc->SetDataType(NKikimrSchemeOp::TSequenceDescription::BIGINT);
seqDesc->SetDataType("pgint8");
} else if (settings.SequenceSettings.DataType == "int4") {
seqDesc->SetDataType(NKikimrSchemeOp::TSequenceDescription::INTEGER);
seqDesc->SetDataType("pgint4");
} else if (settings.SequenceSettings.DataType == "int2") {
seqDesc->SetDataType(NKikimrSchemeOp::TSequenceDescription::SMALLINT);
seqDesc->SetDataType("pgint2");
}
}

Expand Down Expand Up @@ -1635,11 +1635,11 @@ class TKqpGatewayProxy : public IKikimrGateway {
}
if (settings.SequenceSettings.DataType) {
if (settings.SequenceSettings.DataType == "int8") {
seqDesc->SetDataType(NKikimrSchemeOp::TSequenceDescription::BIGINT);
seqDesc->SetDataType("pgint8");
} else if (settings.SequenceSettings.DataType == "int4") {
seqDesc->SetDataType(NKikimrSchemeOp::TSequenceDescription::INTEGER);
seqDesc->SetDataType("pgint4");
} else if (settings.SequenceSettings.DataType == "int2") {
seqDesc->SetDataType(NKikimrSchemeOp::TSequenceDescription::SMALLINT);
seqDesc->SetDataType("pgint2");
}
}

Expand Down
7 changes: 1 addition & 6 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1408,11 +1408,6 @@ message TSequenceDescription {
optional sint64 NextValue = 1;
optional bool NextUsed = 2;
}
enum EDataType {
BIGINT = 0;
INTEGER = 1;
SMALLINT = 2;
}
optional string Name = 1; // mandatory
optional NKikimrProto.TPathID PathId = 2; // sequence path id, assigned by schemeshard
optional uint64 Version = 3; // incremented every time sequence is altered
Expand All @@ -1425,7 +1420,7 @@ message TSequenceDescription {
optional sint64 Increment = 9; // increment at each call, defaults to 1
optional bool Cycle = 10; // true when cycle on overflow is allowed
optional TSetVal SetVal = 11; // SetVal(NextValue, NextUsed) is executed atomically when creating
optional EDataType DataType = 12; // data type of the sequence, defaults to BIGINT
optional string DataType = 12; // data type of the sequence: Int64/pgint8, Int32/pgint4, Int16/pgint2, defaults to Int64/pgint8
}

message TSequenceSharding {
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/datashard/export_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ TMaybe<Ydb::Table::CreateTableRequest> GenYdbScheme(
FillPartitioningSettings(scheme, tableDesc);
FillKeyBloomFilter(scheme, tableDesc);
FillReadReplicasSettings(scheme, tableDesc);
FillSequenceDescription(scheme, tableDesc);

TString error;
Ydb::StatusIds::StatusCode status;
if (!FillSequenceDescription(scheme, tableDesc, status, error)) {
return Nothing();
}

return scheme;
}
Expand Down
85 changes: 63 additions & 22 deletions ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ class TPropose: public TSubOperationState {

std::optional<NKikimrSchemeOp::TSequenceDescription> GetAlterSequenceDescription(
const NKikimrSchemeOp::TSequenceDescription& sequence, const NKikimrSchemeOp::TSequenceDescription& alter,
TString& errStr, NKikimrScheme::EStatus& status) {
const NScheme::TTypeRegistry& typeRegistry, bool pgTypesEnabled,
TString& errStr) {

NKikimrSchemeOp::TSequenceDescription result = sequence;

Expand All @@ -240,28 +241,72 @@ std::optional<NKikimrSchemeOp::TSequenceDescription> GetAlterSequenceDescription
}

i64 dataTypeMaxValue, dataTypeMinValue;
switch (dataType) {
case NKikimrSchemeOp::TSequenceDescription::BIGINT: {
dataTypeMaxValue = Max<i64>();
dataTypeMinValue = Min<i64>();
break;

auto typeName = NMiniKQL::AdaptLegacyYqlType(dataType);
const NScheme::IType* type = typeRegistry.GetType(typeName);
if (type) {
if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) {
errStr = Sprintf("Type '%s' specified for sequence '%s' is no longer supported", dataType.data(), sequence.GetName().data());
return std::nullopt;
}

switch (type->GetTypeId()) {
case NScheme::NTypeIds::Int16: {
dataTypeMaxValue = Max<i16>();
dataTypeMinValue = Min<i16>();
break;
}
case NScheme::NTypeIds::Int32: {
dataTypeMaxValue = Max<i32>();
dataTypeMinValue = Min<i32>();
break;
}
case NScheme::NTypeIds::Int64: {
dataTypeMaxValue = Max<i64>();
dataTypeMinValue = Min<i64>();
break;
}
default: {
errStr = Sprintf("Type '%s' specified for sequence '%s' is not supported", dataType.data(), sequence.GetName().data());
return std::nullopt;
}
}
} else {
auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName);
if (!typeDesc) {
errStr = Sprintf("Type '%s' specified for sequence '%s' is not supported", dataType.data(), sequence.GetName().data());
return std::nullopt;
}
case NKikimrSchemeOp::TSequenceDescription::INTEGER: {
dataTypeMaxValue = Max<i32>();
dataTypeMinValue = Min<i32>();
break;
if (!pgTypesEnabled) {
errStr = Sprintf("Type '%s' specified for sequence '%s', but support for pg types is disabled (EnableTablePgTypes feature flag is off)", dataType.data(), sequence.GetName().data());
return std::nullopt;
}
case NKikimrSchemeOp::TSequenceDescription::SMALLINT: {
dataTypeMaxValue = Max<i16>();
dataTypeMinValue = Min<i16>();
break;
switch (NPg::PgTypeIdFromTypeDesc(typeDesc)) {
case INT2OID: {
dataTypeMaxValue = Max<i16>();
dataTypeMinValue = Min<i16>();
break;
}
case INT4OID: {
dataTypeMaxValue = Max<i32>();
dataTypeMinValue = Min<i32>();
break;
}
case INT8OID: {
dataTypeMaxValue = Max<i64>();
dataTypeMinValue = Min<i64>();
break;
}
default: {
errStr = Sprintf("Type '%s' specified for sequence '%s' is not supported", dataType.data(), sequence.GetName().data());
return std::nullopt;
}
}
}

if (maxValue != Max<i16>() && maxValue != Max<i32>() && maxValue != Max<i64>()) {
if (maxValue > dataTypeMaxValue) {
errStr = Sprintf("MAXVALUE (%ld) is out of range for sequence", maxValue);
status = NKikimrScheme::StatusInvalidParameter;
return std::nullopt;
}
} else {
Expand All @@ -271,7 +316,6 @@ std::optional<NKikimrSchemeOp::TSequenceDescription> GetAlterSequenceDescription
if (minValue != Min<i16>() && minValue != Min<i32>() && minValue != Min<i64>()) {
if (minValue < dataTypeMinValue) {
errStr = Sprintf("MINVALUE (%ld) is out of range for sequence", minValue);
status = NKikimrScheme::StatusInvalidParameter;
return std::nullopt;
}
} else {
Expand All @@ -287,19 +331,16 @@ std::optional<NKikimrSchemeOp::TSequenceDescription> GetAlterSequenceDescription

if (maxValue > dataTypeMaxValue) {
errStr = Sprintf("MAXVALUE (%ld) is out of range for sequence", maxValue);
status = NKikimrScheme::StatusInvalidParameter;
return std::nullopt;
}

if (minValue < dataTypeMinValue) {
errStr = Sprintf("MINVALUE (%ld) is out of range for sequence", minValue);
status = NKikimrScheme::StatusInvalidParameter;
return std::nullopt;
}

if (minValue >= maxValue) {
errStr = Sprintf("MINVALUE (%ld) must be less than MAXVALUE (%ld)", minValue, maxValue);
status = NKikimrScheme::StatusInvalidParameter;
return std::nullopt;
}

Expand All @@ -310,12 +351,10 @@ std::optional<NKikimrSchemeOp::TSequenceDescription> GetAlterSequenceDescription

if (startValue > maxValue) {
errStr = Sprintf("START value (%ld) cannot be greater than MAXVALUE (%ld)", startValue, maxValue);
status = NKikimrScheme::StatusInvalidParameter;
return std::nullopt;
}
if (startValue < minValue) {
errStr = Sprintf("START value (%ld) cannot be less than MINVALUE (%ld)", startValue, minValue);
status = NKikimrScheme::StatusInvalidParameter;
return std::nullopt;
}

Expand Down Expand Up @@ -468,9 +507,11 @@ class TAlterSequence: public TSubOperation {
return result;
}

const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
auto description = GetAlterSequenceDescription(
sequenceInfo->Description, sequenceAlter, errStr, status);
sequenceInfo->Description, sequenceAlter, *typeRegistry, context.SS->EnableTablePgTypes, errStr);
if (!description) {
status = NKikimrScheme::StatusInvalidParameter;
result->SetError(status, errStr);
return result;
}
Expand Down
94 changes: 75 additions & 19 deletions ydb/core/tx/schemeshard/schemeshard__operation_create_sequence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,29 +247,77 @@ class TPropose: public TSubOperationState {
};

// fill sequence description with default values
NKikimrSchemeOp::TSequenceDescription FillSequenceDescription(const NKikimrSchemeOp::TSequenceDescription& descr) {
NKikimrSchemeOp::TSequenceDescription result = descr;

if (!result.HasDataType()) {
result.SetDataType(NKikimrSchemeOp::TSequenceDescription::BIGINT);
std::optional<NKikimrSchemeOp::TSequenceDescription> FillSequenceDescription(const NKikimrSchemeOp::TSequenceDescription& sequence,
const NScheme::TTypeRegistry& typeRegistry, bool pgTypesEnabled,
TString& errStr) {
NKikimrSchemeOp::TSequenceDescription result = sequence;

TString dataType;
if (!sequence.HasDataType()) {
errStr = Sprintf("Type is not specified for sequence '%s'", sequence.GetName().data());
return std::nullopt;
}

i64 dataTypeMaxValue, dataTypeMinValue;
switch (result.GetDataType()) {
case NKikimrSchemeOp::TSequenceDescription::BIGINT: {
dataTypeMaxValue = Max<i64>();
dataTypeMinValue = Min<i64>();
break;
auto typeName = NMiniKQL::AdaptLegacyYqlType(dataType);
const NScheme::IType* type = typeRegistry.GetType(typeName);
if (type) {
if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) {
errStr = Sprintf("Type '%s' specified for sequence '%s' is no longer supported", dataType.data(), sequence.GetName().data());
return std::nullopt;
}
case NKikimrSchemeOp::TSequenceDescription::INTEGER: {
dataTypeMaxValue = Max<i32>();
dataTypeMinValue = Min<i32>();
break;

switch (type->GetTypeId()) {
case NScheme::NTypeIds::Int16: {
dataTypeMaxValue = Max<i16>();
dataTypeMinValue = Min<i16>();
break;
}
case NScheme::NTypeIds::Int32: {
dataTypeMaxValue = Max<i32>();
dataTypeMinValue = Min<i32>();
break;
}
case NScheme::NTypeIds::Int64: {
dataTypeMaxValue = Max<i64>();
dataTypeMinValue = Min<i64>();
break;
}
default: {
errStr = Sprintf("Type '%s' specified for sequence '%s' is not supported", dataType.data(), sequence.GetName().data());
return std::nullopt;
}
}
} else {
auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName);
if (!typeDesc) {
errStr = Sprintf("Type '%s' specified for sequence '%s' is not supported", dataType.data(), sequence.GetName().data());
return std::nullopt;
}
case NKikimrSchemeOp::TSequenceDescription::SMALLINT: {
dataTypeMaxValue = Max<i16>();
dataTypeMinValue = Min<i16>();
break;
if (!pgTypesEnabled) {
errStr = Sprintf("Type '%s' specified for sequence '%s', but support for pg types is disabled (EnableTablePgTypes feature flag is off)", dataType.data(), sequence.GetName().data());
return std::nullopt;
}
switch (NPg::PgTypeIdFromTypeDesc(typeDesc)) {
case INT2OID: {
dataTypeMaxValue = Max<i16>();
dataTypeMinValue = Min<i16>();
break;
}
case INT4OID: {
dataTypeMaxValue = Max<i32>();
dataTypeMinValue = Min<i32>();
break;
}
case INT8OID: {
dataTypeMaxValue = Max<i64>();
dataTypeMinValue = Min<i64>();
break;
}
default: {
errStr = Sprintf("Type '%s' specified for sequence '%s' is not supported", dataType.data(), sequence.GetName().data());
return std::nullopt;
}
}
}

Expand Down Expand Up @@ -511,7 +559,15 @@ class TCreateSequence : public TSubOperation {

TSequenceInfo::TPtr sequenceInfo = new TSequenceInfo(0);
TSequenceInfo::TPtr alterData = sequenceInfo->CreateNextVersion();
alterData->Description = FillSequenceDescription(descr);
const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
auto description = FillSequenceDescription(
descr, *typeRegistry, context.SS->EnableTablePgTypes, errStr);
if (!description) {
status = NKikimrScheme::StatusInvalidParameter;
result->SetError(status, errStr);
return result;
}
alterData->Description = *description;

if (shardsToCreate) {
sequenceShard = context.SS->RegisterShardInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
const auto& fromSequence = column.from_sequence();

auto* seqDesc = indexedTable->MutableSequenceDescription()->Add();
FillSequenceDescription(*seqDesc, fromSequence, status, error);
if (!FillSequenceDescription(*seqDesc, fromSequence, status, error)) {
return nullptr;
}

break;
}
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/tx/sequenceshard/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ namespace NSequenceShard {
using Type = ESequenceState;
static constexpr ESequenceState Default = ESequenceState::Active;
};
struct DataType : Column<11, NScheme::NTypeIds::Uint8> {
using Type = ESequenceState;
static constexpr ESequenceState Default = ESequenceState::Active;
};
struct MovedTo : Column<12, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<OwnerId, LocalId>;
Expand Down
Loading

0 comments on commit e56b5aa

Please sign in to comment.