diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index b7ea38429b..244e8bba81 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -259,6 +259,9 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { } if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency == 0 { + // Stop the current fetcher before replacing it. + r.fetcher.Stop() + if r.fetcher == r { // This method has been called before, no need to switch the fetcher. return