Skip to content

Instantly share code, notes, and snippets.

@ourway
Created December 28, 2024 05:47
Show Gist options
  • Save ourway/632ca23c025adaec64979952b9c57295 to your computer and use it in GitHub Desktop.
Save ourway/632ca23c025adaec64979952b9c57295 to your computer and use it in GitHub Desktop.
"""
PROCESS EXPLANATION:
--------------------
1. The `PackageGenerator` creates new packages and sends them to the `DeliveryManager`.
2. The `DeliveryManager` receives new packages and then checks if there are available `Deliverer` workers.
3. The `Deliverer` workers pick up packages, attempt to deliver them, and report back success or failure.
4. When a package delivery fails, it is re-inserted into the front of the queue by the `DeliveryManager`.
5. The system continues until the package queue is empty, all packages are delivered, and the generator
has signaled that it is done generating packages.
6. The `DeliveryManager` provides a condition variable (`cv`) to wait until all packages are delivered.
7. Finally, the system prints out a summary report of successful deliveries, failed attempts, and
remaining packages.
"""
import queue
import random
import threading
import time
import uuid
from typing import Any, Dict, List, Optional, Set, Tuple
class GenServer:
"""A generic server class that handles messages in a mailbox using a dedicated thread."""
def __init__(self) -> None:
"""
Initializes the GenServer with:
- a mailbox (Queue) for incoming messages
- a thread that will run the server loop
- a flag indicating whether it is running
"""
self.mailbox: queue.Queue = queue.Queue()
self.thread: Optional[threading.Thread] = None
self.running: bool = False
def start(self, *args: Any, **kwargs: Any) -> None:
"""
Starts the server thread if not already running.
It runs the `_loop` method as a new thread.
"""
if self.running:
return
self.running = True
self.thread = threading.Thread(target=self._loop, args=args, kwargs=kwargs)
# Using non-daemon so threads aren't just killed when main returns
self.thread.daemon = False
self.thread.start()
def stop(self) -> None:
"""
Stops the server, if running, by sending a special stop command
to the mailbox. Then waits for the thread to join.
"""
if not self.running:
return
self.running = False
self.mailbox.put(('_command', 'stop'))
self.thread.join()
def cast(self, message: Any) -> None:
"""
Sends a message to the server's mailbox.
In OTP Erlang terms, this is analogous to casting a message
without expecting a return value.
"""
self.mailbox.put(message)
def _loop(self, *args: Any, **kwargs: Any) -> None:
"""Internal loop that processes messages from the mailbox until 'stop' is received."""
state = self.init(*args, **kwargs)
while self.running:
try:
while True:
msg = self.mailbox.get_nowait()
if isinstance(msg, tuple) and msg == ('_command', 'stop'):
# If we receive the stop command, exit the loop
self.running = False
break
elif isinstance(msg, dict):
# Process dictionary messages with handle_cast
state = self.handle_cast(msg, state)
except queue.Empty:
# No messages left to process
pass
# Slight sleep to avoid hogging the CPU
time.sleep(0.01)
self.terminate(state)
def init(self, *args: Any, **kwargs: Any) -> Any:
"""One-time initialization logic for child classes to override if needed."""
return None
def handle_cast(self, message: Dict[str, Any], state: Any) -> Any:
"""Defines how the server should handle incoming dictionary messages."""
print(f"Unhandled cast: {message}")
return state
def terminate(self, state: Any) -> None:
"""Cleanup logic when the server is shutting down."""
pass
class DeliveryManager(GenServer):
"""Manages a queue of packages and coordinates with multiple deliverers."""
def init(self) -> None:
"""
Initializes delivery-related state:
- self.package_queue: list of pending packages
- self.deliverers: dict mapping deliverer IDs to their GenServer references
- self.failed_attempts: track how many total delivery failures occurred
- self.successful_deliveries: track how many deliveries succeeded
- self.delivered_ids: set of IDs that have been successfully delivered
- self.in_flight: how many deliveries are currently being attempted
- self.generator_done: boolean flag indicating no more packages will be generated
- self.lock: a threading lock to control concurrent access to shared state
- self.cv: a condition variable to notify waiting threads when all deliveries finish
"""
self.package_queue: List[Dict[str, Any]] = []
self.deliverers: Dict[str, 'Deliverer'] = {}
self.failed_attempts: int = 0
self.successful_deliveries: int = 0
self.delivered_ids: Set[str] = set()
self.in_flight: int = 0
self.generator_done: bool = False # indicates the generator is done sending packages
self.lock: threading.Lock = threading.Lock()
self.cv: threading.Condition = threading.Condition(self.lock)
return None
def handle_cast(self, message: Dict[str, Any], state: Any) -> Any:
"""Processes various commands related to package delivery."""
cmd = message.get("command")
if cmd == "new_package":
# A new package has arrived; add it to the queue
pkg = message["package"]
with self.lock:
self.package_queue.append(pkg)
self.assign_packages()
elif cmd == "deliverer_available":
# A deliverer is now available for a new package
d_id = message["deliverer_id"]
d_ref = message["deliverer_ref"]
with self.lock:
self.deliverers[d_id] = d_ref
self.assign_packages()
elif cmd == "delivery_failed":
# A package delivery has failed; requeue if not yet delivered
pkg = message["package"]
with self.lock:
self.failed_attempts += 1
if pkg["id"] not in self.delivered_ids:
# Put the package back at the front of the queue
self.package_queue.insert(0, pkg)
self.in_flight -= 1
self.assign_packages()
elif cmd == "delivery_successful":
# A package has been delivered successfully
pkg_id = message["package_id"]
with self.lock:
if pkg_id not in self.delivered_ids:
self.delivered_ids.add(pkg_id)
self.successful_deliveries += 1
self.in_flight -= 1
self.assign_packages()
elif cmd == "generator_finished":
# No more new packages will arrive
with self.lock:
self.generator_done = True
self.assign_packages()
return state
def assign_packages(self) -> None:
"""
Assigns packages to available deliverers. If no deliverers are free or no packages are pending,
it does nothing. Also notifies waiting threads when all deliveries are complete.
"""
with self.lock:
# Assign as many packages as possible while both queue and deliverers exist
while self.package_queue and self.deliverers:
pkg = self.package_queue.pop(0)
(d_id, d_ref) = self.deliverers.popitem()
self.in_flight += 1
d_ref.cast({"command": "deliver", "package": pkg})
# If there are no packages left, nothing in flight, and the generator is finished:
# we can notify any threads waiting on condition variable (e.g., main thread).
if not self.package_queue and self.in_flight == 0 and self.generator_done:
self.cv.notify_all()
def wait_until_all_delivered(self, timeout: Optional[float] = None) -> None:
"""
Blocks the calling thread until all packages have been delivered or the optional
timeout expires. This is achieved with the condition variable `cv`.
"""
with self.cv:
start = time.time()
while True:
# We'll keep waiting if there's still something in queue or in flight
# or if the generator hasn't signaled it's done.
still_not_done = (self.package_queue or self.in_flight or not self.generator_done)
if not still_not_done:
# All deliveries are complete
break
if timeout is not None:
elapsed = time.time() - start
if elapsed >= timeout:
break
self.cv.wait(timeout - elapsed)
else:
self.cv.wait()
def terminate(self, state: Any) -> None:
"""
Prints a delivery report summarizing total deliveries, failures, and remaining packages.
"""
with self.lock:
print("\n--- Delivery Report ---")
print(f"Successful Deliveries: {self.successful_deliveries}")
print(f"Failed Attempts: {self.failed_attempts}")
print(f"Packages remaining in queue: {len(self.package_queue)}")
print("-----------------------\n")
class Deliverer(GenServer):
"""A deliverer server that attempts to deliver packages and reports success or failure."""
def init(self, deliverer_id: str, manager: DeliveryManager) -> None:
"""
Initializes a deliverer with a unique ID and a reference to the DeliveryManager.
Immediately notifies the manager that this deliverer is available.
"""
self.deliverer_id: str = deliverer_id
self.manager: DeliveryManager = manager
# Let the manager know I'm available
self.manager.cast({
"command": "deliverer_available",
"deliverer_id": self.deliverer_id,
"deliverer_ref": self
})
return None
def handle_cast(self, message: Dict[str, Any], state: Any) -> Any:
"""Receives delivery commands and triggers the package delivery."""
if message.get("command") == "deliver":
pkg = message["package"]
self.deliver_package(pkg)
return state
def deliver_package(self, pkg: Dict[str, Any]) -> None:
"""
Attempts to deliver the package. There's a random chance of failure, and in either
case, the manager is informed about the delivery result.
"""
# Simulate some random delivery time
time.sleep(random.uniform(0.05, 0.15)) # Adjust for faster testing
fail_chance = 0.3 # 30% chance of failing
if random.random() < fail_chance:
print(f"{self.deliverer_id} FAILED: {pkg['id']}")
self.manager.cast({
"command": "delivery_failed",
"package": pkg
})
else:
print(f"{self.deliverer_id} DELIVERED: {pkg['id']}")
self.manager.cast({
"command": "delivery_successful",
"package_id": pkg["id"]
})
# After a delivery attempt, re-announce availability
self.manager.cast({
"command": "deliverer_available",
"deliverer_id": self.deliverer_id,
"deliverer_ref": self
})
class PackageGenerator(GenServer):
"""Generates a specified number of packages and sends them to the DeliveryManager."""
def init(self, manager: DeliveryManager, total: int = 10) -> None:
"""
Sets up the generator with a reference to the DeliveryManager and a default package
total if none is specified.
"""
self.manager: DeliveryManager = manager
self.total: int = total
return None
def handle_cast(self, message: Dict[str, Any], state: Any) -> Any:
"""Handles a request to generate a certain number of packages."""
cmd = message.get("command")
if cmd == "generate_packages":
n = message.get("num_packages", self.total)
for _ in range(n):
pkg_id = str(uuid.uuid4())
self.manager.cast({
"command": "new_package",
"package": {"id": pkg_id, "name": "some-item"}
})
# Notify the manager that no more packages will be generated
self.manager.cast({"command": "generator_finished"})
return state
if __name__ == "__main__":
# 1. Start the DeliveryManager
manager = DeliveryManager()
manager.start()
# 2. Start the PackageGenerator (with a 'total' argument if desired)
generator = PackageGenerator()
generator.start(manager, total=10)
# 3. Start multiple Deliverers
deliverers = []
for i in range(4):
d = Deliverer()
d.start(f"Deliverer-{i+1}", manager)
deliverers.append(d)
# 4. Command the generator to create and enqueue packages
# Adjust 'num_packages' as needed
generator.cast({"command": "generate_packages", "num_packages": 32})
print("Main thread waiting for all packages to be delivered...")
# 5. Wait until everything is delivered or a timeout occurs
manager.wait_until_all_delivered(timeout=30)
# 6. Stop all servers gracefully
generator.stop()
for d in deliverers:
d.stop()
manager.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment