Skip to content

Instantly share code, notes, and snippets.

@bkietz
Created September 10, 2024 20:06
Show Gist options
  • Save bkietz/6678297afc8238826c8345ec723aade2 to your computer and use it in GitHub Desktop.
Save bkietz/6678297afc8238826c8345ec723aade2 to your computer and use it in GitHub Desktop.
Basic utility for annotating .arrow files
import click
import struct
import textwrap
import subprocess
import os
import json
from pathlib import Path
CONTINUATION_FORMAT = "<ii"
def hex_lines(contents, offset):
while len(contents) > 0:
hex = " ".join(f"0x{byte:02x}" for byte in contents[:8])
yield f"<pre>0x{offset:04x} => {hex}</pre>"
contents = contents[8:]
offset += 8
class AnnotatedBytes:
def __init__(self, contents):
self.contents = contents
self.offset = 0
def advance(self, length):
"""
Get a byte range from the contents, along with an
HTML template for the annotation.
"""
offset = self.offset
self.offset += length
sub_bytes = self.contents[offset : self.offset]
first, *rest = hex_lines(sub_bytes, offset)
return (
sub_bytes,
f"""
<tr>
<td>{first}</td>
<td rowspan={len(rest) + 1}>
<a href=#{offset} name={offset}>🔗</a>
{{}}
</td>
</tr>
{
''.join(f'<tr><td>{l}</td></tr>' for l in rest)
}
""",
)
def parse_message(flatc, arrow_file, message_bytes, schema):
msg_bin = Path(arrow_file.name + ".tmp.bin").resolve()
msg_afb = Path(arrow_file.name + ".tmp.afb").resolve()
msg_json = Path(arrow_file.name + ".tmp.json").resolve()
msg_bin.write_bytes(message_bytes)
subprocess.run(
[
flatc,
"--json",
"--strict-json",
"--raw-binary",
schema,
"--",
msg_bin,
],
capture_output=True,
)
subprocess.run(
[
flatc,
"--annotate",
schema,
"--binary",
msg_bin,
],
capture_output=True,
)
return json.load(open(msg_json)), msg_afb.read_text()
@click.command(context_settings={"help_option_names": ["-h", "--help"]})
@click.argument(
"arrow_file", type=click.Path(dir_okay=False, file_okay=True, resolve_path=True)
)
@click.option(
"--arrow_src_dir",
type=click.Path(dir_okay=True, file_okay=False, resolve_path=True),
default=Path.home() / "arrow",
)
@click.option(
"--flatc",
type=click.Path(dir_okay=False, file_okay=True, resolve_path=True),
default=subprocess.run(["which", "flatc"], capture_output=True).stdout,
)
def annotate(arrow_file, arrow_src_dir, flatc):
"""
Annotate an arrow file
"""
arrow_file = Path(arrow_file)
out_file = Path(arrow_file.name + ".html").resolve()
print(f"annotating {arrow_file} into {out_file}")
print(f" {flatc=}")
annotated = AnnotatedBytes(arrow_file.read_bytes())
out_file = open(out_file, "w")
out_file.write(
f"""
<title>Annotation dump of {arrow_file}</title>
<h1>Annotation dump of {arrow_file}</h1>
<hr />
<style>
td:nth-child(even) {{
vertical-align: top;
padding: 0 1em;
margin: 0 1em;
border: 1px solid black;
border-radius: 10px;
}}
.detail {{
border: 1px dotted black;
border-radius: 10px;
}}
</style>
<table>
<tr>
<th>Bytes</th>
<th>Annotation</th>
</tr>
"""
)
def write(template, note):
out_file.write(template.format(note))
MAGIC = b"ARROW1\x00\x00"
magic, template = annotated.advance(8)
if magic == MAGIC:
write(template, "Arrow IPC file magic")
else:
write(template, f"!!!ERROR!!! should be Arrow IPC file magic ({MAGIC})")
while True:
# read the stream
encapsulation_cookie, template = annotated.advance(8)
continuation, length = struct.unpack(CONTINUATION_FORMAT, encapsulation_cookie)
if continuation == -1 and length != 0:
write(
template,
f"encapsulated stream message cookie, length={length}=0x{length:x}",
)
elif continuation == -1 and length == 0:
write(template, f"encapsulated stream EOS")
break
else:
write(
template,
f"""
!!!ERROR!!! expected continuation=-1, got {continuation=}
<br /><br />
rewinding to see if there's a footer
""",
)
annotated.offset -= 8
break
message_bytes, template = annotated.advance(length)
message_json, message_afb = parse_message(
flatc, arrow_file, message_bytes, Path(arrow_src_dir) / "format/Message.fbs"
)
body_length = message_json.get("bodyLength", 0)
write(
template,
f"""
Message.{message_json["header_type"]}, {body_length=}
<br /><br />
<details>
<summary>Message JSON</summary>
<pre class=detail>{json.dumps(message_json, indent=2)}</pre>
</details>
<br /><br />
<details>
<summary>flatc --annotate {{Message}}</summary>
<pre class=detail>{message_afb}</pre>
</details>
""",
)
if body_length == 0:
continue
body_bytes, template = annotated.advance(body_length)
write(template, f"body bytes")
footer_bytes, template = annotated.advance(len(annotated.contents))
footer_json, footer_afb = parse_message(
flatc, arrow_file, footer_bytes, Path(arrow_src_dir) / "format/File.fbs"
)
write(
template,
f"""
Footer
<br /><br />
<details>
<summary>Footer JSON</summary>
<pre class=detail>{json.dumps(footer_json, indent=2)}</pre>
</details>
<br /><br />
<details>
<summary>flatc --annotate {{Footer}}</summary>
<pre class=detail>{footer_afb}</pre>
</details>
""",
)
footer_cookie = annotated.contents[-4 - 6 :]
(footer_length,) = struct.unpack("<i", footer_cookie[:4])
out_file.write(
f"""
<tr>
<td></td>
<td>{footer_length=} ({len(footer_bytes)=}) magic={footer_cookie[4:]}</td>
</tr>
</table>
"""
)
if __name__ == "__main__":
annotate()
@bkietz
Copy link
Author

bkietz commented Sep 10, 2024

Screenshot from 2024-08-26 13-50-39

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment