C++ Concurrency Sandbox
Loading...
Searching...
No Matches
BoundedBlockingQueue.cpp
Go to the documentation of this file.
1/**
2 * @file BoundedBlockingQueue.cpp
3 * @brief Implementation of a thread-safe, bounded blocking queue.
4 * * This file demonstrates the Producer-Consumer pattern using modern C++
5 * synchronization primitives like std::mutex and std::condition_variable.
6 */
7
8#include <iostream>
9#include <thread>
10#include <mutex>
11#include <condition_variable>
12#include <queue>
13#include <atomic>
14
15/**
16 * @class BoundedBlockingQueue
17 * @brief A thread-safe queue with a fixed capacity that supports blocking operations.
18 * * This class ensures that:
19 * - Producers block when the queue is full.
20 * - Consumers block when the queue is empty.
21 * - All threads can be gracefully shut down.
22 */
24private:
25 std::queue<int> buffer; ///< The underlying queue to store integer data.
26 size_t max_capacity; ///< Maximum capacity of the buffer.
27 std::mutex mtx; ///< Mutex to protect the buffer from concurrent access.
28
29 std::condition_variable not_full; ///< Signal to producers that space is available.
30 std::condition_variable not_empty; ///< Signal to consumers that data is available.
31
32 std::atomic<bool> is_shutdown{false}; ///< Flag to trigger a safe shutdown across all threads.
33
34public:
35 /**
36 * @brief Constructs a new BoundedBlockingQueue.
37 * @param capacity The maximum number of elements the queue can hold.
38 */
39 BoundedBlockingQueue(size_t capacity) : max_capacity(capacity) {};
40
41 /**
42 * @brief Shuts down the queue and wakes up all blocked threads.
43 * * Sets the shutdown flag and notifies all waiting producers and consumers
44 * to prevent deadlocks during program termination.
45 */
46 void shut_down() {
47 is_shutdown = true;
48 not_full.notify_all();
49 not_empty.notify_all();
50 return;
51 }
52
53 /**
54 * @brief Adds an element to the back of the queue.
55 * @param val The integer to add.
56 * @return true if the item was successfully pushed.
57 * @return false if the queue was shut down before or during the push.
58 */
59 bool push(int val) {
60 std::unique_lock<std::mutex> lock(mtx);
61
62 // Wait while full AND not shutting down
63 while(buffer.size() >= max_capacity && !is_shutdown) {
64 not_full.wait(lock);
65 }
66
67 if(is_shutdown) return false;
68
69 buffer.push(val);
70 std::cout << "[Producer] Pushed : " << val << " | Size: " << buffer.size() << std::endl;
71
72 not_empty.notify_one();
73 return true;
74 }
75
76 /**
77 * @brief Removes an element from the front of the queue.
78 * @param[out] output_val Reference where the consumed value will be stored.
79 * @return true if an item was successfully consumed.
80 * @return false if the queue is empty and shut down.
81 */
82 bool pop(int& output_val) {
83 std::unique_lock<std::mutex> lock(mtx);
84
85 // Wait while empty AND not shutting down
86 while(buffer.empty() && !is_shutdown) {
87 not_empty.wait(lock);
88 }
89
90 // Return false only if empty AND shut down
91 if(buffer.empty() && is_shutdown) return false;
92
93 output_val = buffer.front();
94 buffer.pop();
95 std::cout << "[Consumer] : Consumed " << output_val << " | Size : " << buffer.size() << std::endl;
96
97 not_full.notify_one();
98 return true;
99 }
100};
101
102/**
103 * @brief Worker function for the Producer thread.
104 * @param q Pointer to the shared BoundedBlockingQueue.
105 */
107 for(int i=1; i<=30; ++i) {
108 if(!q->push(i)) {
109 std::cout << "[Producer] Shutdown detected. Stopping production." << std::endl;
110 return;
111 }
112 std::this_thread::sleep_for(std::chrono::milliseconds(25));
113 }
114 std::cout << "[Producer] Finished all items" << std::endl;
115 q->shut_down();
116 return;
117}
118
119/**
120 * @brief Worker function for the Consumer thread.
121 * @param q Pointer to the shared BoundedBlockingQueue.
122 */
124 for(int i=1; i<=100; ++i) {
125 int val;
126 bool pop_status = q->pop(val);
127 if(!pop_status) {
128 std::cout << "---[ConsumerTask] : SHUTDOWN DETECTED. STOPPING CONSUMPTION---" << std::endl;
129 break;
130 } else {
131 std::cout << "Consuming value..." << std::endl;
132 std::this_thread::sleep_for(std::chrono::milliseconds(100));
133 }
134 }
135 std::cout << "[Consumer] Finished my requirements! Triggering shutdown..." << std::endl;
136 q->shut_down();
137 return;
138}
139
140/**
141 * @brief Entry point of the program.
142 * Initializes the queue and manages the lifecycle of worker threads.
143 */
144int main() {
145 BoundedBlockingQueue queue(5);
146
147 std::thread producerThread(producerTask, &queue);
148 std::thread consumerThread(consumerTask, &queue);
149
150 if(producerThread.joinable())
151 producerThread.join();
152 if(consumerThread.joinable())
153 consumerThread.join();
154
155 std::cout << "All work finished." << std::endl;
156 return 0;
157}
void producerTask(BoundedBlockingQueue *q)
Worker function for the Producer thread.
void consumerTask(BoundedBlockingQueue *q)
Worker function for the Consumer thread.
int main()
Entry point of the program. Initializes the queue and manages the lifecycle of worker threads.
A thread-safe queue with a fixed capacity that supports blocking operations.
bool pop(int &output_val)
Removes an element from the front of the queue.
BoundedBlockingQueue(size_t capacity)
Constructs a new BoundedBlockingQueue.
void shut_down()
Shuts down the queue and wakes up all blocked threads.
bool push(int val)
Adds an element to the back of the queue.