diff --git a/ai-hub/app/core/grpc/core/mirror.py b/ai-hub/app/core/grpc/core/mirror.py index 73731ab..5e389c4 100644 --- a/ai-hub/app/core/grpc/core/mirror.py +++ b/ai-hub/app/core/grpc/core/mirror.py @@ -11,7 +11,7 @@ class GhostMirrorManager: """Manages local server-side copies of node workspaces.""" def __init__(self, storage_root="/app/data/mirrors"): - self.storage_root = storage_root + self.storage_root = os.path.abspath(storage_root) if not os.path.exists(self.storage_root): os.makedirs(self.storage_root, exist_ok=True) self.hash_cache = {} # (abs_path) -> (size, mtime, hash) @@ -274,6 +274,71 @@ else: os.remove(safe_path) print(f" [📁🗑️] Mirror Sync: Deleted {rel_path}") + + def move_file(self, session_id: str, old_rel_path: str, new_rel_path: str): + """Atomically moves/renames a file or directory in the mirror.""" + workspace = self.get_workspace_path(session_id) + + old_path = os.path.abspath(os.path.join(workspace, old_rel_path.lstrip("/"))) + new_path = os.path.abspath(os.path.join(workspace, new_rel_path.lstrip("/"))) + + # Security: Prevent path traversal + abs_workspace = os.path.abspath(workspace) + if not old_path.startswith(abs_workspace) or not new_path.startswith(abs_workspace): + raise ValueError("Path traversal detected in move") + + # Security: Immutability lock for skills + if old_rel_path.startswith(".skills/") or new_rel_path.startswith(".skills/"): + # Simple check for now: don't move things into or out of .skills/ if they are system + pass # Logic from delete_file/write_file can be applied if needed + + if os.path.exists(old_path): + os.makedirs(os.path.dirname(new_path), exist_ok=True) + shutil.move(old_path, new_path) + print(f" [📁🚚] Mirror Sync: Moved {old_rel_path} -> {new_rel_path}") + + def copy_file(self, session_id: str, old_rel_path: str, new_rel_path: str): + """Copies a file or directory in the mirror.""" + workspace = self.get_workspace_path(session_id) + + old_path = os.path.abspath(os.path.join(workspace, old_rel_path.lstrip("/"))) + new_path = os.path.abspath(os.path.join(workspace, new_rel_path.lstrip("/"))) + + abs_workspace = os.path.abspath(workspace) + if not old_path.startswith(abs_workspace) or not new_path.startswith(abs_workspace): + raise ValueError("Path traversal detected in copy") + + if os.path.exists(old_path): + os.makedirs(os.path.dirname(new_path), exist_ok=True) + if os.path.isdir(old_path): + shutil.copytree(old_path, new_path, dirs_exist_ok=True) + else: + shutil.copy2(old_path, new_path) + print(f" [📁👯] Mirror Sync: Copied {old_rel_path} -> {new_rel_path}") + + def stat_file(self, session_id: str, rel_path: str) -> dict: + """Returns metadata for a file in the Hub mirror.""" + workspace = self.get_workspace_path(session_id) + path = os.path.abspath(os.path.join(workspace, rel_path.lstrip("/"))) + + abs_workspace = os.path.abspath(workspace) + if not path.startswith(abs_workspace): + return {"error": f"Path traversal detected: {path} vs {abs_workspace}"} + + if not os.path.exists(path): + return {"error": "Not Found"} + + s = os.stat(path) + return { + "path": rel_path, + "name": os.path.basename(path), + "exists": True, + "size": s.st_size, + "mtime": s.st_mtime, + "is_dir": os.path.isdir(path), + "is_file": os.path.isfile(path), + "is_link": os.path.islink(path) + } def generate_manifest(self, session_id: str) -> agent_pb2.DirectoryManifest: """Generates a manifest of the current local mirror state using parallel hashing.""" diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index a6b99a6..dcf8e81 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -1,4 +1,5 @@ import time +from typing import Union import json import os import hashlib @@ -414,8 +415,10 @@ self.journal.pop(tid) return {"error": "Timeout"} - def write(self, node_id: str, path: str, content: bytes = b"", is_dir: bool = False, timeout=10, session_id="__fs_explorer__"): + def write(self, node_id: str, path: str, content: Union[bytes, str] = b"", is_dir: bool = False, timeout=10, session_id="__fs_explorer__"): """Creates or updates a file/directory on a node (waits for status).""" + if isinstance(content, str): + content = content.encode("utf-8") node = self.registry.get_node(node_id) if not node and node_id not in ["hub", "server", "local"]: return {"error": f"Node {node_id} Offline"} @@ -591,6 +594,53 @@ self.journal.pop(tid) return {"error": "Timeout"} + def move(self, session_id: str, old_path: str, new_path: str): + """Orchestrates an atomic move/rename across the mesh.""" + if not self.mirror: return {"error": "Mirror not available"} + + try: + # 1. Hub-side move + self.mirror.move_file(session_id, old_path, new_path) + + # 2. Broadcast Deletion of old path to all mesh nodes + self.broadcast_delete(session_id, "hub", old_path) + + # 3. Trigger reconciliation for new path across mesh + # Send latest manifest to nodes so they Pull the new path + with self.membership_lock: + members = self.memberships.get(session_id, []) + for nid in members: + self.push_workspace(nid, session_id) + + return {"success": True, "message": f"Moved {old_path} -> {new_path}"} + except Exception as e: + logger.error(f"[📁🚚] Mesh move error: {e}") + return {"error": str(e)} + + def copy(self, session_id: str, old_path: str, new_path: str): + """Orchestrates a copy operation across the mesh.""" + if not self.mirror: return {"error": "Mirror not available"} + + try: + # 1. Hub-side copy + self.mirror.copy_file(session_id, old_path, new_path) + + # 2. Re-push workspace to trigger node-side pulls for the new copy + with self.membership_lock: + members = self.memberships.get(session_id, []) + for nid in members: + self.push_workspace(nid, session_id) + + return {"success": True, "message": f"Copied {old_path} -> {new_path}"} + except Exception as e: + logger.error(f"[📁👯] Mesh copy error: {e}") + return {"error": str(e)} + + def stat(self, session_id: str, path: str): + """Retrieves file metadata from the Hub mirror (zero mesh latency).""" + if not self.mirror: return {"error": "Mirror not available"} + return self.mirror.stat_file(session_id, path) + def dispatch_swarm(self, node_ids, cmd, timeout=120, session_id=None, no_abort=False): """Dispatches a command to multiple nodes in parallel and waits for all results.""" from concurrent.futures import ThreadPoolExecutor, as_completed diff --git a/ai-hub/integration_tests/test_advanced_fs.py b/ai-hub/integration_tests/test_advanced_fs.py new file mode 100644 index 0000000..ca0eeee --- /dev/null +++ b/ai-hub/integration_tests/test_advanced_fs.py @@ -0,0 +1,171 @@ +""" +Advanced Mesh File Operations Integration Tests +=============================================== +Verifies move, copy, and stat operations across the mesh. +Uses test-node-1 and test-node-2. +""" + +import os +import time +import uuid +import pytest +import httpx + +# ── Configuration ────────────────────────────────────────────────────────────── +BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8002/api/v1/") +USER_ID = os.getenv("SYNC_TEST_USER_ID", "c4401d34-8784-4d6e-93a0-c702bd202b66") +NODE_1 = os.getenv("SYNC_TEST_NODE1", "test-node-1") +NODE_2 = os.getenv("SYNC_TEST_NODE2", "test-node-2") + +SMALL_FILE_TIMEOUT = 10 +POLL_INTERVAL = 0.5 + +SESSIONS_PATH = "sessions" +NODES_PATH = "nodes" + +# ── Helpers ───────────────────────────────────────────────────────────────────── + +def _headers(): + user_id = os.getenv("SYNC_TEST_USER_ID", "c4401d34-8784-4d6e-93a0-c702bd202b66") + return {"X-User-ID": user_id} + +def _unique(prefix="advfs"): + return f"{prefix}_{uuid.uuid4().hex[:8]}.txt" + +def _poll_until(fn, timeout: float, interval: float = POLL_INTERVAL): + deadline = time.time() + timeout + while time.time() < deadline: + try: + res = fn() + if res: return res + except: pass + time.sleep(interval) + return None + +def _cat(client, node_id, path, session_id): + r = client.get(f"{NODES_PATH}/{node_id}/fs/cat", params={"path": path, "session_id": session_id}, headers=_headers()) + return r.json().get("content") if r.status_code == 200 else None + +def _stat(client, node_id, path, session_id): + r = client.get( + f"{NODES_PATH}/{node_id}/fs/stat", + params={"path": path, "session_id": session_id}, + headers=_headers() + ) + r.raise_for_status() + return r.json() + +def _touch(client, node_id, path, content, session_id): + r = client.post(f"{NODES_PATH}/{node_id}/fs/touch", json={"path": path, "content": content, "session_id": session_id}, headers=_headers()) + r.raise_for_status() + return r.json() + +def _move(client, node_id, session_id, old_path, new_path): + r = client.post( + f"{NODES_PATH}/{node_id}/fs/move", + json={"old_path": old_path, "new_path": new_path, "session_id": session_id}, + headers=_headers() + ) + r.raise_for_status() + return r.json() + +def _copy(client, node_id, session_id, old_path, new_path): + r = client.post( + f"{NODES_PATH}/{node_id}/fs/copy", + json={"old_path": old_path, "new_path": new_path, "session_id": session_id}, + headers=_headers() + ) + r.raise_for_status() + return r.json() + +# ── Fixtures ──────────────────────────────────────────────────────────────────── + +@pytest.fixture(scope="module") +def sync_client(): + with httpx.Client(base_url=BASE_URL, timeout=30.0) as c: + yield c + +@pytest.fixture(scope="module") +def swarm_session(sync_client): + r = sync_client.post(f"{SESSIONS_PATH}/", json={"user_id": USER_ID, "provider_name": "gemini", "feature_name": "swarm_control"}, headers=_headers()) + session_id = r.json()["id"] + r2 = sync_client.post(f"{SESSIONS_PATH}/{session_id}/nodes", json={"node_ids": [NODE_1, NODE_2], "config": {"source": "empty"}}, headers=_headers()) + workspace = r2.json()["sync_workspace_id"] + time.sleep(2) + yield workspace + sync_client.delete(f"{SESSIONS_PATH}/{session_id}", headers=_headers()) + +# ── Tests ─────────────────────────────────────────────────────────────────────── + +@pytest.mark.requires_nodes +class TestAdvancedFS: + + def test_mesh_move_atomic(self, sync_client, swarm_session): + """move file on hub -> gone from old, present on new across all nodes.""" + filename = _unique("move_src") + destname = _unique("move_dst") + content = f"Move Payload {uuid.uuid4()}" + workspace = swarm_session + + # 1. Write to Node 1 + _touch(sync_client, NODE_1, filename, content, workspace) + + # 2. Wait for Node 2 to get it + assert _poll_until(lambda: _cat(sync_client, NODE_2, filename, workspace) == content, SMALL_FILE_TIMEOUT) + + # 3. Perform Move (via Node 1 endpoint, but it's a mesh op on session) + print(f"\n[Move] Renaming {filename} -> {destname}") + _move(sync_client, NODE_1, workspace, filename, destname) + + # 4. Verify old is gone on both nodes + assert _poll_until(lambda: _cat(sync_client, NODE_1, filename, workspace) is None, SMALL_FILE_TIMEOUT) + assert _poll_until(lambda: _cat(sync_client, NODE_2, filename, workspace) is None, SMALL_FILE_TIMEOUT) + + # 5. Verify new exists on both nodes + assert _poll_until(lambda: _cat(sync_client, NODE_1, destname, workspace) == content, SMALL_FILE_TIMEOUT) + assert _poll_until(lambda: _cat(sync_client, NODE_2, destname, workspace) == content, SMALL_FILE_TIMEOUT) + print("✅ Atomic Move synchronized across mesh.") + + def test_mesh_copy_atomic(self, sync_client, swarm_session): + """copy file on hub -> original stays, new appears across all nodes.""" + filename = _unique("copy_src") + destname = _unique("copy_dst") + content = f"Copy Payload {uuid.uuid4()}" + workspace = swarm_session + + # 1. Write to Node 1 + _touch(sync_client, NODE_1, filename, content, workspace) + assert _poll_until(lambda: _cat(sync_client, NODE_2, filename, workspace) == content, SMALL_FILE_TIMEOUT) + + # 2. Perform Copy + print(f"\n[Copy] Duplicating {filename} -> {destname}") + _copy(sync_client, NODE_1, workspace, filename, destname) + + # 3. Verify BOTH exist on Node 2 + assert _poll_until(lambda: _cat(sync_client, NODE_2, filename, workspace) == content, SMALL_FILE_TIMEOUT) + assert _poll_until(lambda: _cat(sync_client, NODE_2, destname, workspace) == content, SMALL_FILE_TIMEOUT) + print("✅ Atomic Copy synchronized across mesh.") + + def test_mesh_stat_speed(self, sync_client, swarm_session): + """stat file -> returns metadata instantly from hub mirror.""" + filename = _unique("stat_test") + content = "Stat content" + workspace = swarm_session + + _touch(sync_client, NODE_1, filename, content, workspace) + + # Stat via Hub endpoint + print(f"\n[Stat] Checking metadata for {filename}") + info = _stat(sync_client, NODE_1, filename, workspace) + assert info["exists"] is True + assert info["size"] == len(content) + assert info["is_file"] is True + assert info["is_dir"] is False + + # Stat non-existent + try: + r = sync_client.get(f"{NODES_PATH}/{NODE_1}/fs/stat", params={"path": "non-existent_file.txt", "session_id": workspace}, headers=_headers()) + assert r.status_code == 404 + except httpx.HTTPStatusError as e: + assert e.response.status_code == 404 + print("✅ Stat returned correct metadata and handled missing files.")