Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd committed Jan 18, 2024
1 parent df7bfaf commit 2209648
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 62 deletions.
139 changes: 77 additions & 62 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,11 @@ class TKqpRequestsQueue {
};

class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
enum ECacheType {
ByUid,
ByQuery,
ByAst,
};
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::KQP_COMPILE_SERVICE;
Expand Down Expand Up @@ -566,26 +571,25 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
if (request.Uid) {
Counters->ReportCompileRequestGet(dbCounters);

if (!request.TempTablesState || request.TempTablesState->TempTables.empty()) {
auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache);
if (compileResult) {
Y_ENSURE(compileResult->Query);
if (compileResult->Query->UserSid == userSid) {
Counters->ReportQueryCacheHit(dbCounters, true);

LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid"
<< ", sender: " << ev->Sender
<< ", queryUid: " << *request.Uid);

ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan));
return;
} else {
LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query"
<< ", sender: " << ev->Sender
<< ", queryUid: " << *request.Uid
<< ", expected sid: " << compileResult->Query->UserSid
<< ", actual sid: " << userSid);
}
auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache);
compileResult = WithCache(std::move(compileResult), request.TempTablesState);
if (compileResult) {
Y_ENSURE(compileResult->Query);
if (compileResult->Query->UserSid == userSid) {
Counters->ReportQueryCacheHit(dbCounters, true);

LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid"
<< ", sender: " << ev->Sender
<< ", queryUid: " << *request.Uid);

ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan));
return;
} else {
LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query"
<< ", sender: " << ev->Sender
<< ", queryUid: " << *request.Uid
<< ", expected sid: " << compileResult->Query->UserSid
<< ", actual sid: " << userSid);
}
}

Expand All @@ -611,19 +615,18 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
Y_ENSURE(query.UserSid == userSid);
}

auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache);
compileResult = WithCache(std::move(compileResult), request.TempTablesState);

if (!request.TempTablesState || request.TempTablesState->TempTables.empty()) {
auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache);
if (compileResult) {
Counters->ReportQueryCacheHit(dbCounters, true);
if (compileResult) {
Counters->ReportQueryCacheHit(dbCounters, true);

LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text"
<< ", sender: " << ev->Sender
<< ", queryUid: " << compileResult->Uid);
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text"
<< ", sender: " << ev->Sender
<< ", queryUid: " << compileResult->Uid);

ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan));
return;
}
ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan));
return;
}

CollectDiagnostics = request.CollectDiagnostics;
Expand Down Expand Up @@ -677,10 +680,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
auto dbCounters = request.DbCounters;
Counters->ReportRecompileRequestGet(dbCounters);

TKqpCompileResult::TConstPtr compileResult = nullptr;
if (!request.TempTablesState || request.TempTablesState->TempTables.empty()) {
compileResult = QueryCache.FindByUid(request.Uid, false);
}
TKqpCompileResult::TConstPtr compileResult = QueryCache.FindByUid(request.Uid, false);
compileResult = WithCache(std::move(compileResult), request.TempTablesState);

if (compileResult || request.Query) {
Counters->ReportCompileRequestCompile(dbCounters);
Expand Down Expand Up @@ -745,27 +746,12 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

bool keepInCache = compileRequest.KeepInCache && compileResult->AllowCache;

bool hasTempTables = compileRequest.TempTablesState
&& (!compileRequest.TempTablesState->TempTables.empty());
if (compileResult->PreparedQuery) {
hasTempTables = compileResult->PreparedQuery->HasTempTables(compileRequest.TempTablesState);
}
bool hasTempTables = WithCache(compileResult, compileRequest.TempTablesState) != nullptr;

try {
if (compileResult->Status == Ydb::StatusIds::SUCCESS) {
if (!hasTempTables) {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
QueryCache.Replace(compileResult);
} else if (keepInCache) {
if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) {
Counters->CompileQueryCacheEvicted->Inc();
}
if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) {
if (InsertPreparingQuery(compileResult, compileRequest.KeepInCache)) {
Counters->CompileQueryCacheEvicted->Inc();
};
}
}
UpdateQueryCache(compileResult, keepInCache);
}

if (ev->Get()->ReplayMessage) {
Expand Down Expand Up @@ -833,25 +819,54 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
StartCheckQueriesTtlTimer();
}

TKqpCompileResult::TConstPtr WithCache(
TKqpCompileResult::TConstPtr cacheResult, TKqpTempTablesState::TConstPtr tempTablesState) {
if (!cacheResult) {
return nullptr;
}
if (!cacheResult->PreparedQuery) {
return cacheResult;
}
auto hasTempTables = cacheResult->PreparedQuery->HasTempTables(tempTablesState);
if (hasTempTables) {
return nullptr;
}
return cacheResult;
}

void UpdateQueryCache(TKqpCompileResult::TConstPtr compileResult, bool keepInCache) {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
QueryCache.Replace(compileResult);
} else if (keepInCache) {
if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) {
Counters->CompileQueryCacheEvicted->Inc();
}
if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) {
if (InsertPreparingQuery(compileResult, true)) {
Counters->CompileQueryCacheEvicted->Inc();
};
}
}
}

void Handle(TEvKqp::TEvParseResponse::TPtr& ev, const TActorContext& ctx) {
auto& parseResult = ev->Get()->AstResult;
auto& query = ev->Get()->Query;
auto compileRequest = RequestsQueue.FinishActiveRequest(query);
if (parseResult && parseResult->Ast->IsOk()) {
if (!compileRequest.TempTablesState || compileRequest.TempTablesState->TempTables.empty()) {
auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.KeepInCache);
if (compileResult) {
Counters->ReportQueryCacheHit(compileRequest.DbCounters, true);
auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.KeepInCache);
compileResult = WithCache(std::move(compileResult), compileRequest.TempTablesState);
if (compileResult) {
Counters->ReportQueryCacheHit(compileRequest.DbCounters, true);

LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from ast"
<< ", sender: " << compileRequest.Sender
<< ", queryUid: " << compileResult->Uid);
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from ast"
<< ", sender: " << compileRequest.Sender
<< ", queryUid: " << compileResult->Uid);

compileResult->Ast->PgAutoParamValues = std::move(parseResult->Ast->PgAutoParamValues);
compileResult->Ast->PgAutoParamValues = std::move(parseResult->Ast->PgAutoParamValues);

ReplyFromCache(compileRequest.Sender, compileResult, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
return;
}
ReplyFromCache(compileRequest.Sender, compileResult, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
return;
}
}
Counters->ReportQueryCacheHit(compileRequest.DbCounters, false);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/query_data/kqp_prepared_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ void TPreparedQueryHolder::FillTables(const google::protobuf::RepeatedPtrField<
}

bool TPreparedQueryHolder::HasTempTables(TKqpTempTablesState::TConstPtr tempTablesState) const {
if (!tempTablesState) {
return false;
}
auto tempTables = THashSet<TString>();
for (const auto& [path, info] : tempTablesState->TempTables) {
tempTables.insert(path.second + *tempTablesState->SessionId);
Expand Down

0 comments on commit 2209648

Please sign in to comment.