Skip to content

Instantly share code, notes, and snippets.

@paul-english
Created July 22, 2025 17:38
Show Gist options
  • Save paul-english/7a4c64c54521cafb05a0fa709f98dbfb to your computer and use it in GitHub Desktop.
Save paul-english/7a4c64c54521cafb05a0fa709f98dbfb to your computer and use it in GitHub Desktop.
import os
import traceback
import logging
import subprocess
import fnmatch
from datetime import datetime
from functools import wraps
import dspy
from prompt_toolkit import PromptSession
from prompt_toolkit.history import FileHistory
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.styles import Style
import requests
from dotenv import load_dotenv
load_dotenv()
# Global debug mode state
DEBUG_MODE = False
def setup_logging(debug_mode=False):
"""Set up logging with appropriate levels based on debug mode."""
global DEBUG_MODE
DEBUG_MODE = debug_mode
# Clear any existing handlers
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Set up custom formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# File handler (always logs everything)
file_handler = logging.FileHandler('agent.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
# Console handler with filtering
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
# Add handlers to root logger
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
root_logger.setLevel(logging.DEBUG)
# Configure specific loggers based on debug mode
if debug_mode:
# In debug mode, show all logs
logging.getLogger('LiteLLM').setLevel(logging.INFO)
logging.getLogger('urllib3').setLevel(logging.INFO)
logging.getLogger('requests').setLevel(logging.INFO)
logging.getLogger('httpx').setLevel(logging.INFO)
else:
# In normal mode, suppress verbose logs
logging.getLogger('LiteLLM').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('httpx').setLevel(logging.WARNING)
# Always show our own logs
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
return logger
# Initialize logging
logger = setup_logging(debug_mode=False)
USER_AGENT = "Paul's LLM agent <[email protected]>"
KAGI_API_KEY = os.getenv("KAGI_API_KEY")
# Global auto confirm state
AUTO_CONFIRM = False
def user_confirm(func):
"""Decorator that prompts user for y/n confirmation before executing a tool."""
@wraps(func)
def wrapper(*args, **kwargs):
# Get the function name and arguments for display
func_name = func.__name__
arg_str = ", ".join([str(arg) for arg in args[1:]]) # Skip self
if kwargs:
kwarg_str = ", ".join([f"{k}={v}" for k, v in kwargs.items()])
arg_str = f"{arg_str}, {kwarg_str}" if arg_str else kwarg_str
# Check if auto confirm is enabled
if AUTO_CONFIRM:
print(f"\nβœ… Auto-confirming tool execution:")
print(f" Function: {func_name}")
print(f" Arguments: {arg_str}")
print(" βœ… Auto-confirmed (auto_confirm enabled)")
try:
result = func(*args, **kwargs)
logger.info(f"Tool {func_name} executed successfully with args: {arg_str}")
return result
except Exception as e:
error_msg = f"Tool {func_name} failed with error: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
# Prompt user for confirmation
print(f"\nπŸ€” Confirm tool execution:")
print(f" Function: {func_name}")
print(f" Arguments: {arg_str}")
print(" (y/n/other text): ", end="", flush=True)
try:
user_input = input().strip().lower()
if user_input in ['y', 'yes']:
print("βœ… Executing tool...")
try:
result = func(*args, **kwargs)
logger.info(f"Tool {func_name} executed successfully with args: {arg_str}")
return result
except Exception as e:
error_msg = f"Tool {func_name} failed with error: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
elif user_input in ['n', 'no']:
print("❌ Tool execution cancelled by user.")
return f"Tool execution cancelled by user for {func_name}({arg_str})"
else:
print(f"❌ Tool execution cancelled. User input: '{user_input}'")
return f"Tool execution cancelled. User provided: '{user_input}' for {func_name}({arg_str})"
except (EOFError, KeyboardInterrupt):
print("\n❌ Tool execution cancelled due to interrupt.")
return f"Tool execution cancelled due to user interrupt for {func_name}({arg_str})"
return wrapper
lm = dspy.LM(
#'ollama_chat/llama3.1:70b',
'ollama_chat/gemma3:27b',
api_base='http://localhost:11434',
api_key=''
)
dspy.settings.configure(lm=lm)
#region agents
class RouterAgent(dspy.Module):
"""A router DSPy agent that can handle various tasks with conversation history."""
def __init__(self, debug_mode=False, max_history=5):
super().__init__()
self.debug_mode = debug_mode
self.max_history = max_history
# Initialize sub-agents
self.weather_agent = WeatherAgent(debug_mode=debug_mode)
self.web_agent = WebAgent(debug_mode=debug_mode)
# Initialize FileSystemAgent with current working directory
import os
self.filesystem_agent = FileSystemAgent(debug_mode=debug_mode, initial_working_dir=os.getcwd())
# Router with history using proper DSPy signature
class RouterSignature(dspy.Signature):
question: str = dspy.InputField()
history: dspy.History = dspy.InputField()
agent_choice: str = dspy.OutputField()
self.router = dspy.ChainOfThought(RouterSignature)
# History management using proper DSPy History
self.history = dspy.History(messages=[])
self.max_history = max_history
def _get_context_from_history(self):
"""Get conversation context from history as a formatted string."""
if not self.history.messages:
return ""
context_lines = ["πŸ“š Previous conversation:"]
for i, message in enumerate(self.history.messages, 1):
question = message.get("question", "")
answer = message.get("answer", "")
# Truncate long responses for readability
truncated_answer = str(answer)[:200] + "..." if len(str(answer)) > 200 else str(answer)
context_lines.append(f"{i}. User: {question}")
context_lines.append(f" Assistant: {truncated_answer}")
context_lines.append("") # Empty line for readability
return "\n".join(context_lines)
def _add_to_history(self, question, result):
"""Add a conversation turn to the history using DSPy conventions."""
# Create new message following DSPy History pattern
new_message = {"question": question, **result}
# Add to history
self.history.messages.append(new_message)
# Keep only the last max_history turns
if len(self.history.messages) > self.max_history:
self.history.messages = self.history.messages[-self.max_history:]
def forward(self, question):
"""Route the question to the appropriate agent with conversation history."""
try:
# First, determine which agent to use
print("🧠 Analyzing question to determine best agent...")
routing_result = self.router(question=question, history=self.history)
# Show the reasoning
if hasattr(routing_result, 'rationale'):
print(f"πŸ€” Routing reasoning: {routing_result.rationale}")
# Extract the agent choice from the reasoning
agent_choice = routing_result.agent_choice.lower()
print(f"🎯 Selected agent: {agent_choice}")
if self.debug_mode:
print(f"πŸ” Debug: Full routing result: {routing_result}")
print(f"πŸ” Debug: Conversation context length: {len(context) if context else 0}")
# Route to appropriate agent based on keywords
if any(keyword in question.lower() for keyword in ['weather', 'temperature', 'forecast', 'zip', 'zipcode', 'zip code']):
print("🌀️ Routing to WeatherAgent...")
result = self.weather_agent(question=question, history=self.history)
elif any(keyword in question.lower() for keyword in ['file', 'files', 'directory', 'folder', 'read', 'write', 'search files', 'list files', 'fd', 'filesystem', 'fs']):
print("πŸ’Ύ Routing to FileSystemAgent...")
result = self.filesystem_agent(question=question, history=self.history)
elif any(keyword in question.lower() for keyword in ['search', 'web', 'internet', 'website', 'url', 'find', 'lookup', 'information']):
print("🌐 Routing to WebAgent...")
result = self.web_agent(question=question, history=self.history)
else:
# Default to web agent for general questions
print("🌐 Routing to WebAgent (default)...")
result = self.web_agent(question=question, history=self.history)
# Add to history
self._add_to_history(question, result)
return result
except Exception as e:
error_msg = f"RouterAgent failed to process question: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
class WeatherAgent(dspy.Module):
"""A weather agent that uses ReAct with weather-related tools and conversation history."""
def __init__(self, debug_mode=False):
super().__init__()
self.debug_mode = debug_mode
# Weather agent signature with history
class WeatherSignature(dspy.Signature):
question: str = dspy.InputField()
history: dspy.History = dspy.InputField()
answer: str = dspy.OutputField()
self.react = dspy.ReAct(WeatherSignature, tools=[
dspy.Tool(name="get_weather", func=self.get_weather, desc="Get current weather for a location"),
dspy.Tool(name="convert_to_fahrenheit", func=self.convert_to_fahrenheit, desc="Convert Celsius to Fahrenheit")
])
@user_confirm
def get_weather(self, zip_code):
"""Get current weather for a zip code."""
try:
logger.info(f"Getting weather for ZIP code: {zip_code}")
# Step 1: Convert ZIP code to coordinates
geocode_url = f"https://nominatim.openstreetmap.org/search?postalcode={zip_code}&country=USA&format=json"
geocode_response = requests.get(geocode_url, headers={"User-Agent": USER_AGENT})
geocode_data = geocode_response.json()
if not geocode_data:
error_msg = f"Could not find coordinates for ZIP code {zip_code}"
logger.warning(error_msg)
return error_msg
latitude = geocode_data[0]['lat']
longitude = geocode_data[0]['lon']
logger.info(f"Found coordinates: lat={latitude}, lon={longitude}")
# Step 2: Fetch weather data from Open-Meteo
weather_url = (
f"https://api.open-meteo.com/v1/forecast"
f"?latitude={latitude}&longitude={longitude}&current=temperature_2m,wind_speed_10m"
)
weather_response = requests.get(weather_url)
weather_data = weather_response.json()
logger.info(f"Weather data retrieved: {weather_data}")
return weather_data.get("current", "No current weather data available")
except Exception as e:
error_msg = f"Error getting weather for ZIP {zip_code}: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
def convert_to_fahrenheit(self, celsius):
"""Convert Celsius to Fahrenheit."""
try:
celsius_float = float(celsius)
fahrenheit = (celsius_float * 9/5) + 32
logger.info(f"Converted {celsius_float}Β°C to {fahrenheit}Β°F")
return fahrenheit
except Exception as e:
error_msg = f"Error converting {celsius} to Fahrenheit: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
def forward(self, question, history=None):
"""Process weather-related questions using ReAct with conversation history."""
try:
print("🌀️ WeatherAgent processing question...")
# Use provided history or create empty one
if history is None:
history = dspy.History(messages=[])
result = self.react(question=question, history=history)
# Show the reasoning if available
if hasattr(result, 'rationale'):
print(f"πŸ€” WeatherAgent reasoning: {result.rationale}")
if self.debug_mode:
print("πŸ” WeatherAgent debug info:")
print(f" - Question: {question}")
print(f" - History length: {len(history.messages) if history else 0}")
print(f" - Result type: {type(result)}")
if hasattr(result, '__dict__'):
print(f" - Result attributes: {list(result.__dict__.keys())}")
return result
except Exception as e:
error_msg = f"WeatherAgent failed to process question: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
class WebAgent(dspy.Module):
"""A web agent that uses ReAct with web-related tools and conversation history."""
def __init__(self, debug_mode=False):
super().__init__()
self.debug_mode = debug_mode
# Web agent signature with history
class WebSignature(dspy.Signature):
question: str = dspy.InputField()
history: dspy.History = dspy.InputField()
answer: str = dspy.OutputField()
self.react = dspy.ReAct(WebSignature, tools=[
dspy.Tool(name="search_web", func=self.search_web, desc="Search the web for information"),
dspy.Tool(name="get_webpage_content", func=self.get_webpage_content, desc="Get content from a webpage"),
])
@user_confirm
def search_web(self, query):
"""Search the web for information using Kagi API."""
try:
logger.info(f"Searching web for query: {query}")
print(f'πŸ” Searching web for: "{query}"')
if not KAGI_API_KEY:
error_msg = "KAGI_API_KEY not found in environment variables"
logger.error(error_msg)
return f"❌ Error: {error_msg}"
url = "https://kagi.com/api/v0/search"
headers = {
"Authorization": f"Bot {KAGI_API_KEY}",
"User-Agent": USER_AGENT
}
params = {"q": query}
response = requests.get(url, headers=headers, params=params)
logger.info(f"Search response status: {response.status_code}")
response.raise_for_status()
data = response.json()
# Extract search results
results = data.get("data", [])
if not results:
logger.warning(f"No search results found for query: {query}")
return f"πŸ” No search results found for '{query}'"
# Format the results
formatted_results = []
for i, result in enumerate(results[:5], 1): # Limit to top 5 results
title = result.get("title", "No title")
snippet = result.get("snippet", "No snippet")
url = result.get("url", "")
formatted_results.append(f"{i}. {title}\n {snippet}\n URL: {url}\n")
logger.info(f"Found {len(results)} search results for query: {query}")
return f"πŸ” Search results for '{query}':\n" + "\n".join(formatted_results)
except requests.exceptions.RequestException as e:
error_msg = f"Error searching web: {e}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
except Exception as e:
error_msg = f"Unexpected error searching web: {e}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
@user_confirm
def get_webpage_content(self, url):
"""Get content from a webpage using requests."""
try:
logger.info(f"Fetching webpage content from: {url}")
print(f"πŸ“„ Fetching content from: {url}")
headers = {
"User-Agent": USER_AGENT
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# Get the content
content = response.text
# Extract title if available
title = ""
if "<title>" in content.lower():
import re
title_match = re.search(r'<title[^>]*>(.*?)</title>', content, re.IGNORECASE)
if title_match:
title = title_match.group(1).strip()
# Clean up the content - remove HTML tags and extra whitespace
import re
# Remove HTML tags
clean_content = re.sub(r'<[^>]+>', '', content)
# Remove extra whitespace and newlines
clean_content = re.sub(r'\s+', ' ', clean_content)
clean_content = clean_content.strip()
# Limit content length for readability
if len(clean_content) > 1000:
clean_content = clean_content[:1000] + "..."
logger.info(f"Successfully fetched content from {url}, length: {len(clean_content)} chars")
result = f"πŸ“„ Content from {url}"
if title:
result += f"\nTitle: {title}"
result += f"\n\n{clean_content}"
return result
except requests.exceptions.RequestException as e:
error_msg = f"Error fetching webpage {url}: {e}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
except Exception as e:
error_msg = f"Unexpected error fetching {url}: {e}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
def forward(self, question, history=None):
"""Process web-related questions using ReAct with conversation history."""
try:
print("🌐 WebAgent processing question...")
# Use provided history or create empty one
if history is None:
history = dspy.History(messages=[])
result = self.react(question=question, history=history)
# Show the reasoning if available
if hasattr(result, 'rationale'):
print(f"πŸ€” WebAgent reasoning: {result.rationale}")
if self.debug_mode:
print("πŸ” WebAgent debug info:")
print(f" - Question: {question}")
print(f" - History length: {len(history.messages) if history else 0}")
print(f" - Result type: {type(result)}")
if hasattr(result, '__dict__'):
print(f" - Result attributes: {list(result.__dict__.keys())}")
return result
except Exception as e:
error_msg = f"WebAgent failed to process question: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
class FileSystemAgent(dspy.Module):
"""A file system agent that uses ReAct with file system tools and maintains working directory context."""
def __init__(self, debug_mode=False, ignore_paths=None, initial_working_dir=None):
super().__init__()
self.debug_mode = debug_mode
self.ignore_paths = ignore_paths or [
'.git', '.svn', '.hg', '.bzr', # Version control
'node_modules', '__pycache__', '.pytest_cache', # Dependencies and cache
'.venv', 'venv', 'env', '.env', # Virtual environments
'.DS_Store', 'Thumbs.db', # OS files
'.vscode', '.idea', # IDE files
'*.log', '*.tmp', '*.temp', # Log and temp files
'/proc', '/sys', '/dev', # System directories
'/tmp', '/var/tmp', '/var/cache' # System temp directories
]
# Initialize working directory context
import os
self.working_dir = os.path.abspath(initial_working_dir or os.getcwd())
self.directory_history = [self.working_dir] # Keep track of directory navigation history
# FileSystem agent signature with history
class FileSystemSignature(dspy.Signature):
question: str = dspy.InputField()
history: dspy.History = dspy.InputField()
answer: str = dspy.OutputField()
self.react = dspy.ReAct(FileSystemSignature, tools=[
dspy.Tool(name="search_files", func=self.search_files, desc="Search for files using fd command"),
dspy.Tool(name="list_directory", func=self.list_directory, desc="List files and directories in a path"),
dspy.Tool(name="read_file", func=self.read_file, desc="Read the contents of a file"),
dspy.Tool(name="write_file", func=self.write_file, desc="Write content to a file"),
dspy.Tool(name="change_directory", func=self.change_directory, desc="Change the current working directory"),
dspy.Tool(name="get_working_directory", func=self.get_working_directory, desc="Get the current working directory"),
dspy.Tool(name="go_back", func=self.go_back, desc="Go back to the previous directory in history"),
])
def _build_fd_ignore_args(self):
"""Build ignore arguments for fd command."""
ignore_args = []
for path in self.ignore_paths:
if path.startswith('*'):
# Pattern like "*.log" becomes --glob "*.log"
ignore_args.extend(['--glob', path])
else:
# Directory or file path becomes --exclude path
ignore_args.extend(['--exclude', path])
return ignore_args
@user_confirm
def search_files(self, query, directory=None):
"""Search for files using fd command."""
try:
# Use working directory if no directory specified
if directory is None:
directory = self.working_dir
elif directory == ".":
directory = self.working_dir
elif not os.path.isabs(directory):
# Make relative paths absolute relative to working directory
directory = os.path.join(self.working_dir, directory)
logger.info(f"Searching files for query: '{query}' in directory: '{directory}'")
print(f"πŸ” Searching files for: '{query}' in '{directory}'")
# Check if fd is available
import subprocess
try:
subprocess.run(['fd', '--version'], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
error_msg = "fd command not found. Please install fd (fdfind) first."
logger.error(error_msg)
return f"❌ {error_msg}"
# Build fd command
cmd = ['fd', query, directory, '--type', 'f', '--color', 'never']
cmd.extend(self._build_fd_ignore_args())
if self.debug_mode:
print(f"πŸ” Debug: Running command: {' '.join(cmd)}")
# Execute fd command
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0 and result.stderr:
logger.warning(f"fd command stderr: {result.stderr}")
files = result.stdout.strip().split('\n') if result.stdout.strip() else []
if not files:
logger.info(f"No files found matching '{query}' in '{directory}'")
return f"πŸ” No files found matching '{query}' in '{directory}'"
# Limit results for readability
if len(files) > 20:
files = files[:20]
result_msg = f"πŸ” Found {len(files)} files (showing first 20):\n"
else:
result_msg = f"πŸ” Found {len(files)} files:\n"
for i, file_path in enumerate(files, 1):
result_msg += f"{i:2d}. {file_path}\n"
logger.info(f"Found {len(files)} files matching '{query}' in '{directory}'")
return result_msg
except subprocess.TimeoutExpired:
error_msg = f"Search timed out for query: '{query}'"
logger.error(error_msg)
return f"❌ {error_msg}"
except Exception as e:
error_msg = f"Error searching files for '{query}': {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
@user_confirm
def list_directory(self, path=None):
"""List files and directories in a path."""
try:
import os
# Use working directory if no path specified
if path is None:
path = self.working_dir
elif path == ".":
path = self.working_dir
elif not os.path.isabs(path):
# Make relative paths absolute relative to working directory
path = os.path.join(self.working_dir, path)
logger.info(f"Listing directory: {path}")
print(f"πŸ“ Listing directory: {path}")
if not os.path.exists(path):
error_msg = f"Path does not exist: {path}"
logger.error(error_msg)
return f"❌ {error_msg}"
if not os.path.isdir(path):
error_msg = f"Path is not a directory: {path}"
logger.error(error_msg)
return f"❌ {error_msg}"
# Get directory contents
try:
items = os.listdir(path)
except PermissionError:
error_msg = f"Permission denied accessing directory: {path}"
logger.error(error_msg)
return f"❌ {error_msg}"
# Filter out ignored items
filtered_items = []
for item in items:
item_path = os.path.join(path, item)
should_ignore = False
for ignore_pattern in self.ignore_paths:
if ignore_pattern.startswith('*'):
# Handle glob patterns
import fnmatch
if fnmatch.fnmatch(item, ignore_pattern):
should_ignore = True
break
elif item == ignore_pattern or item_path.endswith(ignore_pattern):
should_ignore = True
break
if not should_ignore:
filtered_items.append(item)
if not filtered_items:
return f"πŸ“ Directory '{path}' is empty or contains only ignored items"
# Sort items (directories first, then files)
directories = []
files = []
for item in filtered_items:
item_path = os.path.join(path, item)
if os.path.isdir(item_path):
directories.append(item)
else:
files.append(item)
directories.sort()
files.sort()
result = f"πŸ“ Contents of '{path}':\n"
if directories:
result += "\nπŸ“ Directories:\n"
for i, directory in enumerate(directories, 1):
result += f"{i:2d}. {directory}/\n"
if files:
result += "\nπŸ“„ Files:\n"
for i, file in enumerate(files, 1):
file_path = os.path.join(path, file)
try:
size = os.path.getsize(file_path)
size_str = f"{size:,} bytes" if size < 1024 else f"{size/1024:.1f} KB"
result += f"{i:2d}. {file} ({size_str})\n"
except OSError:
result += f"{i:2d}. {file} (size unknown)\n"
logger.info(f"Listed {len(directories)} directories and {len(files)} files in '{path}'")
return result
except Exception as e:
error_msg = f"Error listing directory '{path}': {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
@user_confirm
def read_file(self, file_path):
"""Read the contents of a file."""
try:
import os
# Make relative paths absolute relative to working directory
if not os.path.isabs(file_path):
file_path = os.path.join(self.working_dir, file_path)
logger.info(f"Reading file: {file_path}")
print(f"πŸ“– Reading file: {file_path}")
if not os.path.exists(file_path):
error_msg = f"File does not exist: {file_path}"
logger.error(error_msg)
return f"❌ {error_msg}"
if not os.path.isfile(file_path):
error_msg = f"Path is not a file: {file_path}"
logger.error(error_msg)
return f"❌ {error_msg}"
# Check file size to avoid reading huge files
file_size = os.path.getsize(file_path)
if file_size > 1024 * 1024: # 1MB limit
error_msg = f"File too large ({file_size:,} bytes). Maximum size is 1MB."
logger.warning(error_msg)
return f"❌ {error_msg}"
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except UnicodeDecodeError:
# Try with different encoding
try:
with open(file_path, 'r', encoding='latin-1') as f:
content = f.read()
except Exception as e:
error_msg = f"Could not read file '{file_path}': {str(e)}"
logger.error(error_msg)
return f"❌ {error_msg}"
except PermissionError:
error_msg = f"Permission denied reading file: {file_path}"
logger.error(error_msg)
return f"❌ {error_msg}"
# Limit content length for readability
if len(content) > 5000:
content = content[:5000] + "\n\n... (content truncated)"
result = f"πŸ“– Content of '{file_path}' ({len(content)} characters):\n\n{content}"
logger.info(f"Successfully read file '{file_path}', {len(content)} characters")
return result
except Exception as e:
error_msg = f"Error reading file '{file_path}': {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
@user_confirm
def write_file(self, file_path, content):
"""Write content to a file."""
try:
import os
# Make relative paths absolute relative to working directory
if not os.path.isabs(file_path):
file_path = os.path.join(self.working_dir, file_path)
logger.info(f"Writing to file: {file_path}")
print(f"✍️ Writing to file: {file_path}")
# Check if directory exists
directory = os.path.dirname(file_path)
if directory and not os.path.exists(directory):
try:
os.makedirs(directory, exist_ok=True)
logger.info(f"Created directory: {directory}")
except Exception as e:
error_msg = f"Could not create directory '{directory}': {str(e)}"
logger.error(error_msg)
return f"❌ {error_msg}"
# Check if file exists and is writable
if os.path.exists(file_path):
if not os.path.isfile(file_path):
error_msg = f"Path exists but is not a file: {file_path}"
logger.error(error_msg)
return f"❌ {error_msg}"
if not os.access(file_path, os.W_OK):
error_msg = f"File is not writable: {file_path}"
logger.error(error_msg)
return f"❌ {error_msg}"
try:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
except PermissionError:
error_msg = f"Permission denied writing to file: {file_path}"
logger.error(error_msg)
return f"❌ {error_msg}"
result = f"✍️ Successfully wrote {len(content)} characters to '{file_path}'"
logger.info(f"Successfully wrote to file '{file_path}', {len(content)} characters")
return result
except Exception as e:
error_msg = f"Error writing to file '{file_path}': {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
@user_confirm
def change_directory(self, path):
"""Change the current working directory."""
try:
import os
logger.info(f"Changing directory from '{self.working_dir}' to '{path}'")
print(f"πŸ“ Changing directory to: {path}")
# Handle relative paths
if not os.path.isabs(path):
new_path = os.path.join(self.working_dir, path)
else:
new_path = path
# Resolve the path (handle . and ..)
new_path = os.path.abspath(new_path)
if not os.path.exists(new_path):
error_msg = f"Directory does not exist: {new_path}"
logger.error(error_msg)
return f"❌ {error_msg}"
if not os.path.isdir(new_path):
error_msg = f"Path is not a directory: {new_path}"
logger.error(error_msg)
return f"❌ {error_msg}"
# Add current directory to history before changing
if new_path != self.working_dir:
self.directory_history.append(self.working_dir)
# Keep only last 10 entries in history
if len(self.directory_history) > 10:
self.directory_history = self.directory_history[-10:]
# Update working directory
old_dir = self.working_dir
self.working_dir = new_path
result = f"πŸ“ Changed directory from '{old_dir}' to '{self.working_dir}'"
logger.info(f"Successfully changed directory to '{self.working_dir}'")
return result
except Exception as e:
error_msg = f"Error changing directory to '{path}': {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
@user_confirm
def get_working_directory(self):
"""Get the current working directory."""
try:
logger.info(f"Getting working directory: {self.working_dir}")
print(f"πŸ“ Current working directory: {self.working_dir}")
# Also show directory history
history_info = ""
if len(self.directory_history) > 1:
history_info = f"\nπŸ“š Directory history (last {min(5, len(self.directory_history)-1)}):\n"
for i, path in enumerate(self.directory_history[-6:-1], 1):
history_info += f"{i}. {path}\n"
result = f"πŸ“ Current working directory: {self.working_dir}{history_info}"
return result
except Exception as e:
error_msg = f"Error getting working directory: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
@user_confirm
def go_back(self):
"""Go back to the previous directory in history."""
try:
if len(self.directory_history) < 2:
error_msg = "No previous directory in history"
logger.warning(error_msg)
return f"❌ {error_msg}"
# Get the previous directory
previous_dir = self.directory_history.pop()
old_dir = self.working_dir
self.working_dir = previous_dir
logger.info(f"Went back from '{old_dir}' to '{self.working_dir}'")
print(f"πŸ“ Went back to: {self.working_dir}")
result = f"πŸ“ Went back from '{old_dir}' to '{self.working_dir}'"
return result
except Exception as e:
error_msg = f"Error going back in directory history: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
def forward(self, question, history=None):
"""Process file system-related questions using ReAct with conversation history."""
try:
print("πŸ’Ύ FileSystemAgent processing question...")
# Use provided history or create empty one
if history is None:
history = dspy.History(messages=[])
# Add file system context to the question
enhanced_question = f"""Current working directory: {self.working_dir}
Available tools:
- search_files: Search for files using fd command
- list_directory: List files and directories in current or specified path
- read_file: Read the contents of a file
- write_file: Write content to a file
- change_directory: Change the current working directory
- get_working_directory: Get the current working directory
- go_back: Go back to the previous directory in history
Question: {question}
"""
result = self.react(question=enhanced_question, history=history)
# Show the reasoning if available
if hasattr(result, 'rationale'):
print(f"πŸ€” FileSystemAgent reasoning: {result.rationale}")
if self.debug_mode:
print("πŸ” FileSystemAgent debug info:")
print(f" - Question: {question}")
print(f" - Working directory: {self.working_dir}")
print(f" - Directory history: {self.directory_history}")
print(f" - History length: {len(history.messages) if history else 0}")
print(f" - Result type: {type(result)}")
if hasattr(result, '__dict__'):
print(f" - Result attributes: {list(result.__dict__.keys())}")
return result
except Exception as e:
error_msg = f"FileSystemAgent failed to process question: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
#endregion
class REPLAgent:
"""A REPL interface for the DSPy agent with conversation history."""
def __init__(self, max_history=5):
self.debug_mode = False
self.max_history = max_history
self.agent = RouterAgent(debug_mode=self.debug_mode, max_history=max_history)
self.session = PromptSession(
history=FileHistory('.agent_history'),
auto_suggest=AutoSuggestFromHistory(),
completer=WordCompleter([
'help', 'quit', 'exit', 'clear', 'history', 'debug', 'debug_on', 'debug_off',
'auto_confirm', 'auto_confirm_on', 'auto_confirm_off',
'search files', 'list files', 'read file', 'write file',
'change directory', 'get working directory', 'go back',
'clear_history', 'show_history', 'max_history', 'new'
]),
style=Style.from_dict({
'prompt': 'ansicyan bold',
'input': 'ansiwhite',
})
)
def get_prompt(self):
"""Get the prompt for the REPL."""
debug_indicator = " [DEBUG]" if self.debug_mode else ""
auto_confirm_indicator = " βœ…" if AUTO_CONFIRM else ""
history_indicator = f" [H:{len(self.agent.history.messages)}]" if self.agent.history.messages else ""
return f"πŸ€– DSPy Agent{debug_indicator}{auto_confirm_indicator}{history_indicator} > "
def _update_agent_debug_mode(self):
"""Update the agent's debug mode and logging configuration."""
global logger
# Update logging configuration
logger = setup_logging(debug_mode=self.debug_mode)
# Update agent with current history
current_history = self.agent.history.messages if hasattr(self.agent, 'history') else []
self.agent = RouterAgent(debug_mode=self.debug_mode, max_history=self.max_history)
# Restore history if it exists
if current_history:
self.agent.history.messages = current_history
def _show_conversation_history(self):
"""Display the current conversation history."""
if not self.agent.history.messages:
print("πŸ“š No conversation history yet.")
return
print(f"πŸ“š Conversation history (last {len(self.agent.history.messages)} turns):")
for i, message in enumerate(self.agent.history.messages, 1):
question = message.get("question", "")
answer = message.get("answer", "")
print(f"\n{i}. User: {question}")
print(f" Assistant: {answer}")
def handle_command(self, command):
"""Handle special commands."""
global AUTO_CONFIRM
command = command.strip().lower()
if command in ['quit', 'exit']:
print("πŸ‘‹ Goodbye!")
return False
elif command == 'help':
print("""
πŸ€– DSPy Agent REPL Commands:
- Type any question to get an AI response
- Weather questions (e.g., "What's the weather in 90210?")
- Web search questions (e.g., "Search for Python tutorials")
- File system questions (e.g., "Search for Python files", "List files in current directory", "Read file.txt", "Change to directory /path", "Go back to previous directory")
- General questions will be routed to the web agent
- 'help' - Show this help message
- 'quit' or 'exit' - Exit the REPL
- 'clear' - Clear the screen
- 'history' - Show command history
- 'show_history' - Show conversation history (previous Q&A pairs)
- 'clear_history' - Clear conversation history
- 'max_history <number>' - Set maximum conversation history length
- 'new' - Start fresh conversation (clear history and reset working directory)
- 'debug_on' - Enable debug mode (shows detailed reasoning and verbose logs)
- 'debug_off' - Disable debug mode (suppresses verbose logs)
- 'debug' - Toggle debug mode
- 'auto_confirm_on' - Enable auto-confirm (tools execute automatically)
- 'auto_confirm_off' - Disable auto-confirm (tools prompt for confirmation)
- 'auto_confirm' - Toggle auto-confirm mode
""")
return True
elif command == 'clear':
os.system('clear')
return True
elif command == 'history':
print("πŸ“š Command history is saved in .agent_history")
return True
elif command == 'show_history':
self._show_conversation_history()
return True
elif command == 'clear_history':
self.agent.history.messages.clear()
print("πŸ—‘οΈ Conversation history cleared")
return True
elif command == 'new':
# Clear conversation history
self.agent.history.messages.clear()
# Reset FileSystemAgent working directory to initial state
import os
initial_dir = os.getcwd()
self.agent.filesystem_agent.working_dir = initial_dir
self.agent.filesystem_agent.directory_history = [initial_dir]
print("πŸ”„ Started fresh conversation")
print(f"πŸ“ Reset working directory to: {initial_dir}")
return True
elif command == 'debug_on':
self.debug_mode = True
self._update_agent_debug_mode()
print("πŸ” Debug mode enabled - showing detailed reasoning and verbose logs")
return True
elif command == 'debug_off':
self.debug_mode = False
self._update_agent_debug_mode()
print("πŸ” Debug mode disabled - suppressing verbose logs")
return True
elif command == 'debug':
self.debug_mode = not self.debug_mode
self._update_agent_debug_mode()
status = "enabled" if self.debug_mode else "disabled"
print(f"πŸ” Debug mode {status}")
return True
elif command == 'auto_confirm_on':
AUTO_CONFIRM = True
print("βœ… Auto-confirm enabled - tools will execute automatically")
return True
elif command == 'auto_confirm_off':
AUTO_CONFIRM = False
print("❌ Auto-confirm disabled - tools will prompt for confirmation")
return True
elif command == 'auto_confirm':
AUTO_CONFIRM = not AUTO_CONFIRM
status = "enabled" if AUTO_CONFIRM else "disabled"
print(f"βœ… Auto-confirm {status}")
return True
elif command.startswith('max_history'):
try:
# Extract number from command like "max_history 10"
parts = command.split()
if len(parts) == 2:
new_max = int(parts[1])
if new_max >= 0:
self.max_history = new_max
self.agent.max_history = new_max
print(f"πŸ“š Maximum conversation history set to {new_max} turns")
else:
print("❌ Maximum history must be 0 or greater")
else:
print(f"πŸ“š Current maximum conversation history: {self.max_history} turns")
print("πŸ’‘ Use 'max_history <number>' to change it")
except ValueError:
print("❌ Invalid number. Use 'max_history <number>'")
return True
elif not command:
return True
else:
return None # Not a special command, process as question
def process_question(self, question):
"""Process a question using the DSPy agent with conversation history."""
try:
print("πŸ€” Thinking...")
logger.info(f"Processing question: {question}")
if self.debug_mode:
print("πŸ” Debug mode: Showing detailed reasoning and tool calls")
# Add current time to the question
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
enhanced_question = f"Current time: {current_time}\n\n{question}"
result = self.agent(question=enhanced_question)
# Extract the response text
response_text = ""
if hasattr(result, 'answer'):
response_text = str(result.answer)
print(f"πŸ’‘ Answer: {response_text}")
if hasattr(result, 'rationale') and result.rationale:
print(f"πŸ€” Reasoning: {result.rationale}")
elif hasattr(result, 'rationale') and hasattr(result, 'answer'):
response_text = str(result.answer)
print(f"πŸ’‘ Answer: {response_text}")
if result.rationale:
print(f"πŸ€” Reasoning: {result.rationale}")
else:
# For ReAct responses, the answer might be in a different format
response_text = str(result)
print(f"πŸ’‘ Answer: {response_text}")
# In debug mode, show additional information
if self.debug_mode:
print("πŸ” Debug info:")
print(f" - Result type: {type(result)}")
print(f" - Result attributes: {dir(result)}")
if hasattr(result, '__dict__'):
print(f" - Result dict: {result.__dict__}")
print(f" - Conversation history length: {len(self.agent.history.messages)}")
logger.info("Question processed successfully")
except Exception as e:
error_msg = f"Failed to process question: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
print(f"❌ {error_msg}")
print("πŸ“ Check agent.log for detailed error information.")
if self.debug_mode:
print("πŸ” Debug: Full traceback:")
traceback.print_exc()
def run(self):
"""Run the REPL."""
print("""
πŸ€– Welcome to DSPy Agent REPL!
Type your questions and I'll help you out.
Type 'help' for available commands, 'quit' to exit.
""")
while True:
try:
user_input = self.session.prompt(self.get_prompt())
# Handle special commands
command_result = self.handle_command(user_input)
if command_result is False:
break
elif command_result is True:
continue
elif command_result is None:
# Process as a regular question
self.process_question(user_input)
except KeyboardInterrupt:
print("\nπŸ‘‹ Goodbye!")
break
except EOFError:
print("\nπŸ‘‹ Goodbye!")
break
def main():
"""Main function to run the REPL agent."""
# Create and run the REPL agent
repl_agent = REPLAgent()
repl_agent.run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment