Skip to content

Instantly share code, notes, and snippets.

@ntfargo
Last active May 8, 2025 19:38
Show Gist options
  • Save ntfargo/14f5aa4706d5dcbe0e58746180cdd087 to your computer and use it in GitHub Desktop.
Save ntfargo/14f5aa4706d5dcbe0e58746180cdd087 to your computer and use it in GitHub Desktop.
Leveraging (LLMs) via Ollama (codestral:22b), it generates annotations for memory allocation and deallocation, which are used by CodeQL for static analysis.
"""
LLM-based Memory Leak Detection for C/C++ Codebases
Prototype code for automating memory leak detection in C/C++ codebases. Leveraging large language models (LLMs) via Ollama (codestral:22b), it generates annotations for memory allocation and deallocation, which are used by CodeQL for static analysis.
This is a sample implementation and may require adjustments based on specific requirements.
"""
import clang.cindex
import ollama
import json
import os
import subprocess
from collections import defaultdict
def extract_functions_and_call_graph(codebase_path, output_dir):
"""
Extract functions and call graph from a C/C++ codebase using libclang.
Save function source code and call graph to output_dir.
"""
os.makedirs(output_dir, exist_ok=True)
# Initialize libclang
clang.cindex.Config.set_library_path('C:/Program Files/LLVM/bin') # LLVM path
index = clang.cindex.Index.create()
# Collect all C/C++ files
c_files = []
for root, _, files in os.walk(codebase_path):
for file in files:
if file.endswith(('.c', '.cpp', '.cc', '.cxx')):
c_files.append(os.path.join(root, file))
function_data = {}
call_graph = defaultdict(list)
for file_path in c_files:
# Parse file into AST
translation_unit = index.parse(file_path, args=['-std=c99'])
cursor = translation_unit.cursor
def traverse(node, current_function=None):
"""Recursively traverse AST to extract functions and calls."""
if node.kind == clang.cindex.CursorKind.FUNCTION_DECL and node.is_definition():
func_name = node.spelling
start = node.extent.start
end = node.extent.end
with open(file_path, 'r') as f:
lines = f.readlines()
func_code = ''.join(lines[start.line-1:end.line]).strip()
function_data[func_name] = {
'code': func_code,
'callees': [],
'file': file_path,
'start_line': start.line,
'end_line': end.line
}
for child in node.get_children():
traverse(child, func_name)
elif node.kind == clang.cindex.CursorKind.CALL_EXPR and current_function:
callee_name = node.spelling
if callee_name not in call_graph[current_function]:
call_graph[current_function].append(callee_name)
traverse(cursor)
for caller, callees in call_graph.items():
if caller in function_data:
function_data[caller]['callees'] = [
callee for callee in callees if callee in function_data
]
functions_file = os.path.join(output_dir, 'functions.json')
with open(functions_file, 'w') as f:
json.dump(function_data, f, indent=2)
return function_data, functions_file
def generate_annotation(function_name, function_code, callees_code, model="codestral:22b"):
"""
Generate LLM-based annotations for a function using Ollama.
Returns JSON with allocated and deallocated variables.
"""
prompt_template = """
You are a C developer. Your task is to answer the following questions about a code snippet.
Which variables contain pointers to the memory allocated in function {func_name}?
Put the answer in the "allocated_variables" field.
Which variables contain pointers to the memory deallocated in function {func_name}?
Put the answer in the "deallocated_variables" field.
Return the final answer as a short JSON object.
Code:
{code}
"""
prompt = prompt_template.format(func_name=function_name, code=function_code)
# Query Ollama
response = ollama.chat(model=model, messages=[{"role": "user", "content": prompt}])
try:
annotation = json.loads(response['message']['content'])
except json.JSONDecodeError:
annotation = {"allocated_variables": [], "deallocated_variables": []}
if annotation.get("allocated_variables"):
validation_prompt_template = """
You are a C developer. Your task is to answer the following questions about a code snippet.
Does the function {func_name} allocate memory to a new variable (and not merely assign to an existing object or structure)?
Give a detailed answer in JSON format without any comments.
Code:
{source}
"""
source = function_code + "\n" + "\n".join(callees_code)
validation_prompt = validation_prompt_template.format(func_name=function_name, source=source)
validation_response = ollama.chat(model=model, messages=[{"role": "user", "content": validation_prompt}])
try:
validation_result = json.loads(validation_response['message']['content'])
if not validation_result.get("is_new_allocation", False):
annotation["allocated_variables"] = [] # Clear if not a new allocation
except json.JSONDecodeError:
annotation["allocated_variables"] = [] # Clear on error
return annotation
def process_functions(function_data, output_dir, model="codestral:22b"):
"""
Process all functions to generate annotations.
Save annotations to output_dir.
"""
annotations = {}
for func_name, data in function_data.items():
callees_code = [function_data[callee]['code'] for callee in data['callees'] if callee in function_data]
annotation = generate_annotation(func_name, data['code'], callees_code, model)
annotations[func_name] = annotation
os.makedirs(output_dir, exist_ok=True)
annotations_file = os.path.join(output_dir, 'annotations.json')
with open(annotations_file, 'w') as f:
json.dump(annotations, f, indent=2)
return annotations, annotations_file
def convert_to_codeql_format(function_data, annotations, codebase_path, output_dir):
"""
Convert LLM-generated annotations to CodeQL-compatible inline comments.
Modify source files in a temporary directory.
"""
temp_codebase_dir = os.path.join(output_dir, 'annotated_codebase')
os.makedirs(temp_codebase_dir, exist_ok=True)
tests_dir = os.path.join(temp_codebase_dir, 'tests')
os.makedirs(tests_dir, exist_ok=True)
filename = os.path.basename(codebase_path)
dst_path = os.path.join(tests_dir, filename)
print(f"Copying {codebase_path} to {dst_path}")
with open(codebase_path, 'rb') as src:
with open(dst_path, 'wb') as dst:
dst.write(src.read())
project_dir = os.path.dirname(os.path.dirname(codebase_path))
cmake_path = os.path.join(project_dir, 'CMakeLists.txt')
if os.path.exists(cmake_path):
print(f"Copying CMakeLists.txt from {cmake_path}")
with open(cmake_path, 'rb') as src:
with open(os.path.join(temp_codebase_dir, 'CMakeLists.txt'), 'wb') as dst:
dst.write(src.read())
print(f"CMakeLists.txt copied to {os.path.join(temp_codebase_dir, 'CMakeLists.txt')}")
else:
print(f"ERROR: CMakeLists.txt not found at {cmake_path}")
for func_name, annotation in annotations.items():
if func_name not in function_data:
continue
func_data = function_data[func_name]
file_path = func_data['file']
rel_path = os.path.relpath(file_path, codebase_path)
temp_file_path = os.path.join(temp_codebase_dir, rel_path)
with open(temp_file_path, 'r') as f:
lines = f.readlines()
annotation_comments = []
if annotation["allocated_variables"]:
annotation_comments.append(f"// @allocate {', '.join(annotation['allocated_variables'])}")
if annotation["deallocated_variables"]:
annotation_comments.append(f"// @deallocate {', '.join(annotation['deallocated_variables'])}")
if annotation_comments:
start_line = func_data['start_line'] - 1
lines.insert(start_line, '\n'.join(annotation_comments) + '\n')
with open(temp_file_path, 'w') as f:
f.writelines(lines)
return temp_codebase_dir
def check_codeql_installation():
"""Check if CodeQL CLI is installed and accessible."""
try:
result = subprocess.run(["codeql", "--version"],
capture_output=True,
text=True)
return result.returncode == 0
except FileNotFoundError:
return False
def run_lamded_pipeline(codebase_path, output_base_dir, model="codestral:22b"):
"""
Run the complete LAMED pipeline for memory leak detection.
Args:
codebase_path (str): Path to the C/C++ codebase (file or directory).
output_base_dir (str): Directory to store output files.
model (str): Ollama model name (e.g., 'codestral:22b').
Returns:
dict: Paths to output files (functions, annotations, annotated codebase).
"""
# Check CodeQL installation
if not check_codeql_installation():
raise RuntimeError("CodeQL CLI not found. Please install CodeQL and add it to PATH.")
# Step 1: Extract functions and call graph
function_data, functions_file = extract_functions_and_call_graph(codebase_path, output_base_dir)
# Step 2: Generate LLM-based annotations
annotations, annotations_file = process_functions(function_data, output_base_dir, model)
# Step 3: Convert annotations to CodeQL-compatible format
annotated_codebase_dir = convert_to_codeql_format(function_data, annotations, codebase_path, output_base_dir)
# Return results
results = {
"functions_file": functions_file,
"annotations_file": annotations_file,
"annotated_codebase_dir": annotated_codebase_dir,
"codeql_report": None # Placeholder for future CodeQL integration
}
return results
"""
memc.cpp
#include <stdlib.h>
void* allocate() {
void* ptr = malloc(10); // Allocates memory
return ptr; // No free()
}
void deallocate(void* ptr) {
free(ptr); // Deallocates memory
}
queries/memory_leaks.ql
import cpp
import semmle.code.cpp.dataflow.DataFlow
// Custom allocation functions
class CustomAllocationFunction extends Function {
CustomAllocationFunction() {
exists(Comment c | c.getTarget() = this and c.getText().matches("%@allocate return%"))
}
}
// Custom deallocation functions
class CustomDeallocationFunction extends Function {
string deallocatedParam;
CustomDeallocationFunction() {
exists(Comment c |
c.getTarget() = this and
c.getText().regexpMatch(".*@deallocate\\s+(\\w+).*", deallocatedParam)
)
}
Parameter getDeallocatedParameter() {
result = this.getAParameter() and result.getName() = deallocatedParam
}
}
// Extend allocation expressions
class MyAllocationExpr extends Expr {
MyAllocationExpr() {
this instanceof AllocationExpr or
exists(CustomAllocationFunction f | this.(Call).getTarget() = f)
}
}
// Extend deallocation expressions
class MyDeallocationExpr extends Expr {
MyDeallocationExpr() {
this instanceof DeallocationExpr or
exists(CustomDeallocationFunction f, Call c |
c.getTarget() = f and
this = c.getArgument(f.getDeallocatedParameter().getIndex())
)
}
}
// Detect memory leaks
from MyAllocationExpr alloc
where not exists(MyDeallocationExpr dealloc |
DataFlow::localFlow(DataFlow::exprNode(alloc), DataFlow::exprNode(dealloc))
)
select alloc, "Memory allocated here is not deallocated."
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment