Last active
May 8, 2025 19:38
-
-
Save ntfargo/14f5aa4706d5dcbe0e58746180cdd087 to your computer and use it in GitHub Desktop.
Leveraging (LLMs) via Ollama (codestral:22b), it generates annotations for memory allocation and deallocation, which are used by CodeQL for static analysis.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
LLM-based Memory Leak Detection for C/C++ Codebases | |
Prototype code for automating memory leak detection in C/C++ codebases. Leveraging large language models (LLMs) via Ollama (codestral:22b), it generates annotations for memory allocation and deallocation, which are used by CodeQL for static analysis. | |
This is a sample implementation and may require adjustments based on specific requirements. | |
""" | |
import clang.cindex | |
import ollama | |
import json | |
import os | |
import subprocess | |
from collections import defaultdict | |
def extract_functions_and_call_graph(codebase_path, output_dir): | |
""" | |
Extract functions and call graph from a C/C++ codebase using libclang. | |
Save function source code and call graph to output_dir. | |
""" | |
os.makedirs(output_dir, exist_ok=True) | |
# Initialize libclang | |
clang.cindex.Config.set_library_path('C:/Program Files/LLVM/bin') # LLVM path | |
index = clang.cindex.Index.create() | |
# Collect all C/C++ files | |
c_files = [] | |
for root, _, files in os.walk(codebase_path): | |
for file in files: | |
if file.endswith(('.c', '.cpp', '.cc', '.cxx')): | |
c_files.append(os.path.join(root, file)) | |
function_data = {} | |
call_graph = defaultdict(list) | |
for file_path in c_files: | |
# Parse file into AST | |
translation_unit = index.parse(file_path, args=['-std=c99']) | |
cursor = translation_unit.cursor | |
def traverse(node, current_function=None): | |
"""Recursively traverse AST to extract functions and calls.""" | |
if node.kind == clang.cindex.CursorKind.FUNCTION_DECL and node.is_definition(): | |
func_name = node.spelling | |
start = node.extent.start | |
end = node.extent.end | |
with open(file_path, 'r') as f: | |
lines = f.readlines() | |
func_code = ''.join(lines[start.line-1:end.line]).strip() | |
function_data[func_name] = { | |
'code': func_code, | |
'callees': [], | |
'file': file_path, | |
'start_line': start.line, | |
'end_line': end.line | |
} | |
for child in node.get_children(): | |
traverse(child, func_name) | |
elif node.kind == clang.cindex.CursorKind.CALL_EXPR and current_function: | |
callee_name = node.spelling | |
if callee_name not in call_graph[current_function]: | |
call_graph[current_function].append(callee_name) | |
traverse(cursor) | |
for caller, callees in call_graph.items(): | |
if caller in function_data: | |
function_data[caller]['callees'] = [ | |
callee for callee in callees if callee in function_data | |
] | |
functions_file = os.path.join(output_dir, 'functions.json') | |
with open(functions_file, 'w') as f: | |
json.dump(function_data, f, indent=2) | |
return function_data, functions_file | |
def generate_annotation(function_name, function_code, callees_code, model="codestral:22b"): | |
""" | |
Generate LLM-based annotations for a function using Ollama. | |
Returns JSON with allocated and deallocated variables. | |
""" | |
prompt_template = """ | |
You are a C developer. Your task is to answer the following questions about a code snippet. | |
Which variables contain pointers to the memory allocated in function {func_name}? | |
Put the answer in the "allocated_variables" field. | |
Which variables contain pointers to the memory deallocated in function {func_name}? | |
Put the answer in the "deallocated_variables" field. | |
Return the final answer as a short JSON object. | |
Code: | |
{code} | |
""" | |
prompt = prompt_template.format(func_name=function_name, code=function_code) | |
# Query Ollama | |
response = ollama.chat(model=model, messages=[{"role": "user", "content": prompt}]) | |
try: | |
annotation = json.loads(response['message']['content']) | |
except json.JSONDecodeError: | |
annotation = {"allocated_variables": [], "deallocated_variables": []} | |
if annotation.get("allocated_variables"): | |
validation_prompt_template = """ | |
You are a C developer. Your task is to answer the following questions about a code snippet. | |
Does the function {func_name} allocate memory to a new variable (and not merely assign to an existing object or structure)? | |
Give a detailed answer in JSON format without any comments. | |
Code: | |
{source} | |
""" | |
source = function_code + "\n" + "\n".join(callees_code) | |
validation_prompt = validation_prompt_template.format(func_name=function_name, source=source) | |
validation_response = ollama.chat(model=model, messages=[{"role": "user", "content": validation_prompt}]) | |
try: | |
validation_result = json.loads(validation_response['message']['content']) | |
if not validation_result.get("is_new_allocation", False): | |
annotation["allocated_variables"] = [] # Clear if not a new allocation | |
except json.JSONDecodeError: | |
annotation["allocated_variables"] = [] # Clear on error | |
return annotation | |
def process_functions(function_data, output_dir, model="codestral:22b"): | |
""" | |
Process all functions to generate annotations. | |
Save annotations to output_dir. | |
""" | |
annotations = {} | |
for func_name, data in function_data.items(): | |
callees_code = [function_data[callee]['code'] for callee in data['callees'] if callee in function_data] | |
annotation = generate_annotation(func_name, data['code'], callees_code, model) | |
annotations[func_name] = annotation | |
os.makedirs(output_dir, exist_ok=True) | |
annotations_file = os.path.join(output_dir, 'annotations.json') | |
with open(annotations_file, 'w') as f: | |
json.dump(annotations, f, indent=2) | |
return annotations, annotations_file | |
def convert_to_codeql_format(function_data, annotations, codebase_path, output_dir): | |
""" | |
Convert LLM-generated annotations to CodeQL-compatible inline comments. | |
Modify source files in a temporary directory. | |
""" | |
temp_codebase_dir = os.path.join(output_dir, 'annotated_codebase') | |
os.makedirs(temp_codebase_dir, exist_ok=True) | |
tests_dir = os.path.join(temp_codebase_dir, 'tests') | |
os.makedirs(tests_dir, exist_ok=True) | |
filename = os.path.basename(codebase_path) | |
dst_path = os.path.join(tests_dir, filename) | |
print(f"Copying {codebase_path} to {dst_path}") | |
with open(codebase_path, 'rb') as src: | |
with open(dst_path, 'wb') as dst: | |
dst.write(src.read()) | |
project_dir = os.path.dirname(os.path.dirname(codebase_path)) | |
cmake_path = os.path.join(project_dir, 'CMakeLists.txt') | |
if os.path.exists(cmake_path): | |
print(f"Copying CMakeLists.txt from {cmake_path}") | |
with open(cmake_path, 'rb') as src: | |
with open(os.path.join(temp_codebase_dir, 'CMakeLists.txt'), 'wb') as dst: | |
dst.write(src.read()) | |
print(f"CMakeLists.txt copied to {os.path.join(temp_codebase_dir, 'CMakeLists.txt')}") | |
else: | |
print(f"ERROR: CMakeLists.txt not found at {cmake_path}") | |
for func_name, annotation in annotations.items(): | |
if func_name not in function_data: | |
continue | |
func_data = function_data[func_name] | |
file_path = func_data['file'] | |
rel_path = os.path.relpath(file_path, codebase_path) | |
temp_file_path = os.path.join(temp_codebase_dir, rel_path) | |
with open(temp_file_path, 'r') as f: | |
lines = f.readlines() | |
annotation_comments = [] | |
if annotation["allocated_variables"]: | |
annotation_comments.append(f"// @allocate {', '.join(annotation['allocated_variables'])}") | |
if annotation["deallocated_variables"]: | |
annotation_comments.append(f"// @deallocate {', '.join(annotation['deallocated_variables'])}") | |
if annotation_comments: | |
start_line = func_data['start_line'] - 1 | |
lines.insert(start_line, '\n'.join(annotation_comments) + '\n') | |
with open(temp_file_path, 'w') as f: | |
f.writelines(lines) | |
return temp_codebase_dir | |
def check_codeql_installation(): | |
"""Check if CodeQL CLI is installed and accessible.""" | |
try: | |
result = subprocess.run(["codeql", "--version"], | |
capture_output=True, | |
text=True) | |
return result.returncode == 0 | |
except FileNotFoundError: | |
return False | |
def run_lamded_pipeline(codebase_path, output_base_dir, model="codestral:22b"): | |
""" | |
Run the complete LAMED pipeline for memory leak detection. | |
Args: | |
codebase_path (str): Path to the C/C++ codebase (file or directory). | |
output_base_dir (str): Directory to store output files. | |
model (str): Ollama model name (e.g., 'codestral:22b'). | |
Returns: | |
dict: Paths to output files (functions, annotations, annotated codebase). | |
""" | |
# Check CodeQL installation | |
if not check_codeql_installation(): | |
raise RuntimeError("CodeQL CLI not found. Please install CodeQL and add it to PATH.") | |
# Step 1: Extract functions and call graph | |
function_data, functions_file = extract_functions_and_call_graph(codebase_path, output_base_dir) | |
# Step 2: Generate LLM-based annotations | |
annotations, annotations_file = process_functions(function_data, output_base_dir, model) | |
# Step 3: Convert annotations to CodeQL-compatible format | |
annotated_codebase_dir = convert_to_codeql_format(function_data, annotations, codebase_path, output_base_dir) | |
# Return results | |
results = { | |
"functions_file": functions_file, | |
"annotations_file": annotations_file, | |
"annotated_codebase_dir": annotated_codebase_dir, | |
"codeql_report": None # Placeholder for future CodeQL integration | |
} | |
return results | |
""" | |
memc.cpp | |
#include <stdlib.h> | |
void* allocate() { | |
void* ptr = malloc(10); // Allocates memory | |
return ptr; // No free() | |
} | |
void deallocate(void* ptr) { | |
free(ptr); // Deallocates memory | |
} | |
queries/memory_leaks.ql | |
import cpp | |
import semmle.code.cpp.dataflow.DataFlow | |
// Custom allocation functions | |
class CustomAllocationFunction extends Function { | |
CustomAllocationFunction() { | |
exists(Comment c | c.getTarget() = this and c.getText().matches("%@allocate return%")) | |
} | |
} | |
// Custom deallocation functions | |
class CustomDeallocationFunction extends Function { | |
string deallocatedParam; | |
CustomDeallocationFunction() { | |
exists(Comment c | | |
c.getTarget() = this and | |
c.getText().regexpMatch(".*@deallocate\\s+(\\w+).*", deallocatedParam) | |
) | |
} | |
Parameter getDeallocatedParameter() { | |
result = this.getAParameter() and result.getName() = deallocatedParam | |
} | |
} | |
// Extend allocation expressions | |
class MyAllocationExpr extends Expr { | |
MyAllocationExpr() { | |
this instanceof AllocationExpr or | |
exists(CustomAllocationFunction f | this.(Call).getTarget() = f) | |
} | |
} | |
// Extend deallocation expressions | |
class MyDeallocationExpr extends Expr { | |
MyDeallocationExpr() { | |
this instanceof DeallocationExpr or | |
exists(CustomDeallocationFunction f, Call c | | |
c.getTarget() = f and | |
this = c.getArgument(f.getDeallocatedParameter().getIndex()) | |
) | |
} | |
} | |
// Detect memory leaks | |
from MyAllocationExpr alloc | |
where not exists(MyDeallocationExpr dealloc | | |
DataFlow::localFlow(DataFlow::exprNode(alloc), DataFlow::exprNode(dealloc)) | |
) | |
select alloc, "Memory allocated here is not deallocated." | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment