import os import re import glob as globlib import subprocess import shlex import json import tempfile from modaic import PrecompiledProgram, PrecompiledConfig import dspy # --- Modaic --- MODAIC_REPO_PATH = "farouk1/nanocode" # --- ANSI colors --- RESET = "\033[0m" BOLD = "\033[1m" DIM = "\033[2m" BLUE = "\033[34m" CYAN = "\033[36m" GREEN = "\033[32m" YELLOW = "\033[33m" RED = "\033[31m" MAGENTA = "\033[35m" # --- Display utilities --- LONG_PASTE_THRESHOLD = int(os.environ.get("NANOCODE_LONG_PASTE_THRESHOLD", "4000")) def save_long_paste(text: str) -> str: fd, path = tempfile.mkstemp(prefix="nanocode_paste_", suffix=".txt") with os.fdopen(fd, "w") as handle: handle.write(text) return path def separator(): """Return a horizontal separator line that fits the terminal width.""" return f"{DIM}{'─' * min(os.get_terminal_size().columns, 80)}{RESET}" def render_markdown(text): """Convert basic markdown bold syntax to ANSI bold.""" return re.sub(r"\*\*(.+?)\*\*", f"{BOLD}\\1{RESET}", text) # --- File operations --- def read_file(path: str, offset: int = 0, limit: int = None) -> str: """Read file contents with line numbers. Args: path: Path to the file to read offset: Line number to start from (0-indexed) limit: Maximum number of lines to read Returns: File contents with line numbers """ print(f"{MAGENTA}⏺ Reading file: {path}{RESET}") lines = open(path).readlines() if limit is None: limit = len(lines) selected = lines[offset : offset + limit] return "".join(f"{offset + idx + 1:4}| {line}" for idx, line in enumerate(selected)) def write_file(path: str, content: str) -> str: """Write content to a file. Args: path: Path to the file to write content: Content to write to the file Returns: 'ok' on success """ print(f"{MAGENTA}⏺ Writing file: {path}{RESET}") with open(path, "w") as f: f.write(content) return "ok" def edit_file(path: str, old: str, new: str, replace_all: bool = False) -> str: """Replace text in a file. Args: path: Path to the file to edit old: Text to find and replace new: Replacement text replace_all: If True, replace all occurrences; otherwise old must be unique Returns: 'ok' on success, error message on failure """ print(f"{MAGENTA}⏺ Editing file: {path}{RESET}") text = open(path).read() if old not in text: return "error: old_string not found" count = text.count(old) if not replace_all and count > 1: return f"error: old_string appears {count} times, must be unique (use replace_all=True)" replacement = text.replace(old, new) if replace_all else text.replace(old, new, 1) with open(path, "w") as f: f.write(replacement) return "ok" def glob_files(pattern: str, path: str = ".") -> str: """Find files matching a glob pattern, sorted by modification time. Args: pattern: Glob pattern to match (e.g., '**/*.py') path: Base directory to search in Returns: Newline-separated list of matching files """ print(f"{MAGENTA}⏺ Finding files with pattern: {pattern}{RESET}") full_pattern = (path + "/" + pattern).replace("//", "/") files = globlib.glob(full_pattern, recursive=True) files = sorted( files, key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True, ) return "\n".join(files) or "no files found" def grep_files(pattern: str, path: str = ".") -> str: """Search files for a regex pattern. Args: pattern: Regular expression pattern to search for path: Base directory to search in Returns: Matching lines in format 'filepath:line_num:content' """ print(f"{MAGENTA}⏺ Searching for pattern: {pattern}{RESET}") regex = re.compile(pattern) hits = [] for filepath in globlib.glob(path + "/**", recursive=True): try: for line_num, line in enumerate(open(filepath), 1): if regex.search(line): hits.append(f"{filepath}:{line_num}:{line.rstrip()}") except Exception: pass return "\n".join(hits[:50]) or "no matches found" # --- Shell operations --- def run_bash(cmd: str) -> str: """Run a shell command and return output. Args: cmd: Shell command to execute Returns: Command output (stdout and stderr combined) """ print(f"{MAGENTA}⏺ Running command: {cmd}{RESET}") proc = subprocess.Popen( cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True ) output_lines = [] try: while True: line = proc.stdout.readline() if not line and proc.poll() is not None: break if line: print(f" {DIM}│ {line.rstrip()}{RESET}", flush=True) output_lines.append(line) proc.wait(timeout=30) except subprocess.TimeoutExpired: proc.kill() output_lines.append("\n(timed out after 30s)") return "".join(output_lines).strip() or "(empty output)" # --- Model selection --- AVAILABLE_MODELS = { "1": ("GPT-5.2 Codex", "openai/gpt-5.2-codex"), "2": ("GPT-5.2", "openai/gpt-5.2"), "3": ("Claude Opus 4.5", "anthropic/claude-opus-4.5"), "4": ("Claude Opus 4", "anthropic/claude-opus-4"), "5": ("Qwen 3 Coder", "qwen/qwen3-coder"), "6": ("Gemini 3 Flash Preview", "google/gemini-3-flash-preview"), "7": ("Kimi K2 0905", "moonshotai/kimi-k2-0905"), "8": ("Minimax M2.1", "minimax/minimax-m2.1"), } def select_model(): """Interactive model selection or use environment variable.""" print(f"\n{BOLD}Select a model:{RESET}") for key, (name, model_id) in AVAILABLE_MODELS.items(): print(f" {BLUE}{key}{RESET}. {name} ({DIM}{model_id}{RESET})") print(f" {BLUE}c{RESET}. Custom model (enter manually)") while True: try: choice = ( input(f"\n{BOLD}{BLUE}❯{RESET} Enter choice (1-8 or c): ") .strip() .lower() ) if choice in AVAILABLE_MODELS: name, model_id = AVAILABLE_MODELS[choice] print(f"{GREEN}⏺ Selected: {name}{RESET}") return model_id elif choice == "c": custom_model = input( f"{BOLD}{BLUE}❯{RESET} Enter model ID (e.g., openai/gpt-4): " ).strip() if custom_model: print(f"{GREEN}⏺ Selected custom model: {custom_model}{RESET}") return custom_model else: print(f"{RED}⏺ Invalid model ID{RESET}") else: print(f"{RED}⏺ Invalid choice. Please enter 1-8 or c{RESET}") except (KeyboardInterrupt, EOFError): print(f"\n{RED}⏺ Model selection cancelled{RESET}") exit(1) class CodingAssistant(dspy.Signature): """You are a concise coding assistant. Help the user with their coding task by using the available tools to read, write, edit files, search the codebase, and run commands.""" task: str = dspy.InputField(desc="The user's coding task or question") answer: str = dspy.OutputField( desc="Your response to the user after completing the task" ) affected_files: list[str] = dspy.OutputField( desc="List of files that were written or modified during the task" ) class RLMCodingConfig(PrecompiledConfig): max_iters: int = 50 lm: str = "openrouter/openai/gpt-5.2-codex" sub_lm: str = "openrouter/openai/gpt-5-mini" api_base: str = "https://openrouter.ai/api/v1" max_tokens: int = 32000 max_output_chars: int = 100000 verbose: bool = False track_usage: bool = True class RLMCodingProgram(PrecompiledProgram): config: RLMCodingConfig def __init__(self, config: RLMCodingConfig, **kwargs): super().__init__(config, **kwargs) self.config = config self.tools = { "read_file": read_file, "write_file": write_file, "edit_file": edit_file, "glob_files": glob_files, "grep_files": grep_files, "run_bash": run_bash, } self.lm = dspy.LM( model=self.config.lm, api_base=self.config.api_base, max_tokens=self.config.max_tokens, track_usage=self.config.track_usage, ) self.sub_lm = dspy.LM( model=self.config.sub_lm, api_base=self.config.api_base, max_tokens=self.config.max_tokens, track_usage=self.config.track_usage, ) agent = dspy.RLM( CodingAssistant, sub_lm=self.sub_lm, tools=self.tools, max_output_chars=self.config.max_output_chars, max_iterations=self.config.max_iters, verbose=self.config.verbose, ) agent.set_lm(self.lm) self.agent = agent print(f"Using model: {self.agent.get_lm().model}") print(f"Using sub-model: {self.agent.sub_lm.model}") def forward(self, task: str) -> str: assert task, "Task cannot be empty" return self.agent(task=task) def get_tools(self): return self.tools def set_tool(self, name: str, tool: callable): self.tools[name] = tool self.reload_repl_tools() def remove_tool(self, name: str): if name in self.tools: del self.tools[name] self.reload_repl_tools() def reload_repl_tools( self, ): # we need to create a new instance for tool mutations to be passed back into the REPL new_instance = dspy.RLM( CodingAssistant, sub_lm=self.sub_lm, tools=self.tools, max_output_chars=self.config.max_output_chars, max_iterations=self.config.max_iters, verbose=self.config.verbose, ) new_instance.set_lm(self.lm) self.agent = new_instance def main(): model = select_model() # Add openrouter/ prefix if not already present if not model.startswith("openrouter/"): model = f"openrouter/{model}" config = RLMCodingConfig() config.lm = model agent = RLMCodingProgram(config) print( f"{BOLD}NANOCODE DSPY{RESET} | {DIM}{agent.config.lm} | {os.getcwd()}{RESET}\n" ) # Conversation history for context history = [] # MCP servers registry mcp_servers = {} def register_mcp_server(name, server): tool_names = [] for tool in server.tools: tool_name = f"{name}_{tool.__name__}" agent.set_tool(tool_name, tool) tool_names.append(tool_name) return tool_names while True: try: print(separator()) user_input = input(f"{BOLD}{BLUE}❯{RESET} ").strip() print(separator()) tmp_paste_path = None if len(user_input) > LONG_PASTE_THRESHOLD: tmp_paste_path = save_long_paste(user_input) print( f"{YELLOW}⏺ Long paste detected ({len(user_input)} chars). Saved to {tmp_paste_path}{RESET}" ) user_input = ( f"The user pasted a long input ({len(user_input)} chars). " f"It has been saved to {tmp_paste_path}. " "Use read_file to view it. The file will be deleted after this response." ) if not user_input: continue if user_input in ("/q", "exit"): break if user_input == "/c": history = [] print(f"{GREEN}⏺ Cleared conversation{RESET}") continue if user_input == "/model": print(f"\n{BOLD}Current model: {agent.config.lm}{RESET}") print(f"\n{BOLD}Select a new model:{RESET}") for key, (name, model_id) in AVAILABLE_MODELS.items(): print(f" {BLUE}{key}{RESET}. {name} ({DIM}{model_id}{RESET})") print(f" {BLUE}c{RESET}. Custom model (enter manually)") print(f" {BLUE}k{RESET}. Keep current model") choice = input(f"\n{BOLD}{BLUE}❯{RESET} Enter choice: ").strip().lower() if choice == "k": print(f"{GREEN}⏺ Keeping current model: {agent.config.lm}{RESET}") continue elif choice in AVAILABLE_MODELS: name, model_id = AVAILABLE_MODELS[choice] new_model = ( model_id if model_id.startswith("openrouter/") else f"openrouter/{model_id}" ) config.lm = new_model agent = RLMCodingProgram(config) for server_name, info in mcp_servers.items(): info["tools"] = register_mcp_server(server_name, info["server"]) print(f"{GREEN}⏺ Switched to: {name} ({new_model}){RESET}") elif choice == "c": custom_model = input( f"{BOLD}{BLUE}❯{RESET} Enter model ID: " ).strip() if custom_model: new_model = ( custom_model if custom_model.startswith("openrouter/") else f"openrouter/{custom_model}" ) config.lm = new_model agent = RLMCodingProgram(config) for server_name, info in mcp_servers.items(): info["tools"] = register_mcp_server( server_name, info["server"] ) print(f"{GREEN}⏺ Switched to custom model: {new_model}{RESET}") else: print(f"{RED}⏺ Invalid model ID, keeping current model{RESET}") else: print(f"{RED}⏺ Invalid choice, keeping current model{RESET}") continue if user_input.startswith("/add-mcp"): parts = shlex.split(user_input) args = parts[1:] if not args: print( f"{YELLOW}⏺ Usage: /add-mcp [--auth |--oauth] [--headers ''] [--auto-auth|--no-auto-auth]{RESET}" ) continue name = None auth = None headers = None auto_auth = None positional = [] i = 0 while i < len(args): if args[i] in ("--name", "-n") and i + 1 < len(args): name = args[i + 1] i += 2 elif args[i].startswith("--auth="): auth = args[i].split("=", 1)[1] i += 1 elif args[i] == "--auth" and i + 1 < len(args): auth = args[i + 1] i += 2 elif args[i] == "--oauth": auth = "oauth" i += 1 elif args[i] == "--auto-auth": auto_auth = True i += 1 elif args[i] == "--no-auto-auth": auto_auth = False i += 1 elif args[i].startswith("--headers="): headers = json.loads(args[i].split("=", 1)[1]) i += 1 elif args[i] == "--headers" and i + 1 < len(args): headers = json.loads(args[i + 1]) i += 2 else: positional.append(args[i]) i += 1 server_cmd = None if positional: if name is None and len(positional) >= 2: name = positional[0] server_cmd = " ".join(positional[1:]) else: server_cmd = " ".join(positional) if not server_cmd: print( f"{YELLOW}⏺ Usage: /add-mcp [--auth |--oauth] [--headers ''] [--auto-auth|--no-auto-auth]{RESET}" ) continue if not name: name = re.sub(r"[^a-zA-Z0-9_]+", "_", server_cmd).strip("_") if not name: name = f"mcp_{len(mcp_servers) + 1}" if name in mcp_servers: for tool_name in mcp_servers[name]["tools"]: agent.remove_tool(tool_name) try: from mcp2py import load kwargs = {} if auth is not None: kwargs["auth"] = auth if headers: kwargs["headers"] = headers if auto_auth is not None: kwargs["auto_auth"] = auto_auth server = load(server_cmd, **kwargs) tool_names = register_mcp_server(name, server) mcp_servers[name] = {"server": server, "tools": tool_names} print( f"{GREEN}⏺ Added MCP server '{name}' with {len(tool_names)} tools{RESET}" ) print(f"{GREEN}⏺ Tools: {list(agent.tools.keys())}{RESET}") except Exception as err: print(f"{RED}⏺ Failed to add MCP server: {err}{RESET}") continue # Build context from history context = f"Working directory: {os.getcwd()}\n" if history: context += "\nPrevious conversation:\n" for h in history[-5:]: # Keep last 5 exchanges context += f"User: {h['user']}\nAssistant: {h['assistant']}\n\n" task = f"{context}\nCurrent task: {user_input}" print(f"\n{CYAN}⏺{RESET} Thinking...", flush=True) # Run the RLM agent try: result = agent(task=task) finally: if tmp_paste_path: try: os.remove(tmp_paste_path) except OSError: pass # Display the answer print(f"\n{CYAN}⏺{RESET} {render_markdown(result.answer)}") # Display usage print(f"\n{MAGENTA}⏺ Debug Prediction: {result}{RESET}") # Save to history history.append({"user": user_input, "assistant": result.answer}) print() except (KeyboardInterrupt, EOFError): break except Exception as err: import traceback traceback.print_exc() print(f"{RED}⏺ Error: {err}{RESET}") if __name__ == "__main__": agent = RLMCodingProgram(RLMCodingConfig()) agent.push_to_hub(MODAIC_REPO_PATH, commit_message="debug", branch="prod") #main()