Skip to content

Commit

Permalink
get rid of external arenas in TEvQueryResponse event
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit committed Sep 25, 2024
1 parent 8ebf595 commit b42da72
Show file tree
Hide file tree
Showing 59 changed files with 230 additions and 283 deletions.
8 changes: 4 additions & 4 deletions ydb/core/client/server/msgbus_server_pq_metacache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "stale response with generation " << ev->Cookie << ", actual is " << Generation->Val());
return;
}
const auto& record = ev->Get()->Record.GetRef();
const auto& record = ev->Get()->Record;

if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
LOG_ERROR_S(ctx, NKikimrServices::PQ_METACACHE,
Expand All @@ -272,7 +272,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc

void HandleCheckVersionResult(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {

const auto& record = ev->Get()->Record.GetRef();
const auto& record = ev->Get()->Record;

Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1);
const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
Expand All @@ -291,7 +291,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
void HandleGetTopicsResult(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "HandleGetTopicsResult");

const auto& record = ev->Get()->Record.GetRef();
const auto& record = ev->Get()->Record;

Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1);
TString path, dc;
Expand Down Expand Up @@ -710,7 +710,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
void ProcessNodesInfoWaitersQueue(bool status, const TActorContext& ctx) {
if (DynamicNodesMapping == nullptr) {
Y_ABORT_UNLESS(!status);
DynamicNodesMapping.reset(new THashMap<ui32, ui32>);
DynamicNodesMapping.reset(new THashMap<ui32, ui32>);
}
while(!NodesMappingWaiters.empty()) {
ctx.Send(NodesMappingWaiters.front(),
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/grpc_services/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,10 @@ class TGRpcRequestWrapperImpl
return Ctx_->GetArena();
}

TIntrusivePtr<NActors::TProtoArenaHolder> GetArenaPtr() override {
return Ctx_->GetArenaPtr();
}

//! Allocate Result message using protobuf arena allocator
//! The memory will be freed automaticaly after destroying
//! corresponding request.
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/base/iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class IRequestCtxMtSafe : public virtual IRequestCtxBaseMtSafe {
virtual void SetFinishAction(std::function<void()>&& cb) = 0;
// Allocation is thread safe. https://protobuf.dev/reference/cpp/arenas/#thread-safety
virtual google::protobuf::Arena* GetArena() = 0;
virtual TIntrusivePtr<NActors::TProtoArenaHolder> GetArenaPtr() = 0;
};

}
Expand Down
24 changes: 12 additions & 12 deletions ydb/core/grpc_services/grpc_integrity_trails.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table:

inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::ExecuteDataQueryRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
auto log = [](const auto& traceId, const auto& request, const auto& response) {
auto& record = response->Get()->Record.GetRef();
auto& record = response->Get()->Record;

TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
Expand Down Expand Up @@ -65,7 +65,7 @@ inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table:
Y_UNUSED(request);

auto log = [](const auto& traceId, const auto& response) {
auto& record = response->Get()->Record.GetRef();
auto& record = response->Get()->Record;

TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
Expand Down Expand Up @@ -98,8 +98,8 @@ inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table:

inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::CommitTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
auto log = [](const auto& traceId, const auto& request, const auto& response) {
const auto& record = response->Get()->Record.GetRef();
const auto& record = response->Get()->Record;

TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
Expand Down Expand Up @@ -131,8 +131,8 @@ inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table:

inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::RollbackTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
auto log = [](const auto& traceId, const auto& request, const auto& response) {
const auto& record = response->Get()->Record.GetRef();
const auto& record = response->Get()->Record;

TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
Expand All @@ -150,7 +150,7 @@ inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table:
// ExecuteYqlScript/StreamExecuteYqlScript
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Scripting::ExecuteYqlRequest& request, const TActorContext& ctx) {
Y_UNUSED(request);

auto log = [](const auto& traceId) {
TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
Expand All @@ -164,9 +164,9 @@ inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Script

inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Scripting::ExecuteYqlRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
Y_UNUSED(request);

auto log = [](const auto& traceId, const auto& response) {
const auto& record = response->Get()->Record.GetRef();
const auto& record = response->Get()->Record;

TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
Expand Down Expand Up @@ -206,14 +206,14 @@ inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query:
}

auto log = [](const auto& traceId, const auto& request, const auto& response) {
const auto& record = response->Get()->Record.GetRef();
const auto& record = response->Get()->Record;

TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogKeyValue("Type", "ExecuteQueryResponse", ss);

if (request.tx_control().tx_selector_case() == Ydb::Query::TransactionControl::kBeginTx) {
LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss);
}
Expand Down Expand Up @@ -252,7 +252,7 @@ inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query:
TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogKeyValue("Type", "ExecuteSrciptResponse", ss);
LogKeyValue("Type", "ExecuteSrciptResponse", ss);
LogKeyValue("Status", ToString(response->Get()->Status), ss);
LogKeyValue("Issues", ToString(response->Get()->Issues), ss, /*last*/ true);
return ss.Str();
Expand Down
9 changes: 7 additions & 2 deletions ydb/core/grpc_services/local_grpc/local_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class TContextBase : public NYdbGrpc::IRequestContextBase {
TContextBase(std::shared_ptr<IRequestCtx> baseRequest)
: BaseRequest_{std::move(baseRequest)}
, AuthState_{/*needAuth*/true}
, Arena_(MakeIntrusive<NActors::TProtoArenaHolder>())
{}

virtual void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) = 0;
Expand Down Expand Up @@ -55,7 +56,11 @@ class TContextBase : public NYdbGrpc::IRequestContextBase {
}

google::protobuf::Arena* GetArena() override {
return &Arena_;
return Arena_->Get();
}

TIntrusivePtr<NActors::TProtoArenaHolder> GetArenaPtr() override {
return Arena_;
}

void AddTrailingMetadata(const TString& key, const TString& value) override {
Expand Down Expand Up @@ -101,7 +106,7 @@ class TContextBase : public NYdbGrpc::IRequestContextBase {
NYdbGrpc::TAuthState AuthState_;

NYql::TIssueManager IssueManager_;
google::protobuf::Arena Arena_;
TIntrusivePtr<NActors::TProtoArenaHolder> Arena_;
};

template<typename TReq, typename TResp>
Expand Down
9 changes: 7 additions & 2 deletions ydb/core/grpc_services/local_rpc/local_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class TLocalRpcCtx : public TLocalRpcCtxImpl<TRpc, TCbWrapper, IsOperation> {
, DatabaseName(databaseName)
, RequestType(requestType)
, InternalCall(internalCall)
, Arena(MakeIntrusive<NActors::TProtoArenaHolder>())
{
if (token && !token->empty()) {
InternalToken = new NACLib::TUserToken(*token);
Expand Down Expand Up @@ -189,7 +190,11 @@ class TLocalRpcCtx : public TLocalRpcCtxImpl<TRpc, TCbWrapper, IsOperation> {
}

google::protobuf::Arena* GetArena() override {
return &Arena;
return Arena->Get();
}

TIntrusivePtr<NActors::TProtoArenaHolder> GetArenaPtr() override {
return Arena;
}

const google::protobuf::Message* GetRequest() const override {
Expand Down Expand Up @@ -284,7 +289,7 @@ class TLocalRpcCtx : public TLocalRpcCtxImpl<TRpc, TCbWrapper, IsOperation> {
TIntrusiveConstPtr<NACLib::TUserToken> InternalToken;
const TString EmptySerializedTokenMessage_;
TMap<TString, TString> PeerMeta;
google::protobuf::Arena Arena;
TIntrusivePtr<NActors::TProtoArenaHolder> Arena;
};

template<class TRequest>
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx);

auto& record = ev->Get()->Record.GetRef();
auto& record = ev->Get()->Record;

const auto& issueMessage = record.GetResponse().GetQueryIssues();

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/grpc_services/query/rpc_kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class TBeginTransactionRPC : public TActorBootstrapped<TBeginTransactionRPC> {
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
const auto& record = ev->Get()->Record.GetRef();
const auto& record = ev->Get()->Record;
FillCommonKqpRespFields(record, Request.get());

auto beginTxResult = TEvBeginTransactionRequest::AllocateResult<Ydb::Query::BeginTransactionResponse>(Request);
Expand Down Expand Up @@ -216,7 +216,7 @@ class TFinishTransactionRPC : public TActorBootstrapped<TFinishTransactionRPC>
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
const auto& record = ev->Get()->Record.GetRef();
const auto& record = ev->Get()->Record;
FillCommonKqpRespFields(record, Request.get());

NYql::TIssues issues;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_begin_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class TBeginTransactionRPC : public TRpcKqpRequestActor<TBeginTransactionRPC, TE
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);

const auto& record = ev->Get()->Record.GetRef();
const auto& record = ev->Get()->Record;
SetCost(record.GetConsumedRu());
AddServerHintsIfAny(record);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_commit_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class TCommitTransactionRPC : public TRpcKqpRequestActor<TCommitTransactionRPC,
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);

const auto& record = ev->Get()->Record.GetRef();
const auto& record = ev->Get()->Record;
SetCost(record.GetConsumedRu());
AddServerHintsIfAny(record);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_execute_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);

auto& record = ev->Get()->Record.GetRef();
auto& record = ev->Get()->Record;
SetCost(record.GetConsumedRu());
AddServerHintsIfAny(record);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_execute_scheme_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class TExecuteSchemeQueryRPC : public TRpcKqpRequestActor<TExecuteSchemeQueryRPC
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record.GetRef();
const auto& record = ev->Get()->Record;
AddServerHintsIfAny(record);

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class TExecuteYqlScriptRPC : public TRpcKqpRequestActor<TExecuteYqlScriptRPC, TE
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);

const auto& record = ev->Get()->Record.GetRef();
const auto& record = ev->Get()->Record;
SetCost(record.GetConsumedRu());
AddServerHintsIfAny(record);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_explain_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class TExplainDataQueryRPC : public TRpcKqpRequestActor<TExplainDataQueryRPC, TE
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record.GetRef();
const auto& record = ev->Get()->Record;
AddServerHintsIfAny(record);

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_explain_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class TExplainYqlScriptRPC : public TRpcKqpRequestActor<TExplainYqlScriptRPC, TE
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record.GetRef();
const auto& record = ev->Get()->Record;
AddServerHintsIfAny(record);

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_prepare_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class TPrepareDataQueryRPC : public TRpcKqpRequestActor<TPrepareDataQueryRPC, TE
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record.GetRef();
const auto& record = ev->Get()->Record;
SetCost(record.GetConsumedRu());
AddServerHintsIfAny(record);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_rollback_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class TRollbackTransactionRPC : public TRpcKqpRequestActor<TRollbackTransactionR
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);

const auto& record = ev->Get()->Record.GetRef();
const auto& record = ev->Get()->Record;
AddServerHintsIfAny(record);

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
auto& record = ev->Get()->Record.GetRef();
auto& record = ev->Get()->Record;

NYql::TIssues issues;
const auto& issueMessage = record.GetResponse().GetQueryIssues();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ class TStreamExecuteYqlScriptRPC
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);

auto& record = ev->Get()->Record.GetRef();
auto& record = ev->Get()->Record;

NYql::TIssues issues;
const auto& issueMessage = record.GetResponse().GetQueryIssues();
Expand Down
12 changes: 8 additions & 4 deletions ydb/core/kafka_proxy/actors/control_plane_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ inline TRetentionsConversionResult ConvertRetentions(std::optional<TString> rete
RETENTION_MS_CONFIG_NAME,
[&result](std::optional<ui64> retention) -> void { result.Ms = retention; }
);

convertRetention(
retentionBytes,
RETENTION_BYTES_CONFIG_NAME,
[&result](std::optional<ui64> retention) -> void { result.Bytes = retention; }
);

return result;
}

Expand Down Expand Up @@ -107,7 +107,7 @@ inline std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> ValidateTo
} else {
return std::optional<THolder<TEvKafka::TEvTopicModificationResponse>>();
}
}
}

template<class T>
inline std::unordered_set<TString> ExtractDuplicates(
Expand All @@ -134,7 +134,7 @@ class TAlterTopicActor : public NKikimr::NGRpcProxy::V1::TUpdateSchemeActor<T, U
public:

TAlterTopicActor(
TActorId requester,
TActorId requester,
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TString topicPath,
TString databaseName)
Expand Down Expand Up @@ -232,6 +232,10 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC
return nullptr;
};

TIntrusivePtr<NActors::TProtoArenaHolder> GetArenaPtr() override {
return nullptr;
}

bool HasClientCapability(const TString& capability) const override {
Y_UNUSED(capability);
return false;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
return nullptr;
};

TIntrusivePtr<NActors::TProtoArenaHolder> GetArenaPtr() override {
return nullptr;
}

bool HasClientCapability(const TString& capability) const override {
Y_UNUSED(capability);
return false;
Expand Down
3 changes: 0 additions & 3 deletions ydb/core/kqp/common/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ struct TEvKqp {

struct TEvDataQueryStreamPartAck : public TEventLocal<TEvDataQueryStreamPartAck, TKqpEvents::EvDataQueryStreamPartAck> {};

template <typename TProto>
using TProtoArenaHolder = NPrivateEvents::TProtoArenaHolder<TProto>;

using TEvQueryResponse = NPrivateEvents::TEvQueryResponse;

struct TEvListSessionsRequest: public TEventPB<TEvListSessionsRequest, NKikimrKqp::TEvListSessionsRequest,
Expand Down
Loading

0 comments on commit b42da72

Please sign in to comment.