From bde8e8a9276c52d16034547ebe00e31a0598e73a Mon Sep 17 00:00:00 2001 From: gotjosh Date: Thu, 10 Oct 2024 16:05:04 +0100 Subject: [PATCH] bugfix: Stop the current fetcher before replacing it Otherwise, this causes a memory leak as we leave lingering around the resources we used to catch up as quickly as possible as the ingester was starting. part of https://github.com/grafana/mimir-squad/issues/2475 Signed-off-by: gotjosh --- pkg/storage/ingest/reader.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index b7ea38429b..244e8bba81 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -259,6 +259,9 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { } if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency == 0 { + // Stop the current fetcher before replacing it. + r.fetcher.Stop() + if r.fetcher == r { // This method has been called before, no need to switch the fetcher. return