Skip to content

Commit

Permalink
Fix columnshard schemaversion (ydb-platform#4152)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Apr 27, 2024
1 parent 1b8616b commit 7eca2bb
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 67 deletions.
6 changes: 3 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1891,12 +1891,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
default:
YQL_ENSURE(false, "unknown source type");
}
} else if (StreamResult && stageInfo.Meta.IsOlap()) {
} else if (StreamResult && stageInfo.Meta.IsOlap() && stage.SinksSize() == 0) {
BuildScanTasksFromShards(stageInfo);
} else if (stageInfo.Meta.ShardOperations.empty()) {
BuildComputeTasks(stageInfo, std::max<ui32>(ShardsOnNode.size(), ResourceSnapshot.size()));
} else if (stageInfo.Meta.IsSysView()) {
BuildSysViewScanTasks(stageInfo);
} else if (stageInfo.Meta.ShardOperations.empty() || stage.SinksSize() > 0) {
BuildComputeTasks(stageInfo, std::max<ui32>(ShardsOnNode.size(), ResourceSnapshot.size()));
} else {
BuildDatashardTasks(stageInfo);
}
Expand Down
65 changes: 34 additions & 31 deletions ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class TKqpTableResolver : public TActorBootstrapped<TKqpTableResolver> {
<< "Failed to resolve table " << entry.TableId << " keys: " << entry.Status << '.'));
return;
}

for (auto stageId : stageIds) {
TasksGraph.GetStageInfo(stageId).Meta.ColumnTableInfoPtr = entry.ColumnTableInfo;
}
Expand Down Expand Up @@ -155,39 +156,41 @@ class TKqpTableResolver : public TActorBootstrapped<TKqpTableResolver> {

for (auto& pair : TasksGraph.GetStagesInfo()) {
auto& stageInfo = pair.second;

if (!stageInfo.Meta.ShardOperations.empty()) {
YQL_ENSURE(stageInfo.Meta.TableId);
YQL_ENSURE(stageInfo.Meta.ShardOperations.size() == 1);
auto operation = *stageInfo.Meta.ShardOperations.begin();

const auto& tableInfo = stageInfo.Meta.TableConstInfo;
Y_ENSURE(tableInfo);
stageInfo.Meta.TableKind = tableInfo->TableKind;

stageInfo.Meta.ShardKey = ExtractKey(stageInfo.Meta.TableId, stageInfo.Meta.TableConstInfo, operation);

if (stageInfo.Meta.TableKind == ETableKind::Olap && TableRequestIds.find(stageInfo.Meta.TableId) == TableRequestIds.end()) {
TableRequestIds[stageInfo.Meta.TableId].emplace_back(pair.first);
auto& entry = requestNavigate->ResultSet.emplace_back();
entry.TableId = stageInfo.Meta.TableId;
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable;
}

auto& entry = request->ResultSet.emplace_back(std::move(stageInfo.Meta.ShardKey));
entry.UserData = EncodeStageInfo(stageInfo);
switch (operation) {
case TKeyDesc::ERowOperation::Read:
entry.Access = NACLib::EAccessRights::SelectRow;
break;
case TKeyDesc::ERowOperation::Update:
entry.Access = NACLib::EAccessRights::UpdateRow;
break;
case TKeyDesc::ERowOperation::Erase:
entry.Access = NACLib::EAccessRights::EraseRow;
break;
default:
YQL_ENSURE(false, "Unsupported row operation mode: " << (ui32)operation);
YQL_ENSURE(!stageInfo.Meta.ShardOperations.empty());

for (const auto& operation : stageInfo.Meta.ShardOperations) {
const auto& tableInfo = stageInfo.Meta.TableConstInfo;
Y_ENSURE(tableInfo);
stageInfo.Meta.TableKind = tableInfo->TableKind;

stageInfo.Meta.ShardKey = ExtractKey(stageInfo.Meta.TableId, stageInfo.Meta.TableConstInfo, operation);

if (stageInfo.Meta.TableKind == ETableKind::Olap && TableRequestIds.find(stageInfo.Meta.TableId) == TableRequestIds.end()) {
TableRequestIds[stageInfo.Meta.TableId].emplace_back(pair.first);
auto& entry = requestNavigate->ResultSet.emplace_back();
entry.TableId = stageInfo.Meta.TableId;
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable;
}

auto& entry = request->ResultSet.emplace_back(std::move(stageInfo.Meta.ShardKey));
entry.UserData = EncodeStageInfo(stageInfo);
switch (operation) {
case TKeyDesc::ERowOperation::Read:
entry.Access = NACLib::EAccessRights::SelectRow;
break;
case TKeyDesc::ERowOperation::Update:
entry.Access = NACLib::EAccessRights::UpdateRow;
break;
case TKeyDesc::ERowOperation::Erase:
entry.Access = NACLib::EAccessRights::EraseRow;
break;
default:
YQL_ENSURE(false, "Unsupported row operation mode: " << (ui32)operation);
}
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
}
}

for (auto& sink : stage.GetSinks()) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink && sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
YQL_ENSURE(sink.GetOutputIndex() == 0);
YQL_ENSURE(stage.SinksSize() == 1);
meta.TableId = MakeTableId(settings.GetTable());
meta.TablePath = settings.GetTable().GetPath();
meta.ShardOperations.insert(TKeyDesc::ERowOperation::Update);
meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId);
}
}

bool stageAdded = tasksGraph.AddStageInfo(
TStageInfo(stageId, stage.InputsSize() + stage.SourcesSize(), stage.GetOutputsCount(), std::move(meta)));
YQL_ENSURE(stageAdded);
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ NavigateEntryResult CreateNavigateEntry(const TString& path,
}
}
entry.Path = SplitPath(currentPath);

entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable;
entry.SyncVersion = true;
entry.ShowPrivatePath = settings.WithPrivateTables_;
Expand Down
15 changes: 12 additions & 3 deletions ydb/core/kqp/opt/kqp_opt_effects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,19 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
auto input = program.Body();

if (sinkEffect) {
const auto rowArgument = Build<TCoArgument>(ctx, node.Pos())
.Name("row")
.Done();

stageInput = Build<TDqStage>(ctx, node.Pos())
.Inputs(stage.Inputs())
.Inputs()
.Add(dqUnion)
.Build()
.Program()
.Args(program.Args())
.Body(input)
.Args({rowArgument})
.Body<TCoToFlow>()
.Input(rowArgument)
.Build()
.Build()
.Outputs<TDqStageOutputsList>()
.Add<TDqSink>()
Expand All @@ -350,6 +358,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
.SinkIndex().Build("0")
.Done();
} else if (InplaceUpdateEnabled(*kqpCtx.Config, table, node.Columns()) && IsMapWrite(table, input, ctx)) {
// TODO: inplace update for sink
stageInput = Build<TKqpCnMapShard>(ctx, node.Pos())
.Output()
.Stage(stage)
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,9 +645,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
{
Settings.GetTable().GetOwnerId(),
Settings.GetTable().GetTableId(),
SchemeEntry->Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable
? Settings.GetTable().GetVersion() + 1 // TODO: SchemeShard returns wrong version for columnshard.
: Settings.GetTable().GetVersion()
Settings.GetTable().GetVersion()
},
Serializer->GetWriteColumnIds(),
payloadIndex,
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ class TKqpQueryState : public TNonCopyable {
addTable(source.GetReadRangesSource().GetTable());
}
}

for (const auto& sink : stage.GetSinks()) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) {
YQL_ENSURE(sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>());
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
addTable(settings.GetTable());
}
}
}

for (const auto& table : phyTx.GetTables()) {
Expand Down
25 changes: 0 additions & 25 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2373,14 +2373,6 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
PARTITION BY HASH(Col1)
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
CREATE TABLE `/Root/DataShard1` (
Col1 Uint64 NOT NULL,
Col2 String,
Col3 Int32 NOT NULL,
PRIMARY KEY (Col1)
)
WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2);
CREATE TABLE `/Root/ColumnShard1` (
Col1 Uint64 NOT NULL,
Col2 String,
Expand All @@ -2398,23 +2390,6 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
)
PARTITION BY HASH(Col1)
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16);
CREATE TABLE `/Root/ColumnShard3` (
Col1 Uint64 NOT NULL,
Col2 String,
Col3 Int32 NOT NULL,
PRIMARY KEY (Col1)
)
PARTITION BY HASH(Col1)
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16);
CREATE TABLE `/Root/DataShard2` (
Col1 Uint64 NOT NULL,
Col2 String,
Col3 Int32 NOT NULL,
PRIMARY KEY (Col1)
)
WITH (UNIFORM_PARTITIONS = 3, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3);
)";

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/scheme_board/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
}
}

SchemaVersion = schemaDesc.GetVersion();
KeyColumnTypes.resize(schemaDesc.KeyColumnNamesSize());
for (ui32 i : xrange(schemaDesc.KeyColumnNamesSize())) {
auto* pcolid = nameToId.FindPtr(schemaDesc.GetKeyColumnNames(i));
Expand Down Expand Up @@ -1778,7 +1779,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
entry.CreateStep = CreateStep;

if (entry.RequestType == TNavigate::TEntry::ERequestType::ByPath) {
if (Kind == TNavigate::KindTable) {
if (Kind == TNavigate::KindTable || Kind == TNavigate::KindColumnTable) {
entry.TableId = TTableId(PathId.OwnerId, PathId.LocalPathId, SchemaVersion);
} else {
entry.TableId = TTableId(PathId.OwnerId, PathId.LocalPathId);
Expand Down

0 comments on commit 7eca2bb

Please sign in to comment.