571 lines
19 KiB
Python
571 lines
19 KiB
Python
import os
|
||
import re
|
||
import glob as globlib
|
||
import subprocess
|
||
import shlex
|
||
import json
|
||
import tempfile
|
||
from modaic import PrecompiledProgram, PrecompiledConfig
|
||
import dspy
|
||
|
||
# --- Modaic ---
|
||
|
||
MODAIC_REPO_PATH = "farouk1/nanocode"
|
||
|
||
# --- ANSI colors ---
|
||
|
||
RESET = "\033[0m"
|
||
BOLD = "\033[1m"
|
||
DIM = "\033[2m"
|
||
BLUE = "\033[34m"
|
||
CYAN = "\033[36m"
|
||
GREEN = "\033[32m"
|
||
YELLOW = "\033[33m"
|
||
RED = "\033[31m"
|
||
MAGENTA = "\033[35m"
|
||
|
||
# --- Display utilities ---
|
||
|
||
LONG_PASTE_THRESHOLD = int(os.environ.get("NANOCODE_LONG_PASTE_THRESHOLD", "4000"))
|
||
|
||
|
||
def save_long_paste(text: str) -> str:
|
||
fd, path = tempfile.mkstemp(prefix="nanocode_paste_", suffix=".txt")
|
||
with os.fdopen(fd, "w") as handle:
|
||
handle.write(text)
|
||
return path
|
||
|
||
|
||
|
||
def separator():
|
||
"""Return a horizontal separator line that fits the terminal width."""
|
||
return f"{DIM}{'─' * min(os.get_terminal_size().columns, 80)}{RESET}"
|
||
|
||
|
||
def render_markdown(text):
|
||
"""Convert basic markdown bold syntax to ANSI bold."""
|
||
return re.sub(r"\*\*(.+?)\*\*", f"{BOLD}\\1{RESET}", text)
|
||
|
||
|
||
# --- File operations ---
|
||
|
||
|
||
def read_file(path: str, offset: int = 0, limit: int = None) -> str:
|
||
"""Read file contents with line numbers.
|
||
|
||
Args:
|
||
path: Path to the file to read
|
||
offset: Line number to start from (0-indexed)
|
||
limit: Maximum number of lines to read
|
||
|
||
Returns:
|
||
File contents with line numbers
|
||
"""
|
||
print(f"{MAGENTA}⏺ Reading file: {path}{RESET}")
|
||
|
||
lines = open(path).readlines()
|
||
if limit is None:
|
||
limit = len(lines)
|
||
selected = lines[offset : offset + limit]
|
||
return "".join(f"{offset + idx + 1:4}| {line}" for idx, line in enumerate(selected))
|
||
|
||
|
||
def write_file(path: str, content: str) -> str:
|
||
"""Write content to a file.
|
||
|
||
Args:
|
||
path: Path to the file to write
|
||
content: Content to write to the file
|
||
|
||
Returns:
|
||
'ok' on success
|
||
"""
|
||
print(f"{MAGENTA}⏺ Writing file: {path}{RESET}")
|
||
with open(path, "w") as f:
|
||
f.write(content)
|
||
return "ok"
|
||
|
||
|
||
def edit_file(path: str, old: str, new: str, replace_all: bool = False) -> str:
|
||
"""Replace text in a file.
|
||
|
||
Args:
|
||
path: Path to the file to edit
|
||
old: Text to find and replace
|
||
new: Replacement text
|
||
replace_all: If True, replace all occurrences; otherwise old must be unique
|
||
|
||
Returns:
|
||
'ok' on success, error message on failure
|
||
"""
|
||
print(f"{MAGENTA}⏺ Editing file: {path}{RESET}")
|
||
text = open(path).read()
|
||
if old not in text:
|
||
return "error: old_string not found"
|
||
count = text.count(old)
|
||
if not replace_all and count > 1:
|
||
return f"error: old_string appears {count} times, must be unique (use replace_all=True)"
|
||
replacement = text.replace(old, new) if replace_all else text.replace(old, new, 1)
|
||
with open(path, "w") as f:
|
||
f.write(replacement)
|
||
return "ok"
|
||
|
||
|
||
def glob_files(pattern: str, path: str = ".") -> str:
|
||
"""Find files matching a glob pattern, sorted by modification time.
|
||
|
||
Args:
|
||
pattern: Glob pattern to match (e.g., '**/*.py')
|
||
path: Base directory to search in
|
||
|
||
Returns:
|
||
Newline-separated list of matching files
|
||
"""
|
||
print(f"{MAGENTA}⏺ Finding files with pattern: {pattern}{RESET}")
|
||
full_pattern = (path + "/" + pattern).replace("//", "/")
|
||
files = globlib.glob(full_pattern, recursive=True)
|
||
files = sorted(
|
||
files,
|
||
key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0,
|
||
reverse=True,
|
||
)
|
||
return "\n".join(files) or "no files found"
|
||
|
||
|
||
def grep_files(pattern: str, path: str = ".") -> str:
|
||
"""Search files for a regex pattern.
|
||
|
||
Args:
|
||
pattern: Regular expression pattern to search for
|
||
path: Base directory to search in
|
||
|
||
Returns:
|
||
Matching lines in format 'filepath:line_num:content'
|
||
"""
|
||
print(f"{MAGENTA}⏺ Searching for pattern: {pattern}{RESET}")
|
||
regex = re.compile(pattern)
|
||
hits = []
|
||
for filepath in globlib.glob(path + "/**", recursive=True):
|
||
try:
|
||
for line_num, line in enumerate(open(filepath), 1):
|
||
if regex.search(line):
|
||
hits.append(f"{filepath}:{line_num}:{line.rstrip()}")
|
||
except Exception:
|
||
pass
|
||
return "\n".join(hits[:50]) or "no matches found"
|
||
|
||
|
||
# --- Shell operations ---
|
||
|
||
|
||
def run_bash(cmd: str) -> str:
|
||
"""Run a shell command and return output.
|
||
|
||
Args:
|
||
cmd: Shell command to execute
|
||
|
||
Returns:
|
||
Command output (stdout and stderr combined)
|
||
"""
|
||
print(f"{MAGENTA}⏺ Running command: {cmd}{RESET}")
|
||
proc = subprocess.Popen(
|
||
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
|
||
)
|
||
output_lines = []
|
||
try:
|
||
while True:
|
||
line = proc.stdout.readline()
|
||
if not line and proc.poll() is not None:
|
||
break
|
||
if line:
|
||
print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True)
|
||
output_lines.append(line)
|
||
proc.wait(timeout=30)
|
||
except subprocess.TimeoutExpired:
|
||
proc.kill()
|
||
output_lines.append("\n(timed out after 30s)")
|
||
return "".join(output_lines).strip() or "(empty output)"
|
||
|
||
|
||
# --- Model selection ---
|
||
|
||
AVAILABLE_MODELS = {
|
||
"1": ("GPT-5.2 Codex", "openai/gpt-5.2-codex"),
|
||
"2": ("GPT-5.2", "openai/gpt-5.2"),
|
||
"3": ("Claude Opus 4.5", "anthropic/claude-opus-4.5"),
|
||
"4": ("Claude Opus 4", "anthropic/claude-opus-4"),
|
||
"5": ("Qwen 3 Coder", "qwen/qwen3-coder"),
|
||
"6": ("Gemini 3 Flash Preview", "google/gemini-3-flash-preview"),
|
||
"7": ("Kimi K2 0905", "moonshotai/kimi-k2-0905"),
|
||
"8": ("Minimax M2.1", "minimax/minimax-m2.1"),
|
||
}
|
||
|
||
|
||
def select_model():
|
||
"""Interactive model selection or use environment variable."""
|
||
print(f"\n{BOLD}Select a model:{RESET}")
|
||
for key, (name, model_id) in AVAILABLE_MODELS.items():
|
||
print(f" {BLUE}{key}{RESET}. {name} ({DIM}{model_id}{RESET})")
|
||
print(f" {BLUE}c{RESET}. Custom model (enter manually)")
|
||
|
||
while True:
|
||
try:
|
||
choice = (
|
||
input(f"\n{BOLD}{BLUE}❯{RESET} Enter choice (1-8 or c): ")
|
||
.strip()
|
||
.lower()
|
||
)
|
||
|
||
if choice in AVAILABLE_MODELS:
|
||
name, model_id = AVAILABLE_MODELS[choice]
|
||
print(f"{GREEN}⏺ Selected: {name}{RESET}")
|
||
return model_id
|
||
elif choice == "c":
|
||
custom_model = input(
|
||
f"{BOLD}{BLUE}❯{RESET} Enter model ID (e.g., openai/gpt-4): "
|
||
).strip()
|
||
if custom_model:
|
||
print(f"{GREEN}⏺ Selected custom model: {custom_model}{RESET}")
|
||
return custom_model
|
||
else:
|
||
print(f"{RED}⏺ Invalid model ID{RESET}")
|
||
else:
|
||
print(f"{RED}⏺ Invalid choice. Please enter 1-8 or c{RESET}")
|
||
except (KeyboardInterrupt, EOFError):
|
||
print(f"\n{RED}⏺ Model selection cancelled{RESET}")
|
||
exit(1)
|
||
|
||
|
||
class CodingAssistant(dspy.Signature):
|
||
"""You are a concise coding assistant. Help the user with their coding task by using the available tools to read, write, edit files, search the codebase, and run commands."""
|
||
|
||
task: str = dspy.InputField(desc="The user's coding task or question")
|
||
answer: str = dspy.OutputField(
|
||
desc="Your response to the user after completing the task"
|
||
)
|
||
affected_files: list[str] = dspy.OutputField(
|
||
desc="List of files that were written or modified during the task"
|
||
)
|
||
|
||
class RLMCodingConfig(PrecompiledConfig):
|
||
max_iters: int = 50
|
||
lm: str = "openrouter/openai/gpt-5.2-codex"
|
||
sub_lm: str = "openrouter/openai/gpt-5-mini"
|
||
api_base: str = "https://openrouter.ai/api/v1"
|
||
max_tokens: int = 32000
|
||
max_output_chars: int = 100000
|
||
verbose: bool = False
|
||
track_usage: bool = True
|
||
|
||
|
||
class RLMCodingProgram(PrecompiledProgram):
|
||
config: RLMCodingConfig
|
||
|
||
def __init__(self, config: RLMCodingConfig, **kwargs):
|
||
super().__init__(config, **kwargs)
|
||
|
||
self.config = config
|
||
self.tools = {
|
||
"read_file": read_file,
|
||
"write_file": write_file,
|
||
"edit_file": edit_file,
|
||
"glob_files": glob_files,
|
||
"grep_files": grep_files,
|
||
"run_bash": run_bash,
|
||
}
|
||
|
||
self.lm = dspy.LM(
|
||
model=self.config.lm,
|
||
api_base=self.config.api_base,
|
||
max_tokens=self.config.max_tokens,
|
||
track_usage=self.config.track_usage,
|
||
)
|
||
self.sub_lm = dspy.LM(
|
||
model=self.config.sub_lm,
|
||
api_base=self.config.api_base,
|
||
max_tokens=self.config.max_tokens,
|
||
track_usage=self.config.track_usage,
|
||
)
|
||
agent = dspy.RLM(
|
||
CodingAssistant,
|
||
sub_lm=self.sub_lm,
|
||
tools=self.tools,
|
||
max_output_chars=self.config.max_output_chars,
|
||
max_iterations=self.config.max_iters,
|
||
verbose=self.config.verbose,
|
||
)
|
||
|
||
agent.set_lm(self.lm)
|
||
self.agent = agent
|
||
|
||
def forward(self, task: str) -> str:
|
||
assert task, "Task cannot be empty"
|
||
return self.agent(task=task)
|
||
|
||
def get_tools(self):
|
||
return self.tools
|
||
|
||
def set_tool(self, name: str, tool: callable):
|
||
self.tools[name] = tool
|
||
self.reload_repl_tools()
|
||
|
||
def remove_tool(self, name: str):
|
||
if name in self.tools:
|
||
del self.tools[name]
|
||
self.reload_repl_tools()
|
||
|
||
def reload_repl_tools(
|
||
self,
|
||
): # we need to create a new instance for tool mutations to be passed back into the REPL
|
||
new_instance = dspy.RLM(
|
||
CodingAssistant,
|
||
sub_lm=self.sub_lm,
|
||
tools=self.tools,
|
||
max_output_chars=self.config.max_output_chars,
|
||
max_iterations=self.config.max_iters,
|
||
verbose=self.config.verbose,
|
||
)
|
||
new_instance.set_lm(self.lm)
|
||
self.agent = new_instance
|
||
|
||
|
||
def main():
|
||
model = select_model()
|
||
|
||
# Add openrouter/ prefix if not already present
|
||
if not model.startswith("openrouter/"):
|
||
model = f"openrouter/{model}"
|
||
|
||
config = RLMCodingConfig()
|
||
config.lm = model
|
||
|
||
agent = RLMCodingProgram(config)
|
||
print(
|
||
f"{BOLD}NANOCODE DSPY{RESET} | {DIM}{agent.config.lm} | {os.getcwd()}{RESET}\n"
|
||
)
|
||
|
||
# Conversation history for context
|
||
history = []
|
||
|
||
# MCP servers registry
|
||
mcp_servers = {}
|
||
|
||
def register_mcp_server(name, server):
|
||
tool_names = []
|
||
for tool in server.tools:
|
||
tool_name = f"{name}_{tool.__name__}"
|
||
agent.set_tool(tool_name, tool)
|
||
tool_names.append(tool_name)
|
||
return tool_names
|
||
|
||
while True:
|
||
try:
|
||
print(separator())
|
||
user_input = input(f"{BOLD}{BLUE}❯{RESET} ").strip()
|
||
print(separator())
|
||
|
||
tmp_paste_path = None
|
||
if len(user_input) > LONG_PASTE_THRESHOLD:
|
||
tmp_paste_path = save_long_paste(user_input)
|
||
print(
|
||
f"{YELLOW}⏺ Long paste detected ({len(user_input)} chars). Saved to {tmp_paste_path}{RESET}"
|
||
)
|
||
user_input = (
|
||
f"The user pasted a long input ({len(user_input)} chars). "
|
||
f"It has been saved to {tmp_paste_path}. "
|
||
"Use read_file to view it. The file will be deleted after this response."
|
||
)
|
||
|
||
if not user_input:
|
||
continue
|
||
if user_input in ("/q", "exit"):
|
||
break
|
||
if user_input == "/c":
|
||
history = []
|
||
print(f"{GREEN}⏺ Cleared conversation{RESET}")
|
||
continue
|
||
if user_input == "/model":
|
||
print(f"\n{BOLD}Current model: {agent.config.lm}{RESET}")
|
||
print(f"\n{BOLD}Select a new model:{RESET}")
|
||
for key, (name, model_id) in AVAILABLE_MODELS.items():
|
||
print(f" {BLUE}{key}{RESET}. {name} ({DIM}{model_id}{RESET})")
|
||
print(f" {BLUE}c{RESET}. Custom model (enter manually)")
|
||
print(f" {BLUE}k{RESET}. Keep current model")
|
||
|
||
choice = input(f"\n{BOLD}{BLUE}❯{RESET} Enter choice: ").strip().lower()
|
||
|
||
if choice == "k":
|
||
print(f"{GREEN}⏺ Keeping current model: {agent.config.lm}{RESET}")
|
||
continue
|
||
elif choice in AVAILABLE_MODELS:
|
||
name, model_id = AVAILABLE_MODELS[choice]
|
||
new_model = (
|
||
model_id
|
||
if model_id.startswith("openrouter/")
|
||
else f"openrouter/{model_id}"
|
||
)
|
||
config.lm = new_model
|
||
agent = RLMCodingProgram(config)
|
||
for server_name, info in mcp_servers.items():
|
||
info["tools"] = register_mcp_server(server_name, info["server"])
|
||
print(f"{GREEN}⏺ Switched to: {name} ({new_model}){RESET}")
|
||
elif choice == "c":
|
||
custom_model = input(
|
||
f"{BOLD}{BLUE}❯{RESET} Enter model ID: "
|
||
).strip()
|
||
if custom_model:
|
||
new_model = (
|
||
custom_model
|
||
if custom_model.startswith("openrouter/")
|
||
else f"openrouter/{custom_model}"
|
||
)
|
||
config.lm = new_model
|
||
agent = RLMCodingProgram(config)
|
||
for server_name, info in mcp_servers.items():
|
||
info["tools"] = register_mcp_server(
|
||
server_name, info["server"]
|
||
)
|
||
print(f"{GREEN}⏺ Switched to custom model: {new_model}{RESET}")
|
||
else:
|
||
print(f"{RED}⏺ Invalid model ID, keeping current model{RESET}")
|
||
else:
|
||
print(f"{RED}⏺ Invalid choice, keeping current model{RESET}")
|
||
continue
|
||
|
||
if user_input.startswith("/add-mcp"):
|
||
parts = shlex.split(user_input)
|
||
args = parts[1:]
|
||
if not args:
|
||
print(
|
||
f"{YELLOW}⏺ Usage: /add-mcp <name> <server> [--auth <auth>|--oauth] [--headers '<json>'] [--auto-auth|--no-auto-auth]{RESET}"
|
||
)
|
||
continue
|
||
|
||
name = None
|
||
auth = None
|
||
headers = None
|
||
auto_auth = None
|
||
positional = []
|
||
i = 0
|
||
while i < len(args):
|
||
if args[i] in ("--name", "-n") and i + 1 < len(args):
|
||
name = args[i + 1]
|
||
i += 2
|
||
elif args[i].startswith("--auth="):
|
||
auth = args[i].split("=", 1)[1]
|
||
i += 1
|
||
elif args[i] == "--auth" and i + 1 < len(args):
|
||
auth = args[i + 1]
|
||
i += 2
|
||
elif args[i] == "--oauth":
|
||
auth = "oauth"
|
||
i += 1
|
||
elif args[i] == "--auto-auth":
|
||
auto_auth = True
|
||
i += 1
|
||
elif args[i] == "--no-auto-auth":
|
||
auto_auth = False
|
||
i += 1
|
||
elif args[i].startswith("--headers="):
|
||
headers = json.loads(args[i].split("=", 1)[1])
|
||
i += 1
|
||
elif args[i] == "--headers" and i + 1 < len(args):
|
||
headers = json.loads(args[i + 1])
|
||
i += 2
|
||
else:
|
||
positional.append(args[i])
|
||
i += 1
|
||
|
||
server_cmd = None
|
||
if positional:
|
||
if name is None and len(positional) >= 2:
|
||
name = positional[0]
|
||
server_cmd = " ".join(positional[1:])
|
||
else:
|
||
server_cmd = " ".join(positional)
|
||
|
||
if not server_cmd:
|
||
print(
|
||
f"{YELLOW}⏺ Usage: /add-mcp <name> <server> [--auth <auth>|--oauth] [--headers '<json>'] [--auto-auth|--no-auto-auth]{RESET}"
|
||
)
|
||
continue
|
||
|
||
if not name:
|
||
name = re.sub(r"[^a-zA-Z0-9_]+", "_", server_cmd).strip("_")
|
||
if not name:
|
||
name = f"mcp_{len(mcp_servers) + 1}"
|
||
|
||
if name in mcp_servers:
|
||
for tool_name in mcp_servers[name]["tools"]:
|
||
agent.remove_tool(tool_name)
|
||
|
||
try:
|
||
from mcp2py import load
|
||
|
||
kwargs = {}
|
||
if auth is not None:
|
||
kwargs["auth"] = auth
|
||
if headers:
|
||
kwargs["headers"] = headers
|
||
if auto_auth is not None:
|
||
kwargs["auto_auth"] = auto_auth
|
||
|
||
server = load(server_cmd, **kwargs)
|
||
tool_names = register_mcp_server(name, server)
|
||
mcp_servers[name] = {"server": server, "tools": tool_names}
|
||
|
||
print(
|
||
f"{GREEN}⏺ Added MCP server '{name}' with {len(tool_names)} tools{RESET}"
|
||
)
|
||
print(f"{GREEN}⏺ Tools: {list(agent.tools.keys())}{RESET}")
|
||
except Exception as err:
|
||
print(f"{RED}⏺ Failed to add MCP server: {err}{RESET}")
|
||
|
||
continue
|
||
|
||
# Build context from history
|
||
context = f"Working directory: {os.getcwd()}\n"
|
||
if history:
|
||
context += "\nPrevious conversation:\n"
|
||
for h in history[-5:]: # Keep last 5 exchanges
|
||
context += f"User: {h['user']}\nAssistant: {h['assistant']}\n\n"
|
||
|
||
task = f"{context}\nCurrent task: {user_input}"
|
||
|
||
print(f"\n{CYAN}⏺{RESET} Thinking...", flush=True)
|
||
|
||
# Run the RLM agent
|
||
try:
|
||
result = agent(task=task)
|
||
finally:
|
||
if tmp_paste_path:
|
||
try:
|
||
os.remove(tmp_paste_path)
|
||
except OSError:
|
||
pass
|
||
|
||
# Display the answer
|
||
print(f"\n{CYAN}⏺{RESET} {render_markdown(result.answer)}")
|
||
|
||
# Display usage
|
||
print(f"\n{MAGENTA}⏺ Debug Prediction: {result}{RESET}")
|
||
|
||
# Save to history
|
||
history.append({"user": user_input, "assistant": result.answer})
|
||
|
||
print()
|
||
|
||
except (KeyboardInterrupt, EOFError):
|
||
break
|
||
except Exception as err:
|
||
import traceback
|
||
|
||
traceback.print_exc()
|
||
print(f"{RED}⏺ Error: {err}{RESET}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
agent = RLMCodingProgram(RLMCodingConfig())
|
||
agent.push_to_hub(MODAIC_REPO_PATH, commit_message="Add tool logs", branch="prod")
|
||
#main()
|