C++ Concurrency Sandbox
Loading...
Searching...
No Matches
ThreadPool_Lambda.cpp
Go to the documentation of this file.
1/**
2 * @file ThreadPool_Lambda.cpp
3 * @brief A high-performance, template-based Thread Pool with Future support.
4 * * This file implements a pool of worker threads that can execute arbitrary
5 * functions and return results asynchronously using C++11/14 features.
6 */
7
8#include <iostream>
9#include <vector>
10#include <thread>
11#include <functional>
12#include <mutex>
13#include <queue>
14#include <condition_variable>
15#include <atomic>
16#include <future>
17
18/**
19 * @typedef Task
20 * @brief Type alias for a basic void function with no arguments.
21 */
22typedef std::function<void()> Task;
23
24/**
25 * @class ThreadPool
26 * @brief Manages a collection of threads that execute tasks from a shared queue.
27 * * The pool provides a mechanism to offload work to background threads and
28 * retrieve return values via std::future, preventing the main thread from blocking.
29 */
31private:
32 std::vector<std::thread> workers; ///< List of worker threads.
33 std::queue<Task> tasks; ///< Internal FIFO queue of pending tasks.
34 std::mutex queue_mtx; ///< Mutex to synchronize access to the task queue.
35 std::condition_variable cv; ///< Condition variable to notify workers of new tasks.
36 std::atomic<bool> stop; ///< Atomic flag to signal pool shutdown.
37
38 /**
39 * @brief The core loop executed by every worker thread.
40 * * Workers wait on the condition variable until a task is available or the
41 * pool is stopped. They process tasks until the queue is empty and @ref stop is true.
42 */
43 void workerLoop() {
44 while (true) {
45 Task task;
46 {
47 std::unique_lock<std::mutex> lock(this->queue_mtx);
48 // Wait until there is a task OR we are stopping
49 while (!this->stop && this->tasks.empty()) {
50 this->cv.wait(lock);
51 }
52
53 // Exit condition: stop flag is set and no tasks remain
54 if (this->stop && this->tasks.empty()) return;
55
56 task = std::move(this->tasks.front());
57 this->tasks.pop();
58 }
59 if (task) task(); // Execute the task
60 }
61 }
62
63public:
64 /**
65 * @brief Construct a new Thread Pool object.
66 * @param num_threads The number of worker threads to spawn.
67 */
68 ThreadPool_Lambda(size_t num_threads) : stop(false) {
69 for (size_t i = 0; i < num_threads; ++i) {
70 workers.emplace_back(&ThreadPool_Lambda::workerLoop, this);
71 }
72 }
73
74 /**
75 * @brief Enqueues a function for asynchronous execution.
76 * * This template method wraps a function into a packaged_task, pushes it
77 * to the queue, and returns a future to track the result.
78 * * @tparam F The type of the function to be executed.
79 * @param f The function (or lambda/bind) to execute.
80 * @return std::future<decltype(f())> A future object to retrieve the function's result.
81 */
82 template<class F>
83 auto enqueue(F f) -> std::future<decltype(f())> {
84 // 1. Determine the return type of the function dynamically
85 using ReturnType = decltype(f());
86
87 // 2. Wrap the function in a packaged_task shared pointer
88 // Packaged tasks can only be moved, so we use shared_ptr to allow copying into the wrapper lambda.
89 auto task = std::make_shared<std::packaged_task<ReturnType()>>(f);
90
91 // 3. Extract the future "ticket" before sending the task to the queue
92 std::future<ReturnType> res = task->get_future();
93
94 {
95 std::unique_lock<std::mutex> lock(queue_mtx);
96
97 // 4. Wrap the task in a void() lambda so it fits our Task queue
98 tasks.emplace([task]() { (*task)(); });
99 }
100
101 cv.notify_one(); // Wake up one idle worker
102 return res; // Return the future to the caller
103 }
104
105 /**
106 * @brief Destroy the Thread Pool object.
107 * * Signals all workers to stop and joins them. Tasks already in the
108 * queue will be completed before destruction finishes.
109 */
111 stop = true;
112 cv.notify_all(); // Wake up all threads to see the stop flag
113 for (std::thread &worker : workers) {
114 if (worker.joinable()) worker.join();
115 }
116 }
117};
118
119/**
120 * @brief Simulates a file upload operation.
121 * @param name Name of the file to upload.
122 * @return true on successful "upload".
123 */
124bool uploadFile(std::string name) {
125 std::cout << "Uploading " << name << "..." << std::endl;
126 std::this_thread::sleep_for(std::chrono::seconds(1));
127 return true;
128}
129
130/**
131 * @brief Main function demonstrating ThreadPool_Lambda usage with futures.
132 */
133int main() {
134 ThreadPool_Lambda pool(4);
135
136 // Enqueue tasks and store the futures
137 std::future<bool> f1 = pool.enqueue(std::bind(uploadFile, "Agreement.pdf"));
138 std::future<bool> f2 = pool.enqueue(std::bind(uploadFile, "Receipt.pdf"));
139
140 std::cout << "Doing other work..." << std::endl;
141
142 // Retrieve results; these calls block until the corresponding tasks finish
143 if (f1.get() && f2.get()) {
144 std::cout << "All files uploaded successfully!" << std::endl;
145 }
146
147 return 0;
148}
std::function< void()> Task
bool uploadFile(std::string name)
Simulates a file upload operation.
int main()
Main function demonstrating ThreadPool_Lambda usage with futures.
~ThreadPool_Lambda()
Destroy the Thread Pool object.
auto enqueue(F f) -> std::future< decltype(f())>
Enqueues a function for asynchronous execution.
ThreadPool_Lambda(size_t num_threads)
Construct a new Thread Pool object.