diff --git a/ai-hub/app/api/routes/mcp.py b/ai-hub/app/api/routes/mcp.py index e2d2019..613912e 100644 --- a/ai-hub/app/api/routes/mcp.py +++ b/ai-hub/app/api/routes/mcp.py @@ -15,13 +15,13 @@ GET /.well-known/mcp/manifest.json (mounted in app.py) """ - import asyncio import json import uuid import logging import jwt -from typing import Optional, List, Annotated +import os +from typing import Optional, List, Annotated, Any from fastapi import APIRouter, HTTPException, Request, Query, Header from fastapi.responses import JSONResponse, StreamingResponse @@ -89,17 +89,11 @@ request: Request, token: Optional[str] = Query(None), ): - """ - Server-Sent Events (SSE) transport for MCP. - Supports Bearer token in Authorization header or 'token' query parameter. - """ from app.db.session import get_db_session with get_db_session() as db: user_id = await _get_authenticated_user(request, token, db) if not user_id: - # We allow the SSE connection to open even without auth, - # but actual messages/tools will be rejected. logger.info("[MCP] SSE connection opened without initial auth.") queue = asyncio.Queue() @@ -110,7 +104,6 @@ if user_id: messages_url += f"&token={user_id}" - # Origin validation per MCP 2025-11-25 origin = request.headers.get("origin") if origin: allowed = ["https://ai.jerxie.com", "http://localhost:3000", "http://localhost:8080"] @@ -144,15 +137,10 @@ request: Request, token: Optional[str] = Query(None), ): - """ - One-shot JSON-RPC over HTTP. - Supports Bearer token in Authorization header or 'token' query parameter. - """ from app.db.session import get_db_session with get_db_session() as db: user_id = await _get_authenticated_user(request, token, db) - # Origin validation per MCP 2025-11-25 origin = request.headers.get("origin") if origin: allowed = [ @@ -160,7 +148,6 @@ "http://localhost:3000", "http://localhost:8080", ] - # Also allow the server's own origin server_host = request.headers.get("host", "") allowed.append(f"https://{server_host}") allowed.append(f"http://{server_host}") @@ -176,7 +163,6 @@ except Exception: raise HTTPException(status_code=400, detail="Invalid JSON body.") - # Batch requests (JSON array) if isinstance(body, list): results = [] for item in body: @@ -186,15 +172,13 @@ headers={"Access-Control-Allow-Origin": "*", "MCP-Protocol-Version": MCP_VERSION}, ) - # Single request response = await _handle_single(body, user_id, services) - if response is None: # notification — no id + if response is None: return JSONResponse( None, status_code=202, headers={"Access-Control-Allow-Origin": "*"}, ) - # If initialize, attach a session ID (MAY per spec) headers = {"Access-Control-Allow-Origin": "*", "MCP-Protocol-Version": MCP_VERSION} if body.get("method") == "initialize": headers["Mcp-Session-Id"] = str(uuid.uuid4()) @@ -208,10 +192,6 @@ session_id: str = Query(...), token: Optional[str] = Query(None), ): - """ - Legacy SSE message handler — receives JSON-RPC 2.0 from a client that - first established a GET /sse stream, then pushes results over that stream. - """ queue = _sse_sessions.get(session_id) if not queue: raise HTTPException(status_code=404, detail="MCP session not found or expired.") @@ -241,11 +221,8 @@ return router -# ─── Single-request handler (used by Streamable HTTP) ──────────────────────── - async def _handle_single(body: dict, token: Optional[str], services: ServiceContainer): - """Process one JSON-RPC object; return response dict or None for notifications.""" - rpc_id = body.get("id") # None means it's a notification + rpc_id = body.get("id") method = body.get("method", "") params = body.get("params", {}) @@ -253,7 +230,7 @@ try: result = await _execute(method, params, token, services) if rpc_id is None: - return None # notification — no response + return None return {"jsonrpc": "2.0", "id": rpc_id, "result": result} except Exception as exc: logger.exception(f"[MCP-HTTP] Error for '{method}': {exc}") @@ -266,9 +243,6 @@ } - -# ─── Dispatcher ─────────────────────────────────────────────────────────────── - async def _dispatch( queue: asyncio.Queue, rpc_id, @@ -277,7 +251,6 @@ token: Optional[str], services: ServiceContainer, ): - """Run the method and push a JSON-RPC response onto the SSE queue.""" try: result = await _execute(method, params, token, services) await queue.put({"jsonrpc": "2.0", "id": rpc_id, "result": result}) @@ -291,9 +264,6 @@ async def _execute(method: str, params: dict, token: Optional[str], services: ServiceContainer): - """Route a JSON-RPC method to its implementation.""" - - # ── MCP Handshake ───────────────────────────────────────────────────────── if method == "initialize": return { "protocolVersion": MCP_VERSION, @@ -304,192 +274,47 @@ if method == "ping": return {} - # ── Tool Discovery ──────────────────────────────────────────────────────── if method == "tools/list": return { "tools": [ - _tool_def("list_nodes", - "List all agent nodes in the Cortex swarm mesh and their status.", - {}), - _tool_def("get_app_info", - "Get metadata about this Cortex Hub instance.", - {}), - _tool_def("get_node_details", - "Get full details for a specific agent node.", - {"node_id": {"type": "string", "description": "Unique node ID"}}, - required=["node_id"]), - _tool_def("list_agents", - "List all autonomous agents configured in the system.", - {}), - _tool_def("list_skills", - "List all skill folders (tool libraries) registered in the system.", - {}), - _tool_def("dispatch", - "Dispatch a shell command to a specific agent node.", - { - "node_id": {"type": "string", "description": "Unique node ID"}, - "command": {"type": "string", "description": "Command to execute"}, - "session_id": {"type": "string", "description": "Optional session ID"}, - }, - required=["node_id", "command"]), - _tool_def("write_file", - "Create or update a file on a specific agent node.", - { - "node_id": {"type": "string", "description": "Unique node ID"}, - "path": {"type": "string", "description": "Path to file"}, - "content": {"type": "string", "description": "Content to write (string)"}, - "is_dir": {"type": "boolean", "description": "True if creating a directory"}, - "session_id": {"type": "string", "description": "Optional session ID"}, - }, - required=["node_id", "path"]), - _tool_def("delete_file", - "Delete a file or directory on a specific agent node.", - { - "node_id": {"type": "string", "description": "Unique node ID"}, - "path": {"type": "string", "description": "Path to file or directory"}, - "session_id": {"type": "string", "description": "Optional session ID"}, - }, - required=["node_id", "path"]), - _tool_def("deploy_agent", - "Deploy a new agent instance with a template and session.", - { - "name": {"type": "string", "description": "Name of the agent"}, - "mesh_node_id": {"type": "string", "description": "Node ID to deploy to"}, - "description": {"type": "string", "description": "Optional description"}, - "system_prompt": {"type": "string", "description": "Optional system prompt override"}, - "max_loop_iterations": {"type": "integer", "description": "Max loop iterations (default 20)"}, - "initial_prompt": {"type": "string", "description": "Optional first message to kick off execution"}, - "provider_name": {"type": "string", "description": "Optional LLM provider name"}, - "trigger_type": {"type": "string", "description": "Trigger type: manual, webhook, cron, interval"}, - "cron_expression": {"type": "string", "description": "Cron expression if trigger_type is cron"}, - "interval_seconds": {"type": "integer", "description": "Interval in seconds if trigger_type is interval"}, - "default_prompt": {"type": "string", "description": "Predefined prompt for any trigger"}, - "co_worker_quality_gate": {"type": "boolean", "description": "Enable quality gate"}, - "rework_threshold": {"type": "integer", "description": "Rework threshold (0-100)"}, - "max_rework_attempts": {"type": "integer", "description": "Max rework attempts"} - }, - required=["name", "mesh_node_id"]), - _tool_def("update_agent_config", - "Update configuration for an existing agent.", - { - "agent_id": {"type": "string", "description": "Unique agent ID"}, - "name": {"type": "string", "description": "New name"}, - "system_prompt": {"type": "string", "description": "New system prompt"}, - "max_loop_iterations": {"type": "integer", "description": "New max loop iterations"}, - "mesh_node_id": {"type": "string", "description": "New mesh node ID"}, - "provider_name": {"type": "string", "description": "New provider name"}, - "model_name": {"type": "string", "description": "New model name"}, - "co_worker_quality_gate": {"type": "boolean", "description": "Enable quality gate"}, - "rework_threshold": {"type": "integer", "description": "New rework threshold"}, - "max_rework_attempts": {"type": "integer", "description": "New max rework attempts"} - }, - required=["agent_id", "mesh_node_id"]), - _tool_def("delete_agent", - "Delete an autonomous agent.", - { - "agent_id": {"type": "string", "description": "Unique agent ID to delete"} - }, - required=["agent_id"]), - _tool_def("get_agent_details", - "Get full details for a specific autonomous agent.", - { - "agent_id": {"type": "string", "description": "Unique agent ID"} - }, - required=["agent_id"]), - _tool_def("get_user_config", - "Get current user preferences and effective system configuration.", - {}), - _tool_def("update_user_config", - "Update user preferences for LLM, TTS, and STT.", - { - "llm": {"type": "object", "description": "LLM preferences"}, - "tts": {"type": "object", "description": "TTS preferences"}, - "stt": {"type": "object", "description": "STT preferences"}, - "statuses": {"type": "object", "description": "Provider health statuses"} - }), + _tool_def("list_nodes", "List all agent nodes in the Cortex swarm mesh.", {}), + _tool_def("get_app_info", "Get metadata about this Cortex Hub instance.", {}), + _tool_def("get_node_details", "Get full details for a specific agent node.", {"node_id": {"type": "string"}}, required=["node_id"]), + _tool_def("list_agents", "List all autonomous agents.", {}), + _tool_def("list_skills", "List all registered skill folders.", {}), + _tool_def("dispatch", "Dispatch a shell command to a node.", {"node_id": {"type": "string"}, "command": {"type": "string"}, "session_id": {"type": "string"}}, required=["node_id", "command"]), + _tool_def("write_file", "Create/Update a file on a node.", {"node_id": {"type": "string"}, "path": {"type": "string"}, "content": {"type": "string"}, "is_dir": {"type": "boolean"}, "session_id": {"type": "string"}}, required=["node_id", "path"]), + _tool_def("delete_file", "Delete a file/directory on a node.", {"node_id": {"type": "string"}, "path": {"type": "string"}, "session_id": {"type": "string"}}, required=["node_id", "path"]), + _tool_def("deploy_agent", "Deploy a new autonomous agent.", {"name": {"type": "string"}, "mesh_node_id": {"type": "string"}, "description": {"type": "string"}, "system_prompt": {"type": "string"}, "initial_prompt": {"type": "string"}}, required=["name", "mesh_node_id"]), + _tool_def("update_agent_config", "Update config for an agent.", {"agent_id": {"type": "string"}, "mesh_node_id": {"type": "string"}}, required=["agent_id", "mesh_node_id"]), + _tool_def("delete_agent", "Delete an autonomous agent.", {"agent_id": {"type": "string"}}, required=["agent_id"]), + _tool_def("get_agent_details", "Get details for an autonomous agent.", {"agent_id": {"type": "string"}}, required=["agent_id"]), + _tool_def("get_user_config", "Get user effective config.", {}), + _tool_def("update_user_config", "Update user preferences.", {"llm": {"type": "object"}}), _tool_def("list_groups", "List all security groups (Admin Only).", {}), - _tool_def("create_group", "Create a new security policy group (Admin Only).", { - "name": {"type": "string", "description": "Group name"}, - "description": {"type": "string", "description": "Optional description"}, - "policy": {"type": "object", "description": "Optional policy whitelists"} - }, required=["name"]), - _tool_def("update_group", "Update a security policy group (Admin Only).", { - "gid": {"type": "string", "description": "Group ID"}, - "name": {"type": "string", "description": "Group name"}, - "description": {"type": "string", "description": "Optional description"}, - "policy": {"type": "object", "description": "Optional policy whitelists"} - }, required=["gid"]), - _tool_def("delete_group", "Delete a security policy group (Admin Only).", { - "gid": {"type": "string", "description": "Group ID"} - }, required=["gid"]), + _tool_def("create_group", "Create policy group (Admin Only).", {"name": {"type": "string"}}, required=["name"]), + _tool_def("update_group", "Update policy group (Admin Only).", {"gid": {"type": "string"}}, required=["gid"]), + _tool_def("delete_group", "Delete policy group (Admin Only).", {"gid": {"type": "string"}}, required=["gid"]), _tool_def("list_users", "List all registered users (Admin Only).", {}), - _tool_def("update_user_role", "Update a user's role (Admin Only).", { - "uid": {"type": "string", "description": "User ID"}, - "role": {"type": "string", "description": "New role (admin/user)"} - }, required=["uid", "role"]), - _tool_def("assign_user_to_group", "Assign a user to a policy group (Admin Only).", { - "uid": {"type": "string", "description": "User ID"}, - "group_id": {"type": "string", "description": "Group ID"} - }, required=["uid", "group_id"]), - _tool_def("get_user_profile", "Get the current user's profile information.", {}), - _tool_def("update_user_profile", "Update the current user's profile information.", { - "username": {"type": "string"}, - "full_name": {"type": "string"}, - "avatar_url": {"type": "string"} - }), - _tool_def("create_session", "Create a new chat session.", { - "provider_name": {"type": "string"}, - "model_name": {"type": "string"}, - "feature_name": {"type": "string"}, - "stt_provider_name": {"type": "string"}, - "tts_provider_name": {"type": "string"} - }), - _tool_def("get_session", "Get full detail for a specific session.", { - "session_id": {"type": "integer"} - }, required=["session_id"]), - _tool_def("list_sessions", "List all chat sessions for the current user.", { - "feature_name": {"type": "string", "description": "Optional feature filter"} - }), - _tool_def("update_session", "Update session configuration.", { - "session_id": {"type": "integer"}, - "title": {"type": "string"}, - "provider_name": {"type": "string"}, - "model_name": {"type": "string"}, - "restrict_skills": {"type": "boolean"}, - "allowed_skill_ids": {"type": "array", "items": {"type": "integer"}}, - "allowed_skill_names": {"type": "array", "items": {"type": "string"}}, - "system_prompt_override": {"type": "string"}, - "is_locked": {"type": "boolean"} - }, required=["session_id"]), - _tool_def("delete_session", "Archive a chat session.", { - "session_id": {"type": "integer"} - }, required=["session_id"]), - _tool_def("get_session_messages", "Retrieve the chat history for a session.", { - "session_id": {"type": "integer"} - }, required=["session_id"]), - _tool_def("clear_session_history", "Delete all messages in a session.", { - "session_id": {"type": "integer"} - }, required=["session_id"]), - _tool_def("attach_nodes_to_session", "Attach agent nodes to a session.", { - "session_id": {"type": "integer"}, - "node_ids": {"type": "array", "items": {"type": "string"}}, - "config": {"type": "object"} - }, required=["session_id", "node_ids"]), - _tool_def("detach_node_from_session", "Detach an agent node from a session.", { - "session_id": {"type": "integer"}, - "node_id": {"type": "string"} - }, required=["session_id", "node_id"]), - _tool_def("get_session_nodes", "Get all nodes attached to a session.", { - "session_id": {"type": "integer"} - }, required=["session_id"]), - _tool_def("cancel_session_task", "Cancel a running Swarm task within a session.", { - "session_id": {"type": "integer"} - }, required=["session_id"]), - _tool_def("get_system_status", "Retrieve full system and TLS state.", {}) + _tool_def("update_user_role", "Update a user's role (Admin Only).", {"uid": {"type": "string"}, "role": {"type": "string"}}, required=["uid", "role"]), + _tool_def("assign_user_to_group", "Assign user to group (Admin Only).", {"uid": {"type": "string"}, "group_id": {"type": "string"}}, required=["uid", "group_id"]), + _tool_def("get_user_profile", "Get user's profile info.", {}), + _tool_def("update_user_profile", "Update user's profile info.", {"username": {"type": "string"}}), + _tool_def("create_session", "Create a new chat session.", {"provider_name": {"type": "string"}}), + _tool_def("get_session", "Get specific session details.", {"session_id": {"type": "integer"}}, required=["session_id"]), + _tool_def("list_sessions", "List all chat sessions.", {}), + _tool_def("update_session", "Update session configuration.", {"session_id": {"type": "integer"}}, required=["session_id"]), + _tool_def("delete_session", "Archive a chat session.", {"session_id": {"type": "integer"}}, required=["session_id"]), + _tool_def("get_session_messages", "Retrieve chat history.", {"session_id": {"type": "integer"}}, required=["session_id"]), + _tool_def("clear_session_history", "Delete all messages in session.", {"session_id": {"type": "integer"}}, required=["session_id"]), + _tool_def("attach_nodes_to_session", "Attach agent nodes to session.", {"session_id": {"type": "integer"}, "node_ids": {"type": "array", "items": {"type": "string"}}}, required=["session_id", "node_ids"]), + _tool_def("detach_node_from_session", "Detach agent node.", {"session_id": {"type": "integer"}, "node_id": {"type": "string"}}, required=["session_id", "node_id"]), + _tool_def("get_session_nodes", "Get all nodes in session.", {"session_id": {"type": "integer"}}, required=["session_id"]), + _tool_def("cancel_session_task", "Cancel session task.", {"session_id": {"type": "integer"}}, required=["session_id"]), + _tool_def("get_system_status", "Retrieve full system state.", {}) ] } - # ── Tool Execution ──────────────────────────────────────────────────────── if method == "tools/call": name = params.get("name", "") args = params.get("arguments", {}) @@ -505,313 +330,225 @@ return {"name": name, "description": description, "inputSchema": schema} -# ─── Tool Implementations ───────────────────────────────────────────────────── +class MCPToolDispatcher: + def __init__(self, services: ServiceContainer, loop): + self.services = services + self.loop = loop + self._handlers = { + "list_nodes": self._list_nodes, + "get_app_info": self._get_app_info, + "get_node_details": self._get_node_details, + "list_agents": self._list_agents, + "deploy_agent": self._deploy_agent, + "update_agent_config": self._update_agent_config, + "delete_agent": self._delete_agent, + "get_agent_details": self._get_agent_details, + "get_user_config": self._get_user_config, + "update_user_config": self._update_user_config, + "list_skills": self._list_skills, + "dispatch": self._dispatch, + "write_file": self._write_file, + "delete_file": self._delete_file, + "list_groups": self._list_groups, + "create_group": self._create_group, + "update_group": self._update_group, + "delete_group": self._delete_group, + "list_users": self._list_users, + "update_user_role": self._update_user_role, + "assign_user_to_group": self._assign_user_to_group, + "get_user_profile": self._get_user_profile, + "update_user_profile": self._update_user_profile, + "create_session": self._create_session, + "get_session": self._get_session, + "list_sessions": self._list_sessions, + "update_session": self._update_session, + "delete_session": self._delete_session, + "get_session_messages": self._get_session_messages, + "clear_session_history": self._clear_session_history, + "attach_nodes_to_session": self._attach_nodes_to_session, + "detach_node_from_session": self._detach_node_from_session, + "get_session_nodes": self._get_session_nodes, + "cancel_session_task": self._cancel_session_task, + "get_system_status": self._get_system_status + } -async def _call_tool(name: str, args: dict, token: Optional[str], services: ServiceContainer) -> dict: - """Execute a named tool and return a standard MCP content block.""" + async def dispatch(self, name: str, args: dict, token: Optional[str]) -> Any: + handler = self._handlers.get(name) + if not handler: + raise ValueError(f"Tool '{name}' not found.") + return await handler(args, token) - def _ok(data) -> dict: - text = json.dumps(data, indent=2, default=str) if not isinstance(data, str) else data - return {"content": [{"type": "text", "text": text}]} - - # Run DB queries in a thread pool so we don't block the event loop - loop = asyncio.get_running_loop() - - if name == "list_nodes": - if not token: - raise ValueError("Authentication required to list nodes.") + async def _list_nodes(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") def _query(): from app.db.session import get_db_session with get_db_session() as db: - # Use MeshService to filter nodes based on the authenticated user_id - nodes = services.mesh_service.list_accessible_nodes(token, db) - return { - "nodes": [ - { - "id": n.node_id, - "name": n.display_name, - "status": n.last_status, - "os": (n.capabilities or {}).get("os"), - "is_active": n.is_active, - } - for n in nodes - ] - } - return _ok(await loop.run_in_executor(None, _query)) + nodes = self.services.mesh_service.list_accessible_nodes(token, db) + return {"nodes": [{"id": n.node_id, "name": n.display_name, "status": n.last_status, "os": (n.capabilities or {}).get("os"), "is_active": n.is_active} for n in nodes]} + return await self.loop.run_in_executor(None, _query) - if name == "get_app_info": + async def _get_app_info(self, args: dict, token: Optional[str]): def _query(): from app.db.session import get_db_session with get_db_session() as db: + total, online = 0, 0 if token: - # Filtered counts if authenticated - nodes = services.mesh_service.list_accessible_nodes(token, db) + nodes = self.services.mesh_service.list_accessible_nodes(token, db) total = len(nodes) online = len([n for n in nodes if n.last_status == "online"]) - else: - # Return zero counts if not authenticated - total = 0 - online = 0 - return { - "name": "Cortex Hub", - "version": "1.0.0", - "capabilities": ["swarms", "webmcp", "mcp-sse", "voice-chat", "rag"], - "nodes": {"total": total, "online": online}, - "mcp_transport": "sse", - "sse_endpoint": f"{settings.HUB_PUBLIC_URL}/api/v1/mcp/sse", + "name": "Cortex Hub", "version": "1.0.0", "capabilities": ["swarms", "webmcp", "mcp-sse", "voice-chat", "rag"], + "nodes": {"total": total, "online": online}, "mcp_transport": "sse", "sse_endpoint": f"{settings.HUB_PUBLIC_URL}/api/v1/mcp/sse", "auth": {"oidc_enabled": settings.OIDC_ENABLED} } - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "get_node_details": - if not token: - raise ValueError("Authentication required to get node details.") + async def _get_node_details(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") node_id = args.get("node_id") - if not node_id: - raise ValueError("node_id is required.") + if not node_id: raise ValueError("node_id required.") def _query(): from app.db.session import get_db_session with get_db_session() as db: - # Enforce permission check before returning details - try: - services.mesh_service.require_node_access(token, node_id, db) - except Exception: - return None # Access denied + try: self.services.mesh_service.require_node_access(token, node_id, db) + except Exception: return None + n = self.services.mesh_service.get_node_or_404(node_id, db) + return {"node_id": n.node_id, "display_name": n.display_name, "description": n.description, "status": n.last_status, "is_active": n.is_active, "capabilities": n.capabilities, "skill_config": n.skill_config, "registered_by": n.registered_by, "last_seen_at": str(n.last_seen_at) if n.last_seen_at else None} + res = await self.loop.run_in_executor(None, _query) + if res is None: raise ValueError(f"Node '{node_id}' not found.") + return res - n = services.mesh_service.get_node_or_404(node_id, db) - return { - "node_id": n.node_id, - "display_name": n.display_name, - "description": n.description, - "status": n.last_status, - "is_active": n.is_active, - "capabilities": n.capabilities, - "skill_config": n.skill_config, - "registered_by": n.registered_by, - "last_seen_at": str(n.last_seen_at) if n.last_seen_at else None, - } - result = await loop.run_in_executor(None, _query) - if result is None: - raise ValueError(f"Node '{node_id}' not found.") - return _ok(result) - - if name == "list_agents": - if not token: - raise ValueError("Authentication required to list agents.") + async def _list_agents(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") def _query(): from app.db.session import get_db_session from app.db import models with get_db_session() as db: - # Basic hardening: Only show agents on nodes user can access - accessible_nodes = services.mesh_service.list_accessible_nodes(token, db) - node_ids = [n.node_id for n in accessible_nodes] - - rows = db.query(models.AgentInstance).filter(models.AgentInstance.mesh_node_id.in_(node_ids)).all() - return { - "agents": [ - { - "id": str(a.id), - "name": a.template.name if a.template else None, - "status": a.status, - "node": a.mesh_node_id, - "last_heartbeat": str(a.last_heartbeat) if a.last_heartbeat else None, - "total_runs": a.total_runs, - "quality_score": a.latest_quality_score, - } - for a in rows - ] - } - return _ok(await loop.run_in_executor(None, _query)) + acc = self.services.mesh_service.list_accessible_nodes(token, db) + nids = [n.node_id for n in acc] + rows = db.query(models.AgentInstance).filter(models.AgentInstance.mesh_node_id.in_(nids)).all() + return {"agents": [{"id": str(a.id), "name": a.template.name if a.template else None, "status": a.status, "node": a.mesh_node_id, "last_heartbeat": str(a.last_heartbeat) if a.last_heartbeat else None, "total_runs": a.total_runs, "quality_score": a.latest_quality_score} for a in rows]} + return await self.loop.run_in_executor(None, _query) - if name == "deploy_agent": - if not token: - raise ValueError("Authentication required to deploy agents.") + async def _deploy_agent(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") from app.api import schemas - def _execute_deploy(): + def _execute(): from app.db.session import get_db_session with get_db_session() as db: - request = schemas.DeployAgentRequest(**args) - result = services.agent_service.deploy_agent(db, token, request) - return result - result = await loop.run_in_executor(None, _execute_deploy) - initial_prompt = args.get("initial_prompt") - if initial_prompt: + req = schemas.DeployAgentRequest(**args) + return self.services.agent_service.deploy_agent(db, token, req) + res = await self.loop.run_in_executor(None, _execute) + prompt = args.get("initial_prompt") + if prompt: from app.core.orchestration.agent_loop import AgentExecutor - asyncio.create_task(AgentExecutor.run(result["instance_id"], initial_prompt, services, services.user_service)) - return _ok(result) + asyncio.create_task(AgentExecutor.run(res["instance_id"], prompt, self.services, self.services.user_service)) + return res - if name == "update_agent_config": - if not token: - raise ValueError("Authentication required to update agent config.") + async def _update_agent_config(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") agent_id = args.get("agent_id") - if not agent_id: - raise ValueError("agent_id is required.") + if not agent_id: raise ValueError("agent_id required.") from app.api import schemas - def _execute_update(): + def _execute(): from app.db.session import get_db_session with get_db_session() as db: - config_args = {k: v for k, v in args.items() if k != "agent_id"} - request = schemas.AgentConfigUpdate(**config_args) - result = services.agent_service.update_config(db, agent_id, token, request) - return { - "id": str(result.id), - "status": result.status - } - return _ok(await loop.run_in_executor(None, _execute_update)) + req_args = {k: v for k, v in args.items() if k != "agent_id"} + req = schemas.AgentConfigUpdate(**req_args) + res = self.services.agent_service.update_config(db, agent_id, token, req) + return {"id": str(res.id), "status": res.status} + return await self.loop.run_in_executor(None, _execute) - if name == "delete_agent": - if not token: - raise ValueError("Authentication required to delete agents.") + async def _delete_agent(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") agent_id = args.get("agent_id") - if not agent_id: - raise ValueError("agent_id is required.") - def _execute_delete(): + if not agent_id: raise ValueError("agent_id required.") + def _execute(): from app.db.session import get_db_session with get_db_session() as db: - services.agent_service.delete_agent(db, agent_id, token) + self.services.agent_service.delete_agent(db, agent_id, token) return {"status": "deleted", "agent_id": agent_id} - return _ok(await loop.run_in_executor(None, _execute_delete)) + return await self.loop.run_in_executor(None, _execute) - if name == "get_agent_details": - if not token: - raise ValueError("Authentication required to get agent details.") + async def _get_agent_details(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") agent_id = args.get("agent_id") - if not agent_id: - raise ValueError("agent_id is required.") - def _execute_query(): + if not agent_id: raise ValueError("agent_id required.") + def _execute(): from app.db.session import get_db_session with get_db_session() as db: - a = services.agent_service.get_agent_instance(db, agent_id, token) - return { - "id": str(a.id), - "name": a.template.name if a.template else None, - "status": a.status, - "node": a.mesh_node_id, - "last_heartbeat": str(a.last_heartbeat) if a.last_heartbeat else None, - "total_runs": a.total_runs, - "successful_runs": a.successful_runs, - "quality_score": a.latest_quality_score, - "workspace_jail": a.current_workspace_jail, - "last_error": a.last_error, - "evaluation_status": a.evaluation_status, - } - return _ok(await loop.run_in_executor(None, _execute_query)) + a = self.services.agent_service.get_agent_instance(db, agent_id, token) + return {"id": str(a.id), "name": a.template.name if a.template else None, "status": a.status, "node": a.mesh_node_id, "last_heartbeat": str(a.last_heartbeat) if a.last_heartbeat else None, "total_runs": a.total_runs, "successful_runs": a.successful_runs, "quality_score": a.latest_quality_score, "workspace_jail": a.current_workspace_jail, "last_error": a.last_error, "evaluation_status": a.evaluation_status} + return await self.loop.run_in_executor(None, _execute) - if name == "get_user_config": - if not token: - raise ValueError("Authentication required to get user config.") - def _execute_query(): + async def _get_user_config(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") + def _query(): from app.db.session import get_db_session from app.db.models import User with get_db_session() as db: user = db.query(User).filter(User.id == token).first() - if not user: - raise ValueError("User not found.") - config = services.preference_service.merge_user_config(user, db) - return config.model_dump() - return _ok(await loop.run_in_executor(None, _execute_query)) + if not user: raise ValueError("User not found.") + return self.services.preference_service.merge_user_config(user, db).model_dump() + return await self.loop.run_in_executor(None, _query) - if name == "update_user_config": - if not token: - raise ValueError("Authentication required to update user config.") + async def _update_user_config(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") from app.api import schemas - def _execute_update(): + def _execute(): from app.db.session import get_db_session from app.db.models import User with get_db_session() as db: user = db.query(User).filter(User.id == token).first() - if not user: - raise ValueError("User not found.") + if not user: raise ValueError("User not found.") prefs = schemas.UserPreferences(**args) - result = services.preference_service.update_user_config(user, prefs, db) - return result.model_dump() - return _ok(await loop.run_in_executor(None, _execute_update)) + return self.services.preference_service.update_user_config(user, prefs, db).model_dump() + return await self.loop.run_in_executor(None, _execute) - if name == "list_skills": - if not token: - raise ValueError("Authentication required to list skills.") + async def _list_skills(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") def _query(): from app.db.session import get_db_session from app.db import models with get_db_session() as db: rows = db.query(models.Skill).filter(models.Skill.is_enabled == True).all() - return { - "skills": [ - { - "id": s.id, - "name": s.name, - "description": s.description, - "type": s.skill_type, - } - for s in rows - ] - } - return _ok(await loop.run_in_executor(None, _query)) + return {"skills": [{"id": s.id, "name": s.name, "description": s.description, "type": s.skill_type} for s in rows]} + return await self.loop.run_in_executor(None, _query) - if name == "dispatch": - if not token: - raise ValueError("Authentication required to dispatch tasks.") - node_id = args.get("node_id") - command = args.get("command") - session_id = args.get("session_id", "") - - if not node_id or not command: - raise ValueError("node_id and command are required.") - - def _execute_dispatch(): + async def _dispatch(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") + node_id, command, session_id = args.get("node_id"), args.get("command"), args.get("session_id", "") + if not node_id or not command: raise ValueError("node_id and command required.") + def _execute(): from app.db.session import get_db_session with get_db_session() as db: - task_id = services.mesh_service.dispatch_task( - node_id, command, token, db, session_id=session_id - ) - return {"status": "accepted", "task_id": task_id} - - return _ok(await loop.run_in_executor(None, _execute_dispatch)) + tid = self.services.mesh_service.dispatch_task(node_id, command, token, db, session_id=session_id) + return {"status": "accepted", "task_id": tid} + return await self.loop.run_in_executor(None, _execute) - if name == "write_file": - if not token: - raise ValueError("Authentication required to write files.") - node_id = args.get("node_id") - path = args.get("path") - content = args.get("content", "") - is_dir = args.get("is_dir", False) - session_id = args.get("session_id", "__fs_explorer__") - - if not node_id or not path: - raise ValueError("node_id and path are required.") - - def _execute_write(): + async def _write_file(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") + node_id, path, content, is_dir, session_id = args.get("node_id"), args.get("path"), args.get("content", ""), args.get("is_dir", False), args.get("session_id", "__fs_explorer__") + if not node_id or not path: raise ValueError("node_id and path required.") + def _execute(): from app.db.session import get_db_session with get_db_session() as db: - services.mesh_service.require_node_access(token, node_id, db) - orchestrator = services.orchestrator - res = orchestrator.assistant.write(node_id, path, content, is_dir, session_id=session_id) - return res - - return _ok(await loop.run_in_executor(None, _execute_write)) + self.services.mesh_service.require_node_access(token, node_id, db) + return self.services.orchestrator.assistant.write(node_id, path, content, is_dir, session_id=session_id) + return await self.loop.run_in_executor(None, _execute) - if name == "delete_file": - if not token: - raise ValueError("Authentication required to delete files.") - node_id = args.get("node_id") - path = args.get("path") - session_id = args.get("session_id", "__fs_explorer__") - - if not node_id or not path: - raise ValueError("node_id and path are required.") - - def _execute_delete(): + async def _delete_file(self, args: dict, token: Optional[str]): + if not token: raise ValueError("Authentication required.") + node_id, path, session_id = args.get("node_id"), args.get("path"), args.get("session_id", "__fs_explorer__") + if not node_id or not path: raise ValueError("node_id and path required.") + def _execute(): from app.db.session import get_db_session with get_db_session() as db: - services.mesh_service.require_node_access(token, node_id, db) - orchestrator = services.orchestrator - res = orchestrator.assistant.rm(node_id, path, session_id=session_id) - return res - - return _ok(await loop.run_in_executor(None, _execute_delete)) + self.services.mesh_service.require_node_access(token, node_id, db) + return self.services.orchestrator.assistant.rm(node_id, path, session_id=session_id) + return await self.loop.run_in_executor(None, _execute) - if name == "list_groups": + async def _list_groups(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas @@ -820,11 +557,10 @@ with get_db_session() as db: u = db.query(User).filter(User.id == token).first() if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.") - groups = services.user_service.get_all_groups(db) - return [schemas.GroupInfo.model_validate(g).model_dump() for g in groups] - return _ok(await loop.run_in_executor(None, _query)) + return [schemas.GroupInfo.model_validate(g).model_dump() for g in self.services.user_service.get_all_groups(db)] + return await self.loop.run_in_executor(None, _query) - if name == "create_group": + async def _create_group(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas @@ -833,12 +569,12 @@ with get_db_session() as db: u = db.query(User).filter(User.id == token).first() if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.") - g = services.user_service.create_group(db, args.get("name"), args.get("description"), args.get("policy")) - if g is None: raise ValueError(f"Group '{args.get('name')}' already exists.") + g = self.services.user_service.create_group(db, args.get("name"), args.get("description"), args.get("policy")) + if g is None: raise ValueError(f"Group '{args.get('name')}' exists.") return schemas.GroupInfo.model_validate(g).model_dump() - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "update_group": + async def _update_group(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas @@ -847,13 +583,13 @@ with get_db_session() as db: u = db.query(User).filter(User.id == token).first() if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.") - g = services.user_service.update_group(db, args.get("gid"), args.get("name"), args.get("description"), args.get("policy")) + g = self.services.user_service.update_group(db, args.get("gid"), args.get("name"), args.get("description"), args.get("policy")) if g is None: raise ValueError("Group not found.") - if g is False: raise ValueError(f"Group name conflict.") + if g is False: raise ValueError("Group name conflict.") return schemas.GroupInfo.model_validate(g).model_dump() - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "delete_group": + async def _delete_group(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.db.session import get_db_session @@ -861,12 +597,11 @@ with get_db_session() as db: u = db.query(User).filter(User.id == token).first() if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.") - success = services.user_service.delete_group(db, args.get("gid")) - if not success: raise ValueError("Failed to delete group.") + if not self.services.user_service.delete_group(db, args.get("gid")): raise ValueError("Failed delete.") return {"message": "Group deleted successfully"} - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "list_users": + async def _list_users(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas @@ -875,16 +610,16 @@ with get_db_session() as db: u = db.query(User).filter(User.id == token).first() if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.") - users = services.user_service.get_all_users(db) + users = self.services.user_service.get_all_users(db) res = [] for u_item in users: p = schemas.UserProfile.model_validate(u_item) if u_item.group: p.group_name = u_item.group.name res.append(p.model_dump()) return res - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "update_user_role": + async def _update_user_role(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas @@ -893,13 +628,12 @@ with get_db_session() as db: u = db.query(User).filter(User.id == token).first() if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.") - success = services.user_service.update_user_role(db, args.get("uid"), args.get("role")) - if not success: raise ValueError("Failed to update role.") - updated = services.user_service.get_user_by_id(db, args.get("uid")) + if not self.services.user_service.update_user_role(db, args.get("uid"), args.get("role")): raise ValueError("Failed update.") + updated = self.services.user_service.get_user_by_id(db, args.get("uid")) return schemas.UserProfile.model_validate(updated).model_dump() - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "assign_user_to_group": + async def _assign_user_to_group(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas @@ -908,63 +642,58 @@ with get_db_session() as db: u = db.query(User).filter(User.id == token).first() if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.") - success = services.user_service.assign_user_to_group(db, args.get("uid"), args.get("group_id")) - if not success: raise ValueError("User or group not found.") - updated = services.user_service.get_user_by_id(db, args.get("uid")) + if not self.services.user_service.assign_user_to_group(db, args.get("uid"), args.get("group_id")): raise ValueError("User or group not found.") + updated = self.services.user_service.get_user_by_id(db, args.get("uid")) return schemas.UserProfile.model_validate(updated).model_dump() - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "get_user_profile": + async def _get_user_profile(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas from app.db.session import get_db_session from app.db.models import User with get_db_session() as db: - u = services.user_service.get_user_by_id(db=db, user_id=token) + u = self.services.user_service.get_user_by_id(db=db, user_id=token) if not u: raise ValueError("User not found.") p = schemas.UserProfile.model_validate(u) if u.group: p.group_name = u.group.name return p.model_dump() - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "update_user_profile": + async def _update_user_profile(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas from app.db.session import get_db_session from app.db.models import User with get_db_session() as db: - u = services.user_service.get_user_by_id(db=db, user_id=token) + u = self.services.user_service.get_user_by_id(db=db, user_id=token) if not u: raise ValueError("User not found.") if args.get("username"): u.username = args.get("username") if args.get("full_name"): u.full_name = args.get("full_name") if args.get("avatar_url"): u.avatar_url = args.get("avatar_url") - db.commit() - db.refresh(u) + db.commit(); db.refresh(u) p = schemas.UserProfile.model_validate(u) if u.group: p.group_name = u.group.name return p.model_dump() - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "create_session": + async def _create_session(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas from app.db.session import get_db_session with get_db_session() as db: - s = services.session_service.create_session( - db=db, user_id=token, - provider_name=args.get("provider_name", "deepseek"), - model_name=args.get("model_name"), - feature_name=args.get("feature_name", "default"), - stt_provider_name=args.get("stt_provider_name"), - tts_provider_name=args.get("tts_provider_name") + s = self.services.session_service.create_session( + db=db, user_id=token, provider_name=args.get("provider_name", "deepseek"), + model_name=args.get("model_name"), feature_name=args.get("feature_name", "default"), + stt_provider_name=args.get("stt_provider_name"), tts_provider_name=args.get("tts_provider_name") ) return schemas.Session.model_validate(s).model_dump() - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "get_session": + async def _get_session(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas @@ -974,9 +703,9 @@ s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first() if not s: raise ValueError("Session not found.") return schemas.Session.model_validate(s).model_dump() - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "list_sessions": + async def _list_sessions(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas @@ -984,14 +713,13 @@ from app.db.models import Session as DBSession with get_db_session() as db: sessions = db.query(DBSession).filter( - DBSession.user_id == token, - DBSession.feature_name == args.get("feature_name", "default"), + DBSession.user_id == token, DBSession.feature_name == args.get("feature_name", "default"), DBSession.is_archived == False ).order_by(DBSession.created_at.desc()).all() return [schemas.Session.model_validate(s).model_dump() for s in sessions] - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "update_session": + async def _update_session(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas @@ -1006,28 +734,26 @@ if args.get("restrict_skills") is not None: s.restrict_skills = args.get("restrict_skills") if args.get("allowed_skill_names") is not None: s.allowed_skill_names = args.get("allowed_skill_names") if args.get("allowed_skill_ids") is not None: - skills = db.query(Skill).filter(Skill.id.in_(args.get("allowed_skill_ids"))).all() - s.skills = skills + s.skills = db.query(Skill).filter(Skill.id.in_(args.get("allowed_skill_ids"))).all() if args.get("system_prompt_override") is not None: s.system_prompt_override = args.get("system_prompt_override") if args.get("is_locked") is not None: s.is_locked = args.get("is_locked") - db.commit() - db.refresh(s) + db.commit(); db.refresh(s) return schemas.Session.model_validate(s).model_dump() - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "delete_session": + async def _delete_session(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.db.session import get_db_session from app.db.models import Session as DBSession with get_db_session() as db: s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first() - if not s: raise ValueError("Session not found or access denied.") - services.session_service.archive_session(db, args.get("session_id")) + if not s: raise ValueError("Session not found.") + self.services.session_service.archive_session(db, args.get("session_id")) return {"message": "Session deleted successfully."} - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "get_session_messages": + async def _get_session_messages(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas @@ -1035,19 +761,18 @@ from app.db.models import Session as DBSession with get_db_session() as db: s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first() - if not s: raise ValueError("Session not found or access denied.") - messages = services.rag_service.get_message_history(db=db, session_id=args.get("session_id")) + if not s: raise ValueError("Session access denied.") + msgs = self.services.rag_service.get_message_history(db=db, session_id=args.get("session_id")) res = [] - for m in messages: - msg_dict = schemas.Message.model_validate(m).model_dump() + for m in msgs: + m_dict = schemas.Message.model_validate(m).model_dump() if m.audio_path and os.path.exists(m.audio_path): - msg_dict["has_audio"] = True - msg_dict["audio_url"] = f"/sessions/messages/{m.id}/audio" - res.append(msg_dict) + m_dict["has_audio"], m_dict["audio_url"] = True, f"/sessions/messages/{m.id}/audio" + res.append(m_dict) return res - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "clear_session_history": + async def _clear_session_history(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.db.session import get_db_session @@ -1056,12 +781,12 @@ s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first() if not s: raise ValueError("Session not found.") if s.is_locked: raise ValueError("Session is locked.") - deleted = db.query(DBMessage).filter(DBMessage.session_id == args.get("session_id")).delete() + cnt = db.query(DBMessage).filter(DBMessage.session_id == args.get("session_id")).delete() db.commit() - return {"message": f"Cleared {deleted} messages."} - return _ok(await loop.run_in_executor(None, _query)) + return {"message": f"Cleared {cnt} messages."} + return await self.loop.run_in_executor(None, _query) - if name == "attach_nodes_to_session": + async def _attach_nodes_to_session(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas @@ -1069,13 +794,12 @@ from app.db.models import Session as DBSession with get_db_session() as db: s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first() - if not s: raise ValueError("Session not found or access denied.") + if not s: raise ValueError("Session access denied.") req = schemas.NodeAttachRequest(**args) - res = services.session_service.attach_nodes(db, args.get("session_id"), req) - return res.model_dump() - return _ok(await loop.run_in_executor(None, _query)) + return self.services.session_service.attach_nodes(db, args.get("session_id"), req).model_dump() + return await self.loop.run_in_executor(None, _query) - if name == "detach_node_from_session": + async def _detach_node_from_session(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.db.session import get_db_session @@ -1091,12 +815,12 @@ status.pop(args.get("node_id"), None) s.node_sync_status = status db.commit() - if hasattr(services, "orchestrator") and services.orchestrator: - services.orchestrator.assistant.clear_workspace(args.get("node_id"), s.sync_workspace_id) + if hasattr(self.services, "orchestrator") and self.services.orchestrator: + self.services.orchestrator.assistant.clear_workspace(args.get("node_id"), s.sync_workspace_id) return {"message": "Detached successfully."} - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "get_session_nodes": + async def _get_session_nodes(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.api import schemas @@ -1105,17 +829,17 @@ with get_db_session() as db: s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first() if not s: raise ValueError("Session not found.") - sync_status = s.node_sync_status or {} + status = s.node_sync_status or {} entries = [] for nid in (s.attached_node_ids or []): - live = services.node_registry_service.get_node(nid) - persisted = sync_status.get(nid, {}) - status_val = "connected" if live and persisted.get("status") == "pending" else persisted.get("status", "pending") - entries.append(schemas.NodeSyncStatusEntry(node_id=nid, status=status_val, last_sync=persisted.get("last_sync"), error=persisted.get("error")).model_dump()) + live = self.services.node_registry_service.get_node(nid) + p = status.get(nid, {}) + val = "connected" if live and p.get("status") == "pending" else p.get("status", "pending") + entries.append(schemas.NodeSyncStatusEntry(node_id=nid, status=val, last_sync=p.get("last_sync"), error=p.get("error")).model_dump()) return {"session_id": s.id, "sync_workspace_id": s.sync_workspace_id, "nodes": entries, "sync_config": s.sync_config or {}} - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "cancel_session_task": + async def _cancel_session_task(self, args: dict, token: Optional[str]): if not token: raise ValueError("Authentication required.") def _query(): from app.db.session import get_db_session @@ -1126,25 +850,20 @@ s.is_cancelled = True db.commit() return {"message": "Cancellation request sent."} - return _ok(await loop.run_in_executor(None, _query)) + return await self.loop.run_in_executor(None, _query) - if name == "get_system_status": + async def _get_system_status(self, args: dict, token: Optional[str]): def _query(): - return { - "status": "running", - "oidc_enabled": settings.OIDC_ENABLED, - "tls_enabled": settings.GRPC_TLS_ENABLED, - "external_endpoint": settings.GRPC_EXTERNAL_ENDPOINT, - "version": settings.VERSION - } - return _ok(await loop.run_in_executor(None, _query)) + return {"status": "running", "oidc_enabled": settings.OIDC_ENABLED, "version": "1.0.0"} + return await self.loop.run_in_executor(None, _query) - # Writable tools (future-proofing check) - writable_tools = [] - if name in writable_tools and not settings.OIDC_ENABLED: - raise HTTPException( - status_code=403, - detail="Swarm manipulation tools are disabled because OIDC is not configured." - ) +async def _call_tool(name: str, args: dict, token: Optional[str], services: ServiceContainer) -> dict: + """Execute a named tool via MCPToolDispatcher.""" + def _ok(data) -> dict: + text = json.dumps(data, indent=2, default=str) if not isinstance(data, str) else data + return {"content": [{"type": "text", "text": text}]} - raise ValueError(f"Unknown tool: '{name}'") + loop = asyncio.get_running_loop() + dispatcher = MCPToolDispatcher(services, loop) + result = await dispatcher.dispatch(name, args, token) + return _ok(result)