Created
February 12, 2025 13:10
-
-
Save matthieubulte/0b135555c9bfba101b34f00bc0873f14 to your computer and use it in GitHub Desktop.
jupyc.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# %% | |
from jupyc import c | |
# %% | |
%%c | |
int x = 5; | |
int y = 2; | |
x + y | |
# OUT: 7 | |
# %% | |
%%c | |
int mod2(int y) { | |
return y & 1; | |
} | |
mod2(3) | |
# OUT: 1 | |
# %% | |
%%c | |
for (int i = 0; i < 10; i++) { | |
x += i; | |
} | |
printf("x = %d\n", x); | |
# OUT: x = 50 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ctypes | |
import subprocess | |
import os | |
from typing import Tuple, List, Dict, Optional | |
from IPython.core.magic import register_cell_magic | |
from IPython.core.magic_arguments import argument, magic_arguments, parse_argstring | |
from pycparser import c_parser, c_ast, c_generator | |
from copy import deepcopy | |
def parse_statements(code: str) -> List[c_ast.Node]: | |
ast = c_parser.CParser().parse(f"void __wrap__() {{{code}}}") | |
return ast.ext[0].body.block_items | |
def parse_defs(code: str) -> List[c_ast.Node]: | |
return c_parser.CParser().parse(code).ext | |
def split_code_blocks(code: str): | |
lines = code.split("\n") | |
current_block = [] | |
blocks = [] | |
brace_count = 0 | |
for line in lines: | |
stripped = line.strip() | |
current_block.append(line) | |
brace_count += stripped.count("{") - stripped.count("}") | |
if brace_count == 0: | |
current_block = "".join(current_block) | |
if current_block.strip(): | |
blocks.append(current_block) | |
current_block = [] | |
return blocks | |
def decl_to_assignment(decl: c_ast.Decl) -> list[c_ast.Assignment]: | |
if decl.init: | |
return [c_ast.Assignment("=", c_ast.ID(decl.name), decl.init)] | |
return [] | |
def has_assignment(node: c_ast.Node) -> bool: | |
class AssignmentVisitor(c_ast.NodeVisitor): | |
def __init__(self): | |
self.has_assignment = False | |
def visit_Assignment(self, _): | |
self.has_assignment = True | |
visitor = AssignmentVisitor() | |
visitor.visit(node) | |
return visitor.has_assignment | |
def make_last_statement(node: c_ast.Node) -> str: | |
generator = c_generator.CGenerator() | |
code = generator.visit(node) | |
if isinstance(node, c_ast.Assignment): | |
return f'{code};\nprintf("%d\\n", {node.lvalue.name});\n' | |
if isinstance(node, c_ast.FuncCall) and node.name.name != "printf": | |
# TODO: Implement proper type inference and fresh var name | |
return f'int temp__ = {code};\nprintf("%d\\n", temp__);\n' | |
if isinstance(node, (c_ast.ID, c_ast.Constant, c_ast.BinaryOp)): | |
return f'printf("%d\\n", {code});\n' | |
return f"{code};\n" | |
def make_full_code( | |
header_content: str, variables: Dict[str, c_ast.Node], statements: List[c_ast.Node] | |
) -> str: | |
generator = c_generator.CGenerator() | |
full_code = header_content | |
# Add variable declarations | |
for decl in variables.values(): | |
full_code += generator.visit(decl) + ";\n" | |
# Add main function if there are statements | |
if statements: | |
full_code += "void main() {\n" | |
# Add all statements except the last one | |
for statement in statements[:-1]: | |
full_code += generator.visit(statement) + ";\n" | |
# Add the last statement with special handling | |
full_code += make_last_statement(statements[-1]) | |
full_code += "}\n" | |
return full_code | |
class CEnvironment: | |
def __init__(self): | |
self.shared_lib = None | |
self.variables: Dict[str, c_ast.Node] = {} | |
self.statements: List[c_ast.Node] = [] | |
self.header_content = "#include <stdio.h>\n#include <stdlib.h>\n" | |
self.current_code: Optional[str] = None | |
def create_shared_lib(self, code: str) -> None: | |
with open("temp_shared.c", "w") as f: | |
f.write(self.header_content + code) | |
try: | |
subprocess.run( | |
["gcc", "-fPIC", "-shared", "-o", "libtemp.so", "temp_shared.c"], | |
capture_output=True, | |
text=True, | |
check=True, | |
) | |
except subprocess.CalledProcessError as e: | |
raise Exception(f"Compilation Error: {e.stderr}") | |
if self.shared_lib: | |
try: | |
dlclose = ctypes.CDLL(None).dlclose | |
dlclose.argtypes = [ctypes.c_void_p] | |
dlclose(self.shared_lib._handle) | |
except AttributeError: | |
pass | |
self.shared_lib = ctypes.CDLL("./libtemp.so") | |
def extract_block_code( | |
self, code: str | |
) -> Tuple[List[Tuple[str, c_ast.Node]], List[c_ast.Node]]: | |
added_defs = [] | |
added_statements = [] | |
try: | |
# Try parsing as definitions first | |
for decl in parse_defs(code): | |
if isinstance(decl, c_ast.FuncDef): | |
added_defs.append((decl.decl.name, decl)) | |
elif isinstance(decl, c_ast.Decl): | |
added_defs.append((decl.name, decl)) | |
added_statements.extend(decl_to_assignment(decl)) | |
except Exception: | |
# Fall back to parsing as statements | |
statements = parse_statements(code) | |
for statement in statements: | |
if isinstance(statement, c_ast.Decl): | |
added_defs.append((statement.name, statement)) | |
added_statements.extend(decl_to_assignment(statement)) | |
else: | |
added_statements.append(statement) | |
return added_defs, added_statements | |
def extract_cell_code(self, code: str): | |
blocks = split_code_blocks(code) | |
added_defs = [] | |
added_statements = [] | |
for block in blocks: | |
defs, stmts = self.extract_block_code(block) | |
added_defs.extend(defs) | |
added_statements.extend(stmts) | |
return added_defs, added_statements | |
def run_shared_lib(self) -> None: | |
try: | |
self.shared_lib.main() | |
finally: | |
# Clean up temporary files | |
for file in ["temp_shared.c", "libtemp.so"]: | |
if os.path.exists(file): | |
os.remove(file) | |
def execute(self, code: str, verbose=False) -> None: | |
try: | |
added_defs, added_statements = self.extract_cell_code(code) | |
except Exception as e: | |
print(f"Parse error: {e}") | |
return | |
# Update environment state | |
new_variables = deepcopy(self.variables) | |
new_variables.update(dict(added_defs)) | |
new_statements = deepcopy(self.statements) | |
new_statements.extend(added_statements) | |
# Generate and compile complete code | |
full_code = make_full_code(self.header_content, new_variables, new_statements) | |
self.current_code = full_code | |
if verbose: | |
print(full_code) | |
try: | |
self.create_shared_lib(full_code) | |
self.variables = new_variables | |
self.statements = [stmt for stmt in new_statements if has_assignment(stmt)] | |
except Exception as e: | |
print(f"Compilation error: {e}") | |
return | |
if not added_statements: | |
return | |
try: | |
self.run_shared_lib() | |
except Exception as e: | |
print(f"Runtime error: {e}") | |
# Global C environment instance | |
c_env = CEnvironment() | |
def clear() -> None: | |
"""Reset the C environment to its initial state.""" | |
c_env.variables = {} | |
c_env.statements = [] | |
@register_cell_magic | |
@magic_arguments() | |
@argument("-v", "--verbose", action="store_true", help="Enable verbose output") | |
def c(line: str, cell: str) -> None: | |
args = parse_argstring(c, line) | |
cell = cell.strip() | |
if cell and cell[-1] not in {";", "}"}: | |
cell += ";" # Add missing semicolon | |
c_env.execute(cell, verbose=args.verbose) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment