Files
loki/examples/langchain-sisyphus/sisyphus_langchain/tools/filesystem.py

176 lines
5.3 KiB
Python

"""
Filesystem tools for Sisyphus agents.
These are the LangChain equivalents of Loki's global tools:
- fs_read.sh → read_file
- fs_grep.sh → search_content
- fs_glob.sh → search_files
- fs_ls.sh → list_directory
- fs_write.sh → write_file
- fs_patch.sh → (omitted — write_file covers full rewrites)
Loki Concept Mapping:
Loki tools are bash scripts with @cmd annotations that Loki's compiler
turns into function-calling declarations. In LangChain, we use the @tool
decorator which serves the same purpose: it generates the JSON schema
that the LLM sees, and wraps the Python function for execution.
"""
from __future__ import annotations
import fnmatch
import os
import re
import subprocess
from langchain_core.tools import tool
@tool
def read_file(path: str, offset: int = 1, limit: int = 200) -> str:
"""Read a file's contents with optional line range.
Args:
path: Path to the file (absolute or relative to cwd).
offset: 1-based line number to start from.
limit: Maximum number of lines to return.
"""
path = os.path.expanduser(path)
if not os.path.isfile(path):
return f"Error: file not found: {path}"
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
except Exception as e:
return f"Error reading {path}: {e}"
total = len(lines)
start = max(0, offset - 1)
end = min(total, start + limit)
selected = lines[start:end]
result = f"File: {path} (lines {start + 1}-{end} of {total})\n\n"
for i, line in enumerate(selected, start=start + 1):
result += f"{i}: {line}"
if end < total:
result += f"\n... truncated ({total} total lines)"
return result
@tool
def write_file(path: str, content: str) -> str:
"""Write complete contents to a file, creating parent directories as needed.
Args:
path: Path for the file.
content: Complete file contents to write.
"""
path = os.path.expanduser(path)
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
try:
with open(path, "w", encoding="utf-8") as f:
f.write(content)
return f"Wrote: {path}"
except Exception as e:
return f"Error writing {path}: {e}"
@tool
def search_content(pattern: str, directory: str = ".", file_type: str = "") -> str:
"""Search for a text/regex pattern in files under a directory.
Args:
pattern: Text or regex pattern to search for.
directory: Root directory to search in.
file_type: Optional file extension filter (e.g. "py", "rs").
"""
directory = os.path.expanduser(directory)
cmd = ["grep", "-rn"]
if file_type:
cmd += [f"--include=*.{file_type}"]
cmd += [pattern, directory]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
lines = result.stdout.strip().splitlines()
except Exception as e:
return f"Error: {e}"
# Filter noise
noise = {"/.git/", "/node_modules/", "/target/", "/dist/", "/__pycache__/"}
filtered = [l for l in lines if not any(n in l for n in noise)][:30]
if not filtered:
return "No matches found."
return "\n".join(filtered)
@tool
def search_files(pattern: str, directory: str = ".") -> str:
"""Find files matching a glob pattern.
Args:
pattern: Glob pattern (e.g. '*.py', 'config*', '*test*').
directory: Directory to search in.
"""
directory = os.path.expanduser(directory)
noise = {".git", "node_modules", "target", "dist", "__pycache__"}
matches: list[str] = []
for root, dirs, files in os.walk(directory):
dirs[:] = [d for d in dirs if d not in noise]
for name in files:
if fnmatch.fnmatch(name, pattern):
matches.append(os.path.join(root, name))
if len(matches) >= 25:
break
if len(matches) >= 25:
break
if not matches:
return "No files found."
return "\n".join(matches)
@tool
def list_directory(path: str = ".", max_depth: int = 3) -> str:
"""List directory tree structure.
Args:
path: Directory to list.
max_depth: Maximum depth to recurse.
"""
path = os.path.expanduser(path)
if not os.path.isdir(path):
return f"Error: not a directory: {path}"
noise = {".git", "node_modules", "target", "dist", "__pycache__", ".venv", "venv"}
lines: list[str] = []
def _walk(dir_path: str, prefix: str, depth: int) -> None:
if depth > max_depth:
return
try:
entries = sorted(os.listdir(dir_path))
except PermissionError:
return
dirs = [e for e in entries if os.path.isdir(os.path.join(dir_path, e)) and e not in noise]
files = [e for e in entries if os.path.isfile(os.path.join(dir_path, e))]
for f in files[:20]:
lines.append(f"{prefix}{f}")
if len(files) > 20:
lines.append(f"{prefix}... ({len(files) - 20} more files)")
for d in dirs:
lines.append(f"{prefix}{d}/")
_walk(os.path.join(dir_path, d), prefix + " ", depth + 1)
lines.append(f"{os.path.basename(path) or path}/")
_walk(path, " ", 1)
return "\n".join(lines[:200])