Files
nanocode/nanocode.py
2026-01-22 02:32:24 -08:00

603 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import glob as globlib
import subprocess
import shlex
import json
import tempfile
from modaic import PrecompiledProgram, PrecompiledConfig
import dspy
from dspy.utils.callback import BaseCallback
# --- Modaic ---
MODAIC_REPO_PATH = "farouk1/nanocode"
# --- ANSI colors ---
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
BLUE = "\033[34m"
CYAN = "\033[36m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
MAGENTA = "\033[35m"
# --- Display utilities ---
LONG_PASTE_THRESHOLD = int(os.environ.get("NANOCODE_LONG_PASTE_THRESHOLD", "4000"))
def save_long_paste(text: str) -> str:
fd, path = tempfile.mkstemp(prefix="nanocode_paste_", suffix=".txt")
with os.fdopen(fd, "w") as handle:
handle.write(text)
return path
def separator():
"""Return a horizontal separator line that fits the terminal width."""
return f"{DIM}{'' * min(os.get_terminal_size().columns, 80)}{RESET}"
def render_markdown(text):
"""Convert basic markdown bold syntax to ANSI bold."""
return re.sub(r"\*\*(.+?)\*\*", f"{BOLD}\\1{RESET}", text)
# --- File operations ---
def read_file(path: str, offset: int = 0, limit: int = None) -> str:
"""Read file contents with line numbers.
Args:
path: Path to the file to read
offset: Line number to start from (0-indexed)
limit: Maximum number of lines to read
Returns:
File contents with line numbers
"""
lines = open(path).readlines()
if limit is None:
limit = len(lines)
selected = lines[offset : offset + limit]
return "".join(f"{offset + idx + 1:4}| {line}" for idx, line in enumerate(selected))
def write_file(path: str, content: str) -> str:
"""Write content to a file.
Args:
path: Path to the file to write
content: Content to write to the file
Returns:
'ok' on success
"""
with open(path, "w") as f:
f.write(content)
return "ok"
def edit_file(path: str, old: str, new: str, replace_all: bool = False) -> str:
"""Replace text in a file.
Args:
path: Path to the file to edit
old: Text to find and replace
new: Replacement text
replace_all: If True, replace all occurrences; otherwise old must be unique
Returns:
'ok' on success, error message on failure
"""
text = open(path).read()
if old not in text:
return "error: old_string not found"
count = text.count(old)
if not replace_all and count > 1:
return f"error: old_string appears {count} times, must be unique (use replace_all=True)"
replacement = text.replace(old, new) if replace_all else text.replace(old, new, 1)
with open(path, "w") as f:
f.write(replacement)
return "ok"
def glob_files(pattern: str, path: str = ".") -> str:
"""Find files matching a glob pattern, sorted by modification time.
Args:
pattern: Glob pattern to match (e.g., '**/*.py')
path: Base directory to search in
Returns:
Newline-separated list of matching files
"""
full_pattern = (path + "/" + pattern).replace("//", "/")
files = globlib.glob(full_pattern, recursive=True)
files = sorted(
files,
key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0,
reverse=True,
)
return "\n".join(files) or "no files found"
def grep_files(pattern: str, path: str = ".") -> str:
"""Search files for a regex pattern.
Args:
pattern: Regular expression pattern to search for
path: Base directory to search in
Returns:
Matching lines in format 'filepath:line_num:content'
"""
regex = re.compile(pattern)
hits = []
for filepath in globlib.glob(path + "/**", recursive=True):
try:
for line_num, line in enumerate(open(filepath), 1):
if regex.search(line):
hits.append(f"{filepath}:{line_num}:{line.rstrip()}")
except Exception:
pass
return "\n".join(hits[:50]) or "no matches found"
# --- Shell operations ---
def run_bash(cmd: str) -> str:
"""Run a shell command and return output.
Args:
cmd: Shell command to execute
Returns:
Command output (stdout and stderr combined)
"""
proc = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
output_lines = []
try:
while True:
line = proc.stdout.readline()
if not line and proc.poll() is not None:
break
if line:
print(f" {DIM}{line.rstrip()}{RESET}", flush=True)
output_lines.append(line)
proc.wait(timeout=30)
except subprocess.TimeoutExpired:
proc.kill()
output_lines.append("\n(timed out after 30s)")
return "".join(output_lines).strip() or "(empty output)"
# --- Model selection ---
AVAILABLE_MODELS = {
"1": ("GPT-5.2 Codex", "openai/gpt-5.2-codex"),
"2": ("GPT-5.2", "openai/gpt-5.2"),
"3": ("Claude Opus 4.5", "anthropic/claude-opus-4.5"),
"4": ("Claude Opus 4", "anthropic/claude-opus-4"),
"5": ("Qwen 3 Coder", "qwen/qwen3-coder"),
"6": ("Gemini 3 Flash Preview", "google/gemini-3-flash-preview"),
"7": ("Kimi K2 0905", "moonshotai/kimi-k2-0905"),
"8": ("Minimax M2.1", "minimax/minimax-m2.1"),
}
def select_model():
"""Interactive model selection or use environment variable."""
model_env = os.getenv("MODEL")
if model_env:
print(f"{GREEN}⏺ Using model from environment: {model_env}{RESET}")
return model_env
print(f"\n{BOLD}Select a model:{RESET}")
for key, (name, model_id) in AVAILABLE_MODELS.items():
print(f" {BLUE}{key}{RESET}. {name} ({DIM}{model_id}{RESET})")
print(f" {BLUE}c{RESET}. Custom model (enter manually)")
while True:
try:
choice = (
input(f"\n{BOLD}{BLUE}{RESET} Enter choice (1-8 or c): ")
.strip()
.lower()
)
if choice in AVAILABLE_MODELS:
name, model_id = AVAILABLE_MODELS[choice]
print(f"{GREEN}⏺ Selected: {name}{RESET}")
return model_id
elif choice == "c":
custom_model = input(
f"{BOLD}{BLUE}{RESET} Enter model ID (e.g., openai/gpt-4): "
).strip()
if custom_model:
print(f"{GREEN}⏺ Selected custom model: {custom_model}{RESET}")
return custom_model
else:
print(f"{RED}⏺ Invalid model ID{RESET}")
else:
print(f"{RED}⏺ Invalid choice. Please enter 1-8 or c{RESET}")
except (KeyboardInterrupt, EOFError):
print(f"\n{RED}⏺ Model selection cancelled{RESET}")
exit(1)
class CodingAssistant(dspy.Signature):
"""You are a concise coding assistant. Help the user with their coding task by using the available tools to read, write, edit files, search the codebase, and run commands."""
task: str = dspy.InputField(desc="The user's coding task or question")
answer: str = dspy.OutputField(
desc="Your response to the user after completing the task"
)
affected_files: list[str] = dspy.OutputField(
desc="List of files that were written or modified during the task"
)
class ToolLoggingCallback(BaseCallback):
"""Callback that logs tool calls as they happen."""
def on_tool_start(self, call_id, instance, inputs):
"""Log when a tool starts executing."""
tool_name = instance.name if hasattr(instance, "name") else str(instance)
# Format args nicely
args_str = ", ".join(f"{k}={repr(v)[:50]}" for k, v in inputs.items())
print(f" {MAGENTA}{tool_name}({args_str}){RESET}", flush=True)
def on_tool_end(self, call_id, outputs, exception):
"""Log when a tool finishes executing."""
if exception:
print(f" {RED}Error: {exception}{RESET}", flush=True)
def on_module_end(self, call_id, outputs, exception):
"""Log when the finish tool is called (ReAct completion)."""
# Check if this is a ReAct prediction with tool_calls
if outputs and "tool_calls" in outputs:
for call in outputs["tool_calls"]:
args_str = ", ".join(
f"{k}={repr(v)[:50]}" for k, v in call.args.items()
)
if call.name == "finish":
print(f" {GREEN}⏺ finish{RESET}", flush=True)
else:
print(f" {MAGENTA}{call.name}({args_str}){RESET}", flush=True)
class RLMCodingConfig(PrecompiledConfig):
max_iters: int = 50
lm: str = "openrouter/openai/gpt-5.2-codex"
sub_lm: str = "openrouter/openai/gpt-5-mini"
api_base: str = "https://openrouter.ai/api/v1"
max_tokens: int = 32000
max_output_chars: int = 100000
verbose: bool = False
track_usage: bool = True
class RLMCodingProgram(PrecompiledProgram):
config: RLMCodingConfig
def __init__(self, config: RLMCodingConfig, **kwargs):
super().__init__(config, **kwargs)
self.config = config
self.tools = {
"read_file": read_file,
"write_file": write_file,
"edit_file": edit_file,
"glob_files": glob_files,
"grep_files": grep_files,
"run_bash": run_bash,
}
self.lm = dspy.LM(
model=self.config.lm,
api_base=self.config.api_base,
max_tokens=self.config.max_tokens,
track_usage=self.config.track_usage,
)
self.sub_lm = dspy.LM(
model=self.config.sub_lm,
api_base=self.config.api_base,
max_tokens=self.config.max_tokens,
track_usage=self.config.track_usage,
)
agent = dspy.RLM(
CodingAssistant,
sub_lm=self.sub_lm,
tools=self.tools,
max_output_chars=self.config.max_output_chars,
max_iterations=self.config.max_iters,
verbose=self.config.verbose,
)
agent.set_lm(self.lm)
self.agent = agent
print(f"Using model: {self.agent.get_lm().model}")
print(f"Using sub-model: {self.agent.sub_lm.model}")
def forward(self, task: str) -> str:
assert task, "Task cannot be empty"
return self.agent(task=task)
def get_tools(self):
return self.tools
def set_tool(self, name: str, tool: callable):
self.tools[name] = tool
self.reload_repl_tools()
def remove_tool(self, name: str):
if name in self.tools:
del self.tools[name]
self.reload_repl_tools()
def reload_repl_tools(
self,
): # we need to create a new instance for tool mutations to be passed back into the REPL
new_instance = dspy.RLM(
CodingAssistant,
sub_lm=self.sub_lm,
tools=self.tools,
max_output_chars=self.config.max_output_chars,
max_iterations=self.config.max_iters,
verbose=self.config.verbose,
)
new_instance.set_lm(self.lm)
self.agent = new_instance
def main():
model = select_model()
# Add openrouter/ prefix if not already present
if not model.startswith("openrouter/"):
model = f"openrouter/{model}"
config = RLMCodingConfig()
config.lm = model
agent = RLMCodingProgram(config)
print(
f"{BOLD}NANOCODE DSPY{RESET} | {DIM}{agent.config.lm} | {os.getcwd()}{RESET}\n"
)
# Conversation history for context
history = []
# MCP servers registry
mcp_servers = {}
def register_mcp_server(name, server):
tool_names = []
for tool in server.tools:
tool_name = f"{name}_{tool.__name__}"
agent.set_tool(tool_name, tool)
tool_names.append(tool_name)
return tool_names
while True:
try:
print(separator())
user_input = input(f"{BOLD}{BLUE}{RESET} ").strip()
print(separator())
tmp_paste_path = None
if len(user_input) > LONG_PASTE_THRESHOLD:
tmp_paste_path = save_long_paste(user_input)
print(
f"{YELLOW}⏺ Long paste detected ({len(user_input)} chars). Saved to {tmp_paste_path}{RESET}"
)
user_input = (
f"The user pasted a long input ({len(user_input)} chars). "
f"It has been saved to {tmp_paste_path}. "
"Use read_file to view it. The file will be deleted after this response."
)
if not user_input:
continue
if user_input in ("/q", "exit"):
break
if user_input == "/c":
history = []
print(f"{GREEN}⏺ Cleared conversation{RESET}")
continue
if user_input == "/model":
print(f"\n{BOLD}Current model: {agent.config.lm}{RESET}")
print(f"\n{BOLD}Select a new model:{RESET}")
for key, (name, model_id) in AVAILABLE_MODELS.items():
print(f" {BLUE}{key}{RESET}. {name} ({DIM}{model_id}{RESET})")
print(f" {BLUE}c{RESET}. Custom model (enter manually)")
print(f" {BLUE}k{RESET}. Keep current model")
choice = input(f"\n{BOLD}{BLUE}{RESET} Enter choice: ").strip().lower()
if choice == "k":
print(f"{GREEN}⏺ Keeping current model: {agent.config.lm}{RESET}")
continue
elif choice in AVAILABLE_MODELS:
name, model_id = AVAILABLE_MODELS[choice]
new_model = (
model_id
if model_id.startswith("openrouter/")
else f"openrouter/{model_id}"
)
config.lm = new_model
agent = RLMCodingProgram(config)
for server_name, info in mcp_servers.items():
info["tools"] = register_mcp_server(server_name, info["server"])
print(f"{GREEN}⏺ Switched to: {name} ({new_model}){RESET}")
elif choice == "c":
custom_model = input(
f"{BOLD}{BLUE}{RESET} Enter model ID: "
).strip()
if custom_model:
new_model = (
custom_model
if custom_model.startswith("openrouter/")
else f"openrouter/{custom_model}"
)
config.lm = new_model
agent = RLMCodingProgram(config)
for server_name, info in mcp_servers.items():
info["tools"] = register_mcp_server(
server_name, info["server"]
)
print(f"{GREEN}⏺ Switched to custom model: {new_model}{RESET}")
else:
print(f"{RED}⏺ Invalid model ID, keeping current model{RESET}")
else:
print(f"{RED}⏺ Invalid choice, keeping current model{RESET}")
continue
if user_input.startswith("/add-mcp"):
parts = shlex.split(user_input)
args = parts[1:]
if not args:
print(
f"{YELLOW}⏺ Usage: /add-mcp <name> <server> [--auth <auth>|--oauth] [--headers '<json>'] [--auto-auth|--no-auto-auth]{RESET}"
)
continue
name = None
auth = None
headers = None
auto_auth = None
positional = []
i = 0
while i < len(args):
if args[i] in ("--name", "-n") and i + 1 < len(args):
name = args[i + 1]
i += 2
elif args[i].startswith("--auth="):
auth = args[i].split("=", 1)[1]
i += 1
elif args[i] == "--auth" and i + 1 < len(args):
auth = args[i + 1]
i += 2
elif args[i] == "--oauth":
auth = "oauth"
i += 1
elif args[i] == "--auto-auth":
auto_auth = True
i += 1
elif args[i] == "--no-auto-auth":
auto_auth = False
i += 1
elif args[i].startswith("--headers="):
headers = json.loads(args[i].split("=", 1)[1])
i += 1
elif args[i] == "--headers" and i + 1 < len(args):
headers = json.loads(args[i + 1])
i += 2
else:
positional.append(args[i])
i += 1
server_cmd = None
if positional:
if name is None and len(positional) >= 2:
name = positional[0]
server_cmd = " ".join(positional[1:])
else:
server_cmd = " ".join(positional)
if not server_cmd:
print(
f"{YELLOW}⏺ Usage: /add-mcp <name> <server> [--auth <auth>|--oauth] [--headers '<json>'] [--auto-auth|--no-auto-auth]{RESET}"
)
continue
if not name:
name = re.sub(r"[^a-zA-Z0-9_]+", "_", server_cmd).strip("_")
if not name:
name = f"mcp_{len(mcp_servers) + 1}"
if name in mcp_servers:
for tool_name in mcp_servers[name]["tools"]:
agent.remove_tool(tool_name)
try:
from mcp2py import load
kwargs = {}
if auth is not None:
kwargs["auth"] = auth
if headers:
kwargs["headers"] = headers
if auto_auth is not None:
kwargs["auto_auth"] = auto_auth
server = load(server_cmd, **kwargs)
tool_names = register_mcp_server(name, server)
mcp_servers[name] = {"server": server, "tools": tool_names}
print(
f"{GREEN}⏺ Added MCP server '{name}' with {len(tool_names)} tools{RESET}"
)
print(f"{GREEN}⏺ Tools: {list(agent.tools.keys())}{RESET}")
except Exception as err:
print(f"{RED}⏺ Failed to add MCP server: {err}{RESET}")
continue
# Build context from history
context = f"Working directory: {os.getcwd()}\n"
if history:
context += "\nPrevious conversation:\n"
for h in history[-5:]: # Keep last 5 exchanges
context += f"User: {h['user']}\nAssistant: {h['assistant']}\n\n"
task = f"{context}\nCurrent task: {user_input}"
print(f"\n{CYAN}{RESET} Thinking...", flush=True)
# Run the RLM agent
try:
result = agent(task=task)
finally:
if tmp_paste_path:
try:
os.remove(tmp_paste_path)
except OSError:
pass
# Display the answer
print(f"\n{CYAN}{RESET} {render_markdown(result.answer)}")
# Display usage
print(f"\n{MAGENTA}⏺ Debug Prediction: {result}{RESET}")
# Save to history
history.append({"user": user_input, "assistant": result.answer})
print()
except (KeyboardInterrupt, EOFError):
break
except Exception as err:
import traceback
traceback.print_exc()
print(f"{RED}⏺ Error: {err}{RESET}")
if __name__ == "__main__":
agent = RLMCodingProgram(RLMCodingConfig())
agent.push_to_hub(MODAIC_REPO_PATH, commit_message="debug", branch="dev")
#main()