287 lines
8.7 KiB
Python
287 lines
8.7 KiB
Python
import os
|
|
import glob as globlib
|
|
from modaic import PrecompiledProgram, PrecompiledConfig
|
|
import dspy
|
|
import re
|
|
import subprocess
|
|
|
|
# --- Modaic ---
|
|
|
|
MODAIC_REPO_PATH = "farouk1/nanocode"
|
|
|
|
# --- ANSI colors ---
|
|
|
|
RESET = "\033[0m"
|
|
BOLD = "\033[1m"
|
|
DIM = "\033[2m"
|
|
BLUE = "\033[34m"
|
|
CYAN = "\033[36m"
|
|
GREEN = "\033[32m"
|
|
YELLOW = "\033[33m"
|
|
RED = "\033[31m"
|
|
MAGENTA = "\033[35m"
|
|
|
|
# --- File operations ---
|
|
|
|
|
|
def read_file(path: str, offset: int = 0, limit: int = None) -> str:
|
|
"""Read file contents with line numbers.
|
|
|
|
Args:
|
|
path: Path to the file to read
|
|
offset: Line number to start from (0-indexed)
|
|
limit: Maximum number of lines to read
|
|
|
|
Returns:
|
|
File contents with line numbers
|
|
"""
|
|
print(f"{MAGENTA}⏺ Reading file: {path}{RESET}")
|
|
|
|
lines = open(path).readlines()
|
|
if limit is None:
|
|
limit = len(lines)
|
|
selected = lines[offset : offset + limit]
|
|
return "".join(f"{offset + idx + 1:4}| {line}" for idx, line in enumerate(selected))
|
|
|
|
|
|
def write_file(path: str, content: str) -> str:
|
|
"""Write content to a file.
|
|
|
|
Args:
|
|
path: Path to the file to write
|
|
content: Content to write to the file
|
|
|
|
Returns:
|
|
'ok' on success
|
|
"""
|
|
print(f"{MAGENTA}⏺ Creating file: {path}{RESET}")
|
|
|
|
with open(path, "w") as f:
|
|
f.write(content)
|
|
return "ok"
|
|
|
|
|
|
def edit_file(path: str, old: str, new: str, replace_all: bool = False) -> str:
|
|
"""Replace text in a file.
|
|
|
|
Args:
|
|
path: Path to the file to edit
|
|
old: Text to find and replace
|
|
new: Replacement text
|
|
replace_all: If True, replace all occurrences; otherwise old must be unique
|
|
|
|
Returns:
|
|
'ok' on success, error message on failure
|
|
"""
|
|
print(f"{MAGENTA}⏺ Editing file: {path}{RESET}")
|
|
|
|
text = open(path).read()
|
|
if old not in text:
|
|
return "error: old_string not found"
|
|
count = text.count(old)
|
|
if not replace_all and count > 1:
|
|
return f"error: old_string appears {count} times, must be unique (use replace_all=True)"
|
|
replacement = text.replace(old, new) if replace_all else text.replace(old, new, 1)
|
|
with open(path, "w") as f:
|
|
f.write(replacement)
|
|
return "ok"
|
|
|
|
|
|
def glob_files(pattern: str, path: str = ".") -> str:
|
|
"""Find files matching a glob pattern, sorted by modification time.
|
|
|
|
Args:
|
|
pattern: Glob pattern to match (e.g., '**/*.py')
|
|
path: Base directory to search in
|
|
|
|
Returns:
|
|
Newline-separated list of matching files
|
|
"""
|
|
print(f"{MAGENTA}⏺ Glob: {pattern}{RESET}")
|
|
|
|
full_pattern = (path + "/" + pattern).replace("//", "/")
|
|
files = globlib.glob(full_pattern, recursive=True)
|
|
files = sorted(
|
|
files,
|
|
key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0,
|
|
reverse=True,
|
|
)
|
|
return "\n".join(files) or "no files found"
|
|
|
|
|
|
def grep_files(pattern: str, path: str = ".") -> str:
|
|
"""Search files for a regex pattern.
|
|
|
|
Args:
|
|
pattern: Regular expression pattern to search for
|
|
path: Base directory to search in
|
|
|
|
Returns:
|
|
Matching lines in format 'filepath:line_num:content'
|
|
"""
|
|
print(f"{MAGENTA}⏺ Grep: {pattern}{RESET}")
|
|
regex = re.compile(pattern)
|
|
hits = []
|
|
for filepath in globlib.glob(path + "/**", recursive=True):
|
|
try:
|
|
for line_num, line in enumerate(open(filepath), 1):
|
|
if regex.search(line):
|
|
hits.append(f"{filepath}:{line_num}:{line.rstrip()}")
|
|
except Exception:
|
|
pass
|
|
return "\n".join(hits[:50]) or "no matches found"
|
|
|
|
|
|
# --- Shell operations ---
|
|
|
|
|
|
def run_bash(cmd: str) -> str:
|
|
"""Run a shell command and return output.
|
|
|
|
Args:
|
|
cmd: Shell command to execute
|
|
|
|
Returns:
|
|
Command output (stdout and stderr combined)
|
|
"""
|
|
print(f"{MAGENTA}⏺ Bash: {cmd}{RESET}")
|
|
proc = subprocess.Popen(
|
|
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
|
|
)
|
|
output_lines = []
|
|
try:
|
|
while True:
|
|
line = proc.stdout.readline()
|
|
if not line and proc.poll() is not None:
|
|
break
|
|
if line:
|
|
print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True)
|
|
output_lines.append(line)
|
|
proc.wait(timeout=30)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
output_lines.append("\n(timed out after 30s)")
|
|
return "".join(output_lines).strip() or "(empty output)"
|
|
|
|
|
|
class CodingAssistant(dspy.Signature):
|
|
"""You are a concise coding assistant with access to sub agents."""
|
|
|
|
task: str = dspy.InputField(desc="The user's coding task or question")
|
|
answer: str = dspy.OutputField(
|
|
desc="Your response to the user after completing the task"
|
|
)
|
|
|
|
|
|
class RLMCodingConfig(PrecompiledConfig):
|
|
max_iters: int = 50
|
|
lm: str = "openrouter/openai/gpt-5.2-codex"
|
|
sub_lm: str = "openrouter/openai/gpt-5-mini"
|
|
api_base: str = "https://openrouter.ai/api/v1"
|
|
max_tokens: int = 50000
|
|
max_output_chars: int = 100000
|
|
verbose: bool = False
|
|
track_usage: bool = True
|
|
|
|
|
|
class RLMCodingProgram(PrecompiledProgram):
|
|
config: RLMCodingConfig
|
|
|
|
def __init__(self, config: RLMCodingConfig, **kwargs):
|
|
super().__init__(config, **kwargs)
|
|
|
|
self.config = config
|
|
self.tools = {
|
|
"read_file": read_file,
|
|
"write_file": write_file,
|
|
"edit_file": edit_file,
|
|
"glob_files": glob_files,
|
|
"grep_files": grep_files,
|
|
"run_bash": run_bash,
|
|
}
|
|
|
|
self.lm = dspy.LM(
|
|
model=self.config.lm,
|
|
api_base=self.config.api_base,
|
|
max_tokens=self.config.max_tokens,
|
|
track_usage=self.config.track_usage,
|
|
)
|
|
self.sub_lm = dspy.LM(
|
|
model=self.config.sub_lm,
|
|
api_base=self.config.api_base,
|
|
max_tokens=self.config.max_tokens,
|
|
track_usage=self.config.track_usage,
|
|
)
|
|
self.agent = dspy.RLM(
|
|
CodingAssistant,
|
|
sub_lm=self.sub_lm,
|
|
tools=self.tools,
|
|
max_output_chars=self.config.max_output_chars,
|
|
max_iterations=self.config.max_iters,
|
|
verbose=self.config.verbose,
|
|
)
|
|
self.agent.set_lm(self.lm)
|
|
self.set_lm(self.lm)
|
|
print(f"{BLUE}CONFIG WAS SET: {self.config}{RESET}")
|
|
print(f"{BLUE}LM WAS SET: {self.agent.get_lm().model}{RESET}")
|
|
print(f"{BLUE}SUB LM WAS SET: {self.agent.sub_lm.model}{RESET}")
|
|
|
|
def forward(self, task: str) -> str:
|
|
if not task:
|
|
return dspy.Prediction(answer="No Task Given.")
|
|
|
|
print(f"{YELLOW}DEBUG forward() - agent.get_lm(): {self.agent.get_lm()}{RESET}")
|
|
print(f"{YELLOW}DEBUG forward() - self.lm.model: {self.lm.model}{RESET}")
|
|
print(f"{YELLOW}DEBUG forward() - agent.sub_lm.model: {self.agent.sub_lm.model}{RESET}")
|
|
print(f"{YELLOW}DEBUG forward() - self.sub_lm.model: {self.sub_lm.model}{RESET}")
|
|
print(f"{YELLOW}DEBUG forward() - id(agent.get_lm()): {id(self.agent.get_lm())}, id(self.lm): {id(self.lm)}{RESET}")
|
|
|
|
return self.agent(task=task)
|
|
|
|
def get_tools(self):
|
|
return self.tools
|
|
|
|
def set_tool(self, name: str, tool: callable):
|
|
self.tools[name] = tool
|
|
self.reload_repl()
|
|
|
|
def remove_tool(self, name: str):
|
|
if name in self.tools:
|
|
del self.tools[name]
|
|
self.reload_repl()
|
|
|
|
def reload_repl(
|
|
self,
|
|
): # we need to create a new instance for tool mutations to be passed back into the REPL
|
|
new_instance = dspy.RLM(
|
|
CodingAssistant,
|
|
sub_lm=self.sub_lm,
|
|
tools=self.tools,
|
|
max_output_chars=self.config.max_output_chars,
|
|
max_iterations=self.config.max_iters,
|
|
verbose=self.config.verbose,
|
|
)
|
|
new_instance.set_lm(self.lm)
|
|
self.agent = new_instance
|
|
|
|
def reload_lms(self):
|
|
"""Recreate LM objects from current config. Call this after changing config.lm or config.sub_lm."""
|
|
self.lm = dspy.LM(
|
|
model=self.config.lm,
|
|
api_base=self.config.api_base,
|
|
max_tokens=self.config.max_tokens,
|
|
track_usage=self.config.track_usage,
|
|
)
|
|
self.sub_lm = dspy.LM(
|
|
model=self.config.sub_lm,
|
|
api_base=self.config.api_base,
|
|
max_tokens=self.config.max_tokens,
|
|
track_usage=self.config.track_usage,
|
|
)
|
|
self.reload_repl()
|
|
print(f"{BLUE}LMs RELOADED: {self.lm.model}, {self.sub_lm.model}{RESET}")
|
|
|
|
if __name__ == "__main__":
|
|
agent = RLMCodingProgram(RLMCodingConfig())
|
|
agent.push_to_hub(MODAIC_REPO_PATH, commit_message="Add reload_lms method and debug forward()", branch="dev")
|