C++ Concurrency Sandbox
Loading...
Searching...
No Matches
ThreadPool_Advanced.cpp
Go to the documentation of this file.
1/**
2 * @file ThreadPool_Advanced.cpp
3 * @brief Advanced Thread Pool implementation with dynamic return types and CLI test harness.
4 * * This file demonstrates a sophisticated thread pool that uses std::packaged_task
5 * and std::future to facilitate asynchronous execution of various tasks with
6 * different signatures and return types.
7 */
8
9#include <iostream>
10#include <vector>
11#include <thread>
12#include <functional>
13#include <mutex>
14#include <queue>
15#include <condition_variable>
16#include <atomic>
17#include <future>
18#include <memory>
19#include <utility>
20#include <algorithm> // For std::reverse
21
22/**
23 * @typedef Task
24 * @brief Type alias for a generic void-returning function.
25 * Used internally by the worker threads to process the task queue.
26 */
27typedef std::function<void()> Task;
28
29/**
30 * @brief Global mutex for synchronized console logging.
31 * Prevents race conditions and interleaved output when multiple threads log to stdout.
32 */
33std::mutex log_mtx;
34
35/**
36 * @struct TaskWrapper
37 * @brief A functor that wraps a packaged_task shared pointer.
38 * * This wrapper allows the worker threads to execute a task by calling its
39 * operator(), effectively hiding the specific return type of the underlying
40 * std::packaged_task from the worker loop.
41 */
43 std::shared_ptr<std::packaged_task<void()>> ptr; ///< Pointer to the task.
44
45 /**
46 * @brief Executes the wrapped task.
47 */
48 void operator()() {
49 if (ptr) (*ptr)();
50 }
51};
52
53/**
54 * @class ThreadPool_Advanced
55 * @brief A pool of persistent worker threads for concurrent task execution.
56 * * The ThreadPool manages a fixed set of threads that pull tasks from a thread-safe
57 * queue. It supports tasks with arbitrary return values via the @ref enqueue method.
58 */
60private:
61 std::vector<std::thread> workers; ///< Collection of persistent worker threads.
62 std::queue<Task> tasks; ///< Queue of pending tasks waiting for execution.
63 std::mutex queue_mtx; ///< Mutex protecting the task queue.
64 std::condition_variable cv; ///< Condition variable to signal workers.
65 std::atomic<bool> stop; ///< Shutdown flag.
66
67 /**
68 * @brief The main loop for worker threads.
69 * * Threads block here until a task is available or the pool is signaled to stop.
70 * It ensures all remaining tasks are processed before the thread exits.
71 */
72 void workerLoop() {
73 while (true) {
74 Task task;
75 {
76 std::unique_lock<std::mutex> lock(this->queue_mtx);
77 // Wait for task or stop signal
78 while (!this->stop && this->tasks.empty()) {
79 this->cv.wait(lock);
80 }
81 // Exit if stop is requested and no tasks are left
82 if (this->stop && this->tasks.empty()) return;
83
84 task = std::move(this->tasks.front());
85 this->tasks.pop();
86 }
87 if (task) task();
88 }
89 }
90
91public:
92 /**
93 * @brief Initializes the thread pool with a specified number of threads.
94 * @param num_threads Number of worker threads to create.
95 */
96 ThreadPool_Advanced(size_t num_threads) : stop(false) {
97 for (size_t i = 0; i < num_threads; ++i) {
98 workers.emplace_back(&ThreadPool_Advanced::workerLoop, this);
99 }
100 }
101
102 /**
103 * @brief Adds a task to the pool and returns a future for the result.
104 * * Uses template meta-programming to deduce the return type of the provided function.
105 * * @tparam F Type of the function or functor.
106 * @param f The task to execute.
107 * @return std::future<decltype(f())> A future object to access the task's return value.
108 */
109 template<class F>
110 auto enqueue(F f) -> std::future<decltype(f())> {
111 using ReturnType = decltype(f());
112
113 // Packaged tasks handle the storage of the return value or exceptions.
114 auto task = std::make_shared<std::packaged_task<ReturnType()>>(f);
115
116 std::future<ReturnType> res = task->get_future();
117
118 {
119 std::unique_lock<std::mutex> lock(queue_mtx);
120 // Lambda captures the shared_ptr by value to extend its lifetime.
121 tasks.emplace([task]() { (*task)(); });
122 }
123
124 cv.notify_one();
125 return res;
126 }
127
128 /**
129 * @brief Gracefully shuts down the thread pool.
130 * * Sets the stop flag, notifies all workers, and joins them to ensure
131 * all threads finish execution.
132 */
134 stop = true;
135 cv.notify_all();
136 for (std::thread &worker : workers) {
137 if (worker.joinable()) worker.join();
138 }
139 }
140};
141
142// --- Tasks Documentation ---
143
144/**
145 * @brief Simulates an intensive multiplication operation.
146 * @param a First factor.
147 * @param b Second factor.
148 * @return Product of a and b.
149 */
150int multiply(int a, int b) {
151 std::this_thread::sleep_for(std::chrono::milliseconds(500));
152 return a * b;
153}
154
155/**
156 * @brief Checks if a number is prime.
157 * @param n The integer to check.
158 * @return A pair containing a boolean (is_prime) and the original number.
159 */
160std::pair<bool, int> isPrime(int n) {
161 if (n <= 1) return {false, n};
162 for (int i = 2; i * i <= n; i++) {
163 if (n % i == 0) return {false, n};
164 }
165 return {true, n};
166}
167
168// (Remaining utility functions fetchMetadata, encryptData, etc. documented similarly)
169
170/**
171 * @brief Main entry point to demonstrate the Request-Response pattern.
172 * * Submits various tasks to the pool and retrieves results using the future objects.
173 */
174int main() {
175 int persistent_threads = std::thread::hardware_concurrency();
176 ThreadPool_Advanced pool(persistent_threads);
177 std::cout << "Using " << persistent_threads << " threads for the program..." << std::endl;
178
179 // Submission Phase
180 std::future<int> result1 = pool.enqueue(std::bind(multiply, 10, 5));
181 auto f2 = pool.enqueue(std::bind(isPrime, 11));
182
183 // Response Phase
184 std::cout << "Multiplication Result: " << result1.get() << std::endl;
185
186 return 0;
187}
std::pair< bool, int > isPrime(int n)
Checks if a number is prime.
int multiply(int a, int b)
Simulates an intensive multiplication operation.
std::mutex log_mtx
Global mutex for synchronized console logging. Prevents race conditions and interleaved output when m...
std::function< void()> Task
Type alias for a generic void-returning function. Used internally by the worker threads to process th...
int main()
Main entry point to demonstrate the Request-Response pattern.
A pool of persistent worker threads for concurrent task execution.
auto enqueue(F f) -> std::future< decltype(f())>
Adds a task to the pool and returns a future for the result.
~ThreadPool_Advanced()
Gracefully shuts down the thread pool.
ThreadPool_Advanced(size_t num_threads)
Initializes the thread pool with a specified number of threads.
A functor that wraps a packaged_task shared pointer.
std::shared_ptr< std::packaged_task< void()> > ptr
Pointer to the task.
void operator()()
Executes the wrapped task.