From 40891f81fac247f5686535afad0a0e0a5c397c4e Mon Sep 17 00:00:00 2001 From: qrort <31865255+qrort@users.noreply.github.com> Date: Fri, 26 Apr 2024 14:12:10 +0300 Subject: [PATCH] Add database name to pg_database view (#3453) --- .../kqp/compute_actor/kqp_compute_actor.h | 2 +- .../compute_actor/kqp_pure_compute_actor.cpp | 9 +- .../compute_actor/kqp_pure_compute_actor.h | 4 +- .../kqp/executer_actor/kqp_data_executer.cpp | 12 +- ydb/core/kqp/executer_actor/kqp_executer.h | 2 +- .../kqp/executer_actor/kqp_executer_impl.cpp | 29 +++- .../kqp/executer_actor/kqp_executer_impl.h | 2 +- ydb/core/kqp/executer_actor/kqp_planner.cpp | 7 +- ydb/core/kqp/executer_actor/kqp_planner.h | 2 + .../kqp/executer_actor/kqp_scan_executer.cpp | 3 +- .../kqp/node_service/kqp_node_service.cpp | 6 +- .../kqp/session_actor/kqp_session_actor.cpp | 15 ++- ydb/core/kqp/ut/pg/pg_catalog_ut.cpp | 53 ++++++++ ydb/core/protos/kqp.proto | 1 + .../yql/core/pg_settings/guc_settings.cpp | 16 +++ .../yql/core/pg_settings/guc_settings.h | 4 + .../dq/actors/compute/dq_compute_actor_impl.h | 4 +- ydb/library/yql/minikql/mkql_alloc.h | 7 + ydb/library/yql/minikql/ya.make | 1 + .../yql/parser/pg_wrapper/comp_factory.cpp | 117 ++++++++++------ .../yql/parser/pg_wrapper/interface/context.h | 5 + .../yql/parser/pg_wrapper/memory_context.h | 34 +++++ .../yql/parser/pg_wrapper/syscache.cpp | 125 +++++++++++++----- ydb/library/yql/sql/pg/pg_sql.cpp | 10 +- ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp | 10 ++ 25 files changed, 384 insertions(+), 96 deletions(-) create mode 100644 ydb/library/yql/parser/pg_wrapper/memory_context.h diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index 564339c4c6c1..9fabe602e696 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -47,7 +47,7 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, - const std::optional& federatedQuerySetup); + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 1bc343f9f5da..716771c94614 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -14,8 +14,8 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, - const std::optional& federatedQuerySetup) - : TBase(executerId, txId, task, std::move(asyncIoFactory), settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena)) + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) + : TBase(executerId, txId, task, std::move(asyncIoFactory), settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings) , ComputeCtx(settings.StatsMode) , FederatedQuerySetup(federatedQuerySetup) { @@ -280,10 +280,11 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, - const std::optional& federatedQuerySetup) + const std::optional& federatedQuerySetup, + const TGUCSettings::TPtr& GUCSettings) { return new TKqpComputeActor(executerId, txId, task, std::move(asyncIoFactory), - settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup); + settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings); } } // namespace NKqp diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h index f0b68549b882..590a9bcab774 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h @@ -29,7 +29,7 @@ class TKqpComputeActor : public TDqSyncComputeActorBase { IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, - const std::optional& federatedQuerySetup); + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); void DoBootstrap(); @@ -68,7 +68,7 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, - const std::optional& federatedQuerySetup); + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 5897685d94dc..4367c860e96e 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -129,13 +129,15 @@ class TKqpDataExecuter : public TKqpExecuterBase& userRequestContext, - const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup) + const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, + const TGUCSettings::TPtr& GUCSettings) : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, maximalSecretsSnapshotWaitTime, userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult) , AsyncIoFactory(std::move(asyncIoFactory)) , EnableOlapSink(enableOlapSink) , UseEvWrite(useEvWrite) , FederatedQuerySetup(federatedQuerySetup) + , GUCSettings(GUCSettings) { Target = creator; @@ -2431,7 +2433,8 @@ class TKqpDataExecuter : public TKqpExecuterBasePlanExecution(); @@ -2637,6 +2640,7 @@ class TKqpDataExecuter : public TKqpExecuterBase FederatedQuerySetup; + const TGUCSettings::TPtr GUCSettings; bool HasExternalSources = false; bool SecretSnapshotRequired = false; @@ -2681,11 +2685,11 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, - const std::optional& federatedQuerySetup) + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) { return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext, - enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup); + enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index c41cf4414efd..a3ffd0524601 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -104,7 +104,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, - const std::optional& federatedQuerySetup); + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); IActor* CreateKqpSchemeExecuter( TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 5817fc44224f..4e4bbb8e74e5 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -84,12 +84,17 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, - const std::optional& federatedQuerySetup) + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction YQL_ENSURE(request.LocksOp == ELocksOp::Commit || request.LocksOp == ELocksOp::Rollback); - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup); + return CreateKqpDataExecuter( + std::move(request), database, userToken, counters, false, + aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, + maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, + federatedQuerySetup, /*GUCSettings*/nullptr + ); } TMaybe txsType; @@ -107,13 +112,27 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt switch (*txsType) { case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: case NKqpProto::TKqpPhyTx::TYPE_DATA: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup); + return CreateKqpDataExecuter( + std::move(request), database, userToken, counters, false, + aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, + maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, + federatedQuerySetup, /*GUCSettings*/nullptr + ); case NKqpProto::TKqpPhyTx::TYPE_SCAN: - return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext, statementResultIndex); + return CreateKqpScanExecuter( + std::move(request), database, userToken, counters, aggregation, + executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext, + statementResultIndex + ); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup); + return CreateKqpDataExecuter( + std::move(request), database, userToken, counters, true, + aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, + maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, + federatedQuerySetup, GUCSettings + ); default: YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 8f6b007e166c..ab2e544bce8b 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1983,7 +1983,7 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, - const std::optional& federatedQuerySetup); + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 6c3a5ac51d6b..627f6eb4c590 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -79,6 +79,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args) , UserRequestContext(args.UserRequestContext) , FederatedQuerySetup(args.FederatedQuerySetup) , OutputChunkMaxSize(args.OutputChunkMaxSize) + , GUCSettings(std::move(args.GUCSettings)) { if (!Database) { // a piece of magic for tests @@ -203,6 +204,10 @@ std::unique_ptr TKqpPlanner::SerializeReque request.SetOutputChunkMaxSize(OutputChunkMaxSize); } + if (GUCSettings) { + request.SetSerializedGUCSettings(GUCSettings->SerializeToString()); + } + return result; } @@ -349,7 +354,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool op limits.MemoryQuotaManager = std::make_shared(limit * 2, limit); auto computeActor = NKikimr::NKqp::CreateKqpComputeActor(ExecuterId, TxId, taskDesc, AsyncIoFactory, - settings, limits, ExecuterSpan.GetTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr(), FederatedQuerySetup); + settings, limits, ExecuterSpan.GetTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr(), FederatedQuerySetup, GUCSettings); if (optimizeProtoForLocalExecution) { TVector& taskSourceSettings = static_cast(computeActor)->MutableTaskSourceSettings(); diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index b9cb5b6a1c10..fabda6bf1bf9 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -62,6 +62,7 @@ class TKqpPlanner { const TIntrusivePtr& UserRequestContext; const std::optional& FederatedQuerySetup; const ui64 OutputChunkMaxSize = 0; + const TGUCSettings::TPtr GUCSettings; }; TKqpPlanner(TKqpPlanner::TArgs&& args); @@ -124,6 +125,7 @@ class TKqpPlanner { TIntrusivePtr UserRequestContext; const std::optional FederatedQuerySetup; const ui64 OutputChunkMaxSize; + const TGUCSettings::TPtr GUCSettings; public: static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 98ce4d188c42..790488bcb6be 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -338,7 +338,8 @@ class TKqpScanExecuter : public TKqpExecuterBase { taskCtx.ComputeActorId = Register(computeActor); info.MutableActorIds().emplace_back(taskCtx.ComputeActorId); } else { + std::shared_ptr GUCSettings; + if (ev->Get()->Record.HasSerializedGUCSettings()) { + GUCSettings = std::make_shared(ev->Get()->Record.GetSerializedGUCSettings()); + } if (Y_LIKELY(!CaFactory)) { computeActor = CreateKqpComputeActor(request.Executer, txId, &dqTask, AsyncIoFactory, - runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId), ev->Get()->Arena, FederatedQuerySetup); + runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId), ev->Get()->Arena, FederatedQuerySetup, GUCSettings); taskCtx.ComputeActorId = Register(computeActor); } else { computeActor = CaFactory->CreateKqpComputeActor(request.Executer, txId, &dqTask, diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 01a5d3c1d136..2320ffa09ab4 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -174,6 +174,7 @@ class TKqpSessionActor : public TActorBootstrapped { , QueryServiceConfig(queryServiceConfig) , MetadataProviderConfig(metadataProviderConfig) , KqpTempTablesAgentActor(kqpTempTablesAgentActor) + , GUCSettings(std::make_shared()) { RequestCounters = MakeIntrusive(); RequestCounters->Counters = Counters; @@ -185,6 +186,7 @@ class TKqpSessionActor : public TActorBootstrapped { FillSettings.RowsLimitPerWrite = Config->_ResultRowsLimit.Get(); FillSettings.Format = IDataProvider::EResultFormat::Custom; FillSettings.FormatDetails = TString(KikimrMkqlProtoFormat); + FillGUCSettings(); auto optSessionId = TryDecodeYdbSessionId(SessionId); YQL_ENSURE(optSessionId, "Can't decode ydb session Id"); @@ -1168,6 +1170,12 @@ class TKqpSessionActor : public TActorBootstrapped { return false; } + void FillGUCSettings() { + if (Settings.Database) { + GUCSettings->Set("ydb_database", Settings.Database.substr(1, Settings.Database.Size() - 1)); + } + } + void SendToSchemeExecuter(const TKqpPhyTxHolder::TConstPtr& tx) { YQL_ENSURE(QueryState); @@ -1205,7 +1213,7 @@ class TKqpSessionActor : public TActorBootstrapped { RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(), AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId(), 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()), QueryState ? QueryState->UserRequestContext : MakeIntrusive("", Settings.Database, SessionId), - Settings.TableService.GetEnableOlapSink(), useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup); + Settings.TableService.GetEnableOlapSink(), useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); @@ -2420,7 +2428,7 @@ class TKqpSessionActor : public TActorBootstrapped { bool HasOlapTable = false; bool HasOltpTable = false; - TGUCSettings::TPtr GUCSettings = std::make_shared(); + TGUCSettings::TPtr GUCSettings; }; } // namespace @@ -2436,8 +2444,7 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, { return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, federatedQuerySetup, std::move(asyncIoFactory), std::move(moduleResolverState), counters, - queryServiceConfig, metadataProviderConfig, kqpTempTablesAgentActor - ); + queryServiceConfig, metadataProviderConfig, kqpTempTablesAgentActor); } } diff --git a/ydb/core/kqp/ut/pg/pg_catalog_ut.cpp b/ydb/core/kqp/ut/pg/pg_catalog_ut.cpp index c554e7c80770..09fe6f99fc79 100644 --- a/ydb/core/kqp/ut/pg/pg_catalog_ut.cpp +++ b/ydb/core/kqp/ut/pg/pg_catalog_ut.cpp @@ -352,6 +352,59 @@ Y_UNIT_TEST_SUITE(PgCatalog) { UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); } } + + Y_UNIT_TEST(PgDatabase) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + auto db = kikimr.GetQueryClient(); + auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg); + { + auto result = db.ExecuteQuery(R"( + SELECT datname FROM pg_database + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_C(!result.GetResultSets().empty(), "no result sets"); + CompareYson(R"([ + ["template1"];["template0"];["postgres"];["Root"] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + { + auto result = db.ExecuteQuery(R"( + SELECT current_catalog; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_C(!result.GetResultSets().empty(), "no result sets"); + CompareYson(R"([ + ["Root"] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + { + auto result = db.ExecuteQuery(R"( + SELECT current_catalog = current_database(); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_C(!result.GetResultSets().empty(), "no result sets"); + CompareYson(R"([ + ["t"] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(PgRoles) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + auto db = kikimr.GetQueryClient(); + auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg); + { + auto result = db.ExecuteQuery(R"( + SELECT rolname FROM pg_roles + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_C(!result.GetResultSets().empty(), "no result sets"); + // userName not set to GUCSettings yet + CompareYson(R"([ + ["postgres"] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } } } // namespace NKqp diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index cd981044a3dc..7b769718a6b8 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -573,6 +573,7 @@ message TEvStartKqpTasksRequest { optional TKqpSnapshot Snapshot = 5; optional bool StartAllOrFail = 6 [default = true]; optional uint64 OutputChunkMaxSize = 7 [default = 0]; // 0 - use some default value + optional string SerializedGUCSettings = 8; } message TEvStartKqpTasksResponse { diff --git a/ydb/library/yql/core/pg_settings/guc_settings.cpp b/ydb/library/yql/core/pg_settings/guc_settings.cpp index a46b1d5f7bd8..6f49da4bbde8 100644 --- a/ydb/library/yql/core/pg_settings/guc_settings.cpp +++ b/ydb/library/yql/core/pg_settings/guc_settings.cpp @@ -1,4 +1,14 @@ #include "guc_settings.h" +#include + +TGUCSettings::TGUCSettings(const TString &serialized) { + if (!serialized.Empty()) { + NJson::TJsonValue gucJson; + Y_ENSURE(NJson::ReadJsonTree(serialized, &gucJson), "Error parsing GUCSettings"); + this->ImportFromJson(gucJson); + } +} + void TGUCSettings::Setup(const std::unordered_map& runtimeSettings) { RollbackSettings_ = runtimeSettings; @@ -73,6 +83,12 @@ void TGUCSettings::ImportFromJson(const NJson::TJsonValue& value) } } +TString TGUCSettings::SerializeToString() const { + NJson::TJsonValue gucJson; + this->ExportToJson(gucJson); + return WriteJson(gucJson); +} + bool TGUCSettings::operator==(const TGUCSettings& other) const { return Settings_ == other.Settings_ && RollbackSettings_ == other.RollbackSettings_ && diff --git a/ydb/library/yql/core/pg_settings/guc_settings.h b/ydb/library/yql/core/pg_settings/guc_settings.h index 5555c62da95c..91081fcadd1a 100644 --- a/ydb/library/yql/core/pg_settings/guc_settings.h +++ b/ydb/library/yql/core/pg_settings/guc_settings.h @@ -11,6 +11,9 @@ class TGUCSettings { public: + TGUCSettings() = default; + TGUCSettings(const TString& serialized); + using TPtr = std::shared_ptr; void Setup(const std::unordered_map& runtimeSettings); std::optional Get(const std::string&) const; @@ -19,6 +22,7 @@ class TGUCSettings { void RollBack(); void ExportToJson(NJson::TJsonValue& value) const; void ImportFromJson(const NJson::TJsonValue& value); + TString SerializeToString() const; size_t GetHash() const noexcept; bool operator==(const TGUCSettings& other) const; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 0e873f9b18c2..ca4b1b5ac3e0 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -164,7 +164,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped bool ownMemoryQuota = true, bool passExceptions = false, const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr, NWilson::TTraceId traceId = {}, - TIntrusivePtr arena = nullptr) + TIntrusivePtr arena = nullptr, + const TGUCSettings::TPtr& GUCSettings = nullptr) : ExecuterId(executerId) , TxId(txId) , Task(task, std::move(arena)) @@ -187,6 +188,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped true, false ); + Alloc->SetGUCSettings(GUCSettings); InitMonCounters(taskCounters); if (ownMemoryQuota) { MemoryQuota = InitMemoryQuota(); diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h index df5150f1ce13..c63b5a486925 100644 --- a/ydb/library/yql/minikql/mkql_alloc.h +++ b/ydb/library/yql/minikql/mkql_alloc.h @@ -1,6 +1,7 @@ #pragma once #include "aligned_page_pool.h" #include "mkql_mem_info.h" +#include #include #include #include @@ -206,6 +207,12 @@ class TScopedAlloc { bool IsAttached() const { return AttachedCount_ > 0; } + void SetGUCSettings(const TGUCSettings::TPtr& GUCSettings) { + Acquire(); + PgSetGUCSettings(MyState_.MainContext, GUCSettings); + Release(); + } + private: const bool InitiallyAcquired_; TAllocState MyState_; diff --git a/ydb/library/yql/minikql/ya.make b/ydb/library/yql/minikql/ya.make index 2169423da105..a68a0309b3a7 100644 --- a/ydb/library/yql/minikql/ya.make +++ b/ydb/library/yql/minikql/ya.make @@ -63,6 +63,7 @@ PEERDIR( library/cpp/yson ydb/library/binary_json ydb/library/dynumber + ydb/library/yql/core/pg_settings ydb/library/yql/minikql/dom ydb/library/yql/parser/pg_catalog ydb/library/yql/parser/pg_wrapper/interface diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index 871d5418a7fb..feb08771e26a 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -1,4 +1,6 @@ +#include #include +#include #include #include #include @@ -91,19 +93,6 @@ TVPtrHolder TVPtrHolder::Instance; // use 'false' for native format static __thread bool NeedCanonizeFp = false; -struct TMainContext { - MemoryContextData Data; - MemoryContextData ErrorData; - MemoryContext PrevCurrentMemoryContext = nullptr; - MemoryContext PrevErrorContext = nullptr; - MemoryContext PrevCacheMemoryContext = nullptr; - RecordCacheState CurrentRecordCacheState = { NULL, NULL, NULL, 0, 0, INVALID_TUPLEDESC_IDENTIFIER }; - RecordCacheState PrevRecordCacheState; - TimestampTz StartTimestamp; - pg_stack_base_t PrevStackBase; - TString LastError; -}; - NUdf::TUnboxedValue CreatePgString(i32 typeLen, ui32 targetTypeId, TStringBuf data) { // typname => 'cstring', typlen => '-2' // typname = > 'text', typlen => '-1' @@ -286,6 +275,30 @@ class TPgInternal0 : public TMutableComputationNode { class TPgTableContent : public TMutableComputationNode { typedef TMutableComputationNode TBaseComputation; +private: + static NUdf::TUnboxedValuePod MakePgDatabaseDatnameColumn(ui32 index) { + std::string content; + switch (index) { + case 1: { + content = "template1"; + break; + } + case 2: { + content = "template0"; + break; + } + case 3: { + content = "postgres"; + break; + } + case 4: { + Y_ENSURE(PGGetGUCSetting("ydb_database")); + content = *PGGetGUCSetting("ydb_database"); + break; + } + } + return PointerDatumToPod((Datum)(MakeFixedString(content, NAMEDATALEN))); + } public: TPgTableContent( TComputationMutables& mutables, @@ -317,8 +330,7 @@ class TPgTableContent : public TMutableComputationNode { {"datdba", [](ui32) { return ScalarDatumToPod(ObjectIdGetDatum(1)); }}, {"datistemplate", [](ui32 index) { return ScalarDatumToPod(BoolGetDatum(index < 3)); }}, {"datallowconn", [](ui32 index) { return ScalarDatumToPod(BoolGetDatum(index != 2)); }}, - {"datname", [](ui32 index) { return PointerDatumToPod((Datum)(MakeFixedString( - index == 1 ? "template1" : (index == 2 ? "template0" : "postgres"), NAMEDATALEN))); }}, + {"datname", MakePgDatabaseDatnameColumn}, {"encoding", [](ui32) { return ScalarDatumToPod(Int32GetDatum(PG_UTF8)); }}, {"datcollate", [](ui32) { return PointerDatumToPod((Datum)(MakeFixedString("C", NAMEDATALEN))); }}, {"datctype", [](ui32) { return PointerDatumToPod((Datum)(MakeFixedString("C", NAMEDATALEN))); }}, @@ -386,18 +398,20 @@ class TPgTableContent : public TMutableComputationNode { ApplyFillers(AllPgTablesFillers, Y_ARRAY_SIZE(AllPgTablesFillers), PgTablesFillers_); } else if (Table_ == "pg_roles") { static const std::pair AllPgRolesFillers[] = { - {"rolname", []() { return PointerDatumToPod((Datum)MakeFixedString("postgres", NAMEDATALEN)); }}, - {"oid", []() { return ScalarDatumToPod(ObjectIdGetDatum(1)); }}, - {"rolbypassrls", []() { return ScalarDatumToPod(BoolGetDatum(true)); }}, - {"rolsuper", []() { return ScalarDatumToPod(BoolGetDatum(true)); }}, - {"rolinherit", []() { return ScalarDatumToPod(BoolGetDatum(true)); }}, - {"rolcreaterole", []() { return ScalarDatumToPod(BoolGetDatum(true)); }}, - {"rolcreatedb", []() { return ScalarDatumToPod(BoolGetDatum(true)); }}, - {"rolcanlogin", []() { return ScalarDatumToPod(BoolGetDatum(true)); }}, - {"rolreplication", []() { return ScalarDatumToPod(BoolGetDatum(true)); }}, - {"rolconnlimit", []() { return ScalarDatumToPod(Int32GetDatum(-1)); }}, - {"rolvaliduntil", []() { return NUdf::TUnboxedValuePod(); }}, - {"rolconfig", []() { return PointerDatumToPod(MakeArrayOfText({ + {"rolname", [](ui32 index) { + return PointerDatumToPod((Datum)MakeFixedString(index == 1 ? "postgres" : *PGGetGUCSetting("ydb_user"), NAMEDATALEN)); + }}, + {"oid", [](ui32) { return ScalarDatumToPod(ObjectIdGetDatum(1)); }}, + {"rolbypassrls", [](ui32) { return ScalarDatumToPod(BoolGetDatum(true)); }}, + {"rolsuper", [](ui32) { return ScalarDatumToPod(BoolGetDatum(true)); }}, + {"rolinherit", [](ui32) { return ScalarDatumToPod(BoolGetDatum(true)); }}, + {"rolcreaterole", [](ui32) { return ScalarDatumToPod(BoolGetDatum(true)); }}, + {"rolcreatedb", [](ui32) { return ScalarDatumToPod(BoolGetDatum(true)); }}, + {"rolcanlogin", [](ui32) { return ScalarDatumToPod(BoolGetDatum(true)); }}, + {"rolreplication", [](ui32) { return ScalarDatumToPod(BoolGetDatum(true)); }}, + {"rolconnlimit", [](ui32) { return ScalarDatumToPod(Int32GetDatum(-1)); }}, + {"rolvaliduntil", [](ui32) { return NUdf::TUnboxedValuePod(); }}, + {"rolconfig", [](ui32) { return PointerDatumToPod(MakeArrayOfText({ "search_path=public", "default_transaction_isolation=serializable", "standard_conforming_strings=on", @@ -549,7 +563,8 @@ class TPgTableContent : public TMutableComputationNode { rows.emplace_back(row); }); } else if (Table_ == "pg_database") { - for (ui32 index = 1; index <= 3; ++index) { + ui32 tableSize = PGGetGUCSetting("ydb_database") ? 4 : 3; + for (ui32 index = 1; index <= tableSize; ++index) { NUdf::TUnboxedValue* items; auto row = compCtx.HolderFactory.CreateDirectArrayHolder(PgDatabaseFillers_.size(), items); for (ui32 i = 0; i < PgDatabaseFillers_.size(); ++i) { @@ -739,16 +754,18 @@ class TPgTableContent : public TMutableComputationNode { rows.emplace_back(row); } } else if (Table_ == "pg_roles") { - NUdf::TUnboxedValue* items; - auto row = compCtx.HolderFactory.CreateDirectArrayHolder(PgRolesFillers_.size(), items); - for (ui32 i = 0; i < PgRolesFillers_.size(); ++i) { - if (PgRolesFillers_[i]) { - items[i] = PgRolesFillers_[i](); + for (ui32 index = 1; index <= 1; ++index) { + NUdf::TUnboxedValue* items; + auto row = compCtx.HolderFactory.CreateDirectArrayHolder(PgRolesFillers_.size(), items); + for (ui32 i = 0; i < PgRolesFillers_.size(); ++i) { + if (PgRolesFillers_[i]) { + items[i] = PgRolesFillers_[i](index); + } } - } - sysFiller.Fill(items); - rows.emplace_back(row); + sysFiller.Fill(items); + rows.emplace_back(row); + } } else if (Table_ == "pg_stat_database") { for (ui32 index = 0; index <= 1; ++index) { NUdf::TUnboxedValue* items; @@ -899,7 +916,7 @@ class TPgTableContent : public TMutableComputationNode { TVector PgNamespaceFillers_; using TPgAmFiller = NUdf::TUnboxedValuePod(*)(const NPg::TAmDesc&); TVector PgAmFillers_; - using TPgRolesFiller = NUdf::TUnboxedValuePod(*)(); + using TPgRolesFiller = NUdf::TUnboxedValuePod(*)(ui32 index); TVector PgRolesFillers_; using TPgDatabaseStatFiller = NUdf::TUnboxedValuePod(*)(ui32 index); TVector PgDatabaseStatFillers_; @@ -3096,6 +3113,7 @@ TComputationNodeFactory GetPgFactory() { const auto cluster = clusterData->AsValue().AsStringRef(); const auto table = tableData->AsValue().AsStringRef(); const auto returnType = callable.GetType()->GetReturnType(); + return new TPgTableContent(ctx.Mutables, cluster, table, returnType); } @@ -4796,6 +4814,9 @@ void PgAcquireThreadContext(void* ctx) { SetParallelStartTimestamps(main->StartTimestamp, main->StartTimestamp); main->PrevStackBase = set_stack_base(); yql_error_report_active = true; + if (main->GUCSettings && main->GUCSettings->Get("ydb_database")) { + MyDatabaseId = 4; + } } } @@ -4809,7 +4830,29 @@ void PgReleaseThreadContext(void* ctx) { LoadRecordCacheState(&main->PrevRecordCacheState); restore_stack_base(main->PrevStackBase); yql_error_report_active = false; + MyDatabaseId = 3; + } +} + +void PgSetGUCSettings(void* ctx, const TGUCSettings::TPtr& GUCSettings) { + if (ctx && GUCSettings) { + auto main = (TMainContext*)ctx; + main->GUCSettings = GUCSettings; + if (main->GUCSettings->Get("ydb_database")) { + MyDatabaseId = 4; + } + } + PgCreateSysCacheEntries(ctx); +} + +std::optional PGGetGUCSetting(const std::string& key) { + if (TlsAllocState) { + auto ctx = (TMainContext*)TlsAllocState->MainContext; + if (ctx && ctx->GUCSettings) { + return ctx->GUCSettings->Get(key); + } } + return std::nullopt; } extern "C" void yql_prepare_error(const char* msg) { diff --git a/ydb/library/yql/parser/pg_wrapper/interface/context.h b/ydb/library/yql/parser/pg_wrapper/interface/context.h index 7cde36b3baad..8d72e7854a7d 100644 --- a/ydb/library/yql/parser/pg_wrapper/interface/context.h +++ b/ydb/library/yql/parser/pg_wrapper/interface/context.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace NKikimr { namespace NMiniKQL { @@ -14,5 +15,9 @@ void PgReleaseThreadContext(void* ctx); void* PgInitializeContext(const std::string_view& contextType); void PgDestroyContext(const std::string_view& contextType, void* ctx); +void PgSetGUCSettings(void* ctx, const TGUCSettings::TPtr& GUCSettings); +std::optional PGGetGUCSetting(const std::string& key); + +void PgCreateSysCacheEntries(void* ctx); } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/library/yql/parser/pg_wrapper/memory_context.h b/ydb/library/yql/parser/pg_wrapper/memory_context.h new file mode 100644 index 000000000000..9cdd933f17ff --- /dev/null +++ b/ydb/library/yql/parser/pg_wrapper/memory_context.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include + +extern "C" { +#include "c.h" +#include "postgres.h" +#include "access/htup.h" +#include "datatype/timestamp.h" +#include "miscadmin.h" +#include "utils/palloc.h" +#include "nodes/memnodes.h" +#include "utils/typcache.h" +} + +#undef TypeName +#undef Max + +struct TMainContext { + MemoryContextData Data; + MemoryContextData ErrorData; + MemoryContext PrevCurrentMemoryContext = nullptr; + MemoryContext PrevErrorContext = nullptr; + MemoryContext PrevCacheMemoryContext = nullptr; + RecordCacheState CurrentRecordCacheState = { NULL, NULL, NULL, 0, 0, INVALID_TUPLEDESC_IDENTIFIER }; + RecordCacheState PrevRecordCacheState; + TimestampTz StartTimestamp; + pg_stack_base_t PrevStackBase; + TString LastError; + TGUCSettings::TPtr GUCSettings; + HeapTuple CurrentDatabaseName = nullptr; + HeapTuple PrevDatabaseName = nullptr; +}; diff --git a/ydb/library/yql/parser/pg_wrapper/syscache.cpp b/ydb/library/yql/parser/pg_wrapper/syscache.cpp index 649b56b76367..e6faa5ed954a 100644 --- a/ydb/library/yql/parser/pg_wrapper/syscache.cpp +++ b/ydb/library/yql/parser/pg_wrapper/syscache.cpp @@ -1,3 +1,8 @@ +#include +#include +#include +#include + #define SortBy PG_SortBy #define TypeName PG_TypeName @@ -36,7 +41,6 @@ extern "C" { #include "arena_ctx.h" #include "utils.h" -#include #include #include #include @@ -181,6 +185,7 @@ struct TSysCacheItem { TMaybe RangeMap1; TupleDesc Desc; CatCList* EmptyCList = nullptr; + std::function(const THeapTupleKey&)> PgThreadContextLookup = nullptr; }; struct TSysCache { @@ -461,6 +466,31 @@ struct TSysCache { } + static HeapTuple MakePgDatabaseHeapTuple(ui32 oid, const char* name) { + TupleDesc tupleDesc = CreateTemplateTupleDesc(Natts_pg_database); + FillAttr(tupleDesc, Anum_pg_database_oid, OIDOID); + FillAttr(tupleDesc, Anum_pg_database_datname, NAMEOID); + FillAttr(tupleDesc, Anum_pg_database_datdba, OIDOID); + FillAttr(tupleDesc, Anum_pg_database_encoding, INT4OID); + FillAttr(tupleDesc, Anum_pg_database_datcollate, NAMEOID); + FillAttr(tupleDesc, Anum_pg_database_datctype, NAMEOID); + FillAttr(tupleDesc, Anum_pg_database_datistemplate, BOOLOID); + FillAttr(tupleDesc, Anum_pg_database_datallowconn, BOOLOID); + FillAttr(tupleDesc, Anum_pg_database_datconnlimit, INT4OID); + FillAttr(tupleDesc, Anum_pg_database_datlastsysoid, OIDOID); + FillAttr(tupleDesc, Anum_pg_database_datfrozenxid, XIDOID); + FillAttr(tupleDesc, Anum_pg_database_datminmxid, XIDOID); + FillAttr(tupleDesc, Anum_pg_database_dattablespace, OIDOID); + FillAttr(tupleDesc, Anum_pg_database_datacl, ACLITEMARRAYOID); + Datum values[Natts_pg_database]; + bool nulls[Natts_pg_database]; + Zero(values); + std::fill_n(nulls, Natts_pg_database, true); + FillDatum(Natts_pg_database, values, nulls, Anum_pg_database_oid, (Datum)oid); + FillDatum(Natts_pg_database, values, nulls, Anum_pg_database_datname, (Datum)MakeFixedString(name, NAMEDATALEN)); + return heap_form_tuple(tupleDesc, values, nulls); + } + void InitializeDatabase() { TupleDesc tupleDesc = CreateTemplateTupleDesc(Natts_pg_database); FillAttr(tupleDesc, Anum_pg_database_oid, OIDOID); @@ -489,6 +519,7 @@ struct TSysCache { std::fill_n(nulls, Natts_pg_database, true); FillDatum(Natts_pg_database, values, nulls, Anum_pg_database_oid, (Datum)oid); const char* name = nullptr; + switch (oid) { case 1: name = "template1"; break; case 2: name = "template0"; break; @@ -502,6 +533,19 @@ struct TSysCache { Y_ENSURE(strcmp(NameStr(row->datname), name) == 0); lookupMap.emplace(key, h); } + + //add specific lookup for 4 to cacheItem. save heaptuple to MainContext_. + auto threadContextLookup = [&] (const THeapTupleKey& key) -> std::optional { + if (std::get<0>(key) == 4 && NKikimr::NMiniKQL::TlsAllocState) { + auto ctx = (TMainContext*)NKikimr::NMiniKQL::TlsAllocState->MainContext; + if (ctx && ctx->CurrentDatabaseName) { + return ctx->CurrentDatabaseName; + } + } + return std::nullopt; + }; + + cacheItem->PgThreadContextLookup = std::move(threadContextLookup); } void InitializeAuthId() { @@ -523,35 +567,39 @@ struct TSysCache { auto key = THeapTupleKey(1, 0, 0, 0); - const char* rolname = "postgres"; - const ui32 oid = 1; - Datum values[Natts_pg_authid]; - bool nulls[Natts_pg_authid]; - Zero(values); - std::fill_n(nulls, Natts_pg_authid, true); - FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_oid, (Datum)oid); - FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolname, (Datum)MakeFixedString(rolname, NAMEDATALEN)); - FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolsuper, BoolGetDatum(true)); - FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolinherit, BoolGetDatum(true)); - FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolcreaterole, BoolGetDatum(true)); - FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolcreatedb, BoolGetDatum(true)); - FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolcanlogin, BoolGetDatum(true)); - FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolreplication, BoolGetDatum(true)); - FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolbypassrls, BoolGetDatum(true)); - FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolconnlimit, Int32GetDatum(-1)); - HeapTuple h = heap_form_tuple(tupleDesc, values, nulls); - auto row = (Form_pg_authid) GETSTRUCT(h); - Y_ENSURE(row->oid == oid); - Y_ENSURE(strcmp(NameStr(row->rolname), rolname) == 0); - Y_ENSURE(row->rolsuper); - Y_ENSURE(row->rolinherit); - Y_ENSURE(row->rolcreaterole); - Y_ENSURE(row->rolcreatedb); - Y_ENSURE(row->rolcanlogin); - Y_ENSURE(row->rolreplication); - Y_ENSURE(row->rolbypassrls); - Y_ENSURE(row->rolconnlimit == -1); - lookupMap.emplace(key, h); + //do the same in next PR + // auto userName = *NKikimr::NMiniKQL::PGGetGUCSetting("ydb_user"); + for (ui32 oid = 1; oid <= 1; ++oid) { + const char* rolname = "postgres"; + // const char* rolname = oid == 1 ? "postgres" : userName.c_str(); + Datum values[Natts_pg_authid]; + bool nulls[Natts_pg_authid]; + Zero(values); + std::fill_n(nulls, Natts_pg_authid, true); + FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_oid, (Datum)oid); + FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolname, (Datum)MakeFixedString(rolname, NAMEDATALEN)); + FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolsuper, BoolGetDatum(true)); + FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolinherit, BoolGetDatum(true)); + FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolcreaterole, BoolGetDatum(true)); + FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolcreatedb, BoolGetDatum(true)); + FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolcanlogin, BoolGetDatum(true)); + FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolreplication, BoolGetDatum(true)); + FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolbypassrls, BoolGetDatum(true)); + FillDatum(Natts_pg_authid, values, nulls, Anum_pg_authid_rolconnlimit, Int32GetDatum(-1)); + HeapTuple h = heap_form_tuple(tupleDesc, values, nulls); + auto row = (Form_pg_authid) GETSTRUCT(h); + Y_ENSURE(row->oid == oid); + Y_ENSURE(strcmp(NameStr(row->rolname), rolname) == 0); + Y_ENSURE(row->rolsuper); + Y_ENSURE(row->rolinherit); + Y_ENSURE(row->rolcreaterole); + Y_ENSURE(row->rolcreatedb); + Y_ENSURE(row->rolcanlogin); + Y_ENSURE(row->rolreplication); + Y_ENSURE(row->rolbypassrls); + Y_ENSURE(row->rolconnlimit == -1); + lookupMap.emplace(key, h); + } } void InitializeNameNamespaces() { @@ -619,7 +667,18 @@ struct TSysCache { } } +namespace NKikimr { +namespace NMiniKQL { +void PgCreateSysCacheEntries(void* ctx) { + auto main = (TMainContext*)ctx; + if (main->GUCSettings && main->GUCSettings->Get("ydb_database")) { + main->CurrentDatabaseName = NYql::TSysCache::MakePgDatabaseHeapTuple(4, main->GUCSettings->Get("ydb_database")->c_str()); + } +} + +} //namespace NKikimr +} //namespace NMiniKQL HeapTuple SearchSysCache(int cacheId, Datum key1, Datum key2, Datum key3, Datum key4) { Y_ENSURE(cacheId >= 0 && cacheId < SysCacheSize); @@ -627,10 +686,14 @@ HeapTuple SearchSysCache(int cacheId, Datum key1, Datum key2, Datum key3, Datum if (!cacheItem) { return nullptr; } - const auto& lookupMap = cacheItem->LookupMap; auto it = lookupMap.find(std::make_tuple(key1, key2, key3, key4)); if (it == lookupMap.end()) { + if (cacheItem->PgThreadContextLookup) { + if (auto value = cacheItem->PgThreadContextLookup(std::make_tuple(key1, key2, key3, key4))) { + return *value; + } + } return nullptr; } diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp index ecebd02d65f4..5991ecbc873f 100644 --- a/ydb/library/yql/sql/pg/pg_sql.cpp +++ b/ydb/library/yql/sql/pg/pg_sql.cpp @@ -3397,8 +3397,14 @@ class TConverter : public IPGParseEvents { case SVFOP_CURRENT_ROLE: case SVFOP_USER: return L(A("PgConst"), QA("postgres"), L(A("PgType"), QA("name"))); - case SVFOP_CURRENT_CATALOG: - return L(A("PgConst"), QA("postgres"), L(A("PgType"), QA("name"))); + case SVFOP_CURRENT_CATALOG: { + std::optional database; + if (Settings.GUCSettings) { + database = Settings.GUCSettings->Get("ydb_database"); + } + + return L(A("PgConst"), QA(database ? *database : "postgres"), L(A("PgType"), QA("name"))); + } case SVFOP_CURRENT_SCHEMA: { std::optional searchPath; if (Settings.GUCSettings) { diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index b966aefeb30c..51f836064ea9 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -155,6 +155,16 @@ void PgReleaseThreadContext(void* ctx) { Y_UNUSED(ctx); } +void PgSetGUCSettings(void* ctx, const TGUCSettings::TPtr& GUCSettings) { + Y_UNUSED(ctx); + Y_UNUSED(GUCSettings); +} + +std::optional PGGetGUCSetting(const std::string& key) { + Y_UNUSED(key); + throw yexception() << "PG types are not supported"; +} + ui64 PgValueSize(const NUdf::TUnboxedValuePod& value, i32 typeLen) { Y_UNUSED(typeLen); Y_UNUSED(value);