Last active
June 7, 2023 07:11
-
-
Save bburky/2d281a681f41c1965e208883161169de to your computer and use it in GitHub Desktop.
Offline Kubernetes manifest diff (does not use cluster state)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Offline `kubectl diff` style tool (does not use cluster state). Diff two | |
# local files containing templated manifests (e.g. kustomize or helm output). | |
# | |
# Resources in each file are matched by api, kind, namespace and name. This is | |
# also shown in the filename fields of the diff output. | |
# | |
# Usage: | |
# k8s-diff.py old-manifests.yaml new-manifests.yaml | |
# kustomize build . | k8s-diff.py /tmp/old-manifests.yaml - | |
# | |
# No syntax highlighting, just use VSCode's "diff" language, etc. | |
import yaml | |
import sys | |
import difflib | |
import argparse | |
# When using --ignore, these labels are ignored (deleted) in the diff | |
IGNORE_LABELS = ( | |
"app.kubernetes.io/version", | |
"helm.sh/chart", | |
) | |
# Also checksum/* annotations are ignored | |
# https://stackoverflow.com/a/33300001 | |
def str_presenter(dumper, data): | |
if len(data.splitlines()) > 1: # check for multiline string | |
return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|") | |
return dumper.represent_scalar("tag:yaml.org,2002:str", data) | |
yaml.add_representer(str, str_presenter) | |
def name_resources(resources): | |
named_resources = {} | |
for r in resources: | |
if not r: | |
continue | |
apiVersion = r["apiVersion"] if "apiVersion" in r else None | |
kind = r["kind"] | |
namespace = "" | |
if "namespace" in r["metadata"]: | |
namespace = r["metadata"]["namespace"] | |
name = r["metadata"]["name"] | |
key = (apiVersion, kind, namespace, name) | |
assert key not in named_resources | |
named_resources[key] = r | |
return named_resources | |
def resource_labels(resource): | |
try: | |
yield resource["metadata"]["labels"] | |
except KeyError: | |
pass | |
try: | |
yield resource["spec"]["template"]["metadata"]["labels"] | |
except KeyError: | |
pass | |
# TODO: maybe add jobTemplate pattern for CronJobs | |
def resource_annotations(resource): | |
try: | |
yield resource["metadata"]["annotations"] | |
except KeyError: | |
pass | |
try: | |
yield resource["spec"]["template"]["metadata"]["annotations"] | |
except KeyError: | |
pass | |
def diff(r1, r2, r1_key, r2_key): | |
for line in difflib.unified_diff( | |
yaml.dump(r1, default_flow_style=False).split("\n") if r1 else [], | |
yaml.dump(r2, default_flow_style=False).split("\n") if r2 else [], | |
fromfile=args.old.name + " " + "/".join(r1_key) if r1_key else "", | |
tofile=args.new.name + " " + "/".join(r2_key) if r2_key else "", | |
lineterm="", | |
): | |
sys.stdout.write(line) | |
sys.stdout.write("\n") | |
parser = argparse.ArgumentParser() | |
parser.add_argument("old", type=argparse.FileType("r")) | |
parser.add_argument("new", type=argparse.FileType("r")) | |
parser.add_argument("--ignore", default=False, action=argparse.BooleanOptionalAction) | |
args = parser.parse_args() | |
old = name_resources(yaml.safe_load_all(args.old)) | |
new = name_resources(yaml.safe_load_all(args.new)) | |
if args.ignore: | |
for resources in (old, new): | |
for resource in resources.values(): | |
for label in IGNORE_LABELS: | |
for labels in resource_labels(resource): | |
if label in labels: | |
del labels[label] | |
for annotations in resource_annotations(resource): | |
if annotations: | |
for annotation in list(annotations): | |
if annotation.startswith("checksum/"): | |
del annotations[annotation] | |
old_keys = sorted(old.keys()) | |
new_keys = sorted(new.keys()) | |
for opcode, i1, i2, j1, j2 in difflib.SequenceMatcher( | |
None, old_keys, new_keys | |
).get_opcodes(): | |
if opcode == "equal": | |
for k in range(i2 - i1): | |
old_key = old_keys[i1 + k] | |
new_key = new_keys[j1 + k] | |
assert old_key == new_key | |
diff(old[old_key], new[new_key], old_key, new_key) | |
if opcode in {"delete", "replace"}: | |
for k in range(i2 - i1): | |
old_key = old_keys[i1 + k] | |
diff(old[old_key], None, old_key, None) | |
if opcode in {"insert", "replace"}: | |
for k in range(j2 - j1): | |
new_key = new_keys[j1 + k] | |
diff(None, new[new_key], None, new_key) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment