Newer
Older
cortex-hub / ai-hub / app / api / routes / mcp.py
"""
MCP (Model Context Protocol) Server Route — Streamable HTTP + Legacy SSE Transport

Supports:
  MCP spec 2025-11-25 — Streamable HTTP (primary, recommended)
  MCP spec 2024-11-05 — HTTP+SSE (legacy, backwards-compat)

Endpoints (mounted under /api/v1/mcp/*):
  POST /mcp/sse     — Streamable HTTP: JSON-RPC in, JSON response out
  POST /mcp/        — Same, aliased for clients using the base path
  GET  /mcp/sse     — Legacy SSE stream (sends endpoint event)
  POST /mcp/messages — Legacy SSE message handler

Discovery:
  GET  /.well-known/mcp/manifest.json  (mounted in app.py)
"""


import asyncio
import json
import uuid
import logging
import jwt
from typing import Optional, List, Annotated
from fastapi import APIRouter, HTTPException, Request, Query, Header
from fastapi.responses import JSONResponse, StreamingResponse

from app.api.dependencies import ServiceContainer
from app.config import settings

logger = logging.getLogger(__name__)

MCP_VERSION = "2025-11-25"  # Latest MCP specification version

# ─── In-process SSE session registry ─────────────────────────────────────────
# Maps session_id → asyncio.Queue of JSON-serializable dicts
_sse_sessions: dict[str, asyncio.Queue] = {}


def create_mcp_router(services: ServiceContainer) -> APIRouter:
    router = APIRouter(tags=["MCP"])

    async def _get_authenticated_user(request: Request, token: Optional[str], db) -> Optional[str]:
        """
        Resolves the user_id from either the Authorization header (JWT) or the token query param.
        If OIDC is enabled, this strictly requires a valid JWT.
        """
        from app.config import settings
        
        auth_header = request.headers.get("Authorization")
        if auth_header and auth_header.startswith("Bearer "):
            token = auth_header.split(" ")[1]

        if not token:
            return None

        is_jwt = "." in token
        
        # 1. OIDC Mode: Support both OIDC (RS256) and Internal (HS256) JWTs
        if settings.OIDC_ENABLED:
            if not is_jwt:
                logger.warning(f"[MCP] Rejected non-JWT token in OIDC mode.")
                raise HTTPException(
                    status_code=401, 
                    detail="Authentication required: Provide a valid JWT."
                )
            
            try:
                # Try internal HS256 first
                unverified = jwt.decode(token, options={"verify_signature": False})
                if unverified.get("iss") == "cortex-hub-internal":
                    decoded = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
                    return decoded.get("sub")
                
                # Fallback to OIDC RS256
                user = await services.auth_service.verify_id_token(token, db)
                return user.id
            except Exception as e:
                logger.error(f"[MCP] JWT verification failed: {e}")
                raise HTTPException(status_code=401, detail=f"Invalid token: {str(e)}")
        
        # 2. Legacy/Bootstrap Mode: Accept plain user_id (Identity Claim)
        # This is only active when OIDC is not configured.
        return token

    # ─── SSE Transport — Client Connection ────────────────────────────────────
    @router.get("/sse", summary="MCP SSE Transport Endpoint")
    async def mcp_sse(
        request: Request,
        token: Optional[str] = Query(None),
    ):
        """
        Server-Sent Events (SSE) transport for MCP.
        Supports Bearer token in Authorization header or 'token' query parameter.
        """
        from app.db.session import get_db_session
        with get_db_session() as db:
            user_id = await _get_authenticated_user(request, token, db)
        
        if not user_id:
            # We allow the SSE connection to open even without auth, 
            # but actual messages/tools will be rejected.
            logger.info("[MCP] SSE connection opened without initial auth.")

        queue = asyncio.Queue()
        session_id = str(uuid.uuid4())
        _sse_sessions[session_id] = queue
        
        messages_url = f"{settings.HUB_PUBLIC_URL}/api/v1/mcp/messages?session_id={session_id}"
        if user_id:
            messages_url += f"&token={user_id}"

        # Origin validation per MCP 2025-11-25
        origin = request.headers.get("origin")
        if origin:
            allowed = ["https://ai.jerxie.com", "http://localhost:3000", "http://localhost:8080"]
            if not any(origin.startswith(a) for a in allowed):
                logger.warning(f"[MCP] Blocked unauthorized origin: {origin}")
                raise HTTPException(status_code=403, detail="Unauthorized Origin")

        async def _event_generator():
            try:
                yield f"event: endpoint\ndata: {messages_url}\n\n"
                while True:
                    msg = await queue.get()
                    yield f"event: message\ndata: {json.dumps(msg)}\n\n"
            finally:
                _sse_sessions.pop(session_id, None)

        return StreamingResponse(
            _event_generator(),
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "X-Accel-Buffering": "no",
                "Access-Control-Allow-Origin": "*",
            },
        )

    # ─── Streamable HTTP Transport (MCP 2025-11-25) ───────────────────────────
    @router.post("/sse")
    @router.post("/", summary="MCP Streamable HTTP Endpoint (Post-only mode)")
    async def mcp_streamable_http(
        request: Request,
        token: Optional[str] = Query(None),
    ):
        """
        One-shot JSON-RPC over HTTP.
        Supports Bearer token in Authorization header or 'token' query parameter.
        """
        from app.db.session import get_db_session
        with get_db_session() as db:
            user_id = await _get_authenticated_user(request, token, db)

        # Origin validation per MCP 2025-11-25
        origin = request.headers.get("origin")
        if origin:
            allowed = [
                "https://ai.jerxie.com",
                "http://localhost:3000",
                "http://localhost:8080",
            ]
            # Also allow the server's own origin
            server_host = request.headers.get("host", "")
            allowed.append(f"https://{server_host}")
            allowed.append(f"http://{server_host}")
            if not any(origin.startswith(a) for a in allowed):
                logger.warning(f"[MCP] Rejected request from disallowed origin: {origin}")
                return JSONResponse(
                    {"jsonrpc": "2.0", "error": {"code": -32000, "message": "Forbidden origin"}},
                    status_code=403,
                )

        try:
            body = await request.json()
        except Exception:
            raise HTTPException(status_code=400, detail="Invalid JSON body.")

        # Batch requests (JSON array)
        if isinstance(body, list):
            results = []
            for item in body:
                results.append(await _handle_single(item, user_id, services))
            return JSONResponse(
                [r for r in results if r is not None],
                headers={"Access-Control-Allow-Origin": "*", "MCP-Protocol-Version": MCP_VERSION},
            )

        # Single request
        response = await _handle_single(body, user_id, services)
        if response is None:  # notification — no id
            return JSONResponse(
                None, status_code=202,
                headers={"Access-Control-Allow-Origin": "*"},
            )

        # If initialize, attach a session ID (MAY per spec)
        headers = {"Access-Control-Allow-Origin": "*", "MCP-Protocol-Version": MCP_VERSION}
        if body.get("method") == "initialize":
            headers["Mcp-Session-Id"] = str(uuid.uuid4())

        return JSONResponse(response, headers=headers)

    # ─── SSE Transport — Message Handler ─────────────────────────────────────
    @router.post("/messages")
    async def mcp_messages(
        request: Request,
        session_id: str = Query(...),
        token: Optional[str] = Query(None),
    ):
        """
        Legacy SSE message handler — receives JSON-RPC 2.0 from a client that
        first established a GET /sse stream, then pushes results over that stream.
        """
        queue = _sse_sessions.get(session_id)
        if not queue:
            raise HTTPException(status_code=404, detail="MCP session not found or expired.")

        try:
            body = await request.json()
        except Exception:
            raise HTTPException(status_code=400, detail="Invalid JSON body.")

        rpc_id = body.get("id")
        method = body.get("method", "")
        params = body.get("params", {})

        from app.db.session import get_db_session
        with get_db_session() as db:
            user_id = await _get_authenticated_user(request, token, db)

        logger.info(f"[MCP] [{session_id[:8]}] → {method}")
        asyncio.create_task(_dispatch(queue, rpc_id, method, params, user_id, services))

        return JSONResponse(
            {"status": "accepted"},
            status_code=202,
            headers={"Access-Control-Allow-Origin": "*"},
        )

    return router


# ─── Single-request handler (used by Streamable HTTP) ────────────────────────

async def _handle_single(body: dict, token: Optional[str], services: ServiceContainer):
    """Process one JSON-RPC object; return response dict or None for notifications."""
    rpc_id  = body.get("id")      # None means it's a notification
    method  = body.get("method", "")
    params  = body.get("params", {})

    logger.info(f"[MCP-HTTP] → {method}")
    try:
        result = await _execute(method, params, token, services)
        if rpc_id is None:
            return None   # notification — no response
        return {"jsonrpc": "2.0", "id": rpc_id, "result": result}
    except Exception as exc:
        logger.exception(f"[MCP-HTTP] Error for '{method}': {exc}")
        if rpc_id is None:
            return None
        return {
            "jsonrpc": "2.0",
            "id": rpc_id,
            "error": {"code": -32000, "message": str(exc)},
        }



# ─── Dispatcher ───────────────────────────────────────────────────────────────

async def _dispatch(
    queue: asyncio.Queue,
    rpc_id,
    method: str,
    params: dict,
    token: Optional[str],
    services: ServiceContainer,
):
    """Run the method and push a JSON-RPC response onto the SSE queue."""
    try:
        result = await _execute(method, params, token, services)
        await queue.put({"jsonrpc": "2.0", "id": rpc_id, "result": result})
    except Exception as exc:
        logger.exception(f"[MCP] Tool error for '{method}': {exc}")
        await queue.put({
            "jsonrpc": "2.0",
            "id": rpc_id,
            "error": {"code": -32000, "message": str(exc)},
        })


async def _execute(method: str, params: dict, token: Optional[str], services: ServiceContainer):
    """Route a JSON-RPC method to its implementation."""

    # ── MCP Handshake ─────────────────────────────────────────────────────────
    if method == "initialize":
        return {
            "protocolVersion": MCP_VERSION,
            "capabilities": {"tools": {}},
            "serverInfo": {"name": "Cortex Hub", "version": "1.0.0"},
        }

    if method == "ping":
        return {}

    # ── Tool Discovery ────────────────────────────────────────────────────────
    if method == "tools/list":
        return {
            "tools": [
                _tool_def("list_nodes",
                          "List all agent nodes in the Cortex swarm mesh and their status.",
                          {}),
                _tool_def("get_app_info",
                          "Get metadata about this Cortex Hub instance.",
                          {}),
                _tool_def("get_node_details",
                          "Get full details for a specific agent node.",
                          {"node_id": {"type": "string", "description": "Unique node ID"}},
                          required=["node_id"]),
                _tool_def("list_agents",
                          "List all autonomous agents configured in the system.",
                          {}),
                _tool_def("list_skills",
                          "List all skill folders (tool libraries) registered in the system.",
                          {}),
                _tool_def("dispatch",
                          "Dispatch a shell command to a specific agent node.",
                          {
                              "node_id": {"type": "string", "description": "Unique node ID"},
                              "command": {"type": "string", "description": "Command to execute"},
                              "session_id": {"type": "string", "description": "Optional session ID"},
                          },
                          required=["node_id", "command"]),
                _tool_def("write_file",
                          "Create or update a file on a specific agent node.",
                          {
                              "node_id": {"type": "string", "description": "Unique node ID"},
                              "path": {"type": "string", "description": "Path to file"},
                              "content": {"type": "string", "description": "Content to write (string)"},
                              "is_dir": {"type": "boolean", "description": "True if creating a directory"},
                              "session_id": {"type": "string", "description": "Optional session ID"},
                          },
                          required=["node_id", "path"]),
                _tool_def("delete_file",
                          "Delete a file or directory on a specific agent node.",
                          {
                              "node_id": {"type": "string", "description": "Unique node ID"},
                              "path": {"type": "string", "description": "Path to file or directory"},
                              "session_id": {"type": "string", "description": "Optional session ID"},
                          },
                          required=["node_id", "path"]),
                _tool_def("deploy_agent",
                          "Deploy a new agent instance with a template and session.",
                          {
                              "name": {"type": "string", "description": "Name of the agent"},
                              "mesh_node_id": {"type": "string", "description": "Node ID to deploy to"},
                              "description": {"type": "string", "description": "Optional description"},
                              "system_prompt": {"type": "string", "description": "Optional system prompt override"},
                              "max_loop_iterations": {"type": "integer", "description": "Max loop iterations (default 20)"},
                              "initial_prompt": {"type": "string", "description": "Optional first message to kick off execution"},
                              "provider_name": {"type": "string", "description": "Optional LLM provider name"},
                              "trigger_type": {"type": "string", "description": "Trigger type: manual, webhook, cron, interval"},
                              "cron_expression": {"type": "string", "description": "Cron expression if trigger_type is cron"},
                              "interval_seconds": {"type": "integer", "description": "Interval in seconds if trigger_type is interval"},
                              "default_prompt": {"type": "string", "description": "Predefined prompt for any trigger"},
                              "co_worker_quality_gate": {"type": "boolean", "description": "Enable quality gate"},
                              "rework_threshold": {"type": "integer", "description": "Rework threshold (0-100)"},
                              "max_rework_attempts": {"type": "integer", "description": "Max rework attempts"}
                          },
                          required=["name", "mesh_node_id"]),
                _tool_def("update_agent_config",
                          "Update configuration for an existing agent.",
                          {
                              "agent_id": {"type": "string", "description": "Unique agent ID"},
                              "name": {"type": "string", "description": "New name"},
                              "system_prompt": {"type": "string", "description": "New system prompt"},
                              "max_loop_iterations": {"type": "integer", "description": "New max loop iterations"},
                              "mesh_node_id": {"type": "string", "description": "New mesh node ID"},
                              "provider_name": {"type": "string", "description": "New provider name"},
                              "model_name": {"type": "string", "description": "New model name"},
                              "co_worker_quality_gate": {"type": "boolean", "description": "Enable quality gate"},
                              "rework_threshold": {"type": "integer", "description": "New rework threshold"},
                              "max_rework_attempts": {"type": "integer", "description": "New max rework attempts"}
                          },
                          required=["agent_id", "mesh_node_id"]),
                _tool_def("delete_agent",
                          "Delete an autonomous agent.",
                          {
                              "agent_id": {"type": "string", "description": "Unique agent ID to delete"}
                          },
                          required=["agent_id"]),
                _tool_def("get_agent_details",
                          "Get full details for a specific autonomous agent.",
                          {
                              "agent_id": {"type": "string", "description": "Unique agent ID"}
                          },
                          required=["agent_id"]),
                _tool_def("get_user_config",
                          "Get current user preferences and effective system configuration.",
                          {}),
                _tool_def("update_user_config",
                          "Update user preferences for LLM, TTS, and STT.",
                          {
                              "llm": {"type": "object", "description": "LLM preferences"},
                              "tts": {"type": "object", "description": "TTS preferences"},
                              "stt": {"type": "object", "description": "STT preferences"},
                              "statuses": {"type": "object", "description": "Provider health statuses"}
                          }),
                _tool_def("list_groups", "List all security groups (Admin Only).", {}),
                _tool_def("create_group", "Create a new security policy group (Admin Only).", {
                    "name": {"type": "string", "description": "Group name"},
                    "description": {"type": "string", "description": "Optional description"},
                    "policy": {"type": "object", "description": "Optional policy whitelists"}
                }, required=["name"]),
                _tool_def("update_group", "Update a security policy group (Admin Only).", {
                    "gid": {"type": "string", "description": "Group ID"},
                    "name": {"type": "string", "description": "Group name"},
                    "description": {"type": "string", "description": "Optional description"},
                    "policy": {"type": "object", "description": "Optional policy whitelists"}
                }, required=["gid"]),
                _tool_def("delete_group", "Delete a security policy group (Admin Only).", {
                    "gid": {"type": "string", "description": "Group ID"}
                }, required=["gid"]),
                _tool_def("list_users", "List all registered users (Admin Only).", {}),
                _tool_def("update_user_role", "Update a user's role (Admin Only).", {
                    "uid": {"type": "string", "description": "User ID"},
                    "role": {"type": "string", "description": "New role (admin/user)"}
                }, required=["uid", "role"]),
                _tool_def("assign_user_to_group", "Assign a user to a policy group (Admin Only).", {
                    "uid": {"type": "string", "description": "User ID"},
                    "group_id": {"type": "string", "description": "Group ID"}
                }, required=["uid", "group_id"]),
                _tool_def("get_user_profile", "Get the current user's profile information.", {}),
                _tool_def("update_user_profile", "Update the current user's profile information.", {
                    "username": {"type": "string"},
                    "full_name": {"type": "string"},
                    "avatar_url": {"type": "string"}
                }),
                _tool_def("create_session", "Create a new chat session.", {
                    "provider_name": {"type": "string"},
                    "model_name": {"type": "string"},
                    "feature_name": {"type": "string"},
                    "stt_provider_name": {"type": "string"},
                    "tts_provider_name": {"type": "string"}
                }),
                _tool_def("get_session", "Get full detail for a specific session.", {
                    "session_id": {"type": "integer"}
                }, required=["session_id"]),
                _tool_def("list_sessions", "List all chat sessions for the current user.", {
                    "feature_name": {"type": "string", "description": "Optional feature filter"}
                }),
                _tool_def("update_session", "Update session configuration.", {
                    "session_id": {"type": "integer"},
                    "title": {"type": "string"},
                    "provider_name": {"type": "string"},
                    "model_name": {"type": "string"},
                    "restrict_skills": {"type": "boolean"},
                    "allowed_skill_ids": {"type": "array", "items": {"type": "integer"}},
                    "allowed_skill_names": {"type": "array", "items": {"type": "string"}},
                    "system_prompt_override": {"type": "string"},
                    "is_locked": {"type": "boolean"}
                }, required=["session_id"]),
                _tool_def("delete_session", "Archive a chat session.", {
                    "session_id": {"type": "integer"}
                }, required=["session_id"]),
                _tool_def("get_session_messages", "Retrieve the chat history for a session.", {
                    "session_id": {"type": "integer"}
                }, required=["session_id"]),
                _tool_def("clear_session_history", "Delete all messages in a session.", {
                    "session_id": {"type": "integer"}
                }, required=["session_id"]),
                _tool_def("attach_nodes_to_session", "Attach agent nodes to a session.", {
                    "session_id": {"type": "integer"},
                    "node_ids": {"type": "array", "items": {"type": "string"}},
                    "config": {"type": "object"}
                }, required=["session_id", "node_ids"]),
                _tool_def("detach_node_from_session", "Detach an agent node from a session.", {
                    "session_id": {"type": "integer"},
                    "node_id": {"type": "string"}
                }, required=["session_id", "node_id"]),
                _tool_def("get_session_nodes", "Get all nodes attached to a session.", {
                    "session_id": {"type": "integer"}
                }, required=["session_id"]),
                _tool_def("cancel_session_task", "Cancel a running Swarm task within a session.", {
                    "session_id": {"type": "integer"}
                }, required=["session_id"]),
                _tool_def("get_system_status", "Retrieve full system and TLS state.", {})
            ]
        }

    # ── Tool Execution ────────────────────────────────────────────────────────
    if method == "tools/call":
        name = params.get("name", "")
        args = params.get("arguments", {})
        return await _call_tool(name, args, token, services)

    raise ValueError(f"Unknown method: '{method}'")


def _tool_def(name: str, description: str, properties: dict, required: list = None) -> dict:
    schema = {"type": "object", "properties": properties}
    if required:
        schema["required"] = required
    return {"name": name, "description": description, "inputSchema": schema}


# ─── Tool Implementations ─────────────────────────────────────────────────────

async def _call_tool(name: str, args: dict, token: Optional[str], services: ServiceContainer) -> dict:
    """Execute a named tool and return a standard MCP content block."""

    def _ok(data) -> dict:
        text = json.dumps(data, indent=2, default=str) if not isinstance(data, str) else data
        return {"content": [{"type": "text", "text": text}]}

    # Run DB queries in a thread pool so we don't block the event loop
    loop = asyncio.get_running_loop()

    if name == "list_nodes":
        if not token:
            raise ValueError("Authentication required to list nodes.")
        def _query():
            from app.db.session import get_db_session
            with get_db_session() as db:
                # Use MeshService to filter nodes based on the authenticated user_id
                nodes = services.mesh_service.list_accessible_nodes(token, db)
                return {
                    "nodes": [
                        {
                            "id": n.node_id,
                            "name": n.display_name,
                            "status": n.last_status,
                            "os": (n.capabilities or {}).get("os"),
                            "is_active": n.is_active,
                        }
                        for n in nodes
                    ]
                }
        return _ok(await loop.run_in_executor(None, _query))

    if name == "get_app_info":
        def _query():
            from app.db.session import get_db_session
            with get_db_session() as db:
                if token:
                    # Filtered counts if authenticated
                    nodes = services.mesh_service.list_accessible_nodes(token, db)
                    total = len(nodes)
                    online = len([n for n in nodes if n.last_status == "online"])
                else:
                    # Return zero counts if not authenticated
                    total = 0
                    online = 0

                return {
                    "name": "Cortex Hub",
                    "version": "1.0.0",
                    "capabilities": ["swarms", "webmcp", "mcp-sse", "voice-chat", "rag"],
                    "nodes": {"total": total, "online": online},
                    "mcp_transport": "sse",
                    "sse_endpoint": f"{settings.HUB_PUBLIC_URL}/api/v1/mcp/sse",
                    "auth": {"oidc_enabled": settings.OIDC_ENABLED}
                }
        return _ok(await loop.run_in_executor(None, _query))

    if name == "get_node_details":
        if not token:
            raise ValueError("Authentication required to get node details.")
        node_id = args.get("node_id")
        if not node_id:
            raise ValueError("node_id is required.")
        def _query():
            from app.db.session import get_db_session
            with get_db_session() as db:
                # Enforce permission check before returning details
                try:
                    services.mesh_service.require_node_access(token, node_id, db)
                except Exception:
                    return None # Access denied

                n = services.mesh_service.get_node_or_404(node_id, db)
                return {
                    "node_id": n.node_id,
                    "display_name": n.display_name,
                    "description": n.description,
                    "status": n.last_status,
                    "is_active": n.is_active,
                    "capabilities": n.capabilities,
                    "skill_config": n.skill_config,
                    "registered_by": n.registered_by,
                    "last_seen_at": str(n.last_seen_at) if n.last_seen_at else None,
                }
        result = await loop.run_in_executor(None, _query)
        if result is None:
            raise ValueError(f"Node '{node_id}' not found.")
        return _ok(result)

    if name == "list_agents":
        if not token:
            raise ValueError("Authentication required to list agents.")
        def _query():
            from app.db.session import get_db_session
            from app.db import models
            with get_db_session() as db:
                # Basic hardening: Only show agents on nodes user can access
                accessible_nodes = services.mesh_service.list_accessible_nodes(token, db)
                node_ids = [n.node_id for n in accessible_nodes]
                
                rows = db.query(models.AgentInstance).filter(models.AgentInstance.mesh_node_id.in_(node_ids)).all()
                return {
                    "agents": [
                        {
                            "id": str(a.id),
                            "name": a.template.name if a.template else None,
                            "status": a.status,
                            "node": a.mesh_node_id,
                            "last_heartbeat": str(a.last_heartbeat) if a.last_heartbeat else None,
                            "total_runs": a.total_runs,
                            "quality_score": a.latest_quality_score,
                        }
                        for a in rows
                    ]
                }
        return _ok(await loop.run_in_executor(None, _query))

    if name == "deploy_agent":
        if not token:
            raise ValueError("Authentication required to deploy agents.")
        from app.api import schemas
        def _execute_deploy():
            from app.db.session import get_db_session
            with get_db_session() as db:
                request = schemas.DeployAgentRequest(**args)
                result = services.agent_service.deploy_agent(db, token, request)
                return result
        result = await loop.run_in_executor(None, _execute_deploy)
        initial_prompt = args.get("initial_prompt")
        if initial_prompt:
            from app.core.orchestration.agent_loop import AgentExecutor
            asyncio.create_task(AgentExecutor.run(result["instance_id"], initial_prompt, services, services.user_service))
        return _ok(result)

    if name == "update_agent_config":
        if not token:
            raise ValueError("Authentication required to update agent config.")
        agent_id = args.get("agent_id")
        if not agent_id:
            raise ValueError("agent_id is required.")
        from app.api import schemas
        def _execute_update():
            from app.db.session import get_db_session
            with get_db_session() as db:
                config_args = {k: v for k, v in args.items() if k != "agent_id"}
                request = schemas.AgentConfigUpdate(**config_args)
                result = services.agent_service.update_config(db, agent_id, token, request)
                return {
                    "id": str(result.id),
                    "status": result.status
                }
        return _ok(await loop.run_in_executor(None, _execute_update))

    if name == "delete_agent":
        if not token:
            raise ValueError("Authentication required to delete agents.")
        agent_id = args.get("agent_id")
        if not agent_id:
            raise ValueError("agent_id is required.")
        def _execute_delete():
            from app.db.session import get_db_session
            with get_db_session() as db:
                services.agent_service.delete_agent(db, agent_id, token)
                return {"status": "deleted", "agent_id": agent_id}
        return _ok(await loop.run_in_executor(None, _execute_delete))

    if name == "get_agent_details":
        if not token:
            raise ValueError("Authentication required to get agent details.")
        agent_id = args.get("agent_id")
        if not agent_id:
            raise ValueError("agent_id is required.")
        def _execute_query():
            from app.db.session import get_db_session
            with get_db_session() as db:
                a = services.agent_service.get_agent_instance(db, agent_id, token)
                return {
                    "id": str(a.id),
                    "name": a.template.name if a.template else None,
                    "status": a.status,
                    "node": a.mesh_node_id,
                    "last_heartbeat": str(a.last_heartbeat) if a.last_heartbeat else None,
                    "total_runs": a.total_runs,
                    "successful_runs": a.successful_runs,
                    "quality_score": a.latest_quality_score,
                    "workspace_jail": a.current_workspace_jail,
                    "last_error": a.last_error,
                    "evaluation_status": a.evaluation_status,
                }
        return _ok(await loop.run_in_executor(None, _execute_query))

    if name == "get_user_config":
        if not token:
            raise ValueError("Authentication required to get user config.")
        def _execute_query():
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                user = db.query(User).filter(User.id == token).first()
                if not user:
                    raise ValueError("User not found.")
                config = services.preference_service.merge_user_config(user, db)
                return config.model_dump()
        return _ok(await loop.run_in_executor(None, _execute_query))

    if name == "update_user_config":
        if not token:
            raise ValueError("Authentication required to update user config.")
        from app.api import schemas
        def _execute_update():
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                user = db.query(User).filter(User.id == token).first()
                if not user:
                    raise ValueError("User not found.")
                prefs = schemas.UserPreferences(**args)
                result = services.preference_service.update_user_config(user, prefs, db)
                return result.model_dump()
        return _ok(await loop.run_in_executor(None, _execute_update))

    if name == "list_skills":
        if not token:
            raise ValueError("Authentication required to list skills.")
        def _query():
            from app.db.session import get_db_session
            from app.db import models
            with get_db_session() as db:
                rows = db.query(models.Skill).filter(models.Skill.is_enabled == True).all()
                return {
                    "skills": [
                        {
                            "id": s.id,
                            "name": s.name,
                            "description": s.description,
                            "type": s.skill_type,
                        }
                        for s in rows
                    ]
                }
        return _ok(await loop.run_in_executor(None, _query))

    if name == "dispatch":
        if not token:
            raise ValueError("Authentication required to dispatch tasks.")
        node_id = args.get("node_id")
        command = args.get("command")
        session_id = args.get("session_id", "")
        
        if not node_id or not command:
            raise ValueError("node_id and command are required.")
            
        def _execute_dispatch():
            from app.db.session import get_db_session
            with get_db_session() as db:
                task_id = services.mesh_service.dispatch_task(
                    node_id, command, token, db, session_id=session_id
                )
                return {"status": "accepted", "task_id": task_id}
                
        return _ok(await loop.run_in_executor(None, _execute_dispatch))

    if name == "write_file":
        if not token:
            raise ValueError("Authentication required to write files.")
        node_id = args.get("node_id")
        path = args.get("path")
        content = args.get("content", "")
        is_dir = args.get("is_dir", False)
        session_id = args.get("session_id", "__fs_explorer__")
        
        if not node_id or not path:
            raise ValueError("node_id and path are required.")
            
        def _execute_write():
            from app.db.session import get_db_session
            with get_db_session() as db:
                services.mesh_service.require_node_access(token, node_id, db)
                orchestrator = services.orchestrator
                res = orchestrator.assistant.write(node_id, path, content, is_dir, session_id=session_id)
                return res
                
        return _ok(await loop.run_in_executor(None, _execute_write))

    if name == "delete_file":
        if not token:
            raise ValueError("Authentication required to delete files.")
        node_id = args.get("node_id")
        path = args.get("path")
        session_id = args.get("session_id", "__fs_explorer__")
        
        if not node_id or not path:
            raise ValueError("node_id and path are required.")
            
        def _execute_delete():
            from app.db.session import get_db_session
            with get_db_session() as db:
                services.mesh_service.require_node_access(token, node_id, db)
                orchestrator = services.orchestrator
                res = orchestrator.assistant.rm(node_id, path, session_id=session_id)
                return res
                
        return _ok(await loop.run_in_executor(None, _execute_delete))

    if name == "list_groups":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                groups = services.user_service.get_all_groups(db)
                return [schemas.GroupInfo.model_validate(g).model_dump() for g in groups]
        return _ok(await loop.run_in_executor(None, _query))

    if name == "create_group":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                g = services.user_service.create_group(db, args.get("name"), args.get("description"), args.get("policy"))
                if g is None: raise ValueError(f"Group '{args.get('name')}' already exists.")
                return schemas.GroupInfo.model_validate(g).model_dump()
        return _ok(await loop.run_in_executor(None, _query))

    if name == "update_group":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                g = services.user_service.update_group(db, args.get("gid"), args.get("name"), args.get("description"), args.get("policy"))
                if g is None: raise ValueError("Group not found.")
                if g is False: raise ValueError(f"Group name conflict.")
                return schemas.GroupInfo.model_validate(g).model_dump()
        return _ok(await loop.run_in_executor(None, _query))

    if name == "delete_group":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                success = services.user_service.delete_group(db, args.get("gid"))
                if not success: raise ValueError("Failed to delete group.")
                return {"message": "Group deleted successfully"}
        return _ok(await loop.run_in_executor(None, _query))

    if name == "list_users":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                users = services.user_service.get_all_users(db)
                res = []
                for u_item in users:
                    p = schemas.UserProfile.model_validate(u_item)
                    if u_item.group: p.group_name = u_item.group.name
                    res.append(p.model_dump())
                return res
        return _ok(await loop.run_in_executor(None, _query))

    if name == "update_user_role":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                success = services.user_service.update_user_role(db, args.get("uid"), args.get("role"))
                if not success: raise ValueError("Failed to update role.")
                updated = services.user_service.get_user_by_id(db, args.get("uid"))
                return schemas.UserProfile.model_validate(updated).model_dump()
        return _ok(await loop.run_in_executor(None, _query))

    if name == "assign_user_to_group":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                success = services.user_service.assign_user_to_group(db, args.get("uid"), args.get("group_id"))
                if not success: raise ValueError("User or group not found.")
                updated = services.user_service.get_user_by_id(db, args.get("uid"))
                return schemas.UserProfile.model_validate(updated).model_dump()
        return _ok(await loop.run_in_executor(None, _query))

    if name == "get_user_profile":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = services.user_service.get_user_by_id(db=db, user_id=token)
                if not u: raise ValueError("User not found.")
                p = schemas.UserProfile.model_validate(u)
                if u.group: p.group_name = u.group.name
                return p.model_dump()
        return _ok(await loop.run_in_executor(None, _query))

    if name == "update_user_profile":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = services.user_service.get_user_by_id(db=db, user_id=token)
                if not u: raise ValueError("User not found.")
                if args.get("username"): u.username = args.get("username")
                if args.get("full_name"): u.full_name = args.get("full_name")
                if args.get("avatar_url"): u.avatar_url = args.get("avatar_url")
                db.commit()
                db.refresh(u)
                p = schemas.UserProfile.model_validate(u)
                if u.group: p.group_name = u.group.name
                return p.model_dump()
        return _ok(await loop.run_in_executor(None, _query))

    if name == "create_session":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            with get_db_session() as db:
                s = services.session_service.create_session(
                    db=db, user_id=token,
                    provider_name=args.get("provider_name", "deepseek"),
                    model_name=args.get("model_name"),
                    feature_name=args.get("feature_name", "default"),
                    stt_provider_name=args.get("stt_provider_name"),
                    tts_provider_name=args.get("tts_provider_name")
                )
                return schemas.Session.model_validate(s).model_dump()
        return _ok(await loop.run_in_executor(None, _query))

    if name == "get_session":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found.")
                return schemas.Session.model_validate(s).model_dump()
        return _ok(await loop.run_in_executor(None, _query))

    if name == "list_sessions":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                sessions = db.query(DBSession).filter(
                    DBSession.user_id == token,
                    DBSession.feature_name == args.get("feature_name", "default"),
                    DBSession.is_archived == False
                ).order_by(DBSession.created_at.desc()).all()
                return [schemas.Session.model_validate(s).model_dump() for s in sessions]
        return _ok(await loop.run_in_executor(None, _query))

    if name == "update_session":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession, Skill
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found.")
                if args.get("title") is not None: s.title = args.get("title")
                if args.get("provider_name") is not None: s.provider_name = args.get("provider_name")
                if args.get("model_name") is not None: s.model_name = args.get("model_name")
                if args.get("restrict_skills") is not None: s.restrict_skills = args.get("restrict_skills")
                if args.get("allowed_skill_names") is not None: s.allowed_skill_names = args.get("allowed_skill_names")
                if args.get("allowed_skill_ids") is not None:
                    skills = db.query(Skill).filter(Skill.id.in_(args.get("allowed_skill_ids"))).all()
                    s.skills = skills
                if args.get("system_prompt_override") is not None: s.system_prompt_override = args.get("system_prompt_override")
                if args.get("is_locked") is not None: s.is_locked = args.get("is_locked")
                db.commit()
                db.refresh(s)
                return schemas.Session.model_validate(s).model_dump()
        return _ok(await loop.run_in_executor(None, _query))

    if name == "delete_session":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found or access denied.")
                services.session_service.archive_session(db, args.get("session_id"))
                return {"message": "Session deleted successfully."}
        return _ok(await loop.run_in_executor(None, _query))

    if name == "get_session_messages":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found or access denied.")
                messages = services.rag_service.get_message_history(db=db, session_id=args.get("session_id"))
                res = []
                for m in messages:
                    msg_dict = schemas.Message.model_validate(m).model_dump()
                    if m.audio_path and os.path.exists(m.audio_path):
                        msg_dict["has_audio"] = True
                        msg_dict["audio_url"] = f"/sessions/messages/{m.id}/audio"
                    res.append(msg_dict)
                return res
        return _ok(await loop.run_in_executor(None, _query))

    if name == "clear_session_history":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession, Message as DBMessage
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found.")
                if s.is_locked: raise ValueError("Session is locked.")
                deleted = db.query(DBMessage).filter(DBMessage.session_id == args.get("session_id")).delete()
                db.commit()
                return {"message": f"Cleared {deleted} messages."}
        return _ok(await loop.run_in_executor(None, _query))

    if name == "attach_nodes_to_session":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found or access denied.")
                req = schemas.NodeAttachRequest(**args)
                res = services.session_service.attach_nodes(db, args.get("session_id"), req)
                return res.model_dump()
        return _ok(await loop.run_in_executor(None, _query))

    if name == "detach_node_from_session":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found.")
                nodes = list(s.attached_node_ids or [])
                if args.get("node_id") not in nodes: raise ValueError("Node not attached.")
                nodes.remove(args.get("node_id"))
                s.attached_node_ids = nodes
                status = dict(s.node_sync_status or {})
                status.pop(args.get("node_id"), None)
                s.node_sync_status = status
                db.commit()
                if hasattr(services, "orchestrator") and services.orchestrator:
                    services.orchestrator.assistant.clear_workspace(args.get("node_id"), s.sync_workspace_id)
                return {"message": "Detached successfully."}
        return _ok(await loop.run_in_executor(None, _query))

    if name == "get_session_nodes":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found.")
                sync_status = s.node_sync_status or {}
                entries = []
                for nid in (s.attached_node_ids or []):
                    live = services.node_registry_service.get_node(nid)
                    persisted = sync_status.get(nid, {})
                    status_val = "connected" if live and persisted.get("status") == "pending" else persisted.get("status", "pending")
                    entries.append(schemas.NodeSyncStatusEntry(node_id=nid, status=status_val, last_sync=persisted.get("last_sync"), error=persisted.get("error")).model_dump())
                return {"session_id": s.id, "sync_workspace_id": s.sync_workspace_id, "nodes": entries, "sync_config": s.sync_config or {}}
        return _ok(await loop.run_in_executor(None, _query))

    if name == "cancel_session_task":
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found.")
                s.is_cancelled = True
                db.commit()
                return {"message": "Cancellation request sent."}
        return _ok(await loop.run_in_executor(None, _query))

    if name == "get_system_status":
        def _query():
            return {
                "status": "running",
                "oidc_enabled": settings.OIDC_ENABLED,
                "tls_enabled": settings.GRPC_TLS_ENABLED,
                "external_endpoint": settings.GRPC_EXTERNAL_ENDPOINT,
                "version": settings.VERSION
            }
        return _ok(await loop.run_in_executor(None, _query))

    # Writable tools (future-proofing check)
    writable_tools = []
    if name in writable_tools and not settings.OIDC_ENABLED:
        raise HTTPException(
            status_code=403, 
            detail="Swarm manipulation tools are disabled because OIDC is not configured."
        )

    raise ValueError(f"Unknown tool: '{name}'")