Skip to content

Instantly share code, notes, and snippets.

@vndee
Last active December 26, 2024 17:03
Show Gist options
  • Save vndee/22c7ad22fa21cc5919d7ac2f285faa3f to your computer and use it in GitHub Desktop.
Save vndee/22c7ad22fa21cc5919d7ac2f285faa3f to your computer and use it in GitHub Desktop.
from redis_data_structures import PriorityQueue
from enum import IntEnum
class TaskPriority(IntEnum):
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
class TaskScheduler:
def __init__(self):
self.pq = PriorityQueue("scheduled_tasks)
def add_task(self, task_type: str, data: dict, priority: TaskPriority):
task = {
"type": task_type,
"data": data
}
return self.pq.push(task, priority=priority)
def get_next_task(self):
return self.pq.pop()
# Usage
scheduler = TaskScheduler("security_update")
scheduler.add_task(
{"server": "prod-1", "patch": "security-001"},
TaskPriority.CRITICAL
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment