C++ Concurrency Sandbox
Loading...
Searching...
No Matches
AgingPriority.cpp
Go to the documentation of this file.
1/**
2 * @file AgingPriorityPool.cpp
3 * @brief Implementation of a Thread Pool with dynamic Task Aging to prevent Starvation.
4 * * This file demonstrates a sophisticated scheduling system where a background monitor
5 * thread periodically increases the priority of waiting tasks based on their
6 * residency time in the heap.
7 */
8
9#include <iostream>
10#include <vector>
11#include <thread>
12#include <functional>
13#include <mutex>
14#include <condition_variable>
15#include <atomic>
16#include <algorithm>
17#include <chrono>
18
19using namespace std::chrono;
20
21/**
22 * @struct AgedTask
23 * @brief Represents a task whose priority can increase over time.
24 * * Contains metadata regarding original priority, current (aged) priority,
25 * and arrival time to facilitate the aging calculation.
26 */
27struct AgedTask {
28 int original_priority; ///< The priority assigned at submission.
29 int current_priority; ///< The boosted priority after aging.
30 system_clock::time_point arrival_time; ///< Timestamp of when the task entered the pool.
31 std::function<void()> func; ///< The callable task logic.
32 std::string task_name; ///< Human-readable name for logging.
33
34 /**
35 * @brief Heap comparator.
36 * Higher @ref current_priority values are moved to the front of the heap.
37 */
38 bool operator<(const AgedTask& other) const {
40 }
41};
42
43/**
44 * @class AgingPriorityPool
45 * @brief A priority-based thread pool that implements an Aging Algorithm.
46 * * This pool uses a `std::vector` as a binary heap and a dedicated monitor thread
47 * to "age" tasks, ensuring that low-priority tasks eventually gain enough
48 * priority to be executed even under high load.
49 */
51private:
52 std::vector<std::thread> workers; ///< Worker threads processing the heap.
53 std::thread monitor_thread; ///< Background thread responsible for aging logic.
54 std::vector<AgedTask> task_heap; ///< Internal storage managed as a Max-Heap.
55 std::mutex queue_mtx; ///< Protects heap integrity during aging and execution.
56 std::condition_variable cv; ///< Signals workers when tasks are added or aged.
57 std::atomic<bool> stop; ///< Shutdown flag.
58
59 /**
60 * @brief Background loop that triggers priority aging every second.
61 */
62 void monitorLoop() {
63 while (!stop) {
64 std::this_thread::sleep_for(milliseconds(1000));
65 {
66 std::lock_guard<std::mutex> lock(queue_mtx);
67 applyAging();
68 }
69 cv.notify_all(); // Re-wake workers as heap order may have shifted
70 }
71 }
72
73 /**
74 * @brief Recalculates priorities for all waiting tasks.
75 * * If a task has waited for a multiple of 2 seconds, its priority is
76 * significantly boosted. If any priorities change, the heap is rebuilt.
77 */
78 void applyAging() {
79 auto now = system_clock::now();
80 bool needs_reheap = false;
81 for (auto& task : task_heap) {
82 auto wait_duration = duration_cast<seconds>(now - task.arrival_time).count();
83
84 // Boost priority by 20 for every 2 seconds of waiting
85 int age_bonus = (static_cast<int>(wait_duration) / 2) * 20;
86
87 if (age_bonus > 0) {
88 int new_prio = task.original_priority + age_bonus;
89 if(new_prio > task.current_priority) {
90 task.current_priority = new_prio;
91 needs_reheap = true;
92 }
93 }
94 }
95 if (needs_reheap) {
96 std::make_heap(task_heap.begin(), task_heap.end());
97 }
98 }
99
100 /**
101 * @brief Core worker loop that pops the highest priority task from the heap.
102 */
103 void workerLoop() {
104 while (true) {
105 AgedTask activeTask;
106 {
107 std::unique_lock<std::mutex> lock(this->queue_mtx);
108 while (!this->stop && task_heap.empty()) {
109 cv.wait(lock);
110 }
111 if (this->stop && task_heap.empty()) return;
112
113 // Move top of heap to the back, then pop
114 std::pop_heap(task_heap.begin(), task_heap.end());
115 activeTask = std::move(task_heap.back());
116 task_heap.pop_back();
117 }
118 if (activeTask.func) {
119 std::cout << "[Worker] Starting: " << activeTask.task_name
120 << " | Priority: " << activeTask.current_priority << std::endl;
121 activeTask.func();
122 }
123 }
124 }
125
126public:
127 /**
128 * @brief Constructs the pool and starts both workers and the aging monitor.
129 * @param threads Number of worker threads to spawn.
130 */
131 AgingPriorityPool(size_t threads) : stop(false) {
132 for (size_t i = 0; i < threads; ++i) {
133 workers.emplace_back(&AgingPriorityPool::workerLoop, this);
134 }
135 monitor_thread = std::thread(&AgingPriorityPool::monitorLoop, this);
136 }
137
138 /**
139 * @brief Enqueues a task with an initial priority.
140 * @param priority Initial importance level.
141 * @param name Name for identifying the task in logs.
142 * @param f The function to execute.
143 */
144 void enqueue(int priority, std::string name, std::function<void()> f) {
145 {
146 std::lock_guard<std::mutex> lock(queue_mtx);
147 AgedTask newTask;
148 newTask.original_priority = priority;
149 newTask.current_priority = priority;
150 newTask.arrival_time = system_clock::now();
151 newTask.func = std::move(f);
152 newTask.task_name = name;
153
154 task_heap.push_back(std::move(newTask));
155 std::push_heap(task_heap.begin(), task_heap.end());
156 }
157 cv.notify_one();
158 }
159
160 /**
161 * @brief Gracefully shuts down the monitor and worker threads.
162 */
164 stop = true;
165 cv.notify_all();
166 if (monitor_thread.joinable()) monitor_thread.join();
167 for (auto &w : workers) if (w.joinable()) w.join();
168 }
169};
170
171/**
172 * @struct HeavyTask
173 * @brief A simulated CPU-intensive workload functor.
174 * * This functor is used to occupy worker threads for a specific duration,
175 * allowing the priority queue to fill up and the aging monitor to trigger.
176 */
177struct HeavyTask {
178 int duration_ms; ///< Time in milliseconds the task will "occupy" the worker.
179
180 /**
181 * @brief Executes the simulated work by sleeping the worker thread.
182 */
183 void operator()() const {
184 std::this_thread::sleep_for(milliseconds(duration_ms));
185 }
186};
187
188/**
189 * @brief The Main Test Harness for the Aging Priority Pool.
190 * * This demonstration performs the following steps:
191 * 1. Spawns a pool with a single thread to force queueing.
192 * 2. Submits a long-running "Blocking Task" to hold the worker.
193 * 3. Submits a "Starved" low-priority task.
194 * 4. Floods the queue with "Medium" priority tasks.
195 * 5. Observes the aging monitor as it boosts the starved task ahead of the flood.
196 * * @return int Execution status code.
197 */
198int main() {
199 // We use ONLY 1 thread so that every task after the first one
200 // MUST wait in the heap, allowing us to observe the aging process.
201 AgingPriorityPool pool(1);
202
203 std::cout << "--- STARTING AGING DEMONSTRATION ---\n";
204
205 // Step 1: Block the worker for 4 seconds
206 std::cout << "Step 1: Submitting 'BLOCKING_TASK' (Prio: 100)...\n";
207 pool.enqueue(100, "BLOCKING_TASK", HeavyTask{4000});
208
209 // Step 2: Add a task that would normally wait forever (Starvation)
210 std::cout << "Step 2: Submitting 'STARVED_REWARD_TASK' (Prio: 20)...\n";
211 pool.enqueue(20, "STARVED_REWARD_TASK", HeavyTask{500});
212
213 // Step 3: Flood the system with medium tasks (Prio: 50)
214 std::cout << "Step 3: Flooding queue with 20 'MEDIUM_TASKS' (Prio: 50)...\n";
215 for(int i = 1; i <= 20; ++i) {
216 pool.enqueue(50, "MEDIUM_TASK_" + std::to_string(i), HeavyTask{1000});
217 }
218
219 std::cout << "\n--- OBSERVATION PERIOD ---\n";
220 std::cout << "The 'STARVED' task starts at Priority 20.\n";
221 std::cout << "Every 2 seconds, it gains +20 priority.\n";
222 std::cout << "Wait ~4 seconds: It becomes Priority 60 and jumps ahead of all Medium Tasks!\n\n";
223
224 // Keep the main thread alive to watch the console logs as workers process the aged tasks
225 std::this_thread::sleep_for(std::chrono::seconds(20));
226
227 return 0;
228}
int main()
The Main Test Harness for the Aging Priority Pool.
A priority-based thread pool that implements an Aging Algorithm.
AgingPriorityPool(size_t threads)
Constructs the pool and starts both workers and the aging monitor.
void enqueue(int priority, std::string name, std::function< void()> f)
Enqueues a task with an initial priority.
~AgingPriorityPool()
Gracefully shuts down the monitor and worker threads.
Represents a task whose priority can increase over time.
int original_priority
The priority assigned at submission.
system_clock::time_point arrival_time
Timestamp of when the task entered the pool.
bool operator<(const AgedTask &other) const
Heap comparator. Higher current_priority values are moved to the front of the heap.
int current_priority
The boosted priority after aging.
std::function< void()> func
The callable task logic.
std::string task_name
Human-readable name for logging.
A simulated CPU-intensive workload functor.
int duration_ms
Time in milliseconds the task will "occupy" the worker.
void operator()() const
Executes the simulated work by sleeping the worker thread.