Files
nanocode/nanocode.py
2026-01-22 17:37:10 -08:00

303 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import glob as globlib
from modaic import PrecompiledProgram, PrecompiledConfig
import dspy
import re
# --- Modaic ---
MODAIC_REPO_PATH = "farouk1/nanocode"
# --- ANSI colors ---
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
BLUE = "\033[34m"
CYAN = "\033[36m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
MAGENTA = "\033[35m"
# --- File operations ---
def read_file(path: str, offset: int = 0, limit: int = None) -> str:
"""Read file contents with line numbers.
Args:
path: Path to the file to read
offset: Line number to start from (0-indexed)
limit: Maximum number of lines to read
Returns:
File contents with line numbers
"""
print(f"{MAGENTA}⏺ Reading file: {path}{RESET}")
lines = open(path).readlines()
if limit is None:
limit = len(lines)
selected = lines[offset : offset + limit]
return "".join(f"{offset + idx + 1:4}| {line}" for idx, line in enumerate(selected))
def write_file(path: str, content: str) -> str:
"""Write content to a file.
Args:
path: Path to the file to write
content: Content to write to the file
Returns:
'ok' on success
"""
print(f"{MAGENTA}⏺ Writing file: {path}{RESET}")
with open(path, "w") as f:
f.write(content)
return "ok"
def edit_file(path: str, old: str, new: str, replace_all: bool = False) -> str:
"""Replace text in a file.
Args:
path: Path to the file to edit
old: Text to find and replace
new: Replacement text
replace_all: If True, replace all occurrences; otherwise old must be unique
Returns:
'ok' on success, error message on failure
"""
print(f"{MAGENTA}⏺ Editing file: {path}{RESET}")
text = open(path).read()
if old not in text:
return "error: old_string not found"
count = text.count(old)
if not replace_all and count > 1:
return f"error: old_string appears {count} times, must be unique (use replace_all=True)"
replacement = text.replace(old, new) if replace_all else text.replace(old, new, 1)
with open(path, "w") as f:
f.write(replacement)
return "ok"
def glob_files(pattern: str, path: str = ".") -> str:
"""Find files matching a glob pattern, sorted by modification time.
Args:
pattern: Glob pattern to match (e.g., '**/*.py')
path: Base directory to search in
Returns:
Newline-separated list of matching files
"""
print(f"{MAGENTA}⏺ Finding files with pattern: {pattern}{RESET}")
full_pattern = (path + "/" + pattern).replace("//", "/")
files = globlib.glob(full_pattern, recursive=True)
files = sorted(
files,
key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0,
reverse=True,
)
return "\n".join(files) or "no files found"
def grep_files(pattern: str, path: str = ".") -> str:
"""Search files for a regex pattern.
Args:
pattern: Regular expression pattern to search for
path: Base directory to search in
Returns:
Matching lines in format 'filepath:line_num:content'
"""
print(f"{MAGENTA}⏺ Searching for pattern: {pattern}{RESET}")
regex = re.compile(pattern)
hits = []
for filepath in globlib.glob(path + "/**", recursive=True):
try:
for line_num, line in enumerate(open(filepath), 1):
if regex.search(line):
hits.append(f"{filepath}:{line_num}:{line.rstrip()}")
except Exception:
pass
return "\n".join(hits[:50]) or "no matches found"
# --- Shell operations ---
def run_bash(cmd: str) -> str:
"""Run a shell command and return output.
Args:
cmd: Shell command to execute
Returns:
Command output (stdout and stderr combined)
"""
print(f"{MAGENTA}⏺ Running command: {cmd}{RESET}")
proc = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
output_lines = []
try:
while True:
line = proc.stdout.readline()
if not line and proc.poll() is not None:
break
if line:
print(f" {DIM}{line.rstrip()}{RESET}", flush=True)
output_lines.append(line)
proc.wait(timeout=30)
except subprocess.TimeoutExpired:
proc.kill()
output_lines.append("\n(timed out after 30s)")
return "".join(output_lines).strip() or "(empty output)"
# --- Model selection ---
AVAILABLE_MODELS = {
"1": ("GPT-5.2 Codex", "openai/gpt-5.2-codex"),
"2": ("GPT-5.2", "openai/gpt-5.2"),
"3": ("Claude Opus 4.5", "anthropic/claude-opus-4.5"),
"4": ("Claude Opus 4", "anthropic/claude-opus-4"),
"5": ("Qwen 3 Coder", "qwen/qwen3-coder"),
"6": ("Gemini 3 Flash Preview", "google/gemini-3-flash-preview"),
"7": ("Kimi K2 0905", "moonshotai/kimi-k2-0905"),
"8": ("Minimax M2.1", "minimax/minimax-m2.1"),
}
def select_model():
"""Interactive model selection or use environment variable."""
print(f"\n{BOLD}Select a model:{RESET}")
for key, (name, model_id) in AVAILABLE_MODELS.items():
print(f" {BLUE}{key}{RESET}. {name} ({DIM}{model_id}{RESET})")
print(f" {BLUE}c{RESET}. Custom model (enter manually)")
while True:
try:
choice = (
input(f"\n{BOLD}{BLUE}{RESET} Enter choice (1-8 or c): ")
.strip()
.lower()
)
if choice in AVAILABLE_MODELS:
name, model_id = AVAILABLE_MODELS[choice]
print(f"{GREEN}⏺ Selected: {name}{RESET}")
return model_id
elif choice == "c":
custom_model = input(
f"{BOLD}{BLUE}{RESET} Enter model ID (e.g., openai/gpt-4): "
).strip()
if custom_model:
print(f"{GREEN}⏺ Selected custom model: {custom_model}{RESET}")
return custom_model
else:
print(f"{RED}⏺ Invalid model ID{RESET}")
else:
print(f"{RED}⏺ Invalid choice. Please enter 1-8 or c{RESET}")
except (KeyboardInterrupt, EOFError):
print(f"\n{RED}⏺ Model selection cancelled{RESET}")
exit(1)
class CodingAssistant(dspy.Signature):
"""You are a concise coding assistant with access to sub agents."""
task: str = dspy.InputField(desc="The user's coding task or question")
answer: str = dspy.OutputField(
desc="Your response to the user after completing the task"
)
class RLMCodingConfig(PrecompiledConfig):
max_iters: int = 50
lm: str = "openrouter/openai/gpt-5.2-codex"
sub_lm: str = "openrouter/openai/gpt-5-mini"
api_base: str = "https://openrouter.ai/api/v1"
max_tokens: int = 50000
max_output_chars: int = 100000
verbose: bool = False
track_usage: bool = True
class RLMCodingProgram(PrecompiledProgram):
config: RLMCodingConfig
def __init__(self, config: RLMCodingConfig, **kwargs):
super().__init__(config, **kwargs)
self.config = config
self.tools = {
"read_file": read_file,
"write_file": write_file,
"edit_file": edit_file,
"glob_files": glob_files,
"grep_files": grep_files,
"run_bash": run_bash,
}
self.lm = dspy.LM(
model=self.config.lm,
api_base=self.config.api_base,
max_tokens=self.config.max_tokens,
track_usage=self.config.track_usage,
)
self.sub_lm = dspy.LM(
model=self.config.sub_lm,
api_base=self.config.api_base,
max_tokens=self.config.max_tokens,
track_usage=self.config.track_usage,
)
agent = dspy.RLM(
CodingAssistant,
sub_lm=self.sub_lm,
tools=self.tools,
max_output_chars=self.config.max_output_chars,
max_iterations=self.config.max_iters,
verbose=self.config.verbose,
)
agent.set_lm(self.lm)
self.agent = agent
def forward(self, task: str) -> str:
assert task, "Task cannot be empty"
return self.agent(task=task)
def get_tools(self):
return self.tools
def set_tool(self, name: str, tool: callable):
self.tools[name] = tool
self.reload_repl_tools()
def remove_tool(self, name: str):
if name in self.tools:
del self.tools[name]
self.reload_repl_tools()
def reload_repl_tools(
self,
): # we need to create a new instance for tool mutations to be passed back into the REPL
new_instance = dspy.RLM(
CodingAssistant,
sub_lm=self.sub_lm,
tools=self.tools,
max_output_chars=self.config.max_output_chars,
max_iterations=self.config.max_iters,
verbose=self.config.verbose,
)
new_instance.set_lm(self.lm)
self.agent = new_instance
if __name__ == "__main__":
agent = RLMCodingProgram(RLMCodingConfig())
agent.push_to_hub(MODAIC_REPO_PATH, commit_message="change signature", branch="dev")