215 lines
6.7 KiB
Python
215 lines
6.7 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
from dataclasses import dataclass
|
|
from fnmatch import fnmatch
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
def _default_root() -> Path:
|
|
# Keep memory local to this project folder.
|
|
return Path(__file__).resolve().parent / ".memory"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MemoryFS:
|
|
"""
|
|
A tiny, sandboxed "memory filesystem" for agents.
|
|
|
|
This intentionally mirrors the *shape* of common filesystem MCP servers:
|
|
list/read/write/move/search/info/tree — but is implemented locally as Python tools.
|
|
"""
|
|
|
|
root: Path = _default_root()
|
|
|
|
def _ensure_root(self) -> None:
|
|
self.root.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _resolve(self, rel_path: str) -> Path:
|
|
"""
|
|
Resolve a user-provided path against the memory root, preventing traversal.
|
|
|
|
The path is interpreted as relative to `root`. Leading slashes are ignored.
|
|
"""
|
|
self._ensure_root()
|
|
rel = rel_path.lstrip("/").strip()
|
|
target = (self.root / rel).resolve()
|
|
root = self.root.resolve()
|
|
if target == root:
|
|
return target
|
|
if root not in target.parents:
|
|
raise ValueError("Path escapes memory root; refusing.")
|
|
return target
|
|
|
|
|
|
_MEM = MemoryFS()
|
|
|
|
|
|
def mem_list_directory(path: str = "") -> str:
|
|
"""List directory contents under memory root. Returns lines like: [DIR] foo, [FILE] bar.txt."""
|
|
p = _MEM._resolve(path)
|
|
if not p.exists():
|
|
return f"Not found: {path}"
|
|
if not p.is_dir():
|
|
return f"Not a directory: {path}"
|
|
|
|
entries = []
|
|
for child in sorted(p.iterdir(), key=lambda c: (not c.is_dir(), c.name.lower())):
|
|
tag = "[DIR]" if child.is_dir() else "[FILE]"
|
|
entries.append(f"{tag} {child.name}")
|
|
return "\n".join(entries) if entries else "(empty)"
|
|
|
|
|
|
def mem_create_directory(path: str) -> str:
|
|
"""Create a directory under memory root (parents created)."""
|
|
p = _MEM._resolve(path)
|
|
p.mkdir(parents=True, exist_ok=True)
|
|
return f"OK: created {path}"
|
|
|
|
|
|
def mem_read_text_file(
|
|
path: str, head: int | None = None, tail: int | None = None
|
|
) -> str:
|
|
"""Read a UTF-8 text file under memory root. Optionally return first `head` or last `tail` lines."""
|
|
if head is not None and tail is not None:
|
|
return "Error: cannot specify both head and tail."
|
|
|
|
p = _MEM._resolve(path)
|
|
if not p.exists():
|
|
return f"Not found: {path}"
|
|
if not p.is_file():
|
|
return f"Not a file: {path}"
|
|
|
|
text = p.read_text(encoding="utf-8", errors="replace")
|
|
lines = text.splitlines()
|
|
if head is not None:
|
|
return "\n".join(lines[: max(head, 0)])
|
|
if tail is not None:
|
|
return "\n".join(lines[-max(tail, 0) :])
|
|
return text
|
|
|
|
|
|
def mem_write_file(path: str, content: str) -> str:
|
|
"""Write (overwrite) a UTF-8 text file under memory root."""
|
|
p = _MEM._resolve(path)
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
p.write_text(content, encoding="utf-8")
|
|
return f"OK: wrote {path} ({len(content)} chars)"
|
|
|
|
|
|
def mem_append_file(path: str, content: str) -> str:
|
|
"""Append UTF-8 text to a file under memory root (creates if missing)."""
|
|
p = _MEM._resolve(path)
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
with p.open("a", encoding="utf-8") as f:
|
|
f.write(content)
|
|
return f"OK: appended {path} ({len(content)} chars)"
|
|
|
|
|
|
def mem_move_file(source: str, destination: str) -> str:
|
|
"""Move/rename a file or directory under memory root. Fails if destination exists."""
|
|
src = _MEM._resolve(source)
|
|
dst = _MEM._resolve(destination)
|
|
if not src.exists():
|
|
return f"Not found: {source}"
|
|
if dst.exists():
|
|
return f"Error: destination exists: {destination}"
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
os.replace(src, dst)
|
|
return f"OK: moved {source} -> {destination}"
|
|
|
|
|
|
def mem_get_file_info(path: str) -> str:
|
|
"""Return basic metadata (json) for a path under memory root."""
|
|
p = _MEM._resolve(path)
|
|
if not p.exists():
|
|
return json.dumps({"path": path, "exists": False})
|
|
st = p.stat()
|
|
info: dict[str, Any] = {
|
|
"path": path,
|
|
"exists": True,
|
|
"type": "directory" if p.is_dir() else "file",
|
|
"size": st.st_size,
|
|
"mtime": st.st_mtime,
|
|
}
|
|
return json.dumps(info, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def mem_search_files(
|
|
path: str = "", pattern: str = "*", contains: str | None = None, limit: int = 50
|
|
) -> str:
|
|
"""
|
|
Recursively search for files under memory root.
|
|
|
|
- `pattern`: glob-style match on relative path (e.g. "*.md", "profile/*")
|
|
- `contains`: if set, only include text files that contain this substring
|
|
"""
|
|
base = _MEM._resolve(path)
|
|
if not base.exists():
|
|
return f"Not found: {path}"
|
|
if not base.is_dir():
|
|
return f"Not a directory: {path}"
|
|
|
|
results: list[str] = []
|
|
root = _MEM.root.resolve()
|
|
for p in base.rglob("*"):
|
|
if len(results) >= max(limit, 0):
|
|
break
|
|
if not p.is_file():
|
|
continue
|
|
rel = str(p.resolve().relative_to(root)).replace(os.sep, "/")
|
|
if not fnmatch(rel, pattern):
|
|
continue
|
|
if contains is not None:
|
|
try:
|
|
text = p.read_text(encoding="utf-8", errors="ignore")
|
|
except Exception:
|
|
continue
|
|
if contains not in text:
|
|
continue
|
|
results.append(rel)
|
|
return "\n".join(results) if results else "(no matches)"
|
|
|
|
|
|
def mem_directory_tree(path: str = "", max_depth: int = 6) -> str:
|
|
"""Return a JSON directory tree rooted at `path`."""
|
|
base = _MEM._resolve(path)
|
|
if not base.exists():
|
|
return json.dumps({"error": "not_found", "path": path})
|
|
if not base.is_dir():
|
|
return json.dumps({"error": "not_directory", "path": path})
|
|
|
|
root = _MEM.root.resolve()
|
|
|
|
def node(p: Path, depth: int) -> dict[str, Any]:
|
|
rel = (
|
|
str(p.resolve().relative_to(root)).replace(os.sep, "/")
|
|
if p != _MEM.root
|
|
else ""
|
|
)
|
|
if p.is_dir():
|
|
if depth >= max_depth:
|
|
return {
|
|
"name": p.name or "/",
|
|
"path": rel,
|
|
"type": "directory",
|
|
"children": ["…"],
|
|
}
|
|
children = [
|
|
node(c, depth + 1)
|
|
for c in sorted(
|
|
p.iterdir(), key=lambda c: (not c.is_dir(), c.name.lower())
|
|
)
|
|
]
|
|
return {
|
|
"name": p.name or "/",
|
|
"path": rel,
|
|
"type": "directory",
|
|
"children": children,
|
|
}
|
|
return {"name": p.name, "path": rel, "type": "file"}
|
|
|
|
return json.dumps(node(base, 0), indent=2, ensure_ascii=False)
|