Last active
December 2, 2024 15:03
-
-
Save WalBeh/964619f5de3cc1c932050afc815b3686 to your computer and use it in GitHub Desktop.
automate jfr+heapdump handling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import time | |
import sys | |
from kubernetes import client, config | |
from kubernetes.config.config_exception import ConfigException | |
import logging | |
import logging.handlers | |
import subprocess | |
import boto3 | |
from botocore.exceptions import NoCredentialsError | |
SLEEPY_TIME = 301 # Sleep for 5 minutes until jfr is written | |
S3_BUCKET = 'cratedb-cloud-heapdumps' | |
EXPIRATION = 3600 # life-span pre-signed url | |
# Configure syslog logging | |
logger = logging.getLogger('jfr_logger') | |
logger.setLevel(logging.INFO) | |
# For macOS, the syslog socket is typically at '/var/run/syslog' | |
try: | |
syslog_handler = logging.handlers.SysLogHandler(address='/var/run/syslog') | |
except Exception as e: | |
logger.error(f"Failed to create SysLogHandler: {e}") | |
sys.exit(1) | |
formatter = logging.Formatter('%(asctime)s %(name)s: %(levelname)s %(message)s') | |
syslog_handler.setFormatter(formatter) | |
logger.addHandler(syslog_handler) | |
# Optionally, add a StreamHandler to also output logs to the console | |
stream_handler = logging.StreamHandler() | |
stream_handler.setFormatter(formatter) | |
logger.addHandler(stream_handler) | |
def load_kube_config(kube_context: str = None): | |
"""Load Kubernetes configuration for the specified context.""" | |
try: | |
config.load_kube_config(context=kube_context) | |
except ConfigException as e: | |
print(f"Error loading kube-config: {str(e)}") | |
sys.exit(1) | |
def get_statefulset_pods(apps_api, core_api, namespace, statefulset_name): | |
"""Get all pods belonging to a StatefulSet.""" | |
try: | |
# Retrieve the StatefulSet to get its label selector | |
sts = apps_api.read_namespaced_stateful_set(statefulset_name, namespace) | |
label_selector = ','.join([f"{k}={v}" for k, v in sts.spec.selector.match_labels.items()]) | |
logger.debug(f"Label selector: {label_selector}") | |
# List pods using the StatefulSet's label selector | |
pods = core_api.list_namespaced_pod(namespace, label_selector=label_selector) | |
return pods.items | |
except client.exceptions.ApiException as e: | |
logger.error(f"Error fetching StatefulSet pods: {e}") | |
sys.exit(1) | |
def generate_presigned_url(bucket_name, object_name, region='us-east-1', expiration=EXPIRATION): | |
# Create a session using temporary credentials | |
session = boto3.Session() | |
s3_client = session.client('s3', region_name=region) | |
try: | |
response = s3_client.generate_presigned_url('put_object', | |
Params={'Bucket': bucket_name, | |
'Key': object_name}, | |
ExpiresIn=expiration) | |
except NoCredentialsError: | |
print("Credentials not available.") | |
sys.exit(1) | |
return response | |
def exec_command_in_pod(namespace, pod, container, command): | |
""" | |
Execute a command in a Kubernetes pod using kubectl exec. | |
Args: | |
namespace (str): Kubernetes namespace. | |
pod (str): Pod name. | |
container (str): Container name. | |
command (str): Command to execute in the pod. | |
""" | |
cmd = [ | |
"kubectl", "exec", | |
"-n", namespace, | |
pod, | |
"-c", container, | |
"--", | |
"/bin/sh", "-c", command | |
] | |
try: | |
pod_index = pod.split('-')[-1] # Get the index from pod name | |
logger.info(f"Executing command in pod {pod_index}: {command}") | |
result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
logger.info(f"Command output: {result.stdout.decode().strip()}") | |
except subprocess.CalledProcessError as e: | |
stderr_output = e.stderr.decode().strip() | |
logger.error(f"Failed to execute command in pod {pod_index}: {stderr_output}") | |
raise Exception(f"kubectl exec failed: {stderr_output}") from e | |
def main(): | |
parser = argparse.ArgumentParser(description='Kubernetes pod automation tool') | |
parser.add_argument('--ns', required=True, help='Kubernetes namespace') | |
parser.add_argument('--sts', required=True, help='StatefulSet name') | |
parser.add_argument('--kube-context', help='Kubernetes context (optional)') | |
args = parser.parse_args() | |
# Load Kubernetes configuration | |
load_kube_config(args.kube_context) | |
# Create Kubernetes API clients | |
core_v1 = client.CoreV1Api() | |
apps_v1 = client.AppsV1Api() | |
# Get all pods from the StatefulSet | |
pods = get_statefulset_pods(apps_v1, core_v1, args.ns, args.sts) | |
logger.info(f"Found {len(pods)} pods in StatefulSet {args.sts}") | |
# Iterate over each pod to start JFR and create heap dump | |
for pod in pods: | |
pod_name = pod.metadata.name | |
pod_index = pod_name.split('-')[-1] # Get the index from pod name | |
logger.info(f"Processing pod: {pod_index}") | |
rm_jfr = (f'rm /resource/heapdump/pod-{pod_index}.jfr') | |
exec_command_in_pod(args.ns, pod_name, 'crate', rm_jfr) | |
# Start JFR recording | |
jfr_command = ( | |
f'su crate -c "/crate/jdk/bin/jcmd 1 JFR.start duration=300s ' | |
f'filename=/resource/heapdump/pod-{pod_index}.jfr settings=profile"' | |
) | |
exec_command_in_pod(args.ns, pod_name, 'crate', jfr_command) | |
logger.info(f"Processing pod: {pod_name}") | |
rm_jfr = (f'rm /resource/heapdump/pod-{pod_index}.hprof') | |
exec_command_in_pod(args.ns, pod_name, 'crate', rm_jfr) | |
# Create heap dump | |
heapdump_command = ( | |
f'su crate -c "/crate/jdk/bin/jcmd 1 GC.heap_dump ' | |
f'/resource/heapdump/pod-{pod_index}.hprof"' | |
) | |
exec_command_in_pod(args.ns, pod_name, 'crate', heapdump_command) | |
logger.info("Waiting %s seconds for jfr, before resuming file uploads to S3 %s.", SLEEPY_TIME, S3_BUCKET) | |
time.sleep(SLEEPY_TIME) | |
logger.info("Resuming file downloads.") | |
# Upload files from each pod to s3 | |
for pod in pods: | |
pod_name = pod.metadata.name | |
pod_index = pod_name.split('-')[-1] | |
# Upload heap dump | |
dest_file=f"{pod_name}.hprof" | |
url=generate_presigned_url(S3_BUCKET, dest_file) | |
upload_command = ( | |
f'su crate -c "curl -X PUT -T /resource/heapdump/pod-{pod_index}.hprof \\"{url}\\""' | |
) | |
exec_command_in_pod(args.ns, pod_name, 'crate', upload_command) | |
# Upload jfr | |
dest_file=f"{pod_name}.jfr" | |
url=generate_presigned_url(S3_BUCKET, dest_file) | |
upload_command = ( | |
f'su crate -c "curl -X PUT -T /resource/heapdump/pod-{pod_index}.jfr \\"{url}\\""' | |
) | |
exec_command_in_pod(args.ns, pod_name, 'crate', upload_command) | |
logger.info(f"Process completed. Output files are in {S3_BUCKET} {pod_name} .jfr/.hprof") | |
# Run aws s3 ls command | |
s3_ls_command = f"aws s3 ls s3://{S3_BUCKET}/{args.sts}" | |
try: | |
logger.info(f"Running command: {s3_ls_command}") | |
result = subprocess.run(s3_ls_command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
logger.info(f"Command output:\n{result.stdout.decode().strip()}") | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Failed to run command: {s3_ls_command}\nError: {e.stderr.decode().strip()}") | |
if __name__ == "__main__": | |
main() |
Author
WalBeh
commented
Dec 2, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment