Skip to content

Commit

Permalink
Add database name to pg_database view (ydb-platform#3453)
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort authored Apr 26, 2024
1 parent 541d23a commit 40891f8
Show file tree
Hide file tree
Showing 25 changed files with 384 additions and 96 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);

IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro
IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
: TBase(executerId, txId, task, std::move(asyncIoFactory), settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena))
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
: TBase(executerId, txId, task, std::move(asyncIoFactory), settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
, ComputeCtx(settings.StatsMode)
, FederatedQuerySetup(federatedQuerySetup)
{
Expand Down Expand Up @@ -280,10 +280,11 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings)
{
return new TKqpComputeActor(executerId, txId, task, std::move(asyncIoFactory),
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup);
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings);
}

} // namespace NKqp
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TKqpComputeActor : public TDqSyncComputeActorBase<TKqpComputeActor> {
IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);

void DoBootstrap();

Expand Down Expand Up @@ -68,7 +68,7 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);

} // namespace NKqp
} // namespace NKikimr
12 changes: 8 additions & 4 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,15 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings)
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
maximalSecretsSnapshotWaitTime, userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
, AsyncIoFactory(std::move(asyncIoFactory))
, EnableOlapSink(enableOlapSink)
, UseEvWrite(useEvWrite)
, FederatedQuerySetup(federatedQuerySetup)
, GUCSettings(GUCSettings)
{
Target = creator;

Expand Down Expand Up @@ -2431,7 +2433,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
.AllowSinglePartitionOpt = singlePartitionOptAllowed,
.UserRequestContext = GetUserRequestContext(),
.FederatedQuerySetup = FederatedQuerySetup,
.OutputChunkMaxSize = Request.OutputChunkMaxSize
.OutputChunkMaxSize = Request.OutputChunkMaxSize,
.GUCSettings = GUCSettings
});

auto err = Planner->PlanExecution();
Expand Down Expand Up @@ -2637,6 +2640,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
bool EnableOlapSink = false;
bool UseEvWrite = false;
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const TGUCSettings::TPtr GUCSettings;

bool HasExternalSources = false;
bool SecretSnapshotRequired = false;
Expand Down Expand Up @@ -2681,11 +2685,11 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig,
std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext,
enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup);
enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings);
}

} // namespace NKqp
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);

IActor* CreateKqpSchemeExecuter(
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
Expand Down
29 changes: 24 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,17 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
YQL_ENSURE(request.LocksOp == ELocksOp::Commit || request.LocksOp == ELocksOp::Rollback);
return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup);
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false,
aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator,
maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr
);
}

TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
Expand All @@ -107,13 +112,27 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
switch (*txsType) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup);
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false,
aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator,
maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr
);

case NKqpProto::TKqpPhyTx::TYPE_SCAN:
return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext, statementResultIndex);
return CreateKqpScanExecuter(
std::move(request), database, userToken, counters, aggregation,
executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext,
statementResultIndex
);

case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup);
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, true,
aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator,
maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex,
federatedQuerySetup, GUCSettings
);

default:
YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1983,7 +1983,7 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);

IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
, UserRequestContext(args.UserRequestContext)
, FederatedQuerySetup(args.FederatedQuerySetup)
, OutputChunkMaxSize(args.OutputChunkMaxSize)
, GUCSettings(std::move(args.GUCSettings))
{
if (!Database) {
// a piece of magic for tests
Expand Down Expand Up @@ -203,6 +204,10 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
request.SetOutputChunkMaxSize(OutputChunkMaxSize);
}

if (GUCSettings) {
request.SetSerializedGUCSettings(GUCSettings->SerializeToString());
}

return result;
}

Expand Down Expand Up @@ -349,7 +354,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool op
limits.MemoryQuotaManager = std::make_shared<NYql::NDq::TGuaranteeQuotaManager>(limit * 2, limit);

auto computeActor = NKikimr::NKqp::CreateKqpComputeActor(ExecuterId, TxId, taskDesc, AsyncIoFactory,
settings, limits, ExecuterSpan.GetTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr(), FederatedQuerySetup);
settings, limits, ExecuterSpan.GetTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr(), FederatedQuerySetup, GUCSettings);

if (optimizeProtoForLocalExecution) {
TVector<google::protobuf::Message*>& taskSourceSettings = static_cast<TKqpComputeActor*>(computeActor)->MutableTaskSourceSettings();
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class TKqpPlanner {
const TIntrusivePtr<TUserRequestContext>& UserRequestContext;
const std::optional<TKqpFederatedQuerySetup>& FederatedQuerySetup;
const ui64 OutputChunkMaxSize = 0;
const TGUCSettings::TPtr GUCSettings;
};

TKqpPlanner(TKqpPlanner::TArgs&& args);
Expand Down Expand Up @@ -124,6 +125,7 @@ class TKqpPlanner {
TIntrusivePtr<TUserRequestContext> UserRequestContext;
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const ui64 OutputChunkMaxSize;
const TGUCSettings::TPtr GUCSettings;

public:
static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
.AllowSinglePartitionOpt = false,
.UserRequestContext = GetUserRequestContext(),
.FederatedQuerySetup = std::nullopt,
.OutputChunkMaxSize = Request.OutputChunkMaxSize
.OutputChunkMaxSize = Request.OutputChunkMaxSize,
.GUCSettings = nullptr
});

LOG_D("Execute scan tx, PendingComputeTasks: " << TasksGraph.GetTasks().size());
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/node_service/kqp_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,13 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
taskCtx.ComputeActorId = Register(computeActor);
info.MutableActorIds().emplace_back(taskCtx.ComputeActorId);
} else {
std::shared_ptr<TGUCSettings> GUCSettings;
if (ev->Get()->Record.HasSerializedGUCSettings()) {
GUCSettings = std::make_shared<TGUCSettings>(ev->Get()->Record.GetSerializedGUCSettings());
}
if (Y_LIKELY(!CaFactory)) {
computeActor = CreateKqpComputeActor(request.Executer, txId, &dqTask, AsyncIoFactory,
runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId), ev->Get()->Arena, FederatedQuerySetup);
runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId), ev->Get()->Arena, FederatedQuerySetup, GUCSettings);
taskCtx.ComputeActorId = Register(computeActor);
} else {
computeActor = CaFactory->CreateKqpComputeActor(request.Executer, txId, &dqTask,
Expand Down
15 changes: 11 additions & 4 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
, QueryServiceConfig(queryServiceConfig)
, MetadataProviderConfig(metadataProviderConfig)
, KqpTempTablesAgentActor(kqpTempTablesAgentActor)
, GUCSettings(std::make_shared<TGUCSettings>())
{
RequestCounters = MakeIntrusive<TKqpRequestCounters>();
RequestCounters->Counters = Counters;
Expand All @@ -185,6 +186,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
FillSettings.RowsLimitPerWrite = Config->_ResultRowsLimit.Get();
FillSettings.Format = IDataProvider::EResultFormat::Custom;
FillSettings.FormatDetails = TString(KikimrMkqlProtoFormat);
FillGUCSettings();

auto optSessionId = TryDecodeYdbSessionId(SessionId);
YQL_ENSURE(optSessionId, "Can't decode ydb session Id");
Expand Down Expand Up @@ -1168,6 +1170,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
return false;
}

void FillGUCSettings() {
if (Settings.Database) {
GUCSettings->Set("ydb_database", Settings.Database.substr(1, Settings.Database.Size() - 1));
}
}

void SendToSchemeExecuter(const TKqpPhyTxHolder::TConstPtr& tx) {
YQL_ENSURE(QueryState);

Expand Down Expand Up @@ -1205,7 +1213,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(),
AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId(), 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()),
QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId),
Settings.TableService.GetEnableOlapSink(), useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup);
Settings.TableService.GetEnableOlapSink(), useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings);

auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);
Expand Down Expand Up @@ -2420,7 +2428,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
bool HasOlapTable = false;
bool HasOltpTable = false;

TGUCSettings::TPtr GUCSettings = std::make_shared<TGUCSettings>();
TGUCSettings::TPtr GUCSettings;
};

} // namespace
Expand All @@ -2436,8 +2444,7 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
{
return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, federatedQuerySetup,
std::move(asyncIoFactory), std::move(moduleResolverState), counters,
queryServiceConfig, metadataProviderConfig, kqpTempTablesAgentActor
);
queryServiceConfig, metadataProviderConfig, kqpTempTablesAgentActor);
}

}
Expand Down
Loading

0 comments on commit 40891f8

Please sign in to comment.