Created
July 22, 2025 17:38
-
-
Save paul-english/7a4c64c54521cafb05a0fa709f98dbfb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import traceback | |
import logging | |
import subprocess | |
import fnmatch | |
from datetime import datetime | |
from functools import wraps | |
import dspy | |
from prompt_toolkit import PromptSession | |
from prompt_toolkit.history import FileHistory | |
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory | |
from prompt_toolkit.completion import WordCompleter | |
from prompt_toolkit.styles import Style | |
import requests | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Global debug mode state | |
DEBUG_MODE = False | |
def setup_logging(debug_mode=False): | |
"""Set up logging with appropriate levels based on debug mode.""" | |
global DEBUG_MODE | |
DEBUG_MODE = debug_mode | |
# Clear any existing handlers | |
root_logger = logging.getLogger() | |
for handler in root_logger.handlers[:]: | |
root_logger.removeHandler(handler) | |
# Set up custom formatter | |
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
# File handler (always logs everything) | |
file_handler = logging.FileHandler('agent.log') | |
file_handler.setLevel(logging.DEBUG) | |
file_handler.setFormatter(formatter) | |
# Console handler with filtering | |
console_handler = logging.StreamHandler() | |
console_handler.setLevel(logging.INFO) | |
console_handler.setFormatter(formatter) | |
# Add handlers to root logger | |
root_logger.addHandler(file_handler) | |
root_logger.addHandler(console_handler) | |
root_logger.setLevel(logging.DEBUG) | |
# Configure specific loggers based on debug mode | |
if debug_mode: | |
# In debug mode, show all logs | |
logging.getLogger('LiteLLM').setLevel(logging.INFO) | |
logging.getLogger('urllib3').setLevel(logging.INFO) | |
logging.getLogger('requests').setLevel(logging.INFO) | |
logging.getLogger('httpx').setLevel(logging.INFO) | |
else: | |
# In normal mode, suppress verbose logs | |
logging.getLogger('LiteLLM').setLevel(logging.WARNING) | |
logging.getLogger('urllib3').setLevel(logging.WARNING) | |
logging.getLogger('requests').setLevel(logging.WARNING) | |
logging.getLogger('httpx').setLevel(logging.WARNING) | |
# Always show our own logs | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
return logger | |
# Initialize logging | |
logger = setup_logging(debug_mode=False) | |
USER_AGENT = "Paul's LLM agent <[email protected]>" | |
KAGI_API_KEY = os.getenv("KAGI_API_KEY") | |
# Global auto confirm state | |
AUTO_CONFIRM = False | |
def user_confirm(func): | |
"""Decorator that prompts user for y/n confirmation before executing a tool.""" | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
# Get the function name and arguments for display | |
func_name = func.__name__ | |
arg_str = ", ".join([str(arg) for arg in args[1:]]) # Skip self | |
if kwargs: | |
kwarg_str = ", ".join([f"{k}={v}" for k, v in kwargs.items()]) | |
arg_str = f"{arg_str}, {kwarg_str}" if arg_str else kwarg_str | |
# Check if auto confirm is enabled | |
if AUTO_CONFIRM: | |
print(f"\nβ Auto-confirming tool execution:") | |
print(f" Function: {func_name}") | |
print(f" Arguments: {arg_str}") | |
print(" β Auto-confirmed (auto_confirm enabled)") | |
try: | |
result = func(*args, **kwargs) | |
logger.info(f"Tool {func_name} executed successfully with args: {arg_str}") | |
return result | |
except Exception as e: | |
error_msg = f"Tool {func_name} failed with error: {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
# Prompt user for confirmation | |
print(f"\nπ€ Confirm tool execution:") | |
print(f" Function: {func_name}") | |
print(f" Arguments: {arg_str}") | |
print(" (y/n/other text): ", end="", flush=True) | |
try: | |
user_input = input().strip().lower() | |
if user_input in ['y', 'yes']: | |
print("β Executing tool...") | |
try: | |
result = func(*args, **kwargs) | |
logger.info(f"Tool {func_name} executed successfully with args: {arg_str}") | |
return result | |
except Exception as e: | |
error_msg = f"Tool {func_name} failed with error: {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
elif user_input in ['n', 'no']: | |
print("β Tool execution cancelled by user.") | |
return f"Tool execution cancelled by user for {func_name}({arg_str})" | |
else: | |
print(f"β Tool execution cancelled. User input: '{user_input}'") | |
return f"Tool execution cancelled. User provided: '{user_input}' for {func_name}({arg_str})" | |
except (EOFError, KeyboardInterrupt): | |
print("\nβ Tool execution cancelled due to interrupt.") | |
return f"Tool execution cancelled due to user interrupt for {func_name}({arg_str})" | |
return wrapper | |
lm = dspy.LM( | |
#'ollama_chat/llama3.1:70b', | |
'ollama_chat/gemma3:27b', | |
api_base='http://localhost:11434', | |
api_key='' | |
) | |
dspy.settings.configure(lm=lm) | |
#region agents | |
class RouterAgent(dspy.Module): | |
"""A router DSPy agent that can handle various tasks with conversation history.""" | |
def __init__(self, debug_mode=False, max_history=5): | |
super().__init__() | |
self.debug_mode = debug_mode | |
self.max_history = max_history | |
# Initialize sub-agents | |
self.weather_agent = WeatherAgent(debug_mode=debug_mode) | |
self.web_agent = WebAgent(debug_mode=debug_mode) | |
# Initialize FileSystemAgent with current working directory | |
import os | |
self.filesystem_agent = FileSystemAgent(debug_mode=debug_mode, initial_working_dir=os.getcwd()) | |
# Router with history using proper DSPy signature | |
class RouterSignature(dspy.Signature): | |
question: str = dspy.InputField() | |
history: dspy.History = dspy.InputField() | |
agent_choice: str = dspy.OutputField() | |
self.router = dspy.ChainOfThought(RouterSignature) | |
# History management using proper DSPy History | |
self.history = dspy.History(messages=[]) | |
self.max_history = max_history | |
def _get_context_from_history(self): | |
"""Get conversation context from history as a formatted string.""" | |
if not self.history.messages: | |
return "" | |
context_lines = ["π Previous conversation:"] | |
for i, message in enumerate(self.history.messages, 1): | |
question = message.get("question", "") | |
answer = message.get("answer", "") | |
# Truncate long responses for readability | |
truncated_answer = str(answer)[:200] + "..." if len(str(answer)) > 200 else str(answer) | |
context_lines.append(f"{i}. User: {question}") | |
context_lines.append(f" Assistant: {truncated_answer}") | |
context_lines.append("") # Empty line for readability | |
return "\n".join(context_lines) | |
def _add_to_history(self, question, result): | |
"""Add a conversation turn to the history using DSPy conventions.""" | |
# Create new message following DSPy History pattern | |
new_message = {"question": question, **result} | |
# Add to history | |
self.history.messages.append(new_message) | |
# Keep only the last max_history turns | |
if len(self.history.messages) > self.max_history: | |
self.history.messages = self.history.messages[-self.max_history:] | |
def forward(self, question): | |
"""Route the question to the appropriate agent with conversation history.""" | |
try: | |
# First, determine which agent to use | |
print("π§ Analyzing question to determine best agent...") | |
routing_result = self.router(question=question, history=self.history) | |
# Show the reasoning | |
if hasattr(routing_result, 'rationale'): | |
print(f"π€ Routing reasoning: {routing_result.rationale}") | |
# Extract the agent choice from the reasoning | |
agent_choice = routing_result.agent_choice.lower() | |
print(f"π― Selected agent: {agent_choice}") | |
if self.debug_mode: | |
print(f"π Debug: Full routing result: {routing_result}") | |
print(f"π Debug: Conversation context length: {len(context) if context else 0}") | |
# Route to appropriate agent based on keywords | |
if any(keyword in question.lower() for keyword in ['weather', 'temperature', 'forecast', 'zip', 'zipcode', 'zip code']): | |
print("π€οΈ Routing to WeatherAgent...") | |
result = self.weather_agent(question=question, history=self.history) | |
elif any(keyword in question.lower() for keyword in ['file', 'files', 'directory', 'folder', 'read', 'write', 'search files', 'list files', 'fd', 'filesystem', 'fs']): | |
print("πΎ Routing to FileSystemAgent...") | |
result = self.filesystem_agent(question=question, history=self.history) | |
elif any(keyword in question.lower() for keyword in ['search', 'web', 'internet', 'website', 'url', 'find', 'lookup', 'information']): | |
print("π Routing to WebAgent...") | |
result = self.web_agent(question=question, history=self.history) | |
else: | |
# Default to web agent for general questions | |
print("π Routing to WebAgent (default)...") | |
result = self.web_agent(question=question, history=self.history) | |
# Add to history | |
self._add_to_history(question, result) | |
return result | |
except Exception as e: | |
error_msg = f"RouterAgent failed to process question: {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
class WeatherAgent(dspy.Module): | |
"""A weather agent that uses ReAct with weather-related tools and conversation history.""" | |
def __init__(self, debug_mode=False): | |
super().__init__() | |
self.debug_mode = debug_mode | |
# Weather agent signature with history | |
class WeatherSignature(dspy.Signature): | |
question: str = dspy.InputField() | |
history: dspy.History = dspy.InputField() | |
answer: str = dspy.OutputField() | |
self.react = dspy.ReAct(WeatherSignature, tools=[ | |
dspy.Tool(name="get_weather", func=self.get_weather, desc="Get current weather for a location"), | |
dspy.Tool(name="convert_to_fahrenheit", func=self.convert_to_fahrenheit, desc="Convert Celsius to Fahrenheit") | |
]) | |
@user_confirm | |
def get_weather(self, zip_code): | |
"""Get current weather for a zip code.""" | |
try: | |
logger.info(f"Getting weather for ZIP code: {zip_code}") | |
# Step 1: Convert ZIP code to coordinates | |
geocode_url = f"https://nominatim.openstreetmap.org/search?postalcode={zip_code}&country=USA&format=json" | |
geocode_response = requests.get(geocode_url, headers={"User-Agent": USER_AGENT}) | |
geocode_data = geocode_response.json() | |
if not geocode_data: | |
error_msg = f"Could not find coordinates for ZIP code {zip_code}" | |
logger.warning(error_msg) | |
return error_msg | |
latitude = geocode_data[0]['lat'] | |
longitude = geocode_data[0]['lon'] | |
logger.info(f"Found coordinates: lat={latitude}, lon={longitude}") | |
# Step 2: Fetch weather data from Open-Meteo | |
weather_url = ( | |
f"https://api.open-meteo.com/v1/forecast" | |
f"?latitude={latitude}&longitude={longitude}¤t=temperature_2m,wind_speed_10m" | |
) | |
weather_response = requests.get(weather_url) | |
weather_data = weather_response.json() | |
logger.info(f"Weather data retrieved: {weather_data}") | |
return weather_data.get("current", "No current weather data available") | |
except Exception as e: | |
error_msg = f"Error getting weather for ZIP {zip_code}: {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
def convert_to_fahrenheit(self, celsius): | |
"""Convert Celsius to Fahrenheit.""" | |
try: | |
celsius_float = float(celsius) | |
fahrenheit = (celsius_float * 9/5) + 32 | |
logger.info(f"Converted {celsius_float}Β°C to {fahrenheit}Β°F") | |
return fahrenheit | |
except Exception as e: | |
error_msg = f"Error converting {celsius} to Fahrenheit: {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
def forward(self, question, history=None): | |
"""Process weather-related questions using ReAct with conversation history.""" | |
try: | |
print("π€οΈ WeatherAgent processing question...") | |
# Use provided history or create empty one | |
if history is None: | |
history = dspy.History(messages=[]) | |
result = self.react(question=question, history=history) | |
# Show the reasoning if available | |
if hasattr(result, 'rationale'): | |
print(f"π€ WeatherAgent reasoning: {result.rationale}") | |
if self.debug_mode: | |
print("π WeatherAgent debug info:") | |
print(f" - Question: {question}") | |
print(f" - History length: {len(history.messages) if history else 0}") | |
print(f" - Result type: {type(result)}") | |
if hasattr(result, '__dict__'): | |
print(f" - Result attributes: {list(result.__dict__.keys())}") | |
return result | |
except Exception as e: | |
error_msg = f"WeatherAgent failed to process question: {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
class WebAgent(dspy.Module): | |
"""A web agent that uses ReAct with web-related tools and conversation history.""" | |
def __init__(self, debug_mode=False): | |
super().__init__() | |
self.debug_mode = debug_mode | |
# Web agent signature with history | |
class WebSignature(dspy.Signature): | |
question: str = dspy.InputField() | |
history: dspy.History = dspy.InputField() | |
answer: str = dspy.OutputField() | |
self.react = dspy.ReAct(WebSignature, tools=[ | |
dspy.Tool(name="search_web", func=self.search_web, desc="Search the web for information"), | |
dspy.Tool(name="get_webpage_content", func=self.get_webpage_content, desc="Get content from a webpage"), | |
]) | |
@user_confirm | |
def search_web(self, query): | |
"""Search the web for information using Kagi API.""" | |
try: | |
logger.info(f"Searching web for query: {query}") | |
print(f'π Searching web for: "{query}"') | |
if not KAGI_API_KEY: | |
error_msg = "KAGI_API_KEY not found in environment variables" | |
logger.error(error_msg) | |
return f"β Error: {error_msg}" | |
url = "https://kagi.com/api/v0/search" | |
headers = { | |
"Authorization": f"Bot {KAGI_API_KEY}", | |
"User-Agent": USER_AGENT | |
} | |
params = {"q": query} | |
response = requests.get(url, headers=headers, params=params) | |
logger.info(f"Search response status: {response.status_code}") | |
response.raise_for_status() | |
data = response.json() | |
# Extract search results | |
results = data.get("data", []) | |
if not results: | |
logger.warning(f"No search results found for query: {query}") | |
return f"π No search results found for '{query}'" | |
# Format the results | |
formatted_results = [] | |
for i, result in enumerate(results[:5], 1): # Limit to top 5 results | |
title = result.get("title", "No title") | |
snippet = result.get("snippet", "No snippet") | |
url = result.get("url", "") | |
formatted_results.append(f"{i}. {title}\n {snippet}\n URL: {url}\n") | |
logger.info(f"Found {len(results)} search results for query: {query}") | |
return f"π Search results for '{query}':\n" + "\n".join(formatted_results) | |
except requests.exceptions.RequestException as e: | |
error_msg = f"Error searching web: {e}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
except Exception as e: | |
error_msg = f"Unexpected error searching web: {e}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
@user_confirm | |
def get_webpage_content(self, url): | |
"""Get content from a webpage using requests.""" | |
try: | |
logger.info(f"Fetching webpage content from: {url}") | |
print(f"π Fetching content from: {url}") | |
headers = { | |
"User-Agent": USER_AGENT | |
} | |
response = requests.get(url, headers=headers, timeout=10) | |
response.raise_for_status() | |
# Get the content | |
content = response.text | |
# Extract title if available | |
title = "" | |
if "<title>" in content.lower(): | |
import re | |
title_match = re.search(r'<title[^>]*>(.*?)</title>', content, re.IGNORECASE) | |
if title_match: | |
title = title_match.group(1).strip() | |
# Clean up the content - remove HTML tags and extra whitespace | |
import re | |
# Remove HTML tags | |
clean_content = re.sub(r'<[^>]+>', '', content) | |
# Remove extra whitespace and newlines | |
clean_content = re.sub(r'\s+', ' ', clean_content) | |
clean_content = clean_content.strip() | |
# Limit content length for readability | |
if len(clean_content) > 1000: | |
clean_content = clean_content[:1000] + "..." | |
logger.info(f"Successfully fetched content from {url}, length: {len(clean_content)} chars") | |
result = f"π Content from {url}" | |
if title: | |
result += f"\nTitle: {title}" | |
result += f"\n\n{clean_content}" | |
return result | |
except requests.exceptions.RequestException as e: | |
error_msg = f"Error fetching webpage {url}: {e}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
except Exception as e: | |
error_msg = f"Unexpected error fetching {url}: {e}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
def forward(self, question, history=None): | |
"""Process web-related questions using ReAct with conversation history.""" | |
try: | |
print("π WebAgent processing question...") | |
# Use provided history or create empty one | |
if history is None: | |
history = dspy.History(messages=[]) | |
result = self.react(question=question, history=history) | |
# Show the reasoning if available | |
if hasattr(result, 'rationale'): | |
print(f"π€ WebAgent reasoning: {result.rationale}") | |
if self.debug_mode: | |
print("π WebAgent debug info:") | |
print(f" - Question: {question}") | |
print(f" - History length: {len(history.messages) if history else 0}") | |
print(f" - Result type: {type(result)}") | |
if hasattr(result, '__dict__'): | |
print(f" - Result attributes: {list(result.__dict__.keys())}") | |
return result | |
except Exception as e: | |
error_msg = f"WebAgent failed to process question: {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
class FileSystemAgent(dspy.Module): | |
"""A file system agent that uses ReAct with file system tools and maintains working directory context.""" | |
def __init__(self, debug_mode=False, ignore_paths=None, initial_working_dir=None): | |
super().__init__() | |
self.debug_mode = debug_mode | |
self.ignore_paths = ignore_paths or [ | |
'.git', '.svn', '.hg', '.bzr', # Version control | |
'node_modules', '__pycache__', '.pytest_cache', # Dependencies and cache | |
'.venv', 'venv', 'env', '.env', # Virtual environments | |
'.DS_Store', 'Thumbs.db', # OS files | |
'.vscode', '.idea', # IDE files | |
'*.log', '*.tmp', '*.temp', # Log and temp files | |
'/proc', '/sys', '/dev', # System directories | |
'/tmp', '/var/tmp', '/var/cache' # System temp directories | |
] | |
# Initialize working directory context | |
import os | |
self.working_dir = os.path.abspath(initial_working_dir or os.getcwd()) | |
self.directory_history = [self.working_dir] # Keep track of directory navigation history | |
# FileSystem agent signature with history | |
class FileSystemSignature(dspy.Signature): | |
question: str = dspy.InputField() | |
history: dspy.History = dspy.InputField() | |
answer: str = dspy.OutputField() | |
self.react = dspy.ReAct(FileSystemSignature, tools=[ | |
dspy.Tool(name="search_files", func=self.search_files, desc="Search for files using fd command"), | |
dspy.Tool(name="list_directory", func=self.list_directory, desc="List files and directories in a path"), | |
dspy.Tool(name="read_file", func=self.read_file, desc="Read the contents of a file"), | |
dspy.Tool(name="write_file", func=self.write_file, desc="Write content to a file"), | |
dspy.Tool(name="change_directory", func=self.change_directory, desc="Change the current working directory"), | |
dspy.Tool(name="get_working_directory", func=self.get_working_directory, desc="Get the current working directory"), | |
dspy.Tool(name="go_back", func=self.go_back, desc="Go back to the previous directory in history"), | |
]) | |
def _build_fd_ignore_args(self): | |
"""Build ignore arguments for fd command.""" | |
ignore_args = [] | |
for path in self.ignore_paths: | |
if path.startswith('*'): | |
# Pattern like "*.log" becomes --glob "*.log" | |
ignore_args.extend(['--glob', path]) | |
else: | |
# Directory or file path becomes --exclude path | |
ignore_args.extend(['--exclude', path]) | |
return ignore_args | |
@user_confirm | |
def search_files(self, query, directory=None): | |
"""Search for files using fd command.""" | |
try: | |
# Use working directory if no directory specified | |
if directory is None: | |
directory = self.working_dir | |
elif directory == ".": | |
directory = self.working_dir | |
elif not os.path.isabs(directory): | |
# Make relative paths absolute relative to working directory | |
directory = os.path.join(self.working_dir, directory) | |
logger.info(f"Searching files for query: '{query}' in directory: '{directory}'") | |
print(f"π Searching files for: '{query}' in '{directory}'") | |
# Check if fd is available | |
import subprocess | |
try: | |
subprocess.run(['fd', '--version'], capture_output=True, check=True) | |
except (subprocess.CalledProcessError, FileNotFoundError): | |
error_msg = "fd command not found. Please install fd (fdfind) first." | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
# Build fd command | |
cmd = ['fd', query, directory, '--type', 'f', '--color', 'never'] | |
cmd.extend(self._build_fd_ignore_args()) | |
if self.debug_mode: | |
print(f"π Debug: Running command: {' '.join(cmd)}") | |
# Execute fd command | |
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) | |
if result.returncode != 0 and result.stderr: | |
logger.warning(f"fd command stderr: {result.stderr}") | |
files = result.stdout.strip().split('\n') if result.stdout.strip() else [] | |
if not files: | |
logger.info(f"No files found matching '{query}' in '{directory}'") | |
return f"π No files found matching '{query}' in '{directory}'" | |
# Limit results for readability | |
if len(files) > 20: | |
files = files[:20] | |
result_msg = f"π Found {len(files)} files (showing first 20):\n" | |
else: | |
result_msg = f"π Found {len(files)} files:\n" | |
for i, file_path in enumerate(files, 1): | |
result_msg += f"{i:2d}. {file_path}\n" | |
logger.info(f"Found {len(files)} files matching '{query}' in '{directory}'") | |
return result_msg | |
except subprocess.TimeoutExpired: | |
error_msg = f"Search timed out for query: '{query}'" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
except Exception as e: | |
error_msg = f"Error searching files for '{query}': {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
@user_confirm | |
def list_directory(self, path=None): | |
"""List files and directories in a path.""" | |
try: | |
import os | |
# Use working directory if no path specified | |
if path is None: | |
path = self.working_dir | |
elif path == ".": | |
path = self.working_dir | |
elif not os.path.isabs(path): | |
# Make relative paths absolute relative to working directory | |
path = os.path.join(self.working_dir, path) | |
logger.info(f"Listing directory: {path}") | |
print(f"π Listing directory: {path}") | |
if not os.path.exists(path): | |
error_msg = f"Path does not exist: {path}" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
if not os.path.isdir(path): | |
error_msg = f"Path is not a directory: {path}" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
# Get directory contents | |
try: | |
items = os.listdir(path) | |
except PermissionError: | |
error_msg = f"Permission denied accessing directory: {path}" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
# Filter out ignored items | |
filtered_items = [] | |
for item in items: | |
item_path = os.path.join(path, item) | |
should_ignore = False | |
for ignore_pattern in self.ignore_paths: | |
if ignore_pattern.startswith('*'): | |
# Handle glob patterns | |
import fnmatch | |
if fnmatch.fnmatch(item, ignore_pattern): | |
should_ignore = True | |
break | |
elif item == ignore_pattern or item_path.endswith(ignore_pattern): | |
should_ignore = True | |
break | |
if not should_ignore: | |
filtered_items.append(item) | |
if not filtered_items: | |
return f"π Directory '{path}' is empty or contains only ignored items" | |
# Sort items (directories first, then files) | |
directories = [] | |
files = [] | |
for item in filtered_items: | |
item_path = os.path.join(path, item) | |
if os.path.isdir(item_path): | |
directories.append(item) | |
else: | |
files.append(item) | |
directories.sort() | |
files.sort() | |
result = f"π Contents of '{path}':\n" | |
if directories: | |
result += "\nπ Directories:\n" | |
for i, directory in enumerate(directories, 1): | |
result += f"{i:2d}. {directory}/\n" | |
if files: | |
result += "\nπ Files:\n" | |
for i, file in enumerate(files, 1): | |
file_path = os.path.join(path, file) | |
try: | |
size = os.path.getsize(file_path) | |
size_str = f"{size:,} bytes" if size < 1024 else f"{size/1024:.1f} KB" | |
result += f"{i:2d}. {file} ({size_str})\n" | |
except OSError: | |
result += f"{i:2d}. {file} (size unknown)\n" | |
logger.info(f"Listed {len(directories)} directories and {len(files)} files in '{path}'") | |
return result | |
except Exception as e: | |
error_msg = f"Error listing directory '{path}': {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
@user_confirm | |
def read_file(self, file_path): | |
"""Read the contents of a file.""" | |
try: | |
import os | |
# Make relative paths absolute relative to working directory | |
if not os.path.isabs(file_path): | |
file_path = os.path.join(self.working_dir, file_path) | |
logger.info(f"Reading file: {file_path}") | |
print(f"π Reading file: {file_path}") | |
if not os.path.exists(file_path): | |
error_msg = f"File does not exist: {file_path}" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
if not os.path.isfile(file_path): | |
error_msg = f"Path is not a file: {file_path}" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
# Check file size to avoid reading huge files | |
file_size = os.path.getsize(file_path) | |
if file_size > 1024 * 1024: # 1MB limit | |
error_msg = f"File too large ({file_size:,} bytes). Maximum size is 1MB." | |
logger.warning(error_msg) | |
return f"β {error_msg}" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
except UnicodeDecodeError: | |
# Try with different encoding | |
try: | |
with open(file_path, 'r', encoding='latin-1') as f: | |
content = f.read() | |
except Exception as e: | |
error_msg = f"Could not read file '{file_path}': {str(e)}" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
except PermissionError: | |
error_msg = f"Permission denied reading file: {file_path}" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
# Limit content length for readability | |
if len(content) > 5000: | |
content = content[:5000] + "\n\n... (content truncated)" | |
result = f"π Content of '{file_path}' ({len(content)} characters):\n\n{content}" | |
logger.info(f"Successfully read file '{file_path}', {len(content)} characters") | |
return result | |
except Exception as e: | |
error_msg = f"Error reading file '{file_path}': {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
@user_confirm | |
def write_file(self, file_path, content): | |
"""Write content to a file.""" | |
try: | |
import os | |
# Make relative paths absolute relative to working directory | |
if not os.path.isabs(file_path): | |
file_path = os.path.join(self.working_dir, file_path) | |
logger.info(f"Writing to file: {file_path}") | |
print(f"βοΈ Writing to file: {file_path}") | |
# Check if directory exists | |
directory = os.path.dirname(file_path) | |
if directory and not os.path.exists(directory): | |
try: | |
os.makedirs(directory, exist_ok=True) | |
logger.info(f"Created directory: {directory}") | |
except Exception as e: | |
error_msg = f"Could not create directory '{directory}': {str(e)}" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
# Check if file exists and is writable | |
if os.path.exists(file_path): | |
if not os.path.isfile(file_path): | |
error_msg = f"Path exists but is not a file: {file_path}" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
if not os.access(file_path, os.W_OK): | |
error_msg = f"File is not writable: {file_path}" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
try: | |
with open(file_path, 'w', encoding='utf-8') as f: | |
f.write(content) | |
except PermissionError: | |
error_msg = f"Permission denied writing to file: {file_path}" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
result = f"βοΈ Successfully wrote {len(content)} characters to '{file_path}'" | |
logger.info(f"Successfully wrote to file '{file_path}', {len(content)} characters") | |
return result | |
except Exception as e: | |
error_msg = f"Error writing to file '{file_path}': {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
@user_confirm | |
def change_directory(self, path): | |
"""Change the current working directory.""" | |
try: | |
import os | |
logger.info(f"Changing directory from '{self.working_dir}' to '{path}'") | |
print(f"π Changing directory to: {path}") | |
# Handle relative paths | |
if not os.path.isabs(path): | |
new_path = os.path.join(self.working_dir, path) | |
else: | |
new_path = path | |
# Resolve the path (handle . and ..) | |
new_path = os.path.abspath(new_path) | |
if not os.path.exists(new_path): | |
error_msg = f"Directory does not exist: {new_path}" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
if not os.path.isdir(new_path): | |
error_msg = f"Path is not a directory: {new_path}" | |
logger.error(error_msg) | |
return f"β {error_msg}" | |
# Add current directory to history before changing | |
if new_path != self.working_dir: | |
self.directory_history.append(self.working_dir) | |
# Keep only last 10 entries in history | |
if len(self.directory_history) > 10: | |
self.directory_history = self.directory_history[-10:] | |
# Update working directory | |
old_dir = self.working_dir | |
self.working_dir = new_path | |
result = f"π Changed directory from '{old_dir}' to '{self.working_dir}'" | |
logger.info(f"Successfully changed directory to '{self.working_dir}'") | |
return result | |
except Exception as e: | |
error_msg = f"Error changing directory to '{path}': {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
@user_confirm | |
def get_working_directory(self): | |
"""Get the current working directory.""" | |
try: | |
logger.info(f"Getting working directory: {self.working_dir}") | |
print(f"π Current working directory: {self.working_dir}") | |
# Also show directory history | |
history_info = "" | |
if len(self.directory_history) > 1: | |
history_info = f"\nπ Directory history (last {min(5, len(self.directory_history)-1)}):\n" | |
for i, path in enumerate(self.directory_history[-6:-1], 1): | |
history_info += f"{i}. {path}\n" | |
result = f"π Current working directory: {self.working_dir}{history_info}" | |
return result | |
except Exception as e: | |
error_msg = f"Error getting working directory: {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
@user_confirm | |
def go_back(self): | |
"""Go back to the previous directory in history.""" | |
try: | |
if len(self.directory_history) < 2: | |
error_msg = "No previous directory in history" | |
logger.warning(error_msg) | |
return f"β {error_msg}" | |
# Get the previous directory | |
previous_dir = self.directory_history.pop() | |
old_dir = self.working_dir | |
self.working_dir = previous_dir | |
logger.info(f"Went back from '{old_dir}' to '{self.working_dir}'") | |
print(f"π Went back to: {self.working_dir}") | |
result = f"π Went back from '{old_dir}' to '{self.working_dir}'" | |
return result | |
except Exception as e: | |
error_msg = f"Error going back in directory history: {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
def forward(self, question, history=None): | |
"""Process file system-related questions using ReAct with conversation history.""" | |
try: | |
print("πΎ FileSystemAgent processing question...") | |
# Use provided history or create empty one | |
if history is None: | |
history = dspy.History(messages=[]) | |
# Add file system context to the question | |
enhanced_question = f"""Current working directory: {self.working_dir} | |
Available tools: | |
- search_files: Search for files using fd command | |
- list_directory: List files and directories in current or specified path | |
- read_file: Read the contents of a file | |
- write_file: Write content to a file | |
- change_directory: Change the current working directory | |
- get_working_directory: Get the current working directory | |
- go_back: Go back to the previous directory in history | |
Question: {question} | |
""" | |
result = self.react(question=enhanced_question, history=history) | |
# Show the reasoning if available | |
if hasattr(result, 'rationale'): | |
print(f"π€ FileSystemAgent reasoning: {result.rationale}") | |
if self.debug_mode: | |
print("π FileSystemAgent debug info:") | |
print(f" - Question: {question}") | |
print(f" - Working directory: {self.working_dir}") | |
print(f" - Directory history: {self.directory_history}") | |
print(f" - History length: {len(history.messages) if history else 0}") | |
print(f" - Result type: {type(result)}") | |
if hasattr(result, '__dict__'): | |
print(f" - Result attributes: {list(result.__dict__.keys())}") | |
return result | |
except Exception as e: | |
error_msg = f"FileSystemAgent failed to process question: {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
return f"β {error_msg}" | |
#endregion | |
class REPLAgent: | |
"""A REPL interface for the DSPy agent with conversation history.""" | |
def __init__(self, max_history=5): | |
self.debug_mode = False | |
self.max_history = max_history | |
self.agent = RouterAgent(debug_mode=self.debug_mode, max_history=max_history) | |
self.session = PromptSession( | |
history=FileHistory('.agent_history'), | |
auto_suggest=AutoSuggestFromHistory(), | |
completer=WordCompleter([ | |
'help', 'quit', 'exit', 'clear', 'history', 'debug', 'debug_on', 'debug_off', | |
'auto_confirm', 'auto_confirm_on', 'auto_confirm_off', | |
'search files', 'list files', 'read file', 'write file', | |
'change directory', 'get working directory', 'go back', | |
'clear_history', 'show_history', 'max_history', 'new' | |
]), | |
style=Style.from_dict({ | |
'prompt': 'ansicyan bold', | |
'input': 'ansiwhite', | |
}) | |
) | |
def get_prompt(self): | |
"""Get the prompt for the REPL.""" | |
debug_indicator = " [DEBUG]" if self.debug_mode else "" | |
auto_confirm_indicator = " β " if AUTO_CONFIRM else "" | |
history_indicator = f" [H:{len(self.agent.history.messages)}]" if self.agent.history.messages else "" | |
return f"π€ DSPy Agent{debug_indicator}{auto_confirm_indicator}{history_indicator} > " | |
def _update_agent_debug_mode(self): | |
"""Update the agent's debug mode and logging configuration.""" | |
global logger | |
# Update logging configuration | |
logger = setup_logging(debug_mode=self.debug_mode) | |
# Update agent with current history | |
current_history = self.agent.history.messages if hasattr(self.agent, 'history') else [] | |
self.agent = RouterAgent(debug_mode=self.debug_mode, max_history=self.max_history) | |
# Restore history if it exists | |
if current_history: | |
self.agent.history.messages = current_history | |
def _show_conversation_history(self): | |
"""Display the current conversation history.""" | |
if not self.agent.history.messages: | |
print("π No conversation history yet.") | |
return | |
print(f"π Conversation history (last {len(self.agent.history.messages)} turns):") | |
for i, message in enumerate(self.agent.history.messages, 1): | |
question = message.get("question", "") | |
answer = message.get("answer", "") | |
print(f"\n{i}. User: {question}") | |
print(f" Assistant: {answer}") | |
def handle_command(self, command): | |
"""Handle special commands.""" | |
global AUTO_CONFIRM | |
command = command.strip().lower() | |
if command in ['quit', 'exit']: | |
print("π Goodbye!") | |
return False | |
elif command == 'help': | |
print(""" | |
π€ DSPy Agent REPL Commands: | |
- Type any question to get an AI response | |
- Weather questions (e.g., "What's the weather in 90210?") | |
- Web search questions (e.g., "Search for Python tutorials") | |
- File system questions (e.g., "Search for Python files", "List files in current directory", "Read file.txt", "Change to directory /path", "Go back to previous directory") | |
- General questions will be routed to the web agent | |
- 'help' - Show this help message | |
- 'quit' or 'exit' - Exit the REPL | |
- 'clear' - Clear the screen | |
- 'history' - Show command history | |
- 'show_history' - Show conversation history (previous Q&A pairs) | |
- 'clear_history' - Clear conversation history | |
- 'max_history <number>' - Set maximum conversation history length | |
- 'new' - Start fresh conversation (clear history and reset working directory) | |
- 'debug_on' - Enable debug mode (shows detailed reasoning and verbose logs) | |
- 'debug_off' - Disable debug mode (suppresses verbose logs) | |
- 'debug' - Toggle debug mode | |
- 'auto_confirm_on' - Enable auto-confirm (tools execute automatically) | |
- 'auto_confirm_off' - Disable auto-confirm (tools prompt for confirmation) | |
- 'auto_confirm' - Toggle auto-confirm mode | |
""") | |
return True | |
elif command == 'clear': | |
os.system('clear') | |
return True | |
elif command == 'history': | |
print("π Command history is saved in .agent_history") | |
return True | |
elif command == 'show_history': | |
self._show_conversation_history() | |
return True | |
elif command == 'clear_history': | |
self.agent.history.messages.clear() | |
print("ποΈ Conversation history cleared") | |
return True | |
elif command == 'new': | |
# Clear conversation history | |
self.agent.history.messages.clear() | |
# Reset FileSystemAgent working directory to initial state | |
import os | |
initial_dir = os.getcwd() | |
self.agent.filesystem_agent.working_dir = initial_dir | |
self.agent.filesystem_agent.directory_history = [initial_dir] | |
print("π Started fresh conversation") | |
print(f"π Reset working directory to: {initial_dir}") | |
return True | |
elif command == 'debug_on': | |
self.debug_mode = True | |
self._update_agent_debug_mode() | |
print("π Debug mode enabled - showing detailed reasoning and verbose logs") | |
return True | |
elif command == 'debug_off': | |
self.debug_mode = False | |
self._update_agent_debug_mode() | |
print("π Debug mode disabled - suppressing verbose logs") | |
return True | |
elif command == 'debug': | |
self.debug_mode = not self.debug_mode | |
self._update_agent_debug_mode() | |
status = "enabled" if self.debug_mode else "disabled" | |
print(f"π Debug mode {status}") | |
return True | |
elif command == 'auto_confirm_on': | |
AUTO_CONFIRM = True | |
print("β Auto-confirm enabled - tools will execute automatically") | |
return True | |
elif command == 'auto_confirm_off': | |
AUTO_CONFIRM = False | |
print("β Auto-confirm disabled - tools will prompt for confirmation") | |
return True | |
elif command == 'auto_confirm': | |
AUTO_CONFIRM = not AUTO_CONFIRM | |
status = "enabled" if AUTO_CONFIRM else "disabled" | |
print(f"β Auto-confirm {status}") | |
return True | |
elif command.startswith('max_history'): | |
try: | |
# Extract number from command like "max_history 10" | |
parts = command.split() | |
if len(parts) == 2: | |
new_max = int(parts[1]) | |
if new_max >= 0: | |
self.max_history = new_max | |
self.agent.max_history = new_max | |
print(f"π Maximum conversation history set to {new_max} turns") | |
else: | |
print("β Maximum history must be 0 or greater") | |
else: | |
print(f"π Current maximum conversation history: {self.max_history} turns") | |
print("π‘ Use 'max_history <number>' to change it") | |
except ValueError: | |
print("β Invalid number. Use 'max_history <number>'") | |
return True | |
elif not command: | |
return True | |
else: | |
return None # Not a special command, process as question | |
def process_question(self, question): | |
"""Process a question using the DSPy agent with conversation history.""" | |
try: | |
print("π€ Thinking...") | |
logger.info(f"Processing question: {question}") | |
if self.debug_mode: | |
print("π Debug mode: Showing detailed reasoning and tool calls") | |
# Add current time to the question | |
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
enhanced_question = f"Current time: {current_time}\n\n{question}" | |
result = self.agent(question=enhanced_question) | |
# Extract the response text | |
response_text = "" | |
if hasattr(result, 'answer'): | |
response_text = str(result.answer) | |
print(f"π‘ Answer: {response_text}") | |
if hasattr(result, 'rationale') and result.rationale: | |
print(f"π€ Reasoning: {result.rationale}") | |
elif hasattr(result, 'rationale') and hasattr(result, 'answer'): | |
response_text = str(result.answer) | |
print(f"π‘ Answer: {response_text}") | |
if result.rationale: | |
print(f"π€ Reasoning: {result.rationale}") | |
else: | |
# For ReAct responses, the answer might be in a different format | |
response_text = str(result) | |
print(f"π‘ Answer: {response_text}") | |
# In debug mode, show additional information | |
if self.debug_mode: | |
print("π Debug info:") | |
print(f" - Result type: {type(result)}") | |
print(f" - Result attributes: {dir(result)}") | |
if hasattr(result, '__dict__'): | |
print(f" - Result dict: {result.__dict__}") | |
print(f" - Conversation history length: {len(self.agent.history.messages)}") | |
logger.info("Question processed successfully") | |
except Exception as e: | |
error_msg = f"Failed to process question: {str(e)}" | |
logger.error(f"{error_msg}\n{traceback.format_exc()}") | |
print(f"β {error_msg}") | |
print("π Check agent.log for detailed error information.") | |
if self.debug_mode: | |
print("π Debug: Full traceback:") | |
traceback.print_exc() | |
def run(self): | |
"""Run the REPL.""" | |
print(""" | |
π€ Welcome to DSPy Agent REPL! | |
Type your questions and I'll help you out. | |
Type 'help' for available commands, 'quit' to exit. | |
""") | |
while True: | |
try: | |
user_input = self.session.prompt(self.get_prompt()) | |
# Handle special commands | |
command_result = self.handle_command(user_input) | |
if command_result is False: | |
break | |
elif command_result is True: | |
continue | |
elif command_result is None: | |
# Process as a regular question | |
self.process_question(user_input) | |
except KeyboardInterrupt: | |
print("\nπ Goodbye!") | |
break | |
except EOFError: | |
print("\nπ Goodbye!") | |
break | |
def main(): | |
"""Main function to run the REPL agent.""" | |
# Create and run the REPL agent | |
repl_agent = REPLAgent() | |
repl_agent.run() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment