Sid's Blog

Weighted Priority Queue with Anti-Starvation Pop Logic

While working on a new feature recently, the requirements called for a task manager that accepts jobs from different app environments - Development, QA, Staging, & Production - and feeds them to a processing worker thread.

The main challenge was balancing the task priority. Production tasks obviously needed to processed first but that might leave the other jobs starving in queue, especially during periods of heavy load. So I implemented a rudimentary weighted priority queue with a 4:3:2:1 pop ratio with a few guardrails to make sure lower-priority tasks still got picked.


Table of Contents

  1. Key Design Points
  2. How it Works
  3. Limitations
  4. Possible Improvements
  5. Why Not a Package?
  6. TL;DR

Design Overview

The task manager processes jobs from different environments with varying priority. The goal is to ensure high-priority jobs are processed promptly, while still proving a fair change for lower-priority jobs.

Key Design Points

  1. Task Prioritization: Tasks are pulled from the queue based on environment priority. Production tasks are given the highest priority, followed by staging, QA, and development.

  2. Independent Queues per Environment: Each environment has its own queue, which helps separate tasks from different environments and ensures that tasks from higher-priority environments can be processed first without interference.

  3. In-memory Queue with MongoDB Sync: The task queue operates in memory for fast access, but periodically syncs to MongoDB to ensure data persistence and recovery in case of system failure.

  4. Fair Task Distribution with Independent Counters: Each environment has its own independent counter to track task processing. This helps maintain the 4:3:2:1 weighted ratio between environments.

By combining this env based job prioritization, and periodic syncing to MongoDB for fault tolerance, the task manager ensure efficiency & fairness in handling tasks from different environments.


How It Works

The task manager operates with a straightforward process flow designed to ensure tasks are processed fairly while prioritizing higher-priority environments. Here’s a breakdown of how it works:

  1. Job Insertion:
def insert(self, job_id: str, job_data: Dict[str, Any]) -> None:
    env = job_data.get("env")
    if env not in self.queues:
        return  # Ignore invalid environments

    with self.lock:
        self.queues[env].append((job_id, job_data))
  1. ** Queue Management **:
def __init__(self) -> None:
    self.queues: Dict[str, List[Tuple[str, Dict[str, Any]]]] = {
        "production": [],
        "staging": [],
        "qa": [],
        "development": [],
    }
    self.counters = {"production": 0, "staging": 0, "qa": 0, "development": 0}
  1. Task Prioritization (Weighted Ratio):
def pop_next_job(self) -> Optional[Tuple[str, Dict[str, Any]]]:
    with self.lock:
        order = [
            "production", "production", "production", "production",
            "staging", "staging", "staging",
            "qa", "qa", 
            "development"
        ]
        for env in order:
            if self.queues[env]:
                self.counters[env] += 1
                return self.queues[env].pop(0)
    return None
  1. Periodic MongoDB Sync:
def sync_to_db(self) -> None:
    with self.lock:
        existing_jobs = {job.jobId: job for job in GazeTrackingJob.objects.all()}
        bulk_updates = []

        for env in self.queues:
            for job_id, job_data in self.queues[env]:
                job = existing_jobs.get(job_id, GazeTrackingJob(jobId=job_id))
                job.update(**job_data)
                bulk_updates.append(job)

        # Bulk update and delete jobs
        if bulk_updates:
            GazeTrackingJob.objects.bulk_update(bulk_updates)

Limitations

  1. Limited Task Types
  1. No Built-in Fault Tolerance Logic
  1. Scalability Constraints

Possible Improvements

  1. Backpressure Mechanism:
  1. Job Expiry / TTL Support:
  1. Batch Processing Support:
  1. Priority Override / Manual Injection:
  1. Redis Integration:
  1. Metrics & Monitoring:

Why a Custom Queue?

  1. Precise Control Over Logic
  1. Minimal Dependencies
  1. Better Debuggability
  1. MongoDB-Based Persistence
  1. Lightweight and Purpose-Built

TL;DR


Back to top