Created
November 10, 2015 17:06
-
-
Save billyshambrook/55dfe505e842e6dfffb0 to your computer and use it in GitHub Desktop.
Multiple channel pika consumer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import time | |
import pika | |
logger = logging.getLogger(__name__) | |
class Channel(object): | |
""" """ | |
EXCHANGE = 'message' | |
EXCHANGE_TYPE = 'topic' | |
QUEUE = 'text{}' | |
ROUTING_KEY = 'example.text' | |
def __init__(self, connection, identifier): | |
""" """ | |
self._connection = connection | |
self._consumer_tag = None | |
self._id = identifier | |
def run(self): | |
""" """ | |
self._connection.channel(on_open_callback=self.on_open_callback) | |
def on_open_callback(self, channel): | |
""" """ | |
logger.info('[{}] Channel opened'.format(self._id)) | |
self._channel = channel | |
self.add_on_channel_close_callback() | |
self.setup_exchange(self.EXCHANGE) | |
def add_on_channel_close_callback(self): | |
""" """ | |
logger.info('[{}] Adding channel close callback'.format(self._id)) | |
self._channel.add_on_close_callback(self.on_channel_closed) | |
def on_channel_closed(self, channel, reply_code, reply_text): | |
""" """ | |
logger.warning('[{}] channel {} was closed: ({}) {}'.format( | |
self._id, channel, reply_code, reply_text)) | |
self._connection.close() | |
def setup_exchange(self, exchange_name): | |
""" """ | |
logger.info('[{}] Declaring exchange {}'.format(self._id, exchange_name)) | |
self._channel.exchange_declare( | |
self.on_exchange_declareok, exchange_name, self.EXCHANGE_TYPE) | |
def on_exchange_declareok(self, unused_frame): | |
""" """ | |
logger.info('[{}] Exchange declared'.format(self._id)) | |
queue_name = self.QUEUE.format(self._id) | |
self.setup_queue(queue_name) | |
def setup_queue(self, queue_name): | |
""" """ | |
logger.info('[{}] Declaring queue {}'.format(self._id, queue_name)) | |
self._channel.queue_declare(self.on_queue_declareok, queue_name) | |
def on_queue_declareok(self, method_frame): | |
""" """ | |
logger.info('[{}] Binding {} to {} with {}'.format( | |
self._id, self.EXCHANGE, self.QUEUE.format(self._id), self.ROUTING_KEY)) | |
self._channel.queue_bind( | |
self.on_bindok, self.QUEUE.format(self._id), self.EXCHANGE, self.ROUTING_KEY) | |
def on_bindok(self, unused_frame): | |
""" """ | |
logger.info('[{}] Queue bound') | |
self.start_consuming() | |
def start_consuming(self): | |
""" """ | |
logger.info('[{}] Issing consumer related RPC commands'.format(self._id)) | |
self.add_on_cancel_callback() | |
self._consumer_tag = self._channel.basic_consume( | |
self.on_message, self.QUEUE.format(self._id)) | |
def add_on_cancel_callback(self): | |
""" """ | |
logger.info('[{}] Adding consumer cancellation callback'.format(self._id)) | |
self._channel.add_on_cancel_callback(self.on_consumer_cancelled) | |
def on_consumer_cancelled(self, method_frame): | |
""" """ | |
logger.info('[{}] Consumer was cancelled remotely, shutting down {}'.format( | |
self._id, method_frame)) | |
if self._channel: | |
self._channel.close() | |
def on_message(self, unused_channel, basic_deliver, properties, body): | |
""" """ | |
logger.info('[{}] Received message # {} from {}: {}'.format( | |
self._id, basic_deliver.delivery_tag, properties.app_id, body)) | |
time.sleep(5) | |
self.acknowledge_message(basic_deliver.delivery_tag) | |
def acknowledge_message(self, delivery_tag): | |
""" """ | |
logger.info('[{}] Acknowledging message {}'.format(self._id, delivery_tag)) | |
self._channel.basic_ack(delivery_tag) | |
def stop_consuming(self): | |
""" """ | |
if self._channel: | |
logger.info('[{}] Sending a Basic.Cancel RPC command to RabbitMQ'.format(self._id)) | |
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag) | |
def on_cancelok(self, unused_frame): | |
""" """ | |
logger.info('[{}] RabbitMQ acknowledged the cancellation of the consumer'.format(self._id)) | |
self.close_channel() | |
def close_channel(self): | |
""" """ | |
logger.info('[{}] Closing channel'.format(self._id)) | |
self._channel.close() | |
class Consumer(object): | |
""" """ | |
def __init__(self, amqp_url): | |
""" """ | |
self._connection = None | |
self._closing = False | |
self._channels = [] | |
self._url = amqp_url | |
def run(self): | |
""" """ | |
self._connection = self.connect() | |
self._connection.ioloop.start() | |
def connect(self): | |
""" """ | |
logger.info('Connecting to {}'.format(self._url)) | |
return pika.SelectConnection( | |
pika.URLParameters(self._url), | |
self.on_connection_open, | |
stop_ioloop_on_close=False) | |
def on_connection_open(self, unused_connection): | |
""" """ | |
logger.info('Connection opened') | |
self.add_on_connection_close_callback() | |
self.open_channels() | |
def add_on_connection_close_callback(self): | |
""" """ | |
logger.info('Adding connection close callback') | |
self._connection.add_on_close_callback(self.on_connection_closed) | |
def on_connection_closed(self, connection, reply_code, reply_text): | |
""" """ | |
self._channels = [] | |
if self._closing: | |
self._connection.ioloop.stop() | |
else: | |
logger.warning('Connection closed, reopening in 5 seconds: ({}) {}'.format( | |
reply_code, reply_text)) | |
self._connection.add_timeout(5, self.reconnect) | |
def reconnect(self): | |
""" """ | |
self._connection.ioloop.stop() | |
if not self._closing: | |
self._connection = self.connect() | |
self._connection.ioloop.start() | |
def open_channels(self): | |
""" """ | |
logger.info('Creating new channels.') | |
for i in range(2): | |
channel = Channel(self._connection, str(i)) | |
channel.run() | |
self._channels.append(channel) | |
def stop(self): | |
""" """ | |
logger.info('Stopping') | |
self._closing = True | |
for channel in self._channels: | |
channel.stop_consuming() | |
self._connection.ioloop.start() | |
logger.info('Stopped') | |
def close_connection(self): | |
""" """ | |
logger.info('Closing connection') | |
self._connection.close() | |
def main(): | |
""" Entrypoint. """ | |
logging.basicConfig(level=logging.INFO, format='%(message)s') | |
consumer = Consumer('amqp://guest:guest@localhost:5672/%2F') | |
try: | |
consumer.run() | |
except KeyboardInterrupt: | |
consumer.stop() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment