C++ Concurrency Sandbox
Loading...
Searching...
No Matches
MessagePassing.cpp
Go to the documentation of this file.
1/**
2 * @file MessagePassingThreadPool.cpp
3 * @brief Thread Pool implementation using a centralized Result Queue (Inbox pattern).
4 * * This file demonstrates a decoupled architecture where worker threads push results
5 * to a thread-safe "Inbox," and the main thread consumes them asynchronously.
6 */
7
8#include <iostream>
9#include <vector>
10#include <thread>
11#include <functional>
12#include <mutex>
13#include <queue>
14#include <condition_variable>
15#include <atomic>
16#include <utility>
17
18/**
19 * @class ResultQueue
20 * @brief A thread-safe template queue used for inter-thread communication.
21 * * This acts as the "Inbox." Producers (workers) push results into it,
22 * and the Consumer (main thread) pops them out.
23 * @tparam T The type of data being passed between threads.
24 */
25template <typename T>
27private:
28 std::queue<T> queue; ///< Internal storage for the results.
29 std::mutex mtx; ///< Mutex to protect the internal queue.
30 std::condition_variable cv; ///< Condition variable to signal the main thread.
31
32public:
33 /**
34 * @brief Pushes a result into the queue and notifies the waiting thread.
35 * @param result The data object to be sent to the inbox.
36 */
37 void push(T result) {
38 {
39 std::lock_guard<std::mutex> lock(mtx);
40 queue.push(result);
41 }
42 cv.notify_one();
43 }
44
45 /**
46 * @brief Blocks until a result is available and then removes it from the queue.
47 * @return T The retrieved result object.
48 */
49 T pop() {
50 std::unique_lock<std::mutex> lock(mtx);
51 // Wait until the inbox is not empty
52 cv.wait(lock, [this] { return !this->queue.empty(); });
53 T val = std::move(this->queue.front());
54 queue.pop();
55 return val;
56 }
57};
58
59/**
60 * @typedef Task
61 * @brief Alias for a generic callable object.
62 */
63typedef std::function<void()> Task;
64
65/**
66 * @class ThreadPool
67 * @brief A pool of persistent workers that execute generic Task objects.
68 * * This pool is oblivious to the return types of tasks; it simply executes them.
69 * Communication of results is handled externally by the tasks themselves.
70 */
71class ThreadPool {
72private:
73 std::vector<std::thread> workers; ///< Collection of persistent threads.
74 std::queue<Task> tasks; ///< Queue of incoming work units.
75 std::mutex queue_mtx; ///< Mutex for task queue safety.
76 std::condition_variable cv; ///< Signals workers when work is available.
77 std::atomic<bool> stop; ///< Shutdown flag.
78
79 /**
80 * @brief Persistent loop for worker threads to fetch and execute tasks.
81 */
82 void workerLoop() {
83 while (true) {
84 Task task;
85 {
86 std::unique_lock<std::mutex> lock(this->queue_mtx);
87 while (!this->stop && this->tasks.empty()) {
88 this->cv.wait(lock);
89 }
90 if (this->stop && this->tasks.empty()) return;
91
92 task = std::move(this->tasks.front());
93 this->tasks.pop();
94 }
95 if (task) task();
96 }
97 }
98
99public:
100 /**
101 * @brief Constructs the pool and starts the worker threads.
102 * @param num_threads Number of threads to spawn.
103 */
104 ThreadPool(size_t num_threads) : stop(false) {
105 for (size_t i = 0; i < num_threads; ++i) {
106 workers.emplace_back(&ThreadPool::workerLoop, this);
107 }
108 }
109
110 /**
111 * @brief Enqueues a Task (functor/lambda) into the work queue.
112 * @param t The task to be executed by a worker.
113 */
114 void enqueue(Task t) {
115 {
116 std::unique_lock<std::mutex> lock(queue_mtx);
117 tasks.push(std::move(t));
118 }
119 cv.notify_one();
120 }
121
122 /**
123 * @brief Joins all threads for a clean shutdown.
124 */
126 stop = true;
127 cv.notify_all();
128 for (std::thread &worker : workers) {
129 if (worker.joinable()) worker.join();
130 }
131 }
132};
133
134/**
135 * @struct IsPrimeCalculator
136 * @brief A Functor (Function Object) that calculates primality.
137 * * Instead of returning a value, it holds a pointer to a @ref ResultQueue
138 * and "pushes" its result there once finished.
139 */
141 int n; ///< The number to check for primality.
142 ResultQueue<std::pair<bool, int>>* outputQueue; ///< Pointer to the shared inbox.
143
144 /**
145 * @brief Overloaded operator() to perform the calculation.
146 * * This allows the struct instance to be treated as a void() task by the @ref ThreadPool.
147 */
148 void operator()() const {
149 bool prime = true;
150 if (n <= 1) prime = false;
151 else {
152 for (int i = 2; i * i <= n; i++) {
153 if (n % i == 0) {
154 prime = false;
155 break;
156 }
157 }
158 }
159 // Send the result to the orchestrator (Main Thread)
160 outputQueue->push({prime, n});
161 }
162};
163
164/**
165 * @brief Orchestrates the mass calculation and collection of results.
166 * * Uses the hardware concurrency count to optimize the thread pool size.
167 */
168int main() {
169 int core_count = std::thread::hardware_concurrency();
170 ThreadPool pool(core_count);
172
173 std::cout << "System started with " << core_count << " workers.\n";
174
175 // Submission: Asynchronous and non-blocking
176 for (int i = 1; i <= 500; ++i) {
177 IsPrimeCalculator taskObj;
178 taskObj.n = i;
179 taskObj.outputQueue = &results;
180 pool.enqueue(taskObj);
181 }
182
183 // Collection: The "Inbox" processing loop
184 for (int i = 1; i <= 500; ++i) {
185 std::pair<bool, int> res = results.pop();
186 if (res.first) {
187 std::cout << "[Main] Received Result: " << res.second << " is PRIME\n";
188 }
189 }
190
191 std::cout << "All tasks processed asynchronously!\n";
192 return 0;
193}
std::function< void()> Task
int main()
Orchestrates the mass calculation and collection of results.
std::function< void()> Task
Type alias for a generic void-returning function. Used internally by the worker threads to process th...
A thread-safe template queue used for inter-thread communication.
T pop()
Blocks until a result is available and then removes it from the queue.
void push(T result)
Pushes a result into the queue and notifies the waiting thread.
Manages a collection of threads that execute tasks from a shared queue.
~ThreadPool()
Joins all threads for a clean shutdown.
ThreadPool(size_t num_threads)
Constructs the pool and starts the worker threads.
void enqueue(Task t)
Enqueues a Task (functor/lambda) into the work queue.
A Functor (Function Object) that calculates primality.
int n
The number to check for primality.
void operator()() const
Overloaded operator() to perform the calculation.
ResultQueue< std::pair< bool, int > > * outputQueue
Pointer to the shared inbox.