Skip to content

Instantly share code, notes, and snippets.

@WalBeh
Last active December 2, 2024 15:03
Show Gist options
  • Save WalBeh/964619f5de3cc1c932050afc815b3686 to your computer and use it in GitHub Desktop.
Save WalBeh/964619f5de3cc1c932050afc815b3686 to your computer and use it in GitHub Desktop.
automate jfr+heapdump handling
#!/usr/bin/env python3
import argparse
import time
import sys
from kubernetes import client, config
from kubernetes.config.config_exception import ConfigException
import logging
import logging.handlers
import subprocess
import boto3
from botocore.exceptions import NoCredentialsError
SLEEPY_TIME = 301 # Sleep for 5 minutes until jfr is written
S3_BUCKET = 'cratedb-cloud-heapdumps'
EXPIRATION = 3600 # life-span pre-signed url
# Configure syslog logging
logger = logging.getLogger('jfr_logger')
logger.setLevel(logging.INFO)
# For macOS, the syslog socket is typically at '/var/run/syslog'
try:
syslog_handler = logging.handlers.SysLogHandler(address='/var/run/syslog')
except Exception as e:
logger.error(f"Failed to create SysLogHandler: {e}")
sys.exit(1)
formatter = logging.Formatter('%(asctime)s %(name)s: %(levelname)s %(message)s')
syslog_handler.setFormatter(formatter)
logger.addHandler(syslog_handler)
# Optionally, add a StreamHandler to also output logs to the console
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
def load_kube_config(kube_context: str = None):
"""Load Kubernetes configuration for the specified context."""
try:
config.load_kube_config(context=kube_context)
except ConfigException as e:
print(f"Error loading kube-config: {str(e)}")
sys.exit(1)
def get_statefulset_pods(apps_api, core_api, namespace, statefulset_name):
"""Get all pods belonging to a StatefulSet."""
try:
# Retrieve the StatefulSet to get its label selector
sts = apps_api.read_namespaced_stateful_set(statefulset_name, namespace)
label_selector = ','.join([f"{k}={v}" for k, v in sts.spec.selector.match_labels.items()])
logger.debug(f"Label selector: {label_selector}")
# List pods using the StatefulSet's label selector
pods = core_api.list_namespaced_pod(namespace, label_selector=label_selector)
return pods.items
except client.exceptions.ApiException as e:
logger.error(f"Error fetching StatefulSet pods: {e}")
sys.exit(1)
def generate_presigned_url(bucket_name, object_name, region='us-east-1', expiration=EXPIRATION):
# Create a session using temporary credentials
session = boto3.Session()
s3_client = session.client('s3', region_name=region)
try:
response = s3_client.generate_presigned_url('put_object',
Params={'Bucket': bucket_name,
'Key': object_name},
ExpiresIn=expiration)
except NoCredentialsError:
print("Credentials not available.")
sys.exit(1)
return response
def exec_command_in_pod(namespace, pod, container, command):
"""
Execute a command in a Kubernetes pod using kubectl exec.
Args:
namespace (str): Kubernetes namespace.
pod (str): Pod name.
container (str): Container name.
command (str): Command to execute in the pod.
"""
cmd = [
"kubectl", "exec",
"-n", namespace,
pod,
"-c", container,
"--",
"/bin/sh", "-c", command
]
try:
pod_index = pod.split('-')[-1] # Get the index from pod name
logger.info(f"Executing command in pod {pod_index}: {command}")
result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logger.info(f"Command output: {result.stdout.decode().strip()}")
except subprocess.CalledProcessError as e:
stderr_output = e.stderr.decode().strip()
logger.error(f"Failed to execute command in pod {pod_index}: {stderr_output}")
raise Exception(f"kubectl exec failed: {stderr_output}") from e
def main():
parser = argparse.ArgumentParser(description='Kubernetes pod automation tool')
parser.add_argument('--ns', required=True, help='Kubernetes namespace')
parser.add_argument('--sts', required=True, help='StatefulSet name')
parser.add_argument('--kube-context', help='Kubernetes context (optional)')
args = parser.parse_args()
# Load Kubernetes configuration
load_kube_config(args.kube_context)
# Create Kubernetes API clients
core_v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
# Get all pods from the StatefulSet
pods = get_statefulset_pods(apps_v1, core_v1, args.ns, args.sts)
logger.info(f"Found {len(pods)} pods in StatefulSet {args.sts}")
# Iterate over each pod to start JFR and create heap dump
for pod in pods:
pod_name = pod.metadata.name
pod_index = pod_name.split('-')[-1] # Get the index from pod name
logger.info(f"Processing pod: {pod_index}")
rm_jfr = (f'rm /resource/heapdump/pod-{pod_index}.jfr')
exec_command_in_pod(args.ns, pod_name, 'crate', rm_jfr)
# Start JFR recording
jfr_command = (
f'su crate -c "/crate/jdk/bin/jcmd 1 JFR.start duration=300s '
f'filename=/resource/heapdump/pod-{pod_index}.jfr settings=profile"'
)
exec_command_in_pod(args.ns, pod_name, 'crate', jfr_command)
logger.info(f"Processing pod: {pod_name}")
rm_jfr = (f'rm /resource/heapdump/pod-{pod_index}.hprof')
exec_command_in_pod(args.ns, pod_name, 'crate', rm_jfr)
# Create heap dump
heapdump_command = (
f'su crate -c "/crate/jdk/bin/jcmd 1 GC.heap_dump '
f'/resource/heapdump/pod-{pod_index}.hprof"'
)
exec_command_in_pod(args.ns, pod_name, 'crate', heapdump_command)
logger.info("Waiting %s seconds for jfr, before resuming file uploads to S3 %s.", SLEEPY_TIME, S3_BUCKET)
time.sleep(SLEEPY_TIME)
logger.info("Resuming file downloads.")
# Upload files from each pod to s3
for pod in pods:
pod_name = pod.metadata.name
pod_index = pod_name.split('-')[-1]
# Upload heap dump
dest_file=f"{pod_name}.hprof"
url=generate_presigned_url(S3_BUCKET, dest_file)
upload_command = (
f'su crate -c "curl -X PUT -T /resource/heapdump/pod-{pod_index}.hprof \\"{url}\\""'
)
exec_command_in_pod(args.ns, pod_name, 'crate', upload_command)
# Upload jfr
dest_file=f"{pod_name}.jfr"
url=generate_presigned_url(S3_BUCKET, dest_file)
upload_command = (
f'su crate -c "curl -X PUT -T /resource/heapdump/pod-{pod_index}.jfr \\"{url}\\""'
)
exec_command_in_pod(args.ns, pod_name, 'crate', upload_command)
logger.info(f"Process completed. Output files are in {S3_BUCKET} {pod_name} .jfr/.hprof")
# Run aws s3 ls command
s3_ls_command = f"aws s3 ls s3://{S3_BUCKET}/{args.sts}"
try:
logger.info(f"Running command: {s3_ls_command}")
result = subprocess.run(s3_ls_command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logger.info(f"Command output:\n{result.stdout.decode().strip()}")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to run command: {s3_ls_command}\nError: {e.stderr.decode().strip()}")
if __name__ == "__main__":
main()
@WalBeh
Copy link
Author

WalBeh commented Dec 2, 2024

# Provider configuration
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.78.0"
    }
  }
}

