From 7eca2bb871b003f2e122fdc9631ebcc83aec86da Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Sat, 27 Apr 2024 13:33:06 +0300 Subject: [PATCH] Fix columnshard schemaversion (#4152) --- .../kqp/executer_actor/kqp_data_executer.cpp | 6 +- .../kqp/executer_actor/kqp_table_resolver.cpp | 65 ++++++++++--------- .../kqp/executer_actor/kqp_tasks_graph.cpp | 13 ++++ ydb/core/kqp/gateway/kqp_metadata_loader.cpp | 1 - ydb/core/kqp/opt/kqp_opt_effects.cpp | 15 ++++- ydb/core/kqp/runtime/kqp_write_actor.cpp | 4 +- ydb/core/kqp/session_actor/kqp_query_state.h | 9 +++ ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 25 ------- ydb/core/tx/scheme_board/cache.cpp | 3 +- 9 files changed, 74 insertions(+), 67 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 4367c860e96e..43859f56f3d0 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1891,12 +1891,12 @@ class TKqpDataExecuter : public TKqpExecuterBase(ShardsOnNode.size(), ResourceSnapshot.size())); } else if (stageInfo.Meta.IsSysView()) { BuildSysViewScanTasks(stageInfo); + } else if (stageInfo.Meta.ShardOperations.empty() || stage.SinksSize() > 0) { + BuildComputeTasks(stageInfo, std::max(ShardsOnNode.size(), ResourceSnapshot.size())); } else { BuildDatashardTasks(stageInfo); } diff --git a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp index 539bf9a57051..c0a7f214c31a 100644 --- a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp +++ b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp @@ -83,6 +83,7 @@ class TKqpTableResolver : public TActorBootstrapped { << "Failed to resolve table " << entry.TableId << " keys: " << entry.Status << '.')); return; } + for (auto stageId : stageIds) { TasksGraph.GetStageInfo(stageId).Meta.ColumnTableInfoPtr = entry.ColumnTableInfo; } @@ -155,39 +156,41 @@ class TKqpTableResolver : public TActorBootstrapped { for (auto& pair : TasksGraph.GetStagesInfo()) { auto& stageInfo = pair.second; + if (!stageInfo.Meta.ShardOperations.empty()) { YQL_ENSURE(stageInfo.Meta.TableId); - YQL_ENSURE(stageInfo.Meta.ShardOperations.size() == 1); - auto operation = *stageInfo.Meta.ShardOperations.begin(); - - const auto& tableInfo = stageInfo.Meta.TableConstInfo; - Y_ENSURE(tableInfo); - stageInfo.Meta.TableKind = tableInfo->TableKind; - - stageInfo.Meta.ShardKey = ExtractKey(stageInfo.Meta.TableId, stageInfo.Meta.TableConstInfo, operation); - - if (stageInfo.Meta.TableKind == ETableKind::Olap && TableRequestIds.find(stageInfo.Meta.TableId) == TableRequestIds.end()) { - TableRequestIds[stageInfo.Meta.TableId].emplace_back(pair.first); - auto& entry = requestNavigate->ResultSet.emplace_back(); - entry.TableId = stageInfo.Meta.TableId; - entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; - entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable; - } - - auto& entry = request->ResultSet.emplace_back(std::move(stageInfo.Meta.ShardKey)); - entry.UserData = EncodeStageInfo(stageInfo); - switch (operation) { - case TKeyDesc::ERowOperation::Read: - entry.Access = NACLib::EAccessRights::SelectRow; - break; - case TKeyDesc::ERowOperation::Update: - entry.Access = NACLib::EAccessRights::UpdateRow; - break; - case TKeyDesc::ERowOperation::Erase: - entry.Access = NACLib::EAccessRights::EraseRow; - break; - default: - YQL_ENSURE(false, "Unsupported row operation mode: " << (ui32)operation); + YQL_ENSURE(!stageInfo.Meta.ShardOperations.empty()); + + for (const auto& operation : stageInfo.Meta.ShardOperations) { + const auto& tableInfo = stageInfo.Meta.TableConstInfo; + Y_ENSURE(tableInfo); + stageInfo.Meta.TableKind = tableInfo->TableKind; + + stageInfo.Meta.ShardKey = ExtractKey(stageInfo.Meta.TableId, stageInfo.Meta.TableConstInfo, operation); + + if (stageInfo.Meta.TableKind == ETableKind::Olap && TableRequestIds.find(stageInfo.Meta.TableId) == TableRequestIds.end()) { + TableRequestIds[stageInfo.Meta.TableId].emplace_back(pair.first); + auto& entry = requestNavigate->ResultSet.emplace_back(); + entry.TableId = stageInfo.Meta.TableId; + entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable; + } + + auto& entry = request->ResultSet.emplace_back(std::move(stageInfo.Meta.ShardKey)); + entry.UserData = EncodeStageInfo(stageInfo); + switch (operation) { + case TKeyDesc::ERowOperation::Read: + entry.Access = NACLib::EAccessRights::SelectRow; + break; + case TKeyDesc::ERowOperation::Update: + entry.Access = NACLib::EAccessRights::UpdateRow; + break; + case TKeyDesc::ERowOperation::Erase: + entry.Access = NACLib::EAccessRights::EraseRow; + break; + default: + YQL_ENSURE(false, "Unsupported row operation mode: " << (ui32)operation); + } } } } diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 9dca2bdba8c5..4ebf45030ad9 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -126,6 +126,19 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector()) { + NKikimrKqp::TKqpTableSinkSettings settings; + YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings"); + YQL_ENSURE(sink.GetOutputIndex() == 0); + YQL_ENSURE(stage.SinksSize() == 1); + meta.TableId = MakeTableId(settings.GetTable()); + meta.TablePath = settings.GetTable().GetPath(); + meta.ShardOperations.insert(TKeyDesc::ERowOperation::Update); + meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId); + } + } + bool stageAdded = tasksGraph.AddStageInfo( TStageInfo(stageId, stage.InputsSize() + stage.SourcesSize(), stage.GetOutputsCount(), std::move(meta))); YQL_ENSURE(stageAdded); diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index 8256ea307219..32ec2f48ef84 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -43,7 +43,6 @@ NavigateEntryResult CreateNavigateEntry(const TString& path, } } entry.Path = SplitPath(currentPath); - entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable; entry.SyncVersion = true; entry.ShowPrivatePath = settings.WithPrivateTables_; diff --git a/ydb/core/kqp/opt/kqp_opt_effects.cpp b/ydb/core/kqp/opt/kqp_opt_effects.cpp index 9ed42ac0d3f4..aba0708cd91f 100644 --- a/ydb/core/kqp/opt/kqp_opt_effects.cpp +++ b/ydb/core/kqp/opt/kqp_opt_effects.cpp @@ -321,11 +321,19 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const auto input = program.Body(); if (sinkEffect) { + const auto rowArgument = Build(ctx, node.Pos()) + .Name("row") + .Done(); + stageInput = Build(ctx, node.Pos()) - .Inputs(stage.Inputs()) + .Inputs() + .Add(dqUnion) + .Build() .Program() - .Args(program.Args()) - .Body(input) + .Args({rowArgument}) + .Body() + .Input(rowArgument) + .Build() .Build() .Outputs() .Add() @@ -350,6 +358,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const .SinkIndex().Build("0") .Done(); } else if (InplaceUpdateEnabled(*kqpCtx.Config, table, node.Columns()) && IsMapWrite(table, input, ctx)) { + // TODO: inplace update for sink stageInput = Build(ctx, node.Pos()) .Output() .Stage(stage) diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index aa34b4149188..95fc8adda773 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -645,9 +645,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N { Settings.GetTable().GetOwnerId(), Settings.GetTable().GetTableId(), - SchemeEntry->Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable - ? Settings.GetTable().GetVersion() + 1 // TODO: SchemeShard returns wrong version for columnshard. - : Settings.GetTable().GetVersion() + Settings.GetTable().GetVersion() }, Serializer->GetWriteColumnIds(), payloadIndex, diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 1acd5ab35007..05b89f21185d 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -242,6 +242,15 @@ class TKqpQueryState : public TNonCopyable { addTable(source.GetReadRangesSource().GetTable()); } } + + for (const auto& sink : stage.GetSinks()) { + if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) { + YQL_ENSURE(sink.GetInternalSink().GetSettings().Is()); + NKikimrKqp::TKqpTableSinkSettings settings; + YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings"); + addTable(settings.GetTable()); + } + } } for (const auto& table : phyTx.GetTables()) { diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index b21575fed540..abb948244198 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -2373,14 +2373,6 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { PARTITION BY HASH(Col1) WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10); - CREATE TABLE `/Root/DataShard1` ( - Col1 Uint64 NOT NULL, - Col2 String, - Col3 Int32 NOT NULL, - PRIMARY KEY (Col1) - ) - WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2); - CREATE TABLE `/Root/ColumnShard1` ( Col1 Uint64 NOT NULL, Col2 String, @@ -2398,23 +2390,6 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { ) PARTITION BY HASH(Col1) WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16); - - CREATE TABLE `/Root/ColumnShard3` ( - Col1 Uint64 NOT NULL, - Col2 String, - Col3 Int32 NOT NULL, - PRIMARY KEY (Col1) - ) - PARTITION BY HASH(Col1) - WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16); - - CREATE TABLE `/Root/DataShard2` ( - Col1 Uint64 NOT NULL, - Col2 String, - Col3 Int32 NOT NULL, - PRIMARY KEY (Col1) - ) - WITH (UNIFORM_PARTITIONS = 3, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3); )"; auto result = session.ExecuteSchemeQuery(query).GetValueSync(); diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index ce41d8fd6258..e036e5c9af0a 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -851,6 +851,7 @@ class TSchemeCache: public TMonitorableActor { } } + SchemaVersion = schemaDesc.GetVersion(); KeyColumnTypes.resize(schemaDesc.KeyColumnNamesSize()); for (ui32 i : xrange(schemaDesc.KeyColumnNamesSize())) { auto* pcolid = nameToId.FindPtr(schemaDesc.GetKeyColumnNames(i)); @@ -1778,7 +1779,7 @@ class TSchemeCache: public TMonitorableActor { entry.CreateStep = CreateStep; if (entry.RequestType == TNavigate::TEntry::ERequestType::ByPath) { - if (Kind == TNavigate::KindTable) { + if (Kind == TNavigate::KindTable || Kind == TNavigate::KindColumnTable) { entry.TableId = TTableId(PathId.OwnerId, PathId.LocalPathId, SchemaVersion); } else { entry.TableId = TTableId(PathId.OwnerId, PathId.LocalPathId);