11#include <condition_variable>
25 std::queue<int> buffer;
29 std::condition_variable not_full;
30 std::condition_variable not_empty;
32 std::atomic<bool> is_shutdown{
false};
48 not_full.notify_all();
49 not_empty.notify_all();
60 std::unique_lock<std::mutex> lock(mtx);
63 while(buffer.size() >= max_capacity && !is_shutdown) {
67 if(is_shutdown)
return false;
70 std::cout <<
"[Producer] Pushed : " << val <<
" | Size: " << buffer.size() << std::endl;
72 not_empty.notify_one();
82 bool pop(
int& output_val) {
83 std::unique_lock<std::mutex> lock(mtx);
86 while(buffer.empty() && !is_shutdown) {
91 if(buffer.empty() && is_shutdown)
return false;
93 output_val = buffer.front();
95 std::cout <<
"[Consumer] : Consumed " << output_val <<
" | Size : " << buffer.size() << std::endl;
97 not_full.notify_one();
107 for(
int i=1; i<=30; ++i) {
109 std::cout <<
"[Producer] Shutdown detected. Stopping production." << std::endl;
112 std::this_thread::sleep_for(std::chrono::milliseconds(25));
114 std::cout <<
"[Producer] Finished all items" << std::endl;
124 for(
int i=1; i<=100; ++i) {
126 bool pop_status = q->
pop(val);
128 std::cout <<
"---[ConsumerTask] : SHUTDOWN DETECTED. STOPPING CONSUMPTION---" << std::endl;
131 std::cout <<
"Consuming value..." << std::endl;
132 std::this_thread::sleep_for(std::chrono::milliseconds(100));
135 std::cout <<
"[Consumer] Finished my requirements! Triggering shutdown..." << std::endl;
150 if(producerThread.joinable())
151 producerThread.join();
152 if(consumerThread.joinable())
153 consumerThread.join();
155 std::cout <<
"All work finished." << std::endl;
void producerTask(BoundedBlockingQueue *q)
Worker function for the Producer thread.
void consumerTask(BoundedBlockingQueue *q)
Worker function for the Consumer thread.
int main()
Entry point of the program. Initializes the queue and manages the lifecycle of worker threads.
A thread-safe queue with a fixed capacity that supports blocking operations.
bool pop(int &output_val)
Removes an element from the front of the queue.
BoundedBlockingQueue(size_t capacity)
Constructs a new BoundedBlockingQueue.
void shut_down()
Shuts down the queue and wakes up all blocked threads.
bool push(int val)
Adds an element to the back of the queue.