Weighted Priority Queue with Anti-Starvation Pop Logic
While working on a new feature recently, the requirements called for a task manager that accepts jobs from different app environments - Development, QA, Staging, & Production - and feeds them to a processing worker thread.
The main challenge was balancing the task priority. Production tasks obviously needed to processed first but that might leave the other jobs starving in queue, especially during periods of heavy load. So I implemented a rudimentary weighted priority queue with a 4:3:2:1 pop ratio with a few guardrails to make sure lower-priority tasks still got picked.
Table of Contents
Design Overview
The task manager processes jobs from different environments with varying priority. The goal is to ensure high-priority jobs are processed promptly, while still proving a fair change for lower-priority jobs.
Key Design Points
Task Prioritization: Tasks are pulled from the queue based on environment priority. Production tasks are given the highest priority, followed by staging, QA, and development.
Independent Queues per Environment: Each environment has its own queue, which helps separate tasks from different environments and ensures that tasks from higher-priority environments can be processed first without interference.
In-memory Queue with MongoDB Sync: The task queue operates in memory for fast access, but periodically syncs to MongoDB to ensure data persistence and recovery in case of system failure.
Fair Task Distribution with Independent Counters: Each environment has its own independent counter to track task processing. This helps maintain the 4:3:2:1 weighted ratio between environments.
By combining this env based job prioritization, and periodic syncing to MongoDB for fault tolerance, the task manager ensure efficiency & fairness in handling tasks from different environments.
How It Works
The task manager operates with a straightforward process flow designed to ensure tasks are processed fairly while prioritizing higher-priority environments. Here’s a breakdown of how it works:
- Job Insertion:
- When a job is inserted, the environment (env) is checked, and the job is added to the corresponding queue.
def insert(self, job_id: str, job_data: Dict[str, Any]) -> None:
env = job_data.get("env")
if env not in self.queues:
return # Ignore invalid environments
with self.lock:
self.queues[env].append((job_id, job_data))
- ** Queue Management **:
- Each environment has it's own queue, which ensure the priorities of each env task doesn't get mixed up
- Independent counters are maintained for each environment to maintain the 4:3:2:1 ration
def __init__(self) -> None:
self.queues: Dict[str, List[Tuple[str, Dict[str, Any]]]] = {
"production": [],
"staging": [],
"qa": [],
"development": [],
}
self.counters = {"production": 0, "staging": 0, "qa": 0, "development": 0}
- Task Prioritization (Weighted Ratio):
- The
pop_next_jobfunction pulls tasks based on a weighted ratio:- 4 production tasks
- 3 staging tasks
- 2 QA tasks
- 1 development task
def pop_next_job(self) -> Optional[Tuple[str, Dict[str, Any]]]:
with self.lock:
order = [
"production", "production", "production", "production",
"staging", "staging", "staging",
"qa", "qa",
"development"
]
for env in order:
if self.queues[env]:
self.counters[env] += 1
return self.queues[env].pop(0)
return None
- Periodic MongoDB Sync:
- The task queue is synced periodically with MongoDB to ensure persistence and data recovery.
def sync_to_db(self) -> None:
with self.lock:
existing_jobs = {job.jobId: job for job in GazeTrackingJob.objects.all()}
bulk_updates = []
for env in self.queues:
for job_id, job_data in self.queues[env]:
job = existing_jobs.get(job_id, GazeTrackingJob(jobId=job_id))
job.update(**job_data)
bulk_updates.append(job)
# Bulk update and delete jobs
if bulk_updates:
GazeTrackingJob.objects.bulk_update(bulk_updates)
Limitations
- Limited Task Types
- Only supports basic task queueing logic. No support for scheduling, retry policies, delayed jobs, or result tracking.
- No Built-in Fault Tolerance Logic
- While syncing to MongoDB provides persistence, job acknowledgment, retries, or dead-letter queues are not built-in.
- Scalability Constraints
- The in-memory model is fine for a single instance. But distributing this queue across multiple machines or processes would require additional coordination logic.
Possible Improvements
- Backpressure Mechanism:
- Introduce limits on queue size to prevent memory bloat during high-volume job spikes.
- Job Expiry / TTL Support:
- Automatically discard stale or timed-out jobs based on createdAt or expiresAt fields.
- Batch Processing Support:
- Optimize throughput by allowing batch pops instead of one-by-one job fetching.
- Priority Override / Manual Injection:
- Support manual priority override for specific critical jobs during incident recovery or hotfix deployments.
- Redis Integration:
- Swap or complement the in-memory queue with Redis for distributed queue support and better fault tolerance.
- Metrics & Monitoring:
- Add instrumentation hooks to expose queue health, environment-specific load, and sync stats via Prometheus or similar.
Why a Custom Queue?
- Precise Control Over Logic
- Needed custom pull behavior (4:3:2:1) and per-environment counters, which might not be directly supported in existing queue packages.
- Minimal Dependencies
- Off-the-shelf solutions (like Bull, Agenda, or Bee-Queue) often bring Redis, workers, and task scheduling systems — overkill for this use case.
- Better Debuggability
- Custom logic is easier to reason about and debug since it’s all self-contained and transparent.
- MongoDB-Based Persistence
- Most mature queue systems are tightly coupled with Redis. In this case, persistence needed to be MongoDB-based to stay consistent with the existing infrastructure.
- Lightweight and Purpose-Built
- The goal was a minimal and fast in-memory queue with periodic sync. Building from scratch made it easy to optimize for this specific use case.
TL;DR
- Built a custom in-memory task queue to handle multi-environment job ingestion with weighted priority (Prod:Staging:QA:Dev = 4:3:2:1).
- Each environment maintains its own task queue and independent pop counter to enforce ratio-based fairness.
- Queue is memory-first for low-latency access, with periodic MongoDB sync for persistence and crash recovery.
- A custom solution was preferred over existing libraries to tightly control behavior, avoid over-engineering, and ensure minimal dependencies.