C++ Concurrency Sandbox
Loading...
Searching...
No Matches
BasicThreadPool.cpp
Go to the documentation of this file.
1/**
2 * @file BasicThreadPool.cpp
3 * @brief A basic implementation of a persistent Worker Thread Pool.
4 * * This file demonstrates the core architecture of a thread pool: a fixed
5 * number of threads waiting on a shared task queue using a Monitor Pattern
6 * (Mutex + Condition Variable).
7 */
8
9#include <iostream>
10#include <vector>
11#include <thread>
12#include <functional>
13#include <mutex>
14#include <queue>
15#include <condition_variable>
16#include <atomic>
17#include <chrono>
18
19/**
20 * @typedef Task
21 * @brief Alias for a callable object with a void() signature.
22 */
23typedef std::function<void()> Task;
24
25/**
26 * @brief Global mutex for synchronized console logging.
27 */
28std::mutex log_mtx;
29
30/**
31 * @class ThreadPool
32 * @brief Manages a set of persistent threads to execute fire-and-forget tasks.
33 * * This implementation focuses on the "Fire-and-Forget" pattern, where tasks
34 * are submitted for execution without the caller expecting a return value.
35 */
37private:
38 std::vector<std::thread> workers; ///< The pool of persistent threads.
39 std::queue<Task> tasks; ///< The buffer (queue) for incoming jobs.
40
41 std::mutex queue_mtx; ///< Protects the shared task queue.
42 std::condition_variable cv; ///< Coordinates thread sleep/wake cycles.
43 std::atomic<bool> stop; ///< Flag to trigger a safe shutdown.
44
45 /**
46 * @brief The internal loop for each worker thread.
47 * * Threads block on the condition variable until @ref enqueue is called
48 * or the pool is destroyed.
49 */
50 void workerLoop() {
51 while (true) {
52 Task task;
53
54 {
55 std::unique_lock<std::mutex> lock(this->queue_mtx);
56
57 // Wait until there is a task OR a shutdown signal
58 while (!this->stop && this->tasks.empty()) {
59 this->cv.wait(lock);
60 }
61
62 // Shutdown condition
63 if (this->stop && this->tasks.empty()) {
64 return;
65 }
66
67 task = std::move(this->tasks.front());
68 this->tasks.pop();
69 }
70
71 // Execute the task outside the mutex to allow parallelism
72 if (task) {
73 task();
74 }
75 }
76 }
77
78public:
79 /**
80 * @brief Construct a new Thread Pool and spawns worker threads.
81 * @param num_threads The number of worker threads to maintain in the pool.
82 */
83 ThreadPool(size_t num_threads) : stop(false) {
84 for (size_t i = 0; i < num_threads; ++i) {
85 workers.emplace_back(&ThreadPool::workerLoop, this);
86 }
87 }
88
89 /**
90 * @brief Submits a task to the queue for execution.
91 * * This is a non-blocking call that moves the task into the internal
92 * queue and notifies a single worker thread.
93 * @param t The task to execute (must match void() signature).
94 */
95 void enqueue(Task t) {
96 {
97 std::unique_lock<std::mutex> lock(queue_mtx);
98 tasks.push(std::move(t));
99 }
100 cv.notify_one();
101 }
102
103 /**
104 * @brief Destructor that ensures all threads finish current work before exiting.
105 * * Signals workers to stop, wakes everyone up, and joins all threads.
106 */
108 stop = true;
109 cv.notify_all();
110
111 for (std::thread &worker : workers) {
112 if (worker.joinable()) {
113 worker.join();
114 }
115 }
116 }
117};
118
119/**
120 * @brief Simulates a CPU-bound data processing task.
121 * @param id The task identifier.
122 */
123void dataProcessingTask(int id) {
124 {
125 std::lock_guard<std::mutex> lock(log_mtx);
126 std::cout << "[Task " << id << "] is being processed by a worker..." << std::endl;
127 }
128 std::this_thread::sleep_for(std::chrono::milliseconds(300));
129}
130
131/**
132 * @brief Main function demonstrating mass task submission.
133 */
134int main() {
135 ThreadPool pool(12);
136
137 {
138 std::lock_guard<std::mutex> lock(log_mtx);
139 std::cout << "--- System Initialized with 12 Worker Threads ---" << std::endl;
140 }
141
142 // Submit 1000 tasks
143 for (int i = 1; i <= 1000; ++i) {
144 pool.enqueue(std::bind(dataProcessingTask, i));
145 }
146
147 std::this_thread::sleep_for(std::chrono::seconds(5));
148
149 return 0;
150}
std::mutex log_mtx
Global mutex for synchronized console logging.
std::function< void()> Task
void dataProcessingTask(int id)
Simulates a CPU-bound data processing task.
int main()
Main function demonstrating mass task submission.
std::function< void()> Task
Manages a collection of threads that execute tasks from a shared queue.
~ThreadPool()
Destructor that ensures all threads finish current work before exiting.
ThreadPool(size_t num_threads)
Construct a new Thread Pool and spawns worker threads.
void enqueue(Task t)
Submits a task to the queue for execution.