provider "aws" {
  region = "us-east-1"
}

# S3 Bucket Creation
resource "aws_s3_bucket" "heapdumps" {
  bucket = "cratedb-cloud-heapdumps"

  # Optional: Add tags for better resource management
  tags = {
    Name        = "CrateDB Cloud Heap Dumps"
    Environment = "Production"
    ManagedBy   = "Terraform"
  }
}

# Server-side encryption configuration for the S3 bucket
resource "aws_s3_bucket_server_side_encryption_configuration" "heapdumps" {
  bucket = aws_s3_bucket.heapdumps.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }
  }
}

# IAM Policy for Read Access
resource "aws_iam_policy" "heapdumps_read_access" {
  name        = "cratedb-heapdumps-read-access"
  path        = "/"
  description = "Read-only access to CrateDB heap dumps S3 bucket"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:ListBucket",
          "s3:GetBucketLocation"
        ]
        Resource = [
          aws_s3_bucket.heapdumps.arn,
          "${aws_s3_bucket.heapdumps.arn}/*"
        ]
      }
    ]
  })
}

# Attach Policy to Employees Group
resource "aws_iam_group_policy_attachment" "employees_heapdumps_access" {
  group      = "developers"
  policy_arn = aws_iam_policy.heapdumps_read_access.arn
}

# IAM Policy for Sysadmins (Full S3 Bucket Access)
resource "aws_iam_policy" "heapdumps_sysadmin_access" {
  name        = "cratedb-heapdumps-sysadmin-access"
  path        = "/"
  description = "Full access to CrateDB heap dumps S3 bucket for sysadmins"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:PutObject",
          "s3:DeleteObject",
          "s3:ListBucket",
          "s3:GetBucketLocation",
          "s3:GetBucketVersioning",
          "s3:PutBucketVersioning"
        ]
        Resource = [
          aws_s3_bucket.heapdumps.arn,
          "${aws_s3_bucket.heapdumps.arn}/*"
        ]
      }
    ]
  })
}

# # Attach Policy to Sysadmins Group
# resource "aws_iam_group_policy_attachment" "sysadmins_heapdumps_access" {
#   group      = "sysadmins"
#   policy_arn = aws_iam_policy.heapdumps_sysadmin_access.arn
# }



# Optional: Bucket Public Access Block for additional security
resource "aws_s3_bucket_public_access_block" "heapdumps" {
  bucket = aws_s3_bucket.heapdumps.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

resource "aws_s3_bucket_versioning" "heapdumps" {
  bucket = aws_s3_bucket.heapdumps.id

  versioning_configuration {
    status = "Enabled"
  }
}

# Lifecycle policy to remove files older than 60 days
resource "aws_s3_bucket_lifecycle_configuration" "heapdumps" {
  bucket = aws_s3_bucket.heapdumps.id

  rule {
    id     = "RemoveOldFiles"
    status = "Enabled"

    expiration {
      days = 60
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment