52 std::vector<std::thread> workers;
53 std::thread monitor_thread;
54 std::vector<AgedTask> task_heap;
56 std::condition_variable cv;
57 std::atomic<bool> stop;
64 std::this_thread::sleep_for(milliseconds(1000));
66 std::lock_guard<std::mutex> lock(queue_mtx);
79 auto now = system_clock::now();
80 bool needs_reheap =
false;
81 for (
auto& task : task_heap) {
82 auto wait_duration = duration_cast<seconds>(now - task.arrival_time).count();
85 int age_bonus = (
static_cast<int>(wait_duration) / 2) * 20;
88 int new_prio = task.original_priority + age_bonus;
89 if(new_prio > task.current_priority) {
90 task.current_priority = new_prio;
96 std::make_heap(task_heap.begin(), task_heap.end());
107 std::unique_lock<std::mutex> lock(this->queue_mtx);
108 while (!this->stop && task_heap.empty()) {
111 if (this->stop && task_heap.empty())
return;
114 std::pop_heap(task_heap.begin(), task_heap.end());
115 activeTask = std::move(task_heap.back());
116 task_heap.pop_back();
118 if (activeTask.
func) {
119 std::cout <<
"[Worker] Starting: " << activeTask.
task_name
132 for (
size_t i = 0; i < threads; ++i) {
133 workers.emplace_back(&AgingPriorityPool::workerLoop,
this);
135 monitor_thread = std::thread(&AgingPriorityPool::monitorLoop,
this);
144 void enqueue(
int priority, std::string name, std::function<
void()> f) {
146 std::lock_guard<std::mutex> lock(queue_mtx);
151 newTask.
func = std::move(f);
154 task_heap.push_back(std::move(newTask));
155 std::push_heap(task_heap.begin(), task_heap.end());
166 if (monitor_thread.joinable()) monitor_thread.join();
167 for (
auto &w : workers)
if (w.joinable()) w.join();