-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dynamic thread assignment #31
Comments
This is currently not possible and might require some refactoring to do right. Is it important that threads are closed? If this is just about producer/consumer matching, then rapidgzip should effectively do less work on the threads after some time if the consumer is too slow. If you have N threads and you are reading chunk I, then the prefetcher will at maximum, for sequential access, only dispatch chunks up to I+N to the thread pool. So, if you are reading only one chunk per second, then only one chunk per second will be dispatched to the thread pool. If the decompression of that chunk takes less than a second, then you effectively will only use one thread of the thread pool. |
Thanks for the fast response! It's not important to close threads; hanging them is fine. |
Not yet. You can add it, e.g., like this: diff --git a/src/core/BlockFetcher.hpp b/src/core/BlockFetcher.hpp
index 448b1cca..0f4c0bed 100644
--- a/src/core/BlockFetcher.hpp
+++ b/src/core/BlockFetcher.hpp
@@ -570,6 +570,12 @@ private:
return resultFuture;
}
+ [[nodiscard]] size_t
+ busyThreadCount() const
+ {
+ return m_threadPool.busyThreadCount();
+ }
+
protected:
[[nodiscard]] virtual BlockData
decodeBlock( size_t blockOffset,
diff --git a/src/core/ThreadPool.hpp b/src/core/ThreadPool.hpp
index 3ac2d537..ff6c6414 100644
--- a/src/core/ThreadPool.hpp
+++ b/src/core/ThreadPool.hpp
@@ -178,6 +178,13 @@ public:
[] ( size_t sum, const auto& tasks ) { return sum + tasks.second.size(); } );
}
+ [[nodiscard]] size_t
+ busyThreadCount() const
+ {
+ const std::lock_guard lock( m_mutex );
+ return m_threadCount - m_idleThreadCount;
+ }
+
private:
/**
* Does not lock! Therefore it is a private method that should only be called with a lock.
diff --git a/src/rapidgzip/ParallelGzipReader.hpp b/src/rapidgzip/ParallelGzipReader.hpp
index ac286d5d..ae13d5d8 100644
--- a/src/rapidgzip/ParallelGzipReader.hpp
+++ b/src/rapidgzip/ParallelGzipReader.hpp
@@ -933,6 +933,12 @@ public:
m_deflateStreamCRC32s.insert_or_assign( endOfStreamOffsetInBytes, crc32 );
}
+ [[nodiscard]] size_t
+ busyThreadCount() const
+ {
+ return m_chunkFetcher ? m_chunkFetcher->busyThreadCount() : 0;
+ }
+
private:
BlockFinder&
blockFinder()
Yes. |
The question:
Is there a way to change the number of threads used after initialization?
The story behind the question:
Let's assume I have a program that reads from a large input gz file.
The decompressed content could be processed in parallel.
Using the standard zlib, parallelization is limited by the decompression bandwidth.
Using rapidgzip, I could improve decompression bandwidth, but the problem is how I assign threads.
I would like to adjust the number of threads the decompressor uses dynamically.
In this scenario, I would assign all threads for decompression at the beginning, and then I would start using some of them to process the decompressed data. Depending on the amount of the data already decompressed, I would dynamically re-assign threads to the producer (decompressor) and consumer.
The text was updated successfully, but these errors were encountered: