(no commit message)
This commit is contained in:
214
memory_fs.py
Normal file
214
memory_fs.py
Normal file
@@ -0,0 +1,214 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from fnmatch import fnmatch
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
def _default_root() -> Path:
|
||||
# Keep memory local to this project folder.
|
||||
return Path(__file__).resolve().parent / ".memory"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MemoryFS:
|
||||
"""
|
||||
A tiny, sandboxed "memory filesystem" for agents.
|
||||
|
||||
This intentionally mirrors the *shape* of common filesystem MCP servers:
|
||||
list/read/write/move/search/info/tree — but is implemented locally as Python tools.
|
||||
"""
|
||||
|
||||
root: Path = _default_root()
|
||||
|
||||
def _ensure_root(self) -> None:
|
||||
self.root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def _resolve(self, rel_path: str) -> Path:
|
||||
"""
|
||||
Resolve a user-provided path against the memory root, preventing traversal.
|
||||
|
||||
The path is interpreted as relative to `root`. Leading slashes are ignored.
|
||||
"""
|
||||
self._ensure_root()
|
||||
rel = rel_path.lstrip("/").strip()
|
||||
target = (self.root / rel).resolve()
|
||||
root = self.root.resolve()
|
||||
if target == root:
|
||||
return target
|
||||
if root not in target.parents:
|
||||
raise ValueError("Path escapes memory root; refusing.")
|
||||
return target
|
||||
|
||||
|
||||
_MEM = MemoryFS()
|
||||
|
||||
|
||||
def mem_list_directory(path: str = "") -> str:
|
||||
"""List directory contents under memory root. Returns lines like: [DIR] foo, [FILE] bar.txt."""
|
||||
p = _MEM._resolve(path)
|
||||
if not p.exists():
|
||||
return f"Not found: {path}"
|
||||
if not p.is_dir():
|
||||
return f"Not a directory: {path}"
|
||||
|
||||
entries = []
|
||||
for child in sorted(p.iterdir(), key=lambda c: (not c.is_dir(), c.name.lower())):
|
||||
tag = "[DIR]" if child.is_dir() else "[FILE]"
|
||||
entries.append(f"{tag} {child.name}")
|
||||
return "\n".join(entries) if entries else "(empty)"
|
||||
|
||||
|
||||
def mem_create_directory(path: str) -> str:
|
||||
"""Create a directory under memory root (parents created)."""
|
||||
p = _MEM._resolve(path)
|
||||
p.mkdir(parents=True, exist_ok=True)
|
||||
return f"OK: created {path}"
|
||||
|
||||
|
||||
def mem_read_text_file(
|
||||
path: str, head: int | None = None, tail: int | None = None
|
||||
) -> str:
|
||||
"""Read a UTF-8 text file under memory root. Optionally return first `head` or last `tail` lines."""
|
||||
if head is not None and tail is not None:
|
||||
return "Error: cannot specify both head and tail."
|
||||
|
||||
p = _MEM._resolve(path)
|
||||
if not p.exists():
|
||||
return f"Not found: {path}"
|
||||
if not p.is_file():
|
||||
return f"Not a file: {path}"
|
||||
|
||||
text = p.read_text(encoding="utf-8", errors="replace")
|
||||
lines = text.splitlines()
|
||||
if head is not None:
|
||||
return "\n".join(lines[: max(head, 0)])
|
||||
if tail is not None:
|
||||
return "\n".join(lines[-max(tail, 0) :])
|
||||
return text
|
||||
|
||||
|
||||
def mem_write_file(path: str, content: str) -> str:
|
||||
"""Write (overwrite) a UTF-8 text file under memory root."""
|
||||
p = _MEM._resolve(path)
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
p.write_text(content, encoding="utf-8")
|
||||
return f"OK: wrote {path} ({len(content)} chars)"
|
||||
|
||||
|
||||
def mem_append_file(path: str, content: str) -> str:
|
||||
"""Append UTF-8 text to a file under memory root (creates if missing)."""
|
||||
p = _MEM._resolve(path)
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
with p.open("a", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
return f"OK: appended {path} ({len(content)} chars)"
|
||||
|
||||
|
||||
def mem_move_file(source: str, destination: str) -> str:
|
||||
"""Move/rename a file or directory under memory root. Fails if destination exists."""
|
||||
src = _MEM._resolve(source)
|
||||
dst = _MEM._resolve(destination)
|
||||
if not src.exists():
|
||||
return f"Not found: {source}"
|
||||
if dst.exists():
|
||||
return f"Error: destination exists: {destination}"
|
||||
dst.parent.mkdir(parents=True, exist_ok=True)
|
||||
os.replace(src, dst)
|
||||
return f"OK: moved {source} -> {destination}"
|
||||
|
||||
|
||||
def mem_get_file_info(path: str) -> str:
|
||||
"""Return basic metadata (json) for a path under memory root."""
|
||||
p = _MEM._resolve(path)
|
||||
if not p.exists():
|
||||
return json.dumps({"path": path, "exists": False})
|
||||
st = p.stat()
|
||||
info: dict[str, Any] = {
|
||||
"path": path,
|
||||
"exists": True,
|
||||
"type": "directory" if p.is_dir() else "file",
|
||||
"size": st.st_size,
|
||||
"mtime": st.st_mtime,
|
||||
}
|
||||
return json.dumps(info, indent=2, ensure_ascii=False)
|
||||
|
||||
|
||||
def mem_search_files(
|
||||
path: str = "", pattern: str = "*", contains: str | None = None, limit: int = 50
|
||||
) -> str:
|
||||
"""
|
||||
Recursively search for files under memory root.
|
||||
|
||||
- `pattern`: glob-style match on relative path (e.g. "*.md", "profile/*")
|
||||
- `contains`: if set, only include text files that contain this substring
|
||||
"""
|
||||
base = _MEM._resolve(path)
|
||||
if not base.exists():
|
||||
return f"Not found: {path}"
|
||||
if not base.is_dir():
|
||||
return f"Not a directory: {path}"
|
||||
|
||||
results: list[str] = []
|
||||
root = _MEM.root.resolve()
|
||||
for p in base.rglob("*"):
|
||||
if len(results) >= max(limit, 0):
|
||||
break
|
||||
if not p.is_file():
|
||||
continue
|
||||
rel = str(p.resolve().relative_to(root)).replace(os.sep, "/")
|
||||
if not fnmatch(rel, pattern):
|
||||
continue
|
||||
if contains is not None:
|
||||
try:
|
||||
text = p.read_text(encoding="utf-8", errors="ignore")
|
||||
except Exception:
|
||||
continue
|
||||
if contains not in text:
|
||||
continue
|
||||
results.append(rel)
|
||||
return "\n".join(results) if results else "(no matches)"
|
||||
|
||||
|
||||
def mem_directory_tree(path: str = "", max_depth: int = 6) -> str:
|
||||
"""Return a JSON directory tree rooted at `path`."""
|
||||
base = _MEM._resolve(path)
|
||||
if not base.exists():
|
||||
return json.dumps({"error": "not_found", "path": path})
|
||||
if not base.is_dir():
|
||||
return json.dumps({"error": "not_directory", "path": path})
|
||||
|
||||
root = _MEM.root.resolve()
|
||||
|
||||
def node(p: Path, depth: int) -> dict[str, Any]:
|
||||
rel = (
|
||||
str(p.resolve().relative_to(root)).replace(os.sep, "/")
|
||||
if p != _MEM.root
|
||||
else ""
|
||||
)
|
||||
if p.is_dir():
|
||||
if depth >= max_depth:
|
||||
return {
|
||||
"name": p.name or "/",
|
||||
"path": rel,
|
||||
"type": "directory",
|
||||
"children": ["…"],
|
||||
}
|
||||
children = [
|
||||
node(c, depth + 1)
|
||||
for c in sorted(
|
||||
p.iterdir(), key=lambda c: (not c.is_dir(), c.name.lower())
|
||||
)
|
||||
]
|
||||
return {
|
||||
"name": p.name or "/",
|
||||
"path": rel,
|
||||
"type": "directory",
|
||||
"children": children,
|
||||
}
|
||||
return {"name": p.name, "path": rel, "type": "file"}
|
||||
|
||||
return json.dumps(node(base, 0), indent=2, ensure_ascii=False)
|
||||
Reference in New Issue
Block a user