Skip to content

Commit

Permalink
YQ-3684 fixed error duplicate session id (ydb-platform#9583)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Sep 20, 2024
1 parent d7de8bd commit 8ec93b0
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 11 deletions.
2 changes: 2 additions & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->UserToken
), IEventHandle::FlagTrackDelivery);

QueryState->PoolHandlerActor = MakeKqpWorkloadServiceId(SelfId().NodeId());
Become(&TKqpSessionActor::ExecuteState);
}

Expand Down Expand Up @@ -2421,6 +2422,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
hFunc(TEvents::TEvUndelivered, HandleNoop);
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleNoop);
hFunc(TEvKqpExecuter::TEvStreamData, HandleNoop);
hFunc(NWorkload::TEvContinueRequest, HandleNoop);

// always come from WorkerActor
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
}

SendPoolInfoUpdate(std::nullopt, std::nullopt, Subscribers);
this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvStopPoolHandlerResponse(Database, PoolId));

Counters.OnCleanup(ResetCountersOnStrop);

Expand All @@ -184,16 +185,16 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
}

void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) {
this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(Database, PoolId));

auto event = std::move(ev->Get()->Event);
const TString& sessionId = event->Get()->SessionId;
this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(Database, PoolId, sessionId));

const TActorId& workerActorId = event->Sender;
if (!InFlightLimit) {
this->Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::PRECONDITION_FAILED, PoolId, PoolConfig, {NYql::TIssue(TStringBuilder() << "Resource pool " << PoolId << " was disabled due to zero concurrent query limit")}));
return;
}

const TString& sessionId = event->Get()->SessionId;
if (LocalSessions.contains(sessionId)) {
this->Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::INTERNAL_ERROR, PoolId, PoolConfig, {NYql::TIssue(TStringBuilder() << "Got duplicate session id " << sessionId << " for pool " << PoolId)}));
return;
Expand Down
15 changes: 14 additions & 1 deletion ydb/core/kqp/workload_service/common/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct TEvPrivate {
EvFinishRequestInPool,
EvResignPoolHandler,
EvStopPoolHandler,
EvStopPoolHandlerResponse,
EvCancelRequest,
EvUpdatePoolSubscription,

Expand Down Expand Up @@ -128,13 +129,15 @@ struct TEvPrivate {
};

struct TEvPlaceRequestIntoPoolResponse : public NActors::TEventLocal<TEvPlaceRequestIntoPoolResponse, EvPlaceRequestIntoPoolResponse> {
TEvPlaceRequestIntoPoolResponse(const TString& database, const TString& poolId)
TEvPlaceRequestIntoPoolResponse(const TString& database, const TString& poolId, const TString& sessionId)
: Database(database)
, PoolId(poolId)
, SessionId(sessionId)
{}

const TString Database;
const TString PoolId;
const TString SessionId;
};

struct TEvFinishRequestInPool : public NActors::TEventLocal<TEvFinishRequestInPool, EvFinishRequestInPool> {
Expand Down Expand Up @@ -173,6 +176,16 @@ struct TEvPrivate {
const bool ResetCounters;
};

struct TEvStopPoolHandlerResponse : public NActors::TEventLocal<TEvStopPoolHandlerResponse, EvStopPoolHandlerResponse> {
TEvStopPoolHandlerResponse(const TString& database, const TString& poolId)
: Database(database)
, PoolId(poolId)
{}

const TString Database;
const TString PoolId;
};

struct TEvCancelRequest : public NActors::TEventLocal<TEvCancelRequest, EvCancelRequest> {
explicit TEvCancelRequest(const TString& sessionId)
: SessionId(sessionId)
Expand Down
47 changes: 40 additions & 7 deletions ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,21 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
void Handle(TEvCleanupRequest::TPtr& ev) {
const TString& database = ev->Get()->Database;
const TString& poolId = ev->Get()->PoolId;
const TString& sessionId = ev->Get()->SessionId;
if (GetOrCreateDatabaseState(database)->PendingSessionIds.contains(sessionId)) {
LOG_D("Finished request with worker actor " << ev->Sender << ", wait for place request, Database: " << database << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId);
GetOrCreateDatabaseState(database)->PendingCancelRequests[sessionId].emplace_back(std::move(ev));
return;
}

auto poolState = GetPoolState(database, poolId);
if (!poolState) {
ReplyCleanupError(ev->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << poolId << " not found");
return;
}

LOG_D("Finished request with worker actor " << ev->Sender << ", Database: " << database << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId);
Send(ev->Forward(poolState->PoolHandler));
poolState->DoCleanupRequest(std::move(ev));
}

void Handle(TEvents::TEvWakeup::TPtr& ev) {
Expand Down Expand Up @@ -220,6 +227,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
hFunc(TEvPrivate::TEvTablesCreationFinished, Handle);
hFunc(TEvPrivate::TEvCpuLoadResponse, Handle);
hFunc(TEvPrivate::TEvResignPoolHandler, Handle);
hFunc(TEvPrivate::TEvStopPoolHandlerResponse, Handle);
)

private:
Expand All @@ -245,12 +253,16 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) {
const auto& event = ev->Get()->Event;
const TString& database = event->Get()->Database;
auto databaseState = GetOrCreateDatabaseState(database);
if (ev->Get()->DefaultPoolCreated) {
GetOrCreateDatabaseState(database)->HasDefaultPool = true;
databaseState->HasDefaultPool = true;
}

const TString& poolId = event->Get()->PoolId;
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
databaseState->RemovePendingSession(event->Get()->SessionId, [this](TEvCleanupRequest::TPtr event) {
ReplyCleanupError(event->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << event->Get()->PoolId << " not found");
});
ReplyContinueError(event->Sender, ev->Get()->Status, ev->Get()->Issues);
return;
}
Expand All @@ -265,9 +277,19 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
void Handle(TEvPrivate::TEvPlaceRequestIntoPoolResponse::TPtr& ev) {
const TString& database = ev->Get()->Database;
const TString& poolId = ev->Get()->PoolId;
LOG_T("Request placed into pool, Database: " << database << ", PoolId: " << poolId);
const TString& sessionId = ev->Get()->SessionId;
LOG_T("Request placed into pool, Database: " << database << ", PoolId: " << poolId << ", SessionId: " << sessionId);

if (auto poolState = GetPoolState(database, poolId)) {
auto poolState = GetPoolState(database, poolId);
GetOrCreateDatabaseState(database)->RemovePendingSession(sessionId, [this, poolState](TEvCleanupRequest::TPtr event) {
if (poolState) {
poolState->DoCleanupRequest(std::move(event));
} else {
ReplyCleanupError(event->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << event->Get()->PoolId << " not found");
}
});

if (poolState) {
poolState->PlaceRequestRunning = false;
poolState->UpdateHandler();
poolState->StartPlaceRequest();
Expand Down Expand Up @@ -388,6 +410,17 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
}
}

void Handle(TEvPrivate::TEvStopPoolHandlerResponse::TPtr& ev) {
const TString& database = ev->Get()->Database;
const TString& poolId = ev->Get()->PoolId;
LOG_T("Got stop pool handler response, Database: " << database << ", PoolId: " << poolId);

Counters.ActivePools->Dec();
if (auto poolState = GetPoolState(database, poolId)) {
poolState->PreviousPoolHandlers.erase(ev->Sender);
}
}

private:
void InitializeWorkloadService() {
if (ServiceInitialized) {
Expand Down Expand Up @@ -441,15 +474,14 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
std::vector<TString> poolsToDelete;
poolsToDelete.reserve(PoolIdToState.size());
for (const auto& [poolKey, poolState] : PoolIdToState) {
if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION) {
if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION && poolState.PendingRequests.empty()) {
CpuQuotaManager->CleanupHandler(poolState.PoolHandler);
Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler(true));
poolsToDelete.emplace_back(poolKey);
}
}
for (const auto& poolKey : poolsToDelete) {
PoolIdToState.erase(poolKey);
Counters.ActivePools->Dec();
}

if (!PoolIdToState.empty()) {
Expand Down Expand Up @@ -512,7 +544,8 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)}));
}

TDatabaseState* GetOrCreateDatabaseState(const TString& database) {
TDatabaseState* GetOrCreateDatabaseState(TString database) {
database = CanonizePath(database);
auto databaseIt = DatabaseToState.find(database);
if (databaseIt != DatabaseToState.end()) {
return &databaseIt->second;
Expand Down
26 changes: 26 additions & 0 deletions ydb/core/kqp/workload_service/kqp_workload_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ struct TDatabaseState {
bool& EnabledResourcePoolsOnServerless;

std::vector<TEvPlaceRequestIntoPool::TPtr> PendingRequersts = {};
std::unordered_set<TString> PendingSessionIds = {};
std::unordered_map<TString, std::vector<TEvCleanupRequest::TPtr>> PendingCancelRequests = {};
std::unordered_map<TString, std::unordered_set<TActorId>> PendingSubscriptions = {};
bool HasDefaultPool = false;
bool Serverless = false;
Expand All @@ -38,6 +40,7 @@ struct TDatabaseState {

void DoPlaceRequest(TEvPlaceRequestIntoPool::TPtr ev) {
TString database = ev->Get()->Database;
PendingSessionIds.emplace(ev->Get()->SessionId);
PendingRequersts.emplace_back(std::move(ev));

if (!EnabledResourcePoolsOnServerless && (TInstant::Now() - LastUpdateTime) > IDLE_DURATION) {
Expand Down Expand Up @@ -83,6 +86,14 @@ struct TDatabaseState {
StartPendingRequests();
}

void RemovePendingSession(const TString& sessionId, std::function<void(TEvCleanupRequest::TPtr)> callback) {
for (auto& event : PendingCancelRequests[sessionId]) {
callback(std::move(event));
}
PendingCancelRequests.erase(sessionId);
PendingSessionIds.erase(sessionId);
}

private:
void StartPendingRequests() {
if (!EnabledResourcePoolsOnServerless && Serverless) {
Expand All @@ -98,6 +109,9 @@ struct TDatabaseState {

void ReplyContinueError(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
for (const auto& ev : PendingRequersts) {
RemovePendingSession(ev->Get()->SessionId, [this](TEvCleanupRequest::TPtr event) {
ActorContext.Send(event->Sender, new TEvCleanupResponse(Ydb::StatusIds::NOT_FOUND, {NYql::TIssue(TStringBuilder() << "Pool " << event->Get()->PoolId << " not found")}));
});
ActorContext.Send(ev->Sender, new TEvContinueRequest(status, {}, {}, issues));
}
PendingRequersts.clear();
Expand All @@ -112,6 +126,7 @@ struct TPoolState {
bool WaitingInitialization = false;
bool PlaceRequestRunning = false;
std::optional<TActorId> NewPoolHandler = std::nullopt;
std::unordered_set<TActorId> PreviousPoolHandlers = {};

ui64 InFlightRequests = 0;
TInstant LastUpdateTime = TInstant::Now();
Expand All @@ -122,6 +137,7 @@ struct TPoolState {
}

ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler(false));
PreviousPoolHandlers.insert(PoolHandler);
PoolHandler = *NewPoolHandler;
NewPoolHandler = std::nullopt;
InFlightRequests = 0;
Expand All @@ -143,6 +159,16 @@ struct TPoolState {
InFlightRequests--;
LastUpdateTime = TInstant::Now();
}

void DoCleanupRequest(TEvCleanupRequest::TPtr event) {
for (const auto& poolHandler : PreviousPoolHandlers) {
ActorContext.Send(poolHandler, new TEvCleanupRequest(
event->Get()->Database, event->Get()->SessionId, event->Get()->PoolId,
event->Get()->Duration, event->Get()->CpuConsumed
));
}
ActorContext.Send(event->Forward(PoolHandler));
}
};

struct TCpuQuotaManagerState {
Expand Down

0 comments on commit 8ec93b0

Please sign in to comment.