Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd committed Oct 3, 2024
1 parent 442e587 commit 690d91e
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 40 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
replayMessage.InsertValue("table_metadata", TString(NJson::WriteJson(tablesMeta, false)));
replayMessage.InsertValue("table_meta_serialization_type", EMetaSerializationType::EncodedProto);

GUCSettings->ExportToJson(replayMessage);
//GUCSettings->ExportToJson(replayMessage);

TString message(NJson::WriteJson(replayMessage, /*formatOutput*/ false));
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPILE_ACTOR, "[" << SelfId() << "]: "
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
info.MutableActorIds().emplace_back(result);
return result;
} else {
std::shared_ptr<TGUCSettings> GUCSettings;
if (!args.SerializedGUCSettings.empty()) {
GUCSettings = std::make_shared<TGUCSettings>(args.SerializedGUCSettings);
}
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory,
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings,
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, args.GUCSettings,
std::move(args.SchedulingOptions));
return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) :
TlsActivationContext->AsActorContext().Register(computeActor);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ struct IKqpNodeComputeActorFactory {
const NYql::NDq::TComputeRuntimeSettings& RuntimeSettings;
NWilson::TTraceId TraceId;
TIntrusivePtr<NActors::TProtoArenaHolder> Arena;
const TString& SerializedGUCSettings;
const TGUCSettings::TPtr GUCSettings;
const ui32 NumberOfTasks;
const ui64 OutputChunkMaxSize;
const NKikimr::NKqp::NRm::EKqpMemoryPool MemoryPool;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
request.SetOutputChunkMaxSize(OutputChunkMaxSize);
}

if (SerializedGUCSettings) {
request.SetSerializedGUCSettings(SerializedGUCSettings);
if (GUCSettings) {
ExportToProto(*GUCSettings, *request.MutableGUCSettings());
}

request.SetSchedulerGroup(UserRequestContext->PoolId);
Expand Down Expand Up @@ -453,7 +453,7 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
.RuntimeSettings = settings,
.TraceId = NWilson::TTraceId(ExecuterSpan.GetTraceId()),
.Arena = TasksGraph.GetMeta().GetArenaIntrusivePtr(),
.SerializedGUCSettings = SerializedGUCSettings,
.GUCSettings = GUCSettings,
.NumberOfTasks = computeTasksSize,
.OutputChunkMaxSize = OutputChunkMaxSize,
.MemoryPool = NRm::EKqpMemoryPool::DataQuery,
Expand Down
24 changes: 24 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,30 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto
}
}

void ImportFromProto(TGUCSettings& settings, const NKikimrKqp::TEvStartKqpTasksRequest::TGUCSettings& proto) {
for (const auto& [settingName, settingValue] : proto.GetSettings()) {
settings.Settings_[settingName] = settingValue;
}
for (const auto& [settingName, settingValue] : proto.GetRollbackSettings()) {
settings.RollbackSettings_[settingName] = settingValue;
}
for (const auto& [settingName, settingValue] : proto.GetSessionSettings()) {
settings.SessionSettings_[settingName] = settingValue;
}
}

void ExportToProto(const TGUCSettings& settings, NKikimrKqp::TEvStartKqpTasksRequest::TGUCSettings& proto) {
for (const auto& setting : settings.Settings_) {
proto.MutableSettings()->insert({setting.first.c_str(), setting.second.c_str()});
}
for (const auto& setting : settings.RollbackSettings_) {
proto.MutableRollbackSettings()->insert({setting.first.c_str(), setting.second.c_str()});
}
for (const auto& setting : settings.SessionSettings_) {
proto.MutableSessionSettings()->insert({setting.first.c_str(), setting.second.c_str()});
}
}

