Created
November 13, 2018 08:17
-
-
Save jcrsilva/2a0dd9643eea274854e5d99e00a1154a to your computer and use it in GitHub Desktop.
Small python script to produce+consume to a kafka broker. Tests that the broker is working.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import argparse | |
import logging | |
import sys | |
import os | |
import random | |
import string | |
from kafka import KafkaConsumer, KafkaProducer | |
def _exit_with_error(message, code=2): | |
""" | |
Helper function, exits with status '2' after printing a message | |
:return: None | |
""" | |
print message | |
sys.exit(code) # wrong argument bash code | |
def _get_arguments(args=sys.argv[1:]): | |
""" | |
Gets command line arguments, validates them and parses them into useful | |
formats | |
:return: Command line arguments | |
:rtype: dict | |
""" | |
parser = argparse.ArgumentParser( | |
description="Kafka Producer + Consumer tester" | |
) | |
# Configuration file path | |
parser.add_argument("-t", "--topic", | |
help="Topic to produce to, default is 'test'", | |
required=False, | |
default="test" | |
) | |
parser.add_argument("-ti", "--timeout", | |
help="Timeout to wait for produce/consume operations in seconds, default 30", | |
required=False, | |
default=30 | |
) | |
parser.add_argument("-b", "--broker", | |
help="Broker connection string, default 'localhost:9092'", | |
required=False, | |
default="localhost:9092" | |
) | |
parser.add_argument("-n", "--number", | |
help="Number of messages to grab from topic to get our test message, default=30", | |
required=False, | |
default=30 | |
) | |
parser.add_argument("-l", "--log-path", | |
help="The path to an output log file", | |
required=False | |
) | |
parser.add_argument("-v", "--verbosity", | |
action='count', | |
help="Level of verbosity for output log (-v...-vv)", | |
required=False | |
) | |
args = parser.parse_args(args=args) | |
# log path | |
if args.log_path: | |
args.log_path = os.path.abspath(args.log_path) | |
if not os.path.isfile(args.log_path)\ | |
or not os.access(os.path.dirname(args.log_path), os.W_OK): | |
print "Log path '{}' doesnt seem to be writeable".format(args.log_path) | |
_exit_with_error(parser.print_help()) | |
# verbosity | |
if not args.verbosity or args.verbosity <= 0: | |
args.verbosity = logging.WARNING | |
elif args.verbosity == 1: | |
args.verbosity = logging.INFO | |
elif args.verbosity == 2: | |
args.verbosity = logging.DEBUG | |
else: | |
args.verbosity = logging.NOTSET | |
return args | |
def _setup_logging(file_path=None, level=logging.WARNING): | |
""" | |
Sets up the root logger and level | |
:param str file_path: file path for the (optional) log file | |
:param int level: logging level as per the \ | |
`logging module \ | |
<https://docs.python.org/2/library/logging.html#logging-levels>`_ | |
:return: None | |
""" | |
log_formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(name)s: %(message)s") | |
root_logger = logging.getLogger('') | |
console_handler = logging.StreamHandler(sys.stdout) | |
console_handler.setFormatter(log_formatter) | |
root_logger.addHandler(console_handler) | |
if file_path: | |
file_handler = logging.FileHandler(file_path) | |
file_handler.setFormatter(log_formatter) | |
root_logger.addHandler(file_handler) | |
root_logger.setLevel(level=level) | |
def _main(): | |
args = _get_arguments() | |
_setup_logging(level=args.verbosity) | |
logger = logging.getLogger(__name__) | |
logger.debug("Starting up...") | |
random_str = ''.join(random.choice(string.ascii_lowercase) for i in xrange(20)) | |
logger.debug(random_str) | |
producer = KafkaProducer( | |
bootstrap_servers=args.broker, | |
client_id='kafka_produce_consume', | |
) | |
future = producer.send( | |
topic=args.topic, | |
value=random_str, | |
) | |
result = future.get(timeout=args.timeout) | |
logger.info(result) | |
producer.close(timeout=args.timeout) | |
logger.info("Produced message successfully: {}".format(random_str)) | |
consumer = KafkaConsumer( | |
args.topic, | |
auto_offset_reset="earliest", | |
bootstrap_servers=args.broker, | |
) | |
for i, message in enumerate(consumer): | |
if i >= args.number: | |
logger.error("Reached maximum number of messages in topic, did not find our message") | |
consumer.close() | |
sys.exit(1) | |
elif message.value == random_str: | |
logger.info("Found message!") | |
break | |
else: | |
consumer.close() | |
# @TODO add topic deletion once admin interface is exposed in kafka-python | |
sys.exit(0) | |
if __name__ == "__main__": | |
_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment