C++ Concurrency Sandbox
Loading...
Searching...
No Matches
Priority.cpp
Go to the documentation of this file.
1/**
2 * @file PriorityThreadPool.cpp
3 * @brief Implementation of a Thread Pool with Priority-Based Task Scheduling.
4 * * This file demonstrates how to use a std::priority_queue to ensure that
5 * high-importance tasks are executed by worker threads before lower-importance
6 * tasks, regardless of submission order.
7 */
8
9#include <iostream>
10#include <vector>
11#include <thread>
12#include <functional>
13#include <mutex>
14#include <queue>
15#include <condition_variable>
16#include <atomic>
17
18/**
19 * @struct PriorityTask
20 * @brief A wrapper for a callable task associated with a priority level.
21 * * The priority level determines the execution order within the @ref PriorityThreadPool.
22 */
24 int priority; ///< Priority level (higher values indicate higher importance).
25 std::function<void()> func; ///< The actual task/function to be executed.
26
27 /**
28 * @brief Comparator for the priority queue.
29 * * Allows @c std::priority_queue to order tasks such that the one with the
30 * highest priority number is at the top.
31 * @param other The other task to compare against.
32 * @return true if this task has lower priority than the other.
33 */
34 bool operator<(const PriorityTask& other) const {
35 return priority < other.priority;
36 }
37};
38
39/**
40 * @class PriorityThreadPool
41 * @brief A pool of worker threads that retrieves tasks based on priority.
42 * * Unlike a standard FIFO (First-In-First-Out) thread pool, this class
43 * uses a heap-based queue to prioritize certain tasks over others.
44 */
46private:
47 std::vector<std::thread> workers; ///< Persistent worker threads.
48 std::priority_queue<PriorityTask> tasks; ///< The internal heap-based task scheduler.
49
50 std::mutex queue_mtx; ///< Mutex to synchronize access to the priority queue.
51 std::condition_variable cv; ///< Coordinates worker sleep/wake cycles.
52 std::atomic<bool> stop; ///< Flag to trigger thread pool shutdown.
53
54 /**
55 * @brief The worker thread's internal loop.
56 * * Workers block until a task is available. When multiple tasks exist,
57 * the one with the highest @ref PriorityTask::priority is selected.
58 */
59 void workerLoop() {
60 while (true) {
61 PriorityTask pTask;
62 {
63 std::unique_lock<std::mutex> lock(this->queue_mtx);
64
65 // Wait for a task or a shutdown signal
66 while (!this->stop && this->tasks.empty()) {
67 this->cv.wait(lock);
68 }
69
70 // Exit condition: stop flag is set and no tasks remain
71 if (this->stop && this->tasks.empty()) return;
72
73 // Priority Queue uses .top() (peak at the highest priority element)
74 pTask = std::move(const_cast<PriorityTask&>(this->tasks.top()));
75 this->tasks.pop();
76 }
77 if (pTask.func) pTask.func();
78 }
79 }
80
81public:
82 /**
83 * @brief Initializes the pool and starts worker threads.
84 * @param num_threads The number of worker threads to maintain.
85 */
86 PriorityThreadPool(size_t num_threads) : stop(false) {
87 for (size_t i = 0; i < num_threads; ++i) {
88 workers.emplace_back(&PriorityThreadPool::workerLoop, this);
89 }
90 }
91
92 /**
93 * @brief Submits a task with a specific priority level.
94 * @param priority Importance level (e.g., 10 for urgent, 1 for background).
95 * @param f The function or lambda to execute.
96 */
97 void enqueue(int priority, std::function<void()> f) {
98 {
99 std::unique_lock<std::mutex> lock(queue_mtx);
100 tasks.push({priority, std::move(f)});
101 }
102 cv.notify_one();
103 }
104
105 /**
106 * @brief Gracefully joins all worker threads after processing remaining tasks.
107 */
109 stop = true;
110 cv.notify_all();
111 for (auto &w : workers) if (w.joinable()) w.join();
112 }
113};
114
115/**
116 * @struct PaymentTask
117 * @brief A Functor simulating a financial processing task.
118 */
120 std::string type; ///< Description of the payment activity.
121
122 /**
123 * @brief Executes the simulated payment task.
124 */
125 void operator()() const {
126 std::cout << "[Worker] Processing: " << type << std::endl;
127 std::this_thread::sleep_for(std::chrono::milliseconds(200));
128 }
129};
130
131/**
132 * @brief Main function demonstrating priority-based out-of-order execution.
133 */
134int main() {
135 // We use 1 thread to force tasks to wait in the queue, making
136 // the priority ordering visible in the console output.
137 PriorityThreadPool pool(1);
138
139 std::cout << "Submitting tasks in random order...\n";
140
141 // Submit Low Priority first
142 pool.enqueue(1, PaymentTask{"Low: Reward Statement"});
143 pool.enqueue(1, PaymentTask{"Low: SMS Notification"});
144
145 // Submit High Priority last
146 pool.enqueue(10, PaymentTask{"HIGH: FRAUD DETECTION"});
147 pool.enqueue(10, PaymentTask{"HIGH: AUTHORIZATION"});
148
149 // The output will show HIGH priority tasks running before the LOW priority
150 // tasks that were added earlier.
151
152 std::this_thread::sleep_for(std::chrono::seconds(2));
153 return 0;
154}
int main()
Main function demonstrating priority-based out-of-order execution.
Definition Priority.cpp:134
A pool of worker threads that retrieves tasks based on priority.
Definition Priority.cpp:45
~PriorityThreadPool()
Gracefully joins all worker threads after processing remaining tasks.
Definition Priority.cpp:108
void enqueue(int priority, std::function< void()> f)
Submits a task with a specific priority level.
Definition Priority.cpp:97
PriorityThreadPool(size_t num_threads)
Initializes the pool and starts worker threads.
Definition Priority.cpp:86
A Functor simulating a financial processing task.
Definition Priority.cpp:119
std::string type
Description of the payment activity.
Definition Priority.cpp:120
void operator()() const
Executes the simulated payment task.
Definition Priority.cpp:125
A wrapper for a callable task associated with a priority level.
Definition Priority.cpp:23
int priority
Priority level (higher values indicate higher importance).
Definition Priority.cpp:24
bool operator<(const PriorityTask &other) const
Comparator for the priority queue.
Definition Priority.cpp:34
std::function< void()> func
The actual task/function to be executed.
Definition Priority.cpp:25