void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output, bool enableSpilling) {
switch (output.Type) {
case TTaskOutputType::Map:
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransact
void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, bool enableSpilling);

void ImportFromProto(TGUCSettings& settings, const NKikimrKqp::TEvStartKqpTasksRequest::TGUCSettings& proto);
void ExportToProto(const TGUCSettings& settings, NKikimrKqp::TEvStartKqpTasksRequest::TGUCSettings& proto);

template<typename Proto>
TVector<TTaskMeta::TColumn> BuildKqpColumns(const Proto& op, TIntrusiveConstPtr<TTableConstInfo> tableInfo) {
TVector<TTaskMeta::TColumn> columns;
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/kqp/node_service/kqp_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
#include <ydb/core/kqp/executer_actor/kqp_tasks_graph.h>
#include <ydb/core/kqp/rm_service/kqp_resource_estimation.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
Expand Down Expand Up @@ -194,8 +195,11 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {

NComputeActor::TComputeStagesWithScan computesByStage;

const TString& serializedGUCSettings = ev->Get()->Record.HasSerializedGUCSettings() ?
ev->Get()->Record.GetSerializedGUCSettings() : "";
std::shared_ptr<TGUCSettings> GUCSettings;
if (ev->Get()->Record.HasGUCSettings()) {
GUCSettings = std::make_shared<TGUCSettings>();
ImportFromProto(*GUCSettings, ev->Get()->Record.GetGUCSettings());
}

auto schedulerNow = TlsActivationContext->Monotonic();

Expand Down Expand Up @@ -259,7 +263,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
.RuntimeSettings = runtimeSettingsBase,
.TraceId = NWilson::TTraceId(ev->TraceId),
.Arena = ev->Get()->Arena,
.SerializedGUCSettings = serializedGUCSettings,
.GUCSettings = GUCSettings,
.NumberOfTasks = tasksCount,
.OutputChunkMaxSize = msg.GetOutputChunkMaxSize(),
.MemoryPool = memoryPool,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/node_service/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ PEERDIR(
ydb/core/kqp/common
ydb/core/kqp/compute_actor
ydb/core/kqp/counters
ydb/core/kqp/executer_actor
ydb/core/mind
ydb/core/protos
ydb/core/tablet
Expand Down
32 changes: 15 additions & 17 deletions ydb/library/yql/core/pg_settings/guc_settings.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "guc_settings.h"
#include <library/cpp/json/json_reader.h>


void TGUCSettings::Setup(const std::unordered_map<std::string, std::string>& runtimeSettings) {
Expand Down Expand Up @@ -29,29 +30,26 @@ void TGUCSettings::RollBack() {
Settings_ = SessionSettings_ = RollbackSettings_;
}

void TGUCSettings::ExportToProto(NKikimrKqp::TEvStartKqpTasksRequest::TGUCSettings& value) const {
TString TGUCSettings::SerializeToString() const {
NJson::TJsonValue gucJson;
NJson::TJsonValue settings(NJson::JSON_MAP);
for (const auto& setting : Settings_) {
value.MutableSettings()->insert({setting.first.c_str(), setting.second.c_str()});
settings[setting.first] = setting.second;
}
NJson::TJsonValue rollbackSettings(NJson::JSON_MAP);
for (const auto& setting : RollbackSettings_) {
value.MutableRollbackSettings()->insert({setting.first.c_str(), setting.second.c_str()});
rollbackSettings[setting.first] = setting.second;
}
NJson::TJsonValue sessionSettings(NJson::JSON_MAP);
for (const auto& setting : SessionSettings_) {
value.MutableSessionSettings()->insert({setting.first.c_str(), setting.second.c_str()});
}
}

void TGUCSettings::ImportFromProto(const NKikimrKqp::TEvStartKqpTasksRequest::TGUCSettings& value)
{
for (const auto& [settingName, settingValue] : value.GetSettings()) {
Settings_[settingName] = settingValue;
}
for (const auto& [settingName, settingValue] : value.GetRollbackSettings()) {
RollbackSettings_[settingName] = settingValue;
}
for (const auto& [settingName, settingValue] : value.GetSessionSettings()) {
SessionSettings_[settingName] = settingValue;
sessionSettings[setting.first] = setting.second;
}
NJson::TJsonValue gucSettings(NJson::JSON_MAP);
gucSettings.InsertValue("settings", std::move(settings));
gucSettings.InsertValue("rollback_settings", std::move(rollbackSettings));
gucSettings.InsertValue("session_settings", std::move(sessionSettings));
gucJson.InsertValue("guc_settings", std::move(gucSettings));
return WriteJson(gucJson);
}

bool TGUCSettings::operator==(const TGUCSettings& other) const {
Expand Down
10 changes: 4 additions & 6 deletions ydb/library/yql/core/pg_settings/guc_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@

#include <util/generic/hash.h>

#include <ydb/core/protos/kqp.pb.h>
#include <library/cpp/json/json_writer.h>

class TGUCSettings {
public:
struct TGUCSettings {
TGUCSettings() = default;

using TPtr = std::shared_ptr<TGUCSettings>;
Expand All @@ -19,12 +18,11 @@ class TGUCSettings {
void Set(const std::string&, const std::string&, bool isLocal = false);
void Commit();
void RollBack();
void ExportToProto(NKikimrKqp::TEvStartKqpTasksRequest::TGUCSettings& value) const;
void ImportFromProto(const NKikimrKqp::TEvStartKqpTasksRequest::TGUCSettings& value);
TString SerializeToString() const;

size_t GetHash() const noexcept;
bool operator==(const TGUCSettings& other) const;
private:

std::unordered_map<std::string, std::string> Settings_;
std::unordered_map<std::string, std::string> RollbackSettings_;
std::unordered_map<std::string, std::string> SessionSettings_;
Expand Down
4 changes: 0 additions & 4 deletions ydb/library/yql/core/pg_settings/ya.make
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
LIBRARY()

PEERDIR(
ydb/core/protos
)

SRCS(
guc_settings.cpp
)
Expand Down

0 comments on commit 690d91e

Please sign in to comment.