Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add thread pool for vsag #253

Closed
inabao opened this issue Dec 24, 2024 · 0 comments
Closed

add thread pool for vsag #253

inabao opened this issue Dec 24, 2024 · 0 comments
Assignees
Labels
kind/feature New feature or request version/0.13

Comments

@inabao
Copy link
Collaborator

inabao commented Dec 24, 2024

Background
We hope to add a more flexible resource management method for VSAG, in which thread resources play a very important role. We would like to provide users with a way to customize and inject thread pools, enabling them to easily manage their thread resources.

Interface
We will provide a simple interface implementation:

class ThreadPool {
public:
    /**
      * Blocks until all tasks in the thread pool have completed.
      *
      * This function will wait until all tasks that have been
      * enqueued to the thread pool are finished executing.
      */
    virtual void
    WaitUntilEmpty() = 0;

    /**
      * Sets the limit on the size of the task queue.
      *
      * @param limit The maximum size of the task queue.
      *              Tasks exceeding this size may be rejected or blocked,
      *              depending on the specific implementation.
      */
    virtual void
    SetQueueSizeLimit(std::size_t limit) = 0;

    /**
      * Sets the limit on the size of the thread pool.
      *
      * @param limit The maximum number of worker threads in the pool.
      *              No additional threads will be created beyond this limit.
      */
    virtual void
    SetPoolSize(std::size_t limit) = 0;

    /**
      * Destructor.
      *
      * Cleans up resources used by the thread pool.
      */
    virtual ~ThreadPool() = default;

    /**
      * Enqueues a new task to be executed by the thread pool.
      *
      * @param task A callable object that takes no parameters and
      *             represents the task to be executed.
      * @return std::future<void> A future object representing the
      *                           asynchronous execution of the task,
      *                           which can be used to obtain the task's
      *                           result and status.
      */
    virtual std::future<void>
    Enqueue(std::function<void(void)> task) = 0;
};

Usage Example
Below is a simple implementation example:

class MyThreadPool : public ThreadPool {
public:
    MyThreadPool(size_t threads) {
        for (size_t i = 0; i < threads; ++i) {
            workers.emplace_back([this] {
                for (;;) {
                    std::function<bool(void)> task;

                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock, [this] {
                            return this->stop || !this->tasks.empty();
                        });
                        if (this->stop && this->tasks.empty())
                            return;

                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            });
        }
    }

    ~MyThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread &worker : workers) {
            worker.join();
        }
    }

    std::future<void> Enqueue(std::function<void(void)> func) override {
        auto task = std::make_shared<std::packaged_task<void()>>(func);
        std::future<bool> res = task->get_future();

        {
            std::unique_lock<std::mutex> lock(queue_mutex);

            if (stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");

            tasks.emplace([task]() { (*task)(); });
        }
        condition.notify_one();
        return res;
    }

    void WaitUntilEmpty() override {
        std::unique_lock<std::mutex> lock(queue_mutex);
        condition.wait(lock, [this] { return tasks.empty(); });
    }

    void SetQueueSizeLimit(std::size_t limit) override {
        // This example does not implement queue throttling
    }

    void SetPoolSize(std::size_t limit) override {
        // This example does not implement dynamic adjustment of thread pool size
    }

private:
    std::vector<std::thread> workers;
    std::queue<std::function<bool(void)>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop = false;
};

Implementation
The specific implementation will be provided through the following PR:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature New feature or request version/0.13
Projects
None yet
Development

No branches or pull requests

5 participants