Last active
June 3, 2022 00:24
-
-
Save rjriel/2ba676b75b9f7c5660de44500c94baf1 to your computer and use it in GitHub Desktop.
Replay to a Temporary Queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Create a python3 virtual environment | |
python3 -m venv venv | |
# Activate the virtual env | |
source venv/bin/activate | |
# Install dependencies | |
pip install -r requirements.txt | |
# the sample code is for an environment where you are running a broker | |
# in a local environment. If you need to, uncomment the lines below and | |
# replace the < > values with your true values | |
# export SOLACE_HOST=<host> | |
# export SOLACE_VPN=<message vpn> | |
# export SOLACE_USERNAME=<client username> | |
# export SOLACE_PASSWORD=<client password> | |
# Run samples | |
python temporary_subscriber.py |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
autopep8==1.6.0 | |
certifi==2020.12.5 | |
chardet==4.0.0 | |
idna==2.10 | |
pycodestyle==2.8.0 | |
requests==2.25.1 | |
solace-pubsubplus==1.3.0 | |
toml==0.10.2 | |
urllib3==1.26.4 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Consumer that binds to exclusive durable queue | |
# Assumes existence of queue on broker holding messages. | |
# Note: create queue with topic subscription | |
# See https://docs.solace.com/Solace-PubSub-Messaging-APIs/API-Developer-Guide/Adding-Topic-Subscriptio.htm for more details | |
from datetime import datetime | |
import os | |
import platform | |
import time | |
from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener, ServiceEvent | |
from solace.messaging.resources.queue import Queue | |
from solace.messaging.config.retry_strategy import RetryStrategy | |
from solace.messaging.config.replay_strategy import ReplayStrategy | |
from solace.messaging.config.missing_resources_creation_configuration import MissingResourcesCreationStrategy | |
from solace.messaging.receiver.persistent_message_receiver import PersistentMessageReceiver | |
from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage | |
from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError | |
from solace.messaging.resources.topic_subscription import TopicSubscription | |
if platform.uname().system == 'Windows': os.environ["PYTHONUNBUFFERED"] = "1" # Disable stdout buffer | |
# Handle received messages | |
class MessageHandlerImpl(MessageHandler): | |
def on_message(self, message: InboundMessage): | |
# Check if the payload is a String or Byte, decode if its the later | |
payload = message.get_payload_as_string() if message.get_payload_as_string() != None else message.get_payload_as_bytes() | |
if isinstance(payload, bytearray): | |
print(f"Received a message of type: {type(payload)}. Decoding to string") | |
payload = payload.decode() | |
topic = message.get_destination_name() | |
print("\n" + f"Received message on: {topic}") | |
print("\n" + f"Message payload: {payload} \n") | |
# print("\n" + f"Message dump: {message} \n") | |
# Inner classes for error handling | |
class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener): | |
def on_reconnected(self, e: ServiceEvent): | |
print("\non_reconnected") | |
print(f"Error cause: {e.get_cause()}") | |
print(f"Message: {e.get_message()}") | |
def on_reconnecting(self, e: "ServiceEvent"): | |
print("\non_reconnecting") | |
print(f"Error cause: {e.get_cause()}") | |
print(f"Message: {e.get_message()}") | |
def on_service_interrupted(self, e: "ServiceEvent"): | |
print("\non_service_interrupted") | |
print(f"Error cause: {e.get_cause()}") | |
print(f"Message: {e.get_message()}") | |
# Broker Config. Note: Could pass other properties Look into | |
broker_props = { | |
"solace.messaging.transport.host": os.environ.get('SOLACE_HOST') or "tcp://localhost:55555,tcp://localhost:55554", | |
"solace.messaging.service.vpn-name": os.environ.get('SOLACE_VPN') or "default", | |
"solace.messaging.authentication.scheme.basic.username": os.environ.get('SOLACE_USERNAME') or "default", | |
"solace.messaging.authentication.scheme.basic.password": os.environ.get('SOLACE_PASSWORD') or "default" | |
} | |
# Build A messaging service with a reconnection strategy of 20 retries over an interval of 3 seconds | |
# Note: The reconnections strategy could also be configured using the broker properties object | |
messaging_service = MessagingService.builder().from_properties(broker_props)\ | |
.with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3000))\ | |
.build() | |
# Blocking connect thread | |
messaging_service.connect() | |
print(f'Messaging Service connected? {messaging_service.is_connected}') | |
# Event Handling for the messaging service | |
service_handler = ServiceEventHandler() | |
messaging_service.add_reconnection_listener(service_handler) | |
messaging_service.add_reconnection_attempt_listener(service_handler) | |
messaging_service.add_service_interruption_listener(service_handler) | |
# Queue name. | |
# NOTE: This assumes that a queue template exists with the name filter matching temp-training/> | |
queue_name = "temp-training/browse-game-scans" | |
durable_exclusive_queue = Queue.durable_exclusive_queue(queue_name) | |
try: | |
# The topic we are subscribing to is to get all games scanned in all stores in the APAC region | |
topic_sub = [TopicSubscription.of("gameco/retail/apac/*/game/*/scanned")] | |
reply_strategy = ReplayStrategy.all_messages() | |
# Build a receiver and bind it to the temporary queue | |
persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\ | |
.with_missing_resources_creation_strategy(MissingResourcesCreationStrategy.CREATE_ON_START)\ | |
.with_subscriptions(topic_sub)\ | |
.with_message_replay(reply_strategy)\ | |
.with_message_auto_acknowledgement()\ | |
.build(durable_exclusive_queue) | |
persistent_receiver.start() | |
# Callback for received messages | |
persistent_receiver.receive_async(MessageHandlerImpl()) | |
print(f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]') | |
try: | |
while True: | |
time.sleep(1) | |
except KeyboardInterrupt: | |
print('\nKeyboardInterrupt received') | |
# Handle API exception | |
except PubSubPlusClientError as exception: | |
print(f'\nMake sure queue {queue_name} exists on broker!') | |
finally: | |
if persistent_receiver and persistent_receiver.is_running(): | |
print('\nTerminating receiver') | |
persistent_receiver.terminate(grace_period = 0) | |
print('\nDisconnecting Messaging Service') | |
messaging_service.disconnect() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment