change signature

This commit is contained in:
2026-01-23 04:17:49 -08:00
parent d49904cc44
commit d9821db8ef
3 changed files with 13 additions and 331 deletions

View File

@@ -1,12 +1,8 @@
import os
import re
import glob as globlib
import subprocess
import shlex
import json
import tempfile
from modaic import PrecompiledProgram, PrecompiledConfig
import dspy
import re
# --- Modaic ---
@@ -24,29 +20,6 @@ YELLOW = "\033[33m"
RED = "\033[31m"
MAGENTA = "\033[35m"
# --- Display utilities ---
LONG_PASTE_THRESHOLD = int(os.environ.get("NANOCODE_LONG_PASTE_THRESHOLD", "4000"))
def save_long_paste(text: str) -> str:
fd, path = tempfile.mkstemp(prefix="nanocode_paste_", suffix=".txt")
with os.fdopen(fd, "w") as handle:
handle.write(text)
return path
def separator():
"""Return a horizontal separator line that fits the terminal width."""
return f"{DIM}{'' * min(os.get_terminal_size().columns, 80)}{RESET}"
def render_markdown(text):
"""Convert basic markdown bold syntax to ANSI bold."""
return re.sub(r"\*\*(.+?)\*\*", f"{BOLD}\\1{RESET}", text)
# --- File operations ---
@@ -80,7 +53,8 @@ def write_file(path: str, content: str) -> str:
Returns:
'ok' on success
"""
print(f"{MAGENTA}Writing file: {path}{RESET}")
print(f"{MAGENTA}Creating file: {path}{RESET}")
with open(path, "w") as f:
f.write(content)
return "ok"
@@ -99,6 +73,7 @@ def edit_file(path: str, old: str, new: str, replace_all: bool = False) -> str:
'ok' on success, error message on failure
"""
print(f"{MAGENTA}⏺ Editing file: {path}{RESET}")
text = open(path).read()
if old not in text:
return "error: old_string not found"
@@ -121,7 +96,8 @@ def glob_files(pattern: str, path: str = ".") -> str:
Returns:
Newline-separated list of matching files
"""
print(f"{MAGENTA}Finding files with pattern: {pattern}{RESET}")
print(f"{MAGENTA}Glob: {pattern}{RESET}")
full_pattern = (path + "/" + pattern).replace("//", "/")
files = globlib.glob(full_pattern, recursive=True)
files = sorted(
@@ -142,7 +118,7 @@ def grep_files(pattern: str, path: str = ".") -> str:
Returns:
Matching lines in format 'filepath:line_num:content'
"""
print(f"{MAGENTA}Searching for pattern: {pattern}{RESET}")
print(f"{MAGENTA}Grep: {pattern}{RESET}")
regex = re.compile(pattern)
hits = []
for filepath in globlib.glob(path + "/**", recursive=True):
@@ -167,7 +143,7 @@ def run_bash(cmd: str) -> str:
Returns:
Command output (stdout and stderr combined)
"""
print(f"{MAGENTA}Running command: {cmd}{RESET}")
print(f"{MAGENTA}Bash: {cmd}{RESET}")
proc = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
@@ -186,56 +162,6 @@ def run_bash(cmd: str) -> str:
output_lines.append("\n(timed out after 30s)")
return "".join(output_lines).strip() or "(empty output)"
# --- Model selection ---
AVAILABLE_MODELS = {
"1": ("GPT-5.2 Codex", "openai/gpt-5.2-codex"),
"2": ("GPT-5.2", "openai/gpt-5.2"),
"3": ("Claude Opus 4.5", "anthropic/claude-opus-4.5"),
"4": ("Claude Opus 4", "anthropic/claude-opus-4"),
"5": ("Qwen 3 Coder", "qwen/qwen3-coder"),
"6": ("Gemini 3 Flash Preview", "google/gemini-3-flash-preview"),
"7": ("Kimi K2 0905", "moonshotai/kimi-k2-0905"),
"8": ("Minimax M2.1", "minimax/minimax-m2.1"),
}
def select_model():
"""Interactive model selection or use environment variable."""
print(f"\n{BOLD}Select a model:{RESET}")
for key, (name, model_id) in AVAILABLE_MODELS.items():
print(f" {BLUE}{key}{RESET}. {name} ({DIM}{model_id}{RESET})")
print(f" {BLUE}c{RESET}. Custom model (enter manually)")
while True:
try:
choice = (
input(f"\n{BOLD}{BLUE}{RESET} Enter choice (1-8 or c): ")
.strip()
.lower()
)
if choice in AVAILABLE_MODELS:
name, model_id = AVAILABLE_MODELS[choice]
print(f"{GREEN}⏺ Selected: {name}{RESET}")
return model_id
elif choice == "c":
custom_model = input(
f"{BOLD}{BLUE}{RESET} Enter model ID (e.g., openai/gpt-4): "
).strip()
if custom_model:
print(f"{GREEN}⏺ Selected custom model: {custom_model}{RESET}")
return custom_model
else:
print(f"{RED}⏺ Invalid model ID{RESET}")
else:
print(f"{RED}⏺ Invalid choice. Please enter 1-8 or c{RESET}")
except (KeyboardInterrupt, EOFError):
print(f"\n{RED}⏺ Model selection cancelled{RESET}")
exit(1)
class CodingAssistant(dspy.Signature):
"""You are a concise coding assistant with access to sub agents."""
@@ -243,16 +169,13 @@ class CodingAssistant(dspy.Signature):
answer: str = dspy.OutputField(
desc="Your response to the user after completing the task"
)
affected_files: list[str] = dspy.OutputField(
desc="List of files that were written or modified during the task"
)
class RLMCodingConfig(PrecompiledConfig):
max_iters: int = 50
lm: str = "openrouter/openai/gpt-5.2-codex"
sub_lm: str = "openrouter/openai/gpt-5-mini"
api_base: str = "https://openrouter.ai/api/v1"
max_tokens: int = 32000
max_tokens: int = 50000
max_output_chars: int = 100000
verbose: bool = False
track_usage: bool = True
@@ -328,243 +251,6 @@ class RLMCodingProgram(PrecompiledProgram):
new_instance.set_lm(self.lm)
self.agent = new_instance
def main():
model = select_model()
# Add openrouter/ prefix if not already present
if not model.startswith("openrouter/"):
model = f"openrouter/{model}"
config = RLMCodingConfig()
config.lm = model
agent = RLMCodingProgram(config)
print(
f"{BOLD}NANOCODE DSPY{RESET} | {DIM}{agent.config.lm} | {os.getcwd()}{RESET}\n"
)
# Conversation history for context
history = []
# MCP servers registry
mcp_servers = {}
def register_mcp_server(name, server):
tool_names = []
for tool in server.tools:
tool_name = f"{name}_{tool.__name__}"
agent.set_tool(tool_name, tool)
tool_names.append(tool_name)
return tool_names
while True:
try:
print(separator())
user_input = input(f"{BOLD}{BLUE}{RESET} ").strip()
print(separator())
tmp_paste_path = None
if len(user_input) > LONG_PASTE_THRESHOLD:
tmp_paste_path = save_long_paste(user_input)
print(
f"{YELLOW}⏺ Long paste detected ({len(user_input)} chars). Saved to {tmp_paste_path}{RESET}"
)
user_input = (
f"The user pasted a long input ({len(user_input)} chars). "
f"It has been saved to {tmp_paste_path}. "
"Use read_file to view it. The file will be deleted after this response."
)
if not user_input:
continue
if user_input in ("/q", "exit"):
break
if user_input == "/c":
history = []
print(f"{GREEN}⏺ Cleared conversation{RESET}")
continue
if user_input == "/model":
print(f"\n{BOLD}Current model: {agent.config.lm}{RESET}")
print(f"\n{BOLD}Select a new model:{RESET}")
for key, (name, model_id) in AVAILABLE_MODELS.items():
print(f" {BLUE}{key}{RESET}. {name} ({DIM}{model_id}{RESET})")
print(f" {BLUE}c{RESET}. Custom model (enter manually)")
print(f" {BLUE}k{RESET}. Keep current model")
choice = input(f"\n{BOLD}{BLUE}{RESET} Enter choice: ").strip().lower()
if choice == "k":
print(f"{GREEN}⏺ Keeping current model: {agent.config.lm}{RESET}")
continue
elif choice in AVAILABLE_MODELS:
name, model_id = AVAILABLE_MODELS[choice]
new_model = (
model_id
if model_id.startswith("openrouter/")
else f"openrouter/{model_id}"
)
config.lm = new_model
agent = RLMCodingProgram(config)
for server_name, info in mcp_servers.items():
info["tools"] = register_mcp_server(server_name, info["server"])
print(f"{GREEN}⏺ Switched to: {name} ({new_model}){RESET}")
elif choice == "c":
custom_model = input(
f"{BOLD}{BLUE}{RESET} Enter model ID: "
).strip()
if custom_model:
new_model = (
custom_model
if custom_model.startswith("openrouter/")
else f"openrouter/{custom_model}"
)
config.lm = new_model
agent = RLMCodingProgram(config)
for server_name, info in mcp_servers.items():
info["tools"] = register_mcp_server(
server_name, info["server"]
)
print(f"{GREEN}⏺ Switched to custom model: {new_model}{RESET}")
else:
print(f"{RED}⏺ Invalid model ID, keeping current model{RESET}")
else:
print(f"{RED}⏺ Invalid choice, keeping current model{RESET}")
continue
if user_input.startswith("/add-mcp"):
parts = shlex.split(user_input)
args = parts[1:]
if not args:
print(
f"{YELLOW}⏺ Usage: /add-mcp <name> <server> [--auth <auth>|--oauth] [--headers '<json>'] [--auto-auth|--no-auto-auth]{RESET}"
)
continue
name = None
auth = None
headers = None
auto_auth = None
positional = []
i = 0
while i < len(args):
if args[i] in ("--name", "-n") and i + 1 < len(args):
name = args[i + 1]
i += 2
elif args[i].startswith("--auth="):
auth = args[i].split("=", 1)[1]
i += 1
elif args[i] == "--auth" and i + 1 < len(args):
auth = args[i + 1]
i += 2
elif args[i] == "--oauth":
auth = "oauth"
i += 1
elif args[i] == "--auto-auth":
auto_auth = True
i += 1
elif args[i] == "--no-auto-auth":
auto_auth = False
i += 1
elif args[i].startswith("--headers="):
headers = json.loads(args[i].split("=", 1)[1])
i += 1
elif args[i] == "--headers" and i + 1 < len(args):
headers = json.loads(args[i + 1])
i += 2
else:
positional.append(args[i])
i += 1
server_cmd = None
if positional:
if name is None and len(positional) >= 2:
name = positional[0]
server_cmd = " ".join(positional[1:])
else:
server_cmd = " ".join(positional)
if not server_cmd:
print(
f"{YELLOW}⏺ Usage: /add-mcp <name> <server> [--auth <auth>|--oauth] [--headers '<json>'] [--auto-auth|--no-auto-auth]{RESET}"
)
continue
if not name:
name = re.sub(r"[^a-zA-Z0-9_]+", "_", server_cmd).strip("_")
if not name:
name = f"mcp_{len(mcp_servers) + 1}"
if name in mcp_servers:
for tool_name in mcp_servers[name]["tools"]:
agent.remove_tool(tool_name)
try:
from mcp2py import load
kwargs = {}
if auth is not None:
kwargs["auth"] = auth
if headers:
kwargs["headers"] = headers
if auto_auth is not None:
kwargs["auto_auth"] = auto_auth
server = load(server_cmd, **kwargs)
tool_names = register_mcp_server(name, server)
mcp_servers[name] = {"server": server, "tools": tool_names}
print(
f"{GREEN}⏺ Added MCP server '{name}' with {len(tool_names)} tools{RESET}"
)
print(f"{GREEN}⏺ Tools: {list(agent.tools.keys())}{RESET}")
except Exception as err:
print(f"{RED}⏺ Failed to add MCP server: {err}{RESET}")
continue
# Build context from history
context = f"Working directory: {os.getcwd()}\n"
if history:
context += "\nPrevious conversation:\n"
for h in history[-5:]: # Keep last 5 exchanges
context += f"User: {h['user']}\nAssistant: {h['assistant']}\n\n"
task = f"{context}\nCurrent task: {user_input}"
print(f"\n{CYAN}{RESET} Thinking...", flush=True)
# Run the RLM agent
try:
result = agent(task=task)
finally:
if tmp_paste_path:
try:
os.remove(tmp_paste_path)
except OSError:
pass
# Display the answer
print(f"\n{CYAN}{RESET} {render_markdown(result.answer)}")
# Display usage
print(f"\n{MAGENTA}⏺ Debug Prediction: {result}{RESET}")
# Save to history
history.append({"user": user_input, "assistant": result.answer})
print()
except (KeyboardInterrupt, EOFError):
break
except Exception as err:
import traceback
traceback.print_exc()
print(f"{RED}⏺ Error: {err}{RESET}")
if __name__ == "__main__":
agent = RLMCodingProgram(RLMCodingConfig())
agent.push_to_hub(MODAIC_REPO_PATH, commit_message="change signature", branch="prod")
#main()