Created
December 28, 2024 05:47
-
-
Save ourway/632ca23c025adaec64979952b9c57295 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
PROCESS EXPLANATION: | |
-------------------- | |
1. The `PackageGenerator` creates new packages and sends them to the `DeliveryManager`. | |
2. The `DeliveryManager` receives new packages and then checks if there are available `Deliverer` workers. | |
3. The `Deliverer` workers pick up packages, attempt to deliver them, and report back success or failure. | |
4. When a package delivery fails, it is re-inserted into the front of the queue by the `DeliveryManager`. | |
5. The system continues until the package queue is empty, all packages are delivered, and the generator | |
has signaled that it is done generating packages. | |
6. The `DeliveryManager` provides a condition variable (`cv`) to wait until all packages are delivered. | |
7. Finally, the system prints out a summary report of successful deliveries, failed attempts, and | |
remaining packages. | |
""" | |
import queue | |
import random | |
import threading | |
import time | |
import uuid | |
from typing import Any, Dict, List, Optional, Set, Tuple | |
class GenServer: | |
"""A generic server class that handles messages in a mailbox using a dedicated thread.""" | |
def __init__(self) -> None: | |
""" | |
Initializes the GenServer with: | |
- a mailbox (Queue) for incoming messages | |
- a thread that will run the server loop | |
- a flag indicating whether it is running | |
""" | |
self.mailbox: queue.Queue = queue.Queue() | |
self.thread: Optional[threading.Thread] = None | |
self.running: bool = False | |
def start(self, *args: Any, **kwargs: Any) -> None: | |
""" | |
Starts the server thread if not already running. | |
It runs the `_loop` method as a new thread. | |
""" | |
if self.running: | |
return | |
self.running = True | |
self.thread = threading.Thread(target=self._loop, args=args, kwargs=kwargs) | |
# Using non-daemon so threads aren't just killed when main returns | |
self.thread.daemon = False | |
self.thread.start() | |
def stop(self) -> None: | |
""" | |
Stops the server, if running, by sending a special stop command | |
to the mailbox. Then waits for the thread to join. | |
""" | |
if not self.running: | |
return | |
self.running = False | |
self.mailbox.put(('_command', 'stop')) | |
self.thread.join() | |
def cast(self, message: Any) -> None: | |
""" | |
Sends a message to the server's mailbox. | |
In OTP Erlang terms, this is analogous to casting a message | |
without expecting a return value. | |
""" | |
self.mailbox.put(message) | |
def _loop(self, *args: Any, **kwargs: Any) -> None: | |
"""Internal loop that processes messages from the mailbox until 'stop' is received.""" | |
state = self.init(*args, **kwargs) | |
while self.running: | |
try: | |
while True: | |
msg = self.mailbox.get_nowait() | |
if isinstance(msg, tuple) and msg == ('_command', 'stop'): | |
# If we receive the stop command, exit the loop | |
self.running = False | |
break | |
elif isinstance(msg, dict): | |
# Process dictionary messages with handle_cast | |
state = self.handle_cast(msg, state) | |
except queue.Empty: | |
# No messages left to process | |
pass | |
# Slight sleep to avoid hogging the CPU | |
time.sleep(0.01) | |
self.terminate(state) | |
def init(self, *args: Any, **kwargs: Any) -> Any: | |
"""One-time initialization logic for child classes to override if needed.""" | |
return None | |
def handle_cast(self, message: Dict[str, Any], state: Any) -> Any: | |
"""Defines how the server should handle incoming dictionary messages.""" | |
print(f"Unhandled cast: {message}") | |
return state | |
def terminate(self, state: Any) -> None: | |
"""Cleanup logic when the server is shutting down.""" | |
pass | |
class DeliveryManager(GenServer): | |
"""Manages a queue of packages and coordinates with multiple deliverers.""" | |
def init(self) -> None: | |
""" | |
Initializes delivery-related state: | |
- self.package_queue: list of pending packages | |
- self.deliverers: dict mapping deliverer IDs to their GenServer references | |
- self.failed_attempts: track how many total delivery failures occurred | |
- self.successful_deliveries: track how many deliveries succeeded | |
- self.delivered_ids: set of IDs that have been successfully delivered | |
- self.in_flight: how many deliveries are currently being attempted | |
- self.generator_done: boolean flag indicating no more packages will be generated | |
- self.lock: a threading lock to control concurrent access to shared state | |
- self.cv: a condition variable to notify waiting threads when all deliveries finish | |
""" | |
self.package_queue: List[Dict[str, Any]] = [] | |
self.deliverers: Dict[str, 'Deliverer'] = {} | |
self.failed_attempts: int = 0 | |
self.successful_deliveries: int = 0 | |
self.delivered_ids: Set[str] = set() | |
self.in_flight: int = 0 | |
self.generator_done: bool = False # indicates the generator is done sending packages | |
self.lock: threading.Lock = threading.Lock() | |
self.cv: threading.Condition = threading.Condition(self.lock) | |
return None | |
def handle_cast(self, message: Dict[str, Any], state: Any) -> Any: | |
"""Processes various commands related to package delivery.""" | |
cmd = message.get("command") | |
if cmd == "new_package": | |
# A new package has arrived; add it to the queue | |
pkg = message["package"] | |
with self.lock: | |
self.package_queue.append(pkg) | |
self.assign_packages() | |
elif cmd == "deliverer_available": | |
# A deliverer is now available for a new package | |
d_id = message["deliverer_id"] | |
d_ref = message["deliverer_ref"] | |
with self.lock: | |
self.deliverers[d_id] = d_ref | |
self.assign_packages() | |
elif cmd == "delivery_failed": | |
# A package delivery has failed; requeue if not yet delivered | |
pkg = message["package"] | |
with self.lock: | |
self.failed_attempts += 1 | |
if pkg["id"] not in self.delivered_ids: | |
# Put the package back at the front of the queue | |
self.package_queue.insert(0, pkg) | |
self.in_flight -= 1 | |
self.assign_packages() | |
elif cmd == "delivery_successful": | |
# A package has been delivered successfully | |
pkg_id = message["package_id"] | |
with self.lock: | |
if pkg_id not in self.delivered_ids: | |
self.delivered_ids.add(pkg_id) | |
self.successful_deliveries += 1 | |
self.in_flight -= 1 | |
self.assign_packages() | |
elif cmd == "generator_finished": | |
# No more new packages will arrive | |
with self.lock: | |
self.generator_done = True | |
self.assign_packages() | |
return state | |
def assign_packages(self) -> None: | |
""" | |
Assigns packages to available deliverers. If no deliverers are free or no packages are pending, | |
it does nothing. Also notifies waiting threads when all deliveries are complete. | |
""" | |
with self.lock: | |
# Assign as many packages as possible while both queue and deliverers exist | |
while self.package_queue and self.deliverers: | |
pkg = self.package_queue.pop(0) | |
(d_id, d_ref) = self.deliverers.popitem() | |
self.in_flight += 1 | |
d_ref.cast({"command": "deliver", "package": pkg}) | |
# If there are no packages left, nothing in flight, and the generator is finished: | |
# we can notify any threads waiting on condition variable (e.g., main thread). | |
if not self.package_queue and self.in_flight == 0 and self.generator_done: | |
self.cv.notify_all() | |
def wait_until_all_delivered(self, timeout: Optional[float] = None) -> None: | |
""" | |
Blocks the calling thread until all packages have been delivered or the optional | |
timeout expires. This is achieved with the condition variable `cv`. | |
""" | |
with self.cv: | |
start = time.time() | |
while True: | |
# We'll keep waiting if there's still something in queue or in flight | |
# or if the generator hasn't signaled it's done. | |
still_not_done = (self.package_queue or self.in_flight or not self.generator_done) | |
if not still_not_done: | |
# All deliveries are complete | |
break | |
if timeout is not None: | |
elapsed = time.time() - start | |
if elapsed >= timeout: | |
break | |
self.cv.wait(timeout - elapsed) | |
else: | |
self.cv.wait() | |
def terminate(self, state: Any) -> None: | |
""" | |
Prints a delivery report summarizing total deliveries, failures, and remaining packages. | |
""" | |
with self.lock: | |
print("\n--- Delivery Report ---") | |
print(f"Successful Deliveries: {self.successful_deliveries}") | |
print(f"Failed Attempts: {self.failed_attempts}") | |
print(f"Packages remaining in queue: {len(self.package_queue)}") | |
print("-----------------------\n") | |
class Deliverer(GenServer): | |
"""A deliverer server that attempts to deliver packages and reports success or failure.""" | |
def init(self, deliverer_id: str, manager: DeliveryManager) -> None: | |
""" | |
Initializes a deliverer with a unique ID and a reference to the DeliveryManager. | |
Immediately notifies the manager that this deliverer is available. | |
""" | |
self.deliverer_id: str = deliverer_id | |
self.manager: DeliveryManager = manager | |
# Let the manager know I'm available | |
self.manager.cast({ | |
"command": "deliverer_available", | |
"deliverer_id": self.deliverer_id, | |
"deliverer_ref": self | |
}) | |
return None | |
def handle_cast(self, message: Dict[str, Any], state: Any) -> Any: | |
"""Receives delivery commands and triggers the package delivery.""" | |
if message.get("command") == "deliver": | |
pkg = message["package"] | |
self.deliver_package(pkg) | |
return state | |
def deliver_package(self, pkg: Dict[str, Any]) -> None: | |
""" | |
Attempts to deliver the package. There's a random chance of failure, and in either | |
case, the manager is informed about the delivery result. | |
""" | |
# Simulate some random delivery time | |
time.sleep(random.uniform(0.05, 0.15)) # Adjust for faster testing | |
fail_chance = 0.3 # 30% chance of failing | |
if random.random() < fail_chance: | |
print(f"{self.deliverer_id} FAILED: {pkg['id']}") | |
self.manager.cast({ | |
"command": "delivery_failed", | |
"package": pkg | |
}) | |
else: | |
print(f"{self.deliverer_id} DELIVERED: {pkg['id']}") | |
self.manager.cast({ | |
"command": "delivery_successful", | |
"package_id": pkg["id"] | |
}) | |
# After a delivery attempt, re-announce availability | |
self.manager.cast({ | |
"command": "deliverer_available", | |
"deliverer_id": self.deliverer_id, | |
"deliverer_ref": self | |
}) | |
class PackageGenerator(GenServer): | |
"""Generates a specified number of packages and sends them to the DeliveryManager.""" | |
def init(self, manager: DeliveryManager, total: int = 10) -> None: | |
""" | |
Sets up the generator with a reference to the DeliveryManager and a default package | |
total if none is specified. | |
""" | |
self.manager: DeliveryManager = manager | |
self.total: int = total | |
return None | |
def handle_cast(self, message: Dict[str, Any], state: Any) -> Any: | |
"""Handles a request to generate a certain number of packages.""" | |
cmd = message.get("command") | |
if cmd == "generate_packages": | |
n = message.get("num_packages", self.total) | |
for _ in range(n): | |
pkg_id = str(uuid.uuid4()) | |
self.manager.cast({ | |
"command": "new_package", | |
"package": {"id": pkg_id, "name": "some-item"} | |
}) | |
# Notify the manager that no more packages will be generated | |
self.manager.cast({"command": "generator_finished"}) | |
return state | |
if __name__ == "__main__": | |
# 1. Start the DeliveryManager | |
manager = DeliveryManager() | |
manager.start() | |
# 2. Start the PackageGenerator (with a 'total' argument if desired) | |
generator = PackageGenerator() | |
generator.start(manager, total=10) | |
# 3. Start multiple Deliverers | |
deliverers = [] | |
for i in range(4): | |
d = Deliverer() | |
d.start(f"Deliverer-{i+1}", manager) | |
deliverers.append(d) | |
# 4. Command the generator to create and enqueue packages | |
# Adjust 'num_packages' as needed | |
generator.cast({"command": "generate_packages", "num_packages": 32}) | |
print("Main thread waiting for all packages to be delivered...") | |
# 5. Wait until everything is delivered or a timeout occurs | |
manager.wait_until_all_delivered(timeout=30) | |
# 6. Stop all servers gracefully | |
generator.stop() | |
for d in deliverers: | |
d.stop() | |
manager.stop() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment