Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic thread assignment #31

Open
marekkokot opened this issue Jan 5, 2024 · 3 comments
Open

Dynamic thread assignment #31

marekkokot opened this issue Jan 5, 2024 · 3 comments
Labels
enhancement New feature or request

Comments

@marekkokot
Copy link

The question:
Is there a way to change the number of threads used after initialization?

The story behind the question:
Let's assume I have a program that reads from a large input gz file.
The decompressed content could be processed in parallel.
Using the standard zlib, parallelization is limited by the decompression bandwidth.
Using rapidgzip, I could improve decompression bandwidth, but the problem is how I assign threads.
I would like to adjust the number of threads the decompressor uses dynamically.
In this scenario, I would assign all threads for decompression at the beginning, and then I would start using some of them to process the decompressed data. Depending on the amount of the data already decompressed, I would dynamically re-assign threads to the producer (decompressor) and consumer.

@mxmlnkn
Copy link
Owner

mxmlnkn commented Jan 5, 2024

This is currently not possible and might require some refactoring to do right. Is it important that threads are closed? If this is just about producer/consumer matching, then rapidgzip should effectively do less work on the threads after some time if the consumer is too slow. If you have N threads and you are reading chunk I, then the prefetcher will at maximum, for sequential access, only dispatch chunks up to I+N to the thread pool. So, if you are reading only one chunk per second, then only one chunk per second will be dispatched to the thread pool. If the decompression of that chunk takes less than a second, then you effectively will only use one thread of the thread pool.

@marekkokot
Copy link
Author

Thanks for the fast response! It's not important to close threads; hanging them is fine.
Is there a way to check the number of active rapidgzip threads? I will probably need to experiment with this a little. My general goal is not to exceed the given number of used threads but to use as many of them as possible. So, for example, if my consumer is too slow, I will use more consumers. As I understand, when I add a new customer, the rapidgzip will also start to use more threads. Thank you again for your response.

@mxmlnkn
Copy link
Owner

mxmlnkn commented Jan 5, 2024

Is there a way to check the number of active rapidgzip threads?

Not yet. You can add it, e.g., like this:

diff --git a/src/core/BlockFetcher.hpp b/src/core/BlockFetcher.hpp
index 448b1cca..0f4c0bed 100644
--- a/src/core/BlockFetcher.hpp
+++ b/src/core/BlockFetcher.hpp
@@ -570,6 +570,12 @@ private:
         return resultFuture;
     }
 
+    [[nodiscard]] size_t
+    busyThreadCount() const
+    {
+        return m_threadPool.busyThreadCount();
+    }
+
 protected:
     [[nodiscard]] virtual BlockData
     decodeBlock( size_t blockOffset,
diff --git a/src/core/ThreadPool.hpp b/src/core/ThreadPool.hpp
index 3ac2d537..ff6c6414 100644
--- a/src/core/ThreadPool.hpp
+++ b/src/core/ThreadPool.hpp
@@ -178,6 +178,13 @@ public:
                                 [] ( size_t sum, const auto& tasks ) { return sum + tasks.second.size(); } );
     }
 
+    [[nodiscard]] size_t
+    busyThreadCount() const
+    {
+        const std::lock_guard lock( m_mutex );
+        return m_threadCount - m_idleThreadCount;
+    }
+
 private:
     /**
      * Does not lock! Therefore it is a private method that should only be called with a lock.
diff --git a/src/rapidgzip/ParallelGzipReader.hpp b/src/rapidgzip/ParallelGzipReader.hpp
index ac286d5d..ae13d5d8 100644
--- a/src/rapidgzip/ParallelGzipReader.hpp
+++ b/src/rapidgzip/ParallelGzipReader.hpp
@@ -933,6 +933,12 @@ public:
         m_deflateStreamCRC32s.insert_or_assign( endOfStreamOffsetInBytes, crc32 );
     }
 
+    [[nodiscard]] size_t
+    busyThreadCount() const
+    {
+        return m_chunkFetcher ? m_chunkFetcher->busyThreadCount() : 0;
+    }
+
 private:
     BlockFinder&
     blockFinder()

As I understand, when I add a new customer, the rapidgzip will also start to use more threads.

Yes.

@mxmlnkn mxmlnkn added the enhancement New feature or request label Jan 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants