diff --git a/ai-hub/app/api/routes/mcp.py b/ai-hub/app/api/routes/mcp.py index 9921c3e..ba86fb3 100644 --- a/ai-hub/app/api/routes/mcp.py +++ b/ai-hub/app/api/routes/mcp.py @@ -338,7 +338,35 @@ "api_key": {"type": "string"}, "model": {"type": "string"}, "voice": {"type": "string"} - }, required=["section", "provider_name"]) + }, required=["section", "provider_name"]), + _tool_def("list_files", "List files/directories on an agent node.", { + "node_id": {"type": "string"}, + "path": {"type": "string", "description": "Directory path (default: '.')"}, + "session_id": {"type": "string", "description": "Session workspace ID (default: '__fs_explorer__')"}, + "recursive": {"type": "boolean"} + }, required=["node_id"]), + _tool_def("upload_file", "Upload/Update a file on a node.", { + "node_id": {"type": "string"}, + "path": {"type": "string"}, + "content": {"type": "string"}, + "session_id": {"type": "string"} + }, required=["node_id", "path", "content"]), + _tool_def("download_file", "Read file content from a node.", { + "node_id": {"type": "string"}, + "path": {"type": "string"}, + "session_id": {"type": "string"} + }, required=["node_id", "path"]), + _tool_def("remove_file", "Delete a file/directory from a node.", { + "node_id": {"type": "string"}, + "path": {"type": "string"}, + "session_id": {"type": "string"} + }, required=["node_id", "path"]), + _tool_def("create_file", "Create a new empty file or directory.", { + "node_id": {"type": "string"}, + "path": {"type": "string"}, + "is_dir": {"type": "boolean"}, + "session_id": {"type": "string"} + }, required=["node_id", "path"]) ] } @@ -399,7 +427,12 @@ "get_system_status": self._get_system_status, "get_global_config": self._get_global_config, "update_global_config": self._update_global_config, - "verify_provider": self._verify_provider + "verify_provider": self._verify_provider, + "list_files": self._list_files, + "upload_file": self._upload_file, + "download_file": self._download_file, + "remove_file": self._remove_file, + "create_file": self._create_file } async def dispatch(self, name: str, args: dict, token: Optional[str]) -> Any: @@ -578,6 +611,38 @@ return self.services.orchestrator.assistant.rm(node_id, path, session_id=session_id) return await self.loop.run_in_executor(None, _execute) + async def _list_files(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") + node_id, path, session_id = args.get("node_id"), args.get("path", "."), args.get("session_id", "__fs_explorer__") + if not node_id: raise ValueError("node_id required.") + def _execute(): + from app.db.session import get_db_session + with get_db_session() as db: + self.services.mesh_service.require_node_access(token, node_id, db) + return self.services.orchestrator.assistant.ls(node_id, path, session_id=session_id) + return await self.loop.run_in_executor(None, _execute) + + async def _upload_file(self, args: dict, token: Optional[str]): + return await self._write_file(args, token) + + async def _download_file(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") + node_id, path, session_id = args.get("node_id"), args.get("path"), args.get("session_id", "__fs_explorer__") + if not node_id or not path: raise ValueError("node_id and path required.") + def _execute(): + from app.db.session import get_db_session + with get_db_session() as db: + self.services.mesh_service.require_node_access(token, node_id, db) + return self.services.orchestrator.assistant.cat(node_id, path, session_id=session_id) + return await self.loop.run_in_executor(None, _execute) + + async def _remove_file(self, args: dict, token: Optional[str]): + return await self._delete_file(args, token) + + async def _create_file(self, args: dict, token: Optional[str]): + args["content"] = "" + return await self._write_file(args, token) + async def _list_groups(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index 9ad536b..94cbeef 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -475,6 +475,11 @@ with self.io_locks_lock: if lock_key in self.io_locks: del self.io_locks[lock_key] + + # M6 FIX: Fulfill the journal task when the final chunk arrives + # so the synchronous assistant.cat() call in the REST API can return. + if task_id and task_id.startswith("fs-cat-"): + self.journal.fulfill(task_id, {"path": fs.file_data.path, "status": "COMPLETED"}) self.assistant.broadcast_file_chunk(fs.session_id, node_id, fs.file_data) if fs.file_data.chunk_index % 10 == 0: @@ -536,7 +541,7 @@ success = fs.status.code == 0 if not success: self.journal.fulfill(task_id, {"error": fs.status.message}) - elif task_id.startswith("fs-write-") or task_id.startswith("fs-rm-"): + elif task_id.startswith("fs-write-") or task_id.startswith("fs-rm-") or task_id.startswith("fs-cat-"): self.journal.fulfill(task_id, {"success": True, "message": fs.status.message}) self.registry.emit(node_id, "sync_status", {"message": fs.status.message, "code": fs.status.code}) diff --git a/ai-hub/tests/api/routes/test_mcp_files.py b/ai-hub/tests/api/routes/test_mcp_files.py new file mode 100644 index 0000000..82fc69f --- /dev/null +++ b/ai-hub/tests/api/routes/test_mcp_files.py @@ -0,0 +1,211 @@ +import pytest +import json +from unittest.mock import MagicMock, AsyncMock, patch +from app.db import models + +def test_mcp_list_tools_discovery(client): + """Tests that the new file explorer tools are present in tools/list.""" + test_client, mock_services = client + + # Ensure all required services are mocked + mock_services.mesh_service = MagicMock() + mock_services.orchestrator = MagicMock() + mock_services.user_service = MagicMock() + mock_services.stt_service = MagicMock() + mock_services.tts_service = MagicMock() + mock_services.tool_service = MagicMock() + mock_services.browser_service = MagicMock() + + # Mocking the discovery process + # The route calls _execute for tools/list + response = test_client.post("/mcp?token=test_user", json={ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/list", + "params": {} + }, headers={"X-User-ID": "test_user"}) + + assert response.status_code == 200 + res_body = response.json() + assert "result" in res_body + tools = res_body["result"].get("tools", []) + tool_names = [t["name"] for t in tools] + + assert "list_files" in tool_names + assert "upload_file" in tool_names + assert "download_file" in tool_names + assert "remove_file" in tool_names + assert "create_file" in tool_names + +def test_mcp_list_files_success(client): + """Tests list_files tool via MCP dispatch.""" + test_client, mock_services = client + + # Mock services + mock_services.mesh_service = MagicMock() + mock_services.orchestrator = MagicMock() + + # Mock node access and orchestrator assistant + mock_services.mesh_service.require_node_access.return_value = True + mock_services.orchestrator.assistant.ls.return_value = { + "files": [{"path": "test.txt", "is_dir": False, "size": 123}] + } + + response = test_client.post("/mcp?token=test_user", json={ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": { + "name": "list_files", + "arguments": { + "node_id": "test-node", + "path": "." + } + } + }, headers={"X-User-ID": "test_user"}) + + assert response.status_code == 200 + res_body = response.json() + assert "result" in res_body + res_data = res_body["result"] + assert "content" in res_data + content_text = res_data["content"][0]["text"] + data = json.loads(content_text) + assert "files" in data + assert data["files"][0]["path"] == "test.txt" + + mock_services.mesh_service.require_node_access.assert_called_once() + mock_services.orchestrator.assistant.ls.assert_called_once_with("test-node", ".", session_id="__fs_explorer__") + +def test_mcp_upload_file_success(client): + """Tests upload_file tool via MCP dispatch (aliased to write_file).""" + test_client, mock_services = client + + mock_services.mesh_service = MagicMock() + mock_services.orchestrator = MagicMock() + + mock_services.mesh_service.require_node_access.return_value = True + mock_services.orchestrator.assistant.write.return_value = {"status": "success"} + + response = test_client.post("/mcp?token=test_user", json={ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": { + "name": "upload_file", + "arguments": { + "node_id": "test-node", + "path": "hello.txt", + "content": "hello world" + } + } + }, headers={"X-User-ID": "test_user"}) + + assert response.status_code == 200 + res_data = response.json()["result"] + assert "content" in res_data + content_text = res_data["content"][0]["text"] + assert json.loads(content_text) == {"status": "success"} + + mock_services.orchestrator.assistant.write.assert_called_once_with( + "test-node", "hello.txt", "hello world", False, session_id="__fs_explorer__" + ) + +def test_mcp_download_file_success(client): + """Tests download_file tool via MCP dispatch (aliased to cat).""" + test_client, mock_services = client + + mock_services.mesh_service = MagicMock() + mock_services.orchestrator = MagicMock() + + mock_services.mesh_service.require_node_access.return_value = True + mock_services.orchestrator.assistant.cat.return_value = "file content" + + response = test_client.post("/mcp?token=test_user", json={ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": { + "name": "download_file", + "arguments": { + "node_id": "test-node", + "path": "hello.txt" + } + } + }, headers={"X-User-ID": "test_user"}) + + assert response.status_code == 200 + res_data = response.json()["result"] + assert "content" in res_data + assert res_data["content"][0]["text"] == "file content" + + mock_services.orchestrator.assistant.cat.assert_called_once_with( + "test-node", "hello.txt", session_id="__fs_explorer__" + ) + +def test_mcp_remove_file_success(client): + """Tests remove_file tool via MCP dispatch (aliased to delete_file).""" + test_client, mock_services = client + + mock_services.mesh_service = MagicMock() + mock_services.orchestrator = MagicMock() + + mock_services.mesh_service.require_node_access.return_value = True + mock_services.orchestrator.assistant.rm.return_value = {"status": "success"} + + response = test_client.post("/mcp?token=test_user", json={ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": { + "name": "remove_file", + "arguments": { + "node_id": "test-node", + "path": "old.txt" + } + } + }, headers={"X-User-ID": "test_user"}) + + assert response.status_code == 200 + res_data = response.json()["result"] + assert "content" in res_data + content_text = res_data["content"][0]["text"] + assert json.loads(content_text) == {"status": "success"} + + mock_services.orchestrator.assistant.rm.assert_called_once_with( + "test-node", "old.txt", session_id="__fs_explorer__" + ) + +def test_mcp_create_file_success(client): + """Tests create_file tool via MCP dispatch (empty content).""" + test_client, mock_services = client + + mock_services.mesh_service = MagicMock() + mock_services.orchestrator = MagicMock() + + mock_services.mesh_service.require_node_access.return_value = True + mock_services.orchestrator.assistant.write.return_value = {"status": "success"} + + response = test_client.post("/mcp?token=test_user", json={ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": { + "name": "create_file", + "arguments": { + "node_id": "test-node", + "path": "new_dir", + "is_dir": True + } + } + }, headers={"X-User-ID": "test_user"}) + + assert response.status_code == 200 + res_data = response.json()["result"] + assert "content" in res_data + content_text = res_data["content"][0]["text"] + assert json.loads(content_text) == {"status": "success"} + + mock_services.orchestrator.assistant.write.assert_called_once_with( + "test-node", "new_dir", "", True, session_id="__fs_explorer__" + ) diff --git a/frontend/src/features/swarm/pages/SwarmControlPage.js b/frontend/src/features/swarm/pages/SwarmControlPage.js index 913f6e9..5e47f94 100644 --- a/frontend/src/features/swarm/pages/SwarmControlPage.js +++ b/frontend/src/features/swarm/pages/SwarmControlPage.js @@ -182,10 +182,136 @@ } }); + registerTool({ + name: 'list_files', + description: 'List files and directories on a specific agent node in the swarm.', + inputSchema: { + type: 'object', + properties: { + node_id: { type: 'string', description: 'The ID of the node to list files from.' }, + path: { type: 'string', description: 'The directory path to list (default: ".")' } + }, + required: ['node_id'] + }, + execute: async ({ node_id, path = '.' }) => { + try { + const res = await nodeFsList(node_id, path, sessionId); + return { + content: [{ type: 'text', text: JSON.stringify(res, null, 2) }] + }; + } catch (err) { + return { content: [{ type: 'text', text: `Error: ${err.message}` }], isError: true }; + } + } + }); + + registerTool({ + name: 'upload_file', + description: 'Upload or update a file on an agent node.', + inputSchema: { + type: 'object', + properties: { + node_id: { type: 'string', description: 'Target node ID.' }, + path: { type: 'string', description: 'Target file path.' }, + content: { type: 'string', description: 'The file content.' } + }, + required: ['node_id', 'path', 'content'] + }, + execute: async ({ node_id, path, content }) => { + try { + const { nodeFsTouch } = await import("../../../services/apiService"); + const res = await nodeFsTouch(node_id, path, content, false, sessionId); + return { + content: [{ type: 'text', text: JSON.stringify(res, null, 2) }] + }; + } catch (err) { + return { content: [{ type: 'text', text: `Error: ${err.message}` }], isError: true }; + } + } + }); + + registerTool({ + name: 'download_file', + description: 'Read the content of a file from an agent node.', + inputSchema: { + type: 'object', + properties: { + node_id: { type: 'string', description: 'Source node ID.' }, + path: { type: 'string', description: 'Path to the file.' } + }, + required: ['node_id', 'path'] + }, + execute: async ({ node_id, path }) => { + try { + const { nodeFsCat } = await import("../../../services/apiService"); + const res = await nodeFsCat(node_id, path, sessionId); + return { + content: [{ type: 'text', text: typeof res === 'string' ? res : JSON.stringify(res, null, 2) }] + }; + } catch (err) { + return { content: [{ type: 'text', text: `Error: ${err.message}` }], isError: true }; + } + } + }); + + registerTool({ + name: 'remove_file', + description: 'Delete a file or directory from an agent node.', + inputSchema: { + type: 'object', + properties: { + node_id: { type: 'string', description: 'Target node ID.' }, + path: { type: 'string', description: 'Path to delete.' } + }, + required: ['node_id', 'path'] + }, + execute: async ({ node_id, path }) => { + try { + const { nodeFsRm } = await import("../../../services/apiService"); + const res = await nodeFsRm(node_id, path, sessionId); + return { + content: [{ type: 'text', text: JSON.stringify(res, null, 2) }] + }; + } catch (err) { + return { content: [{ type: 'text', text: `Error: ${err.message}` }], isError: true }; + } + } + }); + + registerTool({ + name: 'create_file', + description: 'Create a new empty file or directory on an agent node.', + inputSchema: { + type: 'object', + properties: { + node_id: { type: 'string', description: 'Target node ID.' }, + path: { type: 'string', description: 'Path to create.' }, + is_dir: { type: 'boolean', description: 'Whether to create a directory.' } + }, + required: ['node_id', 'path'] + }, + execute: async ({ node_id, path, is_dir = false }) => { + try { + const { nodeFsTouch } = await import("../../../services/apiService"); + const res = await nodeFsTouch(node_id, path, "", is_dir, sessionId); + return { + content: [{ type: 'text', text: JSON.stringify(res, null, 2) }] + }; + } catch (err) { + return { content: [{ type: 'text', text: `Error: ${err.message}` }], isError: true }; + } + } + }); + return () => { unregisterTool('get_session_nodes'); unregisterTool('list_accessible_nodes'); unregisterTool('get_swarm_status'); + unregisterTool('list_files'); + unregisterTool('upload_file'); + unregisterTool('download_file'); + unregisterTool('remove_file'); + unregisterTool('create_file'); }; }, [sessionId, accessibleNodes, attachedNodeIds, workspaceId, localActiveLLM, isConfigured, registerTool, unregisterTool]); // ─────────────────────────────────────────────────────────────────────────