Created
February 2, 2018 15:11
-
-
Save stephnr/ca9b44bc5cadae8bb6e6b422d5ec2670 to your computer and use it in GitHub Desktop.
Tail a kinesis stream for decrypted records
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
red=$'\e[1;31m' | |
green=$'\e[1;32m' | |
yellow=$'\e[1;33m' | |
blue=$'\e[1;34m' | |
magenta=$'\e[1;35m' | |
cyan=$'\e[1;36m' | |
dim=$'\e[2m' | |
end=$'\e[0m' | |
# Detect if jq is installed | |
hash jq 2>/dev/null || { printf "${yellow}jq${end} command is required but is not installed.\n\n${dim}Please download from here >>${end} ${cyan}https://stedolan.github.io/jq/download/${end}\n"; exit 1; } | |
if [ "$1" == "help" ] || [ $# -eq 0 ]; then | |
printf "Usage: ${dim}tailKinesisRecords --stream-name=my-stream [options]${end}\n\n" | |
printf "tails a kinesis stream and prints the data contents to stdout\n\n" | |
printf "${cyan}Flags:${end}\n\n" | |
printf "%-30s%s\n" "-s STREAM_NAME" "${dim}the name of the kinesis stream${end}" | |
printf "%-30s%s\n" "-i IDX" "${dim}the index of the shard to use. [Default: 0]${end}" | |
printf "%-30s%s\n" "-t <TRIM_HORIZON|LATEST>" "${dim}the type of kinesis iterator. Defaults to LATEST${end}" | |
printf "%-30s%s\n" "-n COUNT" "${dim}how many records to be returned from each call to kinesis. [Default: 1]${end}" | |
printf "%-30s%s\n" "-h URL" "${dim}host url of the kinesis stream. Defaults to AWS${end}" | |
printf "%-30s%s\n" "-p AWS_PROFILE_NAME" "${dim}name of the AWS profile to use. Defaults to value of AWS_DEFAULT_PROFILE variable${end}" | |
exit 0 | |
fi | |
# Defaults | |
EndpointUrl="" | |
Limit=1 | |
Profile=$AWS_DEFAULT_PROFILE | |
ShardId=0 | |
ShardIteratorType="LATEST" | |
MY_OPTS=":s:i:t:n:h:p:" | |
while getopts ${MY_OPTS} opt; do | |
case $opt in | |
s) StreamName=$OPTARG ;; | |
i) | |
if [[ $OPTARG =~ ^[[:digit:]]+$ ]]; then | |
ShardId=$OPTARG | |
fi | |
;; | |
t) ShardIteratorType=$OPTARG ;; | |
n) Limit=$OPTARG ;; | |
h) EndpointUrl=$OPTARG ;; | |
p) Profile=$OPTARG ;; | |
\?) | |
echo "Invalid option: -$OPTARG" >&2 | |
exit 1 | |
;; | |
esac | |
done | |
if [ -z "$StreamName" ]; then | |
printf "\nNo stream name was provided. Aborting.\n" | |
exit 1 | |
fi | |
if [ "$Profile" != "" ]; then | |
Profile="$Profile" | |
fi | |
if [ "$EndpointUrl" != "" ]; then | |
EndpointUrl="--endpoint-url $EndpointUrl --no-verify-ssl" | |
fi | |
StreamDescription=$(aws kinesis describe-stream --stream-name $StreamName $Profile $EndpointUrl | jq -r ".StreamDescription") | |
Status=$(echo $StreamDescription | jq -r ".StreamStatus") | |
if [ "${Status}" != "ACTIVE" ]; then | |
echo "Stream is not active!" | |
exit 1 | |
fi | |
Shard=$(echo $StreamDescription | jq -r ".Shards[$ShardId]") | |
ShardId=$(echo $Shard | jq -r ".ShardId") | |
Iterator=$(aws kinesis get-shard-iterator --stream-name $StreamName --shard-id $ShardId --shard-iterator-type $ShardIteratorType $EndpointUrl | jq -r ".ShardIterator") | |
printf "Beginning to tail kinesis stream : [%s] at %s ...\n" $StreamName $ShardId | |
while : ; do | |
response=$(aws kinesis get-records --shard-iterator $Iterator --limit $Limit $Profile $EndpointUrl) | |
records=$(echo $response | jq -r ".Records") | |
if [ ${#records} -ge 3 ]; then | |
echo "Milliseconds Behind: $(echo $response | jq -r ".MillisBehindLatest")\n" | |
echo $(echo $records | jq -r ".[] | .Data" | base64 -D) | |
fi | |
Iterator=$(echo $response | jq -r ".NextShardIterator") | |
sleep 2 | |
done |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment