Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed SegmentedQueues not being cleaned up on session purge #833

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

hylkevds
Copy link
Collaborator

Segmented queues were closed, but not cleared on session close. This resulted in segments not being returned to the pool, and thus disk-space not being released. The queues were also not removed from the QueuePool, resulting in a memory leak.

Relates #734

@hylkevds hylkevds requested a review from andsel April 30, 2024 06:49
Copy link
Collaborator

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hylkevds thanks for this PR. I would also add unit test on the unsafequeues.Queue or unsafequeues.QueuePool to verify that a queue purge or close give back the segments to be reused.

subscriptionsDirectory.removeSharedSubscriptionsForClient(session.getClientID());
}

void remove(String clientID) {
final Session old = pool.remove(clientID);
if (old != null) {
unsubscribe(old);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that pool's session is the same instance of current session?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only situation where it could happen that there are two sessions with the same clientID is when a client re-connects with cleanSession=true. In that case the old session is purged before the new session is created.

I can change the code to only remove from the pool if the session is the same as the one in the pool.

That just lead me to a different potential issue, with the sessionExpirationService timer. I think that timer could trigger right as a client is reconnecting, and as it is based on the clientID, it could kill a session that was just made. Shouldn't removeExpiredSession check if the session really is expired, and not reconnected? And probably do the cleanup on the session thread?

Copy link
Collaborator

@andsel andsel Jun 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that timer could trigger right as a client is reconnecting, and as it is based on the clientID, it could kill a session that was just made. Shouldn't removeExpiredSession check if the session really is expired, and not reconnected? And probably do the cleanup on the session thread?

Why do you think that that case could happen? Looking at the code, when a session disconnect it's also untracked from the session expiration service. When it reconnects it's tracked again, reinserting the reference into the pool managed by the session expiration service.
In this way all session tracked by the expiration service are session present in the session registry, and so can't be the case of a duplication.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The session expiration service only contains disconnected sessions that have cleanSession=false. Sessions that are connected don't need to be tracked since they can't expire, and sessions with cleanSession=true are cleaned up on disconnect.

Imagine a disconnected session with cleanSession=false. The expiration service finds it in the queue and removes it for cleaning, but there is a time interval between it being removed from the queue and it being actually cleaned. Either because there is a large batch of sessions to expire, or because the thread is put in hold. In that time the user reconnects, and a new session is made that inherits the subscriptions and clientID of the old session. Now the expiration thread is reactivated and kills the thread with the stored clientID...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds you are right! Very good explanation of the case. I think this aspect should be kept out of this PR, and an issue tracking the problem should opened so be solved. Thanks for pointing it out 👍

@hylkevds
Copy link
Collaborator Author

Hi @hylkevds thanks for this PR. I would also add unit test on the unsafequeues.Queue or unsafequeues.QueuePool to verify that a queue purge or close give back the segments to be reused.

I'll look into making some tests for this.

@hylkevds hylkevds force-pushed the fix_segmentedQueueCleanup branch from c22869c to c20f3c5 Compare May 25, 2024 19:19
Segmented queues were closed, but not cleared on session close. This
resulted in segments not being returned to the pool, and thus disk-space
not being released. The queues were also not removed from the QueuePool,
resulting in a memory leak.
@hylkevds hylkevds force-pushed the fix_segmentedQueueCleanup branch from c20f3c5 to 9acaf3a Compare August 3, 2024 17:22
@hylkevds
Copy link
Collaborator Author

hylkevds commented Aug 3, 2024

@andsel: I added a test to verify the queue cleanup and page-file re-use.

Copy link
Collaborator

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hylkevds the changes are almost good, just a couple of nitpicks on the log messages, then the PR is done.

final QueueName queueName = new QueueName(name);
final LinkedList<SegmentRef> segmentRefs = queueSegments.remove(queueName);
SegmentRef segmentRef = segmentRefs.pollLast();
segmentsAllocationLock.lock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a debug line here, like

LOG.debug("Purging segments for queue {}", queueName);

segmentsAllocationLock.lock();
try {
while (segmentRef != null) {
LOG.debug("Consumed tail segment {} from queue {}", segmentRef, queueName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would rewrite this log as

Suggested change
LOG.debug("Consumed tail segment {} from queue {}", segmentRef, queueName);
LOG.debug("Purging segment {} from queue {}", segmentRef, queueName);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, I added both logging lines.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants