diff --git a/ai-hub/app/api/routes/mcp.py b/ai-hub/app/api/routes/mcp.py index 0d961aa..dbc47f9 100644 --- a/ai-hub/app/api/routes/mcp.py +++ b/ai-hub/app/api/routes/mcp.py @@ -678,7 +678,7 @@ from app.db.session import get_db_session with get_db_session() as db: self.services.mesh_service.require_node_access(token, node_id, db) - return self.services.orchestrator.assistant.move(session_id, old_path, new_path) + return self.services.orchestrator.assistant.move(session_id, old_path, new_path, node_id=node_id) return await self.loop.run_in_executor(None, _execute) async def _copy_file(self, args: dict, token: Optional[str]): @@ -689,7 +689,7 @@ from app.db.session import get_db_session with get_db_session() as db: self.services.mesh_service.require_node_access(token, node_id, db) - return self.services.orchestrator.assistant.copy(session_id, old_path, new_path) + return self.services.orchestrator.assistant.copy(session_id, old_path, new_path, node_id=node_id) return await self.loop.run_in_executor(None, _execute) async def _get_file_stat(self, args: dict, token: Optional[str]): diff --git a/ai-hub/app/api/routes/nodes.py b/ai-hub/app/api/routes/nodes.py index 710f9a2..9850327 100644 --- a/ai-hub/app/api/routes/nodes.py +++ b/ai-hub/app/api/routes/nodes.py @@ -904,7 +904,7 @@ try: orchestrator = services.orchestrator loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, lambda: orchestrator.assistant.move(req.session_id, req.old_path, req.new_path)) + return await loop.run_in_executor(None, lambda: orchestrator.assistant.move(req.session_id, req.old_path, req.new_path, node_id=node_id)) except Exception as e: logger.error(f"[FS] Move error: {e}") raise HTTPException(status_code=500, detail=str(e)) @@ -921,7 +921,7 @@ try: orchestrator = services.orchestrator loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, lambda: orchestrator.assistant.copy(req.session_id, req.old_path, req.new_path)) + return await loop.run_in_executor(None, lambda: orchestrator.assistant.copy(req.session_id, req.old_path, req.new_path, node_id=node_id)) except Exception as e: logger.error(f"[FS] Copy error: {e}") raise HTTPException(status_code=500, detail=str(e)) diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index 3b34d81..2b362ad 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -77,6 +77,18 @@ else: logger.warning(f"[📋❌] Task {tid} is NOT idempotent. Cannot safely retry automatically.") + def _read_from_mirror(self, session_id, path): + """Internal helper to read file content from the local ghost mirror.""" + if not self.mirror: return None + try: + w_path = self.mirror.get_workspace_path(session_id, create=False) + f_path = os.path.normpath(os.path.join(w_path, path.lstrip("/"))) + if os.path.exists(f_path) and os.path.isfile(f_path): + with open(f_path, "rb") as f: + return f.read() + except: pass + return None + def push_workspace(self, node_id, session_id): """Initial unidirectional push from server ghost mirror to a node.""" if not self.mirror: return @@ -489,10 +501,9 @@ content = content.encode("utf-8") node = self.registry.get_node(node_id) - # --- MESH SYNC MODE (Session-Aware) --- # If we have a session, we prioritize writing to the local mirror even if the target node is offline. # The node will reconcile its state from the Hub mirror upon reconnection. - if self.mirror and session_id != "__fs_explorer__": + if self.mirror: workspace_mirror = self.mirror.get_workspace_path(session_id) dest = os.path.normpath(os.path.join(workspace_mirror, path.lstrip("/"))) try: @@ -661,7 +672,7 @@ """Async wrapper for rm.""" return await asyncio.to_thread(self.rm, node_id, path, timeout, session_id) - def move(self, session_id: str, old_path: str, new_path: str): + def move(self, session_id: str, old_path: str, new_path: str, node_id: Optional[str] = None): """Orchestrates an atomic move/rename across the mesh.""" if not self.mirror: return {"error": "Mirror not available"} @@ -676,15 +687,23 @@ # Send latest manifest to nodes so they Pull the new path with self.membership_lock: members = self.memberships.get(session_id, []) + if not members and node_id: + members = [node_id] for nid in members: - self.push_workspace(nid, session_id) + if session_id == "__fs_explorer__": + content = self._read_from_mirror(session_id, new_path) + if content is not None: + self.write(nid, new_path, content, session_id=session_id) + self.rm(nid, old_path, session_id=session_id) + else: + self.push_workspace(nid, session_id) return {"success": True, "message": f"Moved {old_path} -> {new_path}"} except Exception as e: logger.error(f"[📁🚚] Mesh move error: {e}") return {"error": str(e)} - def copy(self, session_id: str, old_path: str, new_path: str): + def copy(self, session_id: str, old_path: str, new_path: str, node_id: Optional[str] = None): """Orchestrates a copy operation across the mesh.""" if not self.mirror: return {"error": "Mirror not available"} @@ -695,8 +714,15 @@ # 2. Re-push workspace to trigger node-side pulls for the new copy with self.membership_lock: members = self.memberships.get(session_id, []) + if not members and node_id: + members = [node_id] for nid in members: - self.push_workspace(nid, session_id) + if session_id == "__fs_explorer__": + content = self._read_from_mirror(session_id, new_path) + if content is not None: + self.write(nid, new_path, content, session_id=session_id) + else: + self.push_workspace(nid, session_id) return {"success": True, "message": f"Copied {old_path} -> {new_path}"} except Exception as e: diff --git a/ai-hub/integration_tests/test_mcp_file_ops_enhanced.py b/ai-hub/integration_tests/test_mcp_file_ops_enhanced.py deleted file mode 100644 index df9dc18..0000000 --- a/ai-hub/integration_tests/test_mcp_file_ops_enhanced.py +++ /dev/null @@ -1,152 +0,0 @@ -import pytest -import os -import httpx -import json -import time -import jwt - -# Use the same BASE_URL as other integration tests -BASE_URL = os.getenv("HUB_API_URL", "http://localhost:8000/api/v1") -SECRET_KEY = os.getenv("SECRET_KEY", "aYc2j1lYUUZXkBFFUndnleZI") - -def _get_token(user_id): - # Generate an internal HS256 JWT - payload = { - "iss": "cortex-hub-internal", - "sub": user_id, - "exp": int(time.time()) + 3600 - } - return jwt.encode(payload, SECRET_KEY, algorithm="HS256") - -def _headers(user_id): - return { - "X-User-ID": user_id, - "Authorization": f"Bearer {_get_token(user_id)}" - } - -@pytest.mark.slow -def test_mcp_file_ops_enhanced(): - """ - Test calling 'move_file', 'copy_file', and 'get_file_stat' tools via MCP. - """ - node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1") - user_id = os.getenv("SYNC_TEST_USER_ID", "admin") - - headers = _headers(user_id) - - with httpx.Client(timeout=15.0) as client: - # 0. Check initial state - r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params={"path": ".", "session_id": "__fs_explorer__"}, headers=headers) - print(f"[test] Initial LS (.): {r_ls.json().get('files', [])}") - - # 1. Prepare: Write a test file - test_file = "mcp_test_orig.txt" - test_content = "Original Content" - payload_write = { - "jsonrpc": "2.0", - "id": "write-1", - "method": "tools/call", - "params": { - "name": "write_file", - "arguments": { - "node_id": node_id, - "path": test_file, - "content": test_content, - "session_id": "__fs_explorer__" - } - } - } - r = client.post(f"{BASE_URL}/mcp/", json=payload_write, headers=headers) - assert r.status_code == 200, f"MCP write failed: {r.text}" - - # 2. Test get_file_stat - payload_stat = { - "jsonrpc": "2.0", - "id": "stat-1", - "method": "tools/call", - "params": { - "name": "get_file_stat", - "arguments": { - "node_id": node_id, - "path": test_file, - "session_id": "__fs_explorer__" - } - } - } - r = client.post(f"{BASE_URL}/mcp/", json=payload_stat, headers=headers) - assert r.status_code == 200, f"MCP stat failed: {r.text}" - res_stat = r.json() - assert "result" in res_stat, f"Result missing in stat: {res_stat}" - text_val = res_stat["result"]["content"][0]["text"] - data = json.loads(text_val) - assert data.get("exists") is True, f"File does not exist: {data}" - assert data.get("size") == len(test_content), f"Size mismatch: {data}" - print(f"[test] get_file_stat success: {data}") - - # 3. Test copy_file - copy_file = "mcp_test_copy.txt" - payload_copy = { - "jsonrpc": "2.0", - "id": "copy-1", - "method": "tools/call", - "params": { - "name": "copy_file", - "arguments": { - "node_id": node_id, - "old_path": test_file, - "new_path": copy_file, - "session_id": "__fs_explorer__" - } - } - } - r = client.post(f"{BASE_URL}/mcp/", json=payload_copy, headers=headers) - assert r.status_code == 200, f"MCP copy failed: {r.text}" - print(f"[test] copy_file call success") - - # Verify copy exists - time.sleep(1) - r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params={"path": ".", "session_id": "__fs_explorer__"}, headers=headers) - assert r_ls.status_code == 200, f"LS failed: {r_ls.text}" - files = r_ls.json().get("files", []) - assert any(f["name"] == "mcp_test_copy.txt" for f in files), f"Copy missing: {files}" - - # 4. Test move_file - move_file = "mcp_test_moved.txt" - payload_move = { - "jsonrpc": "2.0", - "id": "move-1", - "method": "tools/call", - "params": { - "name": "move_file", - "arguments": { - "node_id": node_id, - "old_path": test_file, - "new_path": move_file, - "session_id": "__fs_explorer__" - } - } - } - r = client.post(f"{BASE_URL}/mcp/", json=payload_move, headers=headers) - assert r.status_code == 200, f"MCP move failed: {r.text}" - print(f"[test] move_file call success") - - # Verify move - time.sleep(1) - r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params={"path": ".", "session_id": "__fs_explorer__"}, headers=headers) - assert r_ls.status_code == 200, f"LS failed: {r_ls.text}" - files = r_ls.json().get("files", []) - assert any(f["name"] == "mcp_test_moved.txt" for f in files), f"Moved file missing: {files}" - assert not any(f["name"] == "mcp_test_orig.txt" for f in files), f"Original file still exists: {files}" - - # Cleanup - client.post(f"{BASE_URL}/mcp/", json={ - "jsonrpc": "2.0", "id": "clean-1", "method": "tools/call", - "params": {"name": "remove_file", "arguments": {"node_id": node_id, "path": move_file, "session_id": "__fs_explorer__"}} - }, headers=headers) - client.post(f"{BASE_URL}/mcp/", json={ - "jsonrpc": "2.0", "id": "clean-2", "method": "tools/call", - "params": {"name": "remove_file", "arguments": {"node_id": node_id, "path": copy_file, "session_id": "__fs_explorer__"}} - }, headers=headers) - -if __name__ == "__main__": - test_mcp_file_ops_enhanced() diff --git a/frontend/src/features/swarm/pages/SwarmControlPage.js b/frontend/src/features/swarm/pages/SwarmControlPage.js index 8c328d3..8d5ba6f 100644 --- a/frontend/src/features/swarm/pages/SwarmControlPage.js +++ b/frontend/src/features/swarm/pages/SwarmControlPage.js @@ -67,11 +67,7 @@ return localStorage.getItem("swarm_auto_collapse") === "true"; }); - const { registerTool, unregisterTool, setIsAiProcessing } = useWebMcp(); - useEffect(() => { - setIsAiProcessing(isProcessing); - }, [isProcessing, setIsAiProcessing]); const onNewSessionCreated = useCallback(async (newSid) => { try { @@ -132,6 +128,12 @@ const [showClearChatModal, setShowClearChatModal] = useState(false); const [isClearingHistory, setIsClearingHistory] = useState(false); + const { registerTool, unregisterTool, setIsAiProcessing } = useWebMcp(); + + useEffect(() => { + setIsAiProcessing(isProcessing); + }, [isProcessing, setIsAiProcessing]); + // ── WebMCP Tools for the Swarm Control page ────────────────────────────── useEffect(() => { registerTool({