change signature

This commit is contained in:
2026-01-23 04:19:55 -08:00
parent 96d0a034cd
commit 239ed8cf39
3 changed files with 23 additions and 369 deletions

View File

@@ -1,13 +1,9 @@
import os
import re
import glob as globlib
import subprocess
import shlex
import json
import tempfile
from modaic import PrecompiledProgram, PrecompiledConfig
import dspy
from dspy.utils.callback import BaseCallback
import re
import subprocess
# --- Modaic ---
@@ -25,29 +21,6 @@ YELLOW = "\033[33m"
RED = "\033[31m"
MAGENTA = "\033[35m"
# --- Display utilities ---
LONG_PASTE_THRESHOLD = int(os.environ.get("NANOCODE_LONG_PASTE_THRESHOLD", "4000"))
def save_long_paste(text: str) -> str:
fd, path = tempfile.mkstemp(prefix="nanocode_paste_", suffix=".txt")
with os.fdopen(fd, "w") as handle:
handle.write(text)
return path
def separator():
"""Return a horizontal separator line that fits the terminal width."""
return f"{DIM}{'' * min(os.get_terminal_size().columns, 80)}{RESET}"
def render_markdown(text):
"""Convert basic markdown bold syntax to ANSI bold."""
return re.sub(r"\*\*(.+?)\*\*", f"{BOLD}\\1{RESET}", text)
# --- File operations ---
@@ -62,6 +35,8 @@ def read_file(path: str, offset: int = 0, limit: int = None) -> str:
Returns:
File contents with line numbers
"""
print(f"{MAGENTA}⏺ Reading file: {path}{RESET}")
lines = open(path).readlines()
if limit is None:
limit = len(lines)
@@ -79,6 +54,8 @@ def write_file(path: str, content: str) -> str:
Returns:
'ok' on success
"""
print(f"{MAGENTA}⏺ Creating file: {path}{RESET}")
with open(path, "w") as f:
f.write(content)
return "ok"
@@ -96,6 +73,8 @@ def edit_file(path: str, old: str, new: str, replace_all: bool = False) -> str:
Returns:
'ok' on success, error message on failure
"""
print(f"{MAGENTA}⏺ Editing file: {path}{RESET}")
text = open(path).read()
if old not in text:
return "error: old_string not found"
@@ -118,6 +97,8 @@ def glob_files(pattern: str, path: str = ".") -> str:
Returns:
Newline-separated list of matching files
"""
print(f"{MAGENTA}⏺ Glob: {pattern}{RESET}")
full_pattern = (path + "/" + pattern).replace("//", "/")
files = globlib.glob(full_pattern, recursive=True)
files = sorted(
@@ -138,6 +119,7 @@ def grep_files(pattern: str, path: str = ".") -> str:
Returns:
Matching lines in format 'filepath:line_num:content'
"""
print(f"{MAGENTA}⏺ Grep: {pattern}{RESET}")
regex = re.compile(pattern)
hits = []
for filepath in globlib.glob(path + "/**", recursive=True):
@@ -162,6 +144,7 @@ def run_bash(cmd: str) -> str:
Returns:
Command output (stdout and stderr combined)
"""
print(f"{MAGENTA}⏺ Bash: {cmd}{RESET}")
proc = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
@@ -180,108 +163,20 @@ def run_bash(cmd: str) -> str:
output_lines.append("\n(timed out after 30s)")
return "".join(output_lines).strip() or "(empty output)"
# --- Model selection ---
AVAILABLE_MODELS = {
"1": ("GPT-5.2 Codex", "openai/gpt-5.2-codex"),
"2": ("GPT-5.2", "openai/gpt-5.2"),
"3": ("Claude Opus 4.5", "anthropic/claude-opus-4.5"),
"4": ("Claude Opus 4", "anthropic/claude-opus-4"),
"5": ("Qwen 3 Coder", "qwen/qwen3-coder"),
"6": ("Gemini 3 Flash Preview", "google/gemini-3-flash-preview"),
"7": ("Kimi K2 0905", "moonshotai/kimi-k2-0905"),
"8": ("Minimax M2.1", "minimax/minimax-m2.1"),
}
def select_model():
"""Interactive model selection or use environment variable."""
model_env = os.getenv("MODEL")
if model_env:
print(f"{GREEN}⏺ Using model from environment: {model_env}{RESET}")
return model_env
print(f"\n{BOLD}Select a model:{RESET}")
for key, (name, model_id) in AVAILABLE_MODELS.items():
print(f" {BLUE}{key}{RESET}. {name} ({DIM}{model_id}{RESET})")
print(f" {BLUE}c{RESET}. Custom model (enter manually)")
while True:
try:
choice = (
input(f"\n{BOLD}{BLUE}{RESET} Enter choice (1-8 or c): ")
.strip()
.lower()
)
if choice in AVAILABLE_MODELS:
name, model_id = AVAILABLE_MODELS[choice]
print(f"{GREEN}⏺ Selected: {name}{RESET}")
return model_id
elif choice == "c":
custom_model = input(
f"{BOLD}{BLUE}{RESET} Enter model ID (e.g., openai/gpt-4): "
).strip()
if custom_model:
print(f"{GREEN}⏺ Selected custom model: {custom_model}{RESET}")
return custom_model
else:
print(f"{RED}⏺ Invalid model ID{RESET}")
else:
print(f"{RED}⏺ Invalid choice. Please enter 1-8 or c{RESET}")
except (KeyboardInterrupt, EOFError):
print(f"\n{RED}⏺ Model selection cancelled{RESET}")
exit(1)
class CodingAssistant(dspy.Signature):
"""You are a concise coding assistant. Help the user with their coding task by using the available tools to read, write, edit files, search the codebase, and run commands."""
"""You are a concise coding assistant with access to sub agents."""
task: str = dspy.InputField(desc="The user's coding task or question")
answer: str = dspy.OutputField(
desc="Your response to the user after completing the task"
)
affected_files: list[str] = dspy.OutputField(
desc="List of files that were written or modified during the task"
)
class ToolLoggingCallback(BaseCallback):
"""Callback that logs tool calls as they happen."""
def on_tool_start(self, call_id, instance, inputs):
"""Log when a tool starts executing."""
tool_name = instance.name if hasattr(instance, "name") else str(instance)
# Format args nicely
args_str = ", ".join(f"{k}={repr(v)[:50]}" for k, v in inputs.items())
print(f" {MAGENTA}{tool_name}({args_str}){RESET}", flush=True)
def on_tool_end(self, call_id, outputs, exception):
"""Log when a tool finishes executing."""
if exception:
print(f" {RED}Error: {exception}{RESET}", flush=True)
def on_module_end(self, call_id, outputs, exception):
"""Log when the finish tool is called (ReAct completion)."""
# Check if this is a ReAct prediction with tool_calls
if outputs and "tool_calls" in outputs:
for call in outputs["tool_calls"]:
args_str = ", ".join(
f"{k}={repr(v)[:50]}" for k, v in call.args.items()
)
if call.name == "finish":
print(f" {GREEN}⏺ finish{RESET}", flush=True)
else:
print(f" {MAGENTA}{call.name}({args_str}){RESET}", flush=True)
class RLMCodingConfig(PrecompiledConfig):
max_iters: int = 50
lm: str = "openrouter/openai/gpt-5.2-codex"
sub_lm: str = "openrouter/openai/gpt-5-mini"
api_base: str = "https://openrouter.ai/api/v1"
max_tokens: int = 32000
max_tokens: int = 50000
max_output_chars: int = 100000
verbose: bool = False
track_usage: bool = True
@@ -291,8 +186,9 @@ class RLMCodingProgram(PrecompiledProgram):
config: RLMCodingConfig
def __init__(self, config: RLMCodingConfig, **kwargs):
self.config = config
super().__init__(config, **kwargs)
self.config = config
self.tools = {
"read_file": read_file,
"write_file": write_file,
@@ -322,9 +218,8 @@ class RLMCodingProgram(PrecompiledProgram):
max_iterations=self.config.max_iters,
verbose=self.config.verbose,
)
agent.set_lm(self.lm)
print(f"Using model: {self.lm.model}")
print(f"Using sub-model: {self.sub_lm.model}")
self.agent = agent
def forward(self, task: str) -> str:
@@ -357,243 +252,6 @@ class RLMCodingProgram(PrecompiledProgram):
new_instance.set_lm(self.lm)
self.agent = new_instance
def main():
model = select_model()
# Add openrouter/ prefix if not already present
if not model.startswith("openrouter/"):
model = f"openrouter/{model}"
config = RLMCodingConfig()
config.lm = model
agent = RLMCodingProgram(config)
print(
f"{BOLD}NANOCODE DSPY{RESET} | {DIM}{agent.config.lm} | {os.getcwd()}{RESET}\n"
)
# Conversation history for context
history = []
# MCP servers registry
mcp_servers = {}
def register_mcp_server(name, server):
tool_names = []
for tool in server.tools:
tool_name = f"{name}_{tool.__name__}"
agent.set_tool(tool_name, tool)
tool_names.append(tool_name)
return tool_names
while True:
try:
print(separator())
user_input = input(f"{BOLD}{BLUE}{RESET} ").strip()
print(separator())
tmp_paste_path = None
if len(user_input) > LONG_PASTE_THRESHOLD:
tmp_paste_path = save_long_paste(user_input)
print(
f"{YELLOW}⏺ Long paste detected ({len(user_input)} chars). Saved to {tmp_paste_path}{RESET}"
)
user_input = (
f"The user pasted a long input ({len(user_input)} chars). "
f"It has been saved to {tmp_paste_path}. "
"Use read_file to view it. The file will be deleted after this response."
)
if not user_input:
continue
if user_input in ("/q", "exit"):
break
if user_input == "/c":
history = []
print(f"{GREEN}⏺ Cleared conversation{RESET}")
continue
if user_input == "/model":
print(f"\n{BOLD}Current model: {agent.config.lm}{RESET}")
print(f"\n{BOLD}Select a new model:{RESET}")
for key, (name, model_id) in AVAILABLE_MODELS.items():
print(f" {BLUE}{key}{RESET}. {name} ({DIM}{model_id}{RESET})")
print(f" {BLUE}c{RESET}. Custom model (enter manually)")
print(f" {BLUE}k{RESET}. Keep current model")
choice = input(f"\n{BOLD}{BLUE}{RESET} Enter choice: ").strip().lower()
if choice == "k":
print(f"{GREEN}⏺ Keeping current model: {agent.config.lm}{RESET}")
continue
elif choice in AVAILABLE_MODELS:
name, model_id = AVAILABLE_MODELS[choice]
new_model = (
model_id
if model_id.startswith("openrouter/")
else f"openrouter/{model_id}"
)
config.lm = new_model
agent = RLMCodingProgram(config)
for server_name, info in mcp_servers.items():
info["tools"] = register_mcp_server(server_name, info["server"])
print(f"{GREEN}⏺ Switched to: {name} ({new_model}){RESET}")
elif choice == "c":
custom_model = input(
f"{BOLD}{BLUE}{RESET} Enter model ID: "
).strip()
if custom_model:
new_model = (
custom_model
if custom_model.startswith("openrouter/")
else f"openrouter/{custom_model}"
)
config.lm = new_model
agent = RLMCodingProgram(config)
for server_name, info in mcp_servers.items():
info["tools"] = register_mcp_server(
server_name, info["server"]
)
print(f"{GREEN}⏺ Switched to custom model: {new_model}{RESET}")
else:
print(f"{RED}⏺ Invalid model ID, keeping current model{RESET}")
else:
print(f"{RED}⏺ Invalid choice, keeping current model{RESET}")
continue
if user_input.startswith("/add-mcp"):
parts = shlex.split(user_input)
args = parts[1:]
if not args:
print(
f"{YELLOW}⏺ Usage: /add-mcp <name> <server> [--auth <auth>|--oauth] [--headers '<json>'] [--auto-auth|--no-auto-auth]{RESET}"
)
continue
name = None
auth = None
headers = None
auto_auth = None
positional = []
i = 0
while i < len(args):
if args[i] in ("--name", "-n") and i + 1 < len(args):
name = args[i + 1]
i += 2
elif args[i].startswith("--auth="):
auth = args[i].split("=", 1)[1]
i += 1
elif args[i] == "--auth" and i + 1 < len(args):
auth = args[i + 1]
i += 2
elif args[i] == "--oauth":
auth = "oauth"
i += 1
elif args[i] == "--auto-auth":
auto_auth = True
i += 1
elif args[i] == "--no-auto-auth":
auto_auth = False
i += 1
elif args[i].startswith("--headers="):
headers = json.loads(args[i].split("=", 1)[1])
i += 1
elif args[i] == "--headers" and i + 1 < len(args):
headers = json.loads(args[i + 1])
i += 2
else:
positional.append(args[i])
i += 1
server_cmd = None
if positional:
if name is None and len(positional) >= 2:
name = positional[0]
server_cmd = " ".join(positional[1:])
else:
server_cmd = " ".join(positional)
if not server_cmd:
print(
f"{YELLOW}⏺ Usage: /add-mcp <name> <server> [--auth <auth>|--oauth] [--headers '<json>'] [--auto-auth|--no-auto-auth]{RESET}"
)
continue
if not name:
name = re.sub(r"[^a-zA-Z0-9_]+", "_", server_cmd).strip("_")
if not name:
name = f"mcp_{len(mcp_servers) + 1}"
if name in mcp_servers:
for tool_name in mcp_servers[name]["tools"]:
agent.remove_tool(tool_name)
try:
from mcp2py import load
kwargs = {}
if auth is not None:
kwargs["auth"] = auth
if headers:
kwargs["headers"] = headers
if auto_auth is not None:
kwargs["auto_auth"] = auto_auth
server = load(server_cmd, **kwargs)
tool_names = register_mcp_server(name, server)
mcp_servers[name] = {"server": server, "tools": tool_names}
print(
f"{GREEN}⏺ Added MCP server '{name}' with {len(tool_names)} tools{RESET}"
)
print(f"{GREEN}⏺ Tools: {list(agent.tools.keys())}{RESET}")
except Exception as err:
print(f"{RED}⏺ Failed to add MCP server: {err}{RESET}")
continue
# Build context from history
context = f"Working directory: {os.getcwd()}\n"
if history:
context += "\nPrevious conversation:\n"
for h in history[-5:]: # Keep last 5 exchanges
context += f"User: {h['user']}\nAssistant: {h['assistant']}\n\n"
task = f"{context}\nCurrent task: {user_input}"
print(f"\n{CYAN}{RESET} Thinking...", flush=True)
# Run the RLM agent
try:
result = agent(task=task)
finally:
if tmp_paste_path:
try:
os.remove(tmp_paste_path)
except OSError:
pass
# Display the answer
print(f"\n{CYAN}{RESET} {render_markdown(result.answer)}")
# Display usage
print(f"\n{MAGENTA}⏺ Debug Prediction: {result}{RESET}")
# Save to history
history.append({"user": user_input, "assistant": result.answer})
print()
except (KeyboardInterrupt, EOFError):
break
except Exception as err:
import traceback
traceback.print_exc()
print(f"{RED}⏺ Error: {err}{RESET}")
if __name__ == "__main__":
agent = RLMCodingProgram(RLMCodingConfig())
agent.push_to_hub(MODAIC_REPO_PATH, commit_message="debug", tag="v0.0.5")
#main()
agent.push_to_hub(MODAIC_REPO_PATH, commit_message="change signature")