"""
MCP (Model Context Protocol) Server Route — Streamable HTTP + Legacy SSE Transport

Supports:
  MCP spec 2025-11-25 — Streamable HTTP (primary, recommended)
  MCP spec 2024-11-05 — HTTP+SSE (legacy, backwards-compat)

Endpoints (mounted under /api/v1/mcp/*):
  POST /mcp/sse     — Streamable HTTP: JSON-RPC in, JSON response out
  POST /mcp/        — Same, aliased for clients using the base path
  GET  /mcp/sse     — Legacy SSE stream (sends endpoint event)
  POST /mcp/messages — Legacy SSE message handler

Discovery:
  GET  /.well-known/mcp/manifest.json  (mounted in app.py)
"""

import asyncio
import json
import uuid
import logging
import jwt
import os
from typing import Optional, List, Annotated, Any
from fastapi import APIRouter, HTTPException, Request, Query, Header
from fastapi.responses import JSONResponse, StreamingResponse

from app.api.dependencies import ServiceContainer
from app.config import settings

logger = logging.getLogger(__name__)

MCP_VERSION = "2025-11-25"  # Latest MCP specification version

# ─── In-process SSE session registry ─────────────────────────────────────────
# Maps session_id → asyncio.Queue of JSON-serializable dicts
_sse_sessions: dict[str, asyncio.Queue] = {}


def create_mcp_router(services: ServiceContainer) -> APIRouter:
    router = APIRouter(tags=["MCP"])

    async def _get_authenticated_user(request: Request, token: Optional[str], db) -> Optional[str]:
        """
        Resolves the user_id from either the Authorization header (JWT) or the token query param.
        If OIDC is enabled, this strictly requires a valid JWT.
        """
        from app.config import settings
        
        auth_header = request.headers.get("Authorization")
        if auth_header and auth_header.startswith("Bearer "):
            token = auth_header.split(" ")[1]

        if not token:
            return None

        is_jwt = "." in token
        
        # 1. OIDC Mode: Support both OIDC (RS256) and Internal (HS256) JWTs
        if settings.OIDC_ENABLED:
            if not is_jwt:
                logger.warning(f"[MCP] Rejected non-JWT token in OIDC mode.")
                raise HTTPException(
                    status_code=401, 
                    detail="Authentication required: Provide a valid JWT."
                )
            
            try:
                # Try internal HS256 first
                unverified = jwt.decode(token, options={"verify_signature": False})
                if unverified.get("iss") == "cortex-hub-internal":
                    decoded = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
                    return decoded.get("sub")
                
                # Fallback to OIDC RS256
                user = await services.auth_service.verify_id_token(token, db)
                return user.id
            except Exception as e:
                logger.error(f"[MCP] JWT verification failed: {e}")
                raise HTTPException(status_code=401, detail=f"Invalid token: {str(e)}")
        
        # 2. Legacy/Bootstrap Mode: Accept plain user_id (Identity Claim)
        # This is only active when OIDC is not configured.
        return token

    # ─── SSE Transport — Client Connection ────────────────────────────────────
    @router.get("/sse", summary="MCP SSE Transport Endpoint")
    async def mcp_sse(
        request: Request,
        token: Optional[str] = Query(None),
    ):
        from app.db.session import get_db_session
        with get_db_session() as db:
            user_id = await _get_authenticated_user(request, token, db)
        
        if not user_id:
            logger.info("[MCP] SSE connection opened without initial auth.")

        queue = asyncio.Queue()
        session_id = str(uuid.uuid4())
        _sse_sessions[session_id] = queue
        
        messages_url = f"{settings.HUB_PUBLIC_URL}/api/v1/mcp/messages?session_id={session_id}"
        if user_id:
            messages_url += f"&token={user_id}"

        origin = request.headers.get("origin")
        if origin:
            allowed = ["https://ai.jerxie.com", "http://localhost:3000", "http://localhost:8080"]
            if not any(origin.startswith(a) for a in allowed):
                logger.warning(f"[MCP] Blocked unauthorized origin: {origin}")
                raise HTTPException(status_code=403, detail="Unauthorized Origin")

        async def _event_generator():
            try:
                yield f"event: endpoint\ndata: {messages_url}\n\n"
                while True:
                    msg = await queue.get()
                    yield f"event: message\ndata: {json.dumps(msg)}\n\n"
            finally:
                _sse_sessions.pop(session_id, None)

        return StreamingResponse(
            _event_generator(),
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "X-Accel-Buffering": "no",
                "Access-Control-Allow-Origin": "*",
            },
        )

    # ─── Streamable HTTP Transport (MCP 2025-11-25) ───────────────────────────
    @router.post("/sse")
    @router.post("/", summary="MCP Streamable HTTP Endpoint (Post-only mode)")
    async def mcp_streamable_http(
        request: Request,
        token: Optional[str] = Query(None),
    ):
        from app.db.session import get_db_session
        with get_db_session() as db:
            user_id = await _get_authenticated_user(request, token, db)

        origin = request.headers.get("origin")
        if origin:
            allowed = [
                "https://ai.jerxie.com",
                "http://localhost:3000",
                "http://localhost:8080",
            ]
            server_host = request.headers.get("host", "")
            allowed.append(f"https://{server_host}")
            allowed.append(f"http://{server_host}")
            if not any(origin.startswith(a) for a in allowed):
                logger.warning(f"[MCP] Rejected request from disallowed origin: {origin}")
                return JSONResponse(
                    {"jsonrpc": "2.0", "error": {"code": -32000, "message": "Forbidden origin"}},
                    status_code=403,
                )

        try:
            body = await request.json()
        except Exception:
            raise HTTPException(status_code=400, detail="Invalid JSON body.")

        if isinstance(body, list):
            results = []
            for item in body:
                results.append(await _handle_single(item, user_id, services))
            return JSONResponse(
                [r for r in results if r is not None],
                headers={"Access-Control-Allow-Origin": "*", "MCP-Protocol-Version": MCP_VERSION},
            )

        response = await _handle_single(body, user_id, services)
        if response is None:
            return JSONResponse(
                None, status_code=202,
                headers={"Access-Control-Allow-Origin": "*"},
            )

        headers = {"Access-Control-Allow-Origin": "*", "MCP-Protocol-Version": MCP_VERSION}
        if body.get("method") == "initialize":
            headers["Mcp-Session-Id"] = str(uuid.uuid4())

        return JSONResponse(response, headers=headers)

    # ─── SSE Transport — Message Handler ─────────────────────────────────────
    @router.post("/messages")
    async def mcp_messages(
        request: Request,
        session_id: str = Query(...),
        token: Optional[str] = Query(None),
    ):
        queue = _sse_sessions.get(session_id)
        if not queue:
            raise HTTPException(status_code=404, detail="MCP session not found or expired.")

        try:
            body = await request.json()
        except Exception:
            raise HTTPException(status_code=400, detail="Invalid JSON body.")

        rpc_id = body.get("id")
        method = body.get("method", "")
        params = body.get("params", {})

        from app.db.session import get_db_session
        with get_db_session() as db:
            user_id = await _get_authenticated_user(request, token, db)

        logger.info(f"[MCP] [{session_id[:8]}] → {method}")
        asyncio.create_task(_dispatch(queue, rpc_id, method, params, user_id, services))

        return JSONResponse(
            {"status": "accepted"},
            status_code=202,
            headers={"Access-Control-Allow-Origin": "*"},
        )

    return router


async def _handle_single(body: dict, token: Optional[str], services: ServiceContainer):
    rpc_id  = body.get("id")
    method  = body.get("method", "")
    params  = body.get("params", {})

    logger.info(f"[MCP-HTTP] → {method}")
    try:
        result = await _execute(method, params, token, services)
        if rpc_id is None:
            return None
        return {"jsonrpc": "2.0", "id": rpc_id, "result": result}
    except Exception as exc:
        logger.exception(f"[MCP-HTTP] Error for '{method}': {exc}")
        if rpc_id is None:
            return None
        return {
            "jsonrpc": "2.0",
            "id": rpc_id,
            "error": {"code": -32000, "message": str(exc)},
        }


async def _dispatch(
    queue: asyncio.Queue,
    rpc_id,
    method: str,
    params: dict,
    token: Optional[str],
    services: ServiceContainer,
):
    try:
        result = await _execute(method, params, token, services)
        await queue.put({"jsonrpc": "2.0", "id": rpc_id, "result": result})
    except Exception as exc:
        logger.exception(f"[MCP] Tool error for '{method}': {exc}")
        await queue.put({
            "jsonrpc": "2.0",
            "id": rpc_id,
            "error": {"code": -32000, "message": str(exc)},
        })


async def _execute(method: str, params: dict, token: Optional[str], services: ServiceContainer):
    if method == "initialize":
        return {
            "protocolVersion": MCP_VERSION,
            "capabilities": {"tools": {}},
            "serverInfo": {"name": "Cortex Hub", "version": "1.0.0"},
        }

    if method == "ping":
        return {}

    if method == "tools/list":
        return {
            "tools": [
                _tool_def("list_nodes", "List all agent nodes in the Cortex swarm mesh.", {}),
                _tool_def("get_app_info", "Get metadata about this Cortex Hub instance.", {}),
                _tool_def("get_node_details", "Get full details for a specific agent node.", {"node_id": {"type": "string"}}, required=["node_id"]),
                _tool_def("list_agents", "List all autonomous agents.", {}),
                _tool_def("list_skills", "List all registered skill folders.", {}),
                _tool_def("dispatch", "Dispatch a shell command to a node.", {"node_id": {"type": "string"}, "command": {"type": "string"}, "session_id": {"type": "string"}}, required=["node_id", "command"]),
                _tool_def("write_file", "Create/Update a file on a node.", {"node_id": {"type": "string"}, "path": {"type": "string"}, "content": {"type": "string"}, "is_dir": {"type": "boolean"}, "session_id": {"type": "string"}}, required=["node_id", "path"]),
                _tool_def("delete_file", "Delete a file/directory on a node.", {"node_id": {"type": "string"}, "path": {"type": "string"}, "session_id": {"type": "string"}}, required=["node_id", "path"]),
                _tool_def("deploy_agent", "Deploy a new autonomous agent.", {"name": {"type": "string"}, "mesh_node_id": {"type": "string"}, "description": {"type": "string"}, "system_prompt": {"type": "string"}, "initial_prompt": {"type": "string"}}, required=["name", "mesh_node_id"]),
                _tool_def("update_agent_config", "Update config for an agent.", {"agent_id": {"type": "string"}, "mesh_node_id": {"type": "string"}}, required=["agent_id", "mesh_node_id"]),
                _tool_def("delete_agent", "Delete an autonomous agent.", {"agent_id": {"type": "string"}}, required=["agent_id"]),
                _tool_def("get_agent_details", "Get details for an autonomous agent.", {"agent_id": {"type": "string"}}, required=["agent_id"]),
                _tool_def("get_user_config", "Get user effective config.", {}),
                _tool_def("update_user_config", "Update user preferences.", {"llm": {"type": "object"}}),
                _tool_def("list_groups", "List all security groups (Admin Only).", {}),
                _tool_def("create_group", "Create policy group (Admin Only).", {"name": {"type": "string"}}, required=["name"]),
                _tool_def("update_group", "Update policy group (Admin Only).", {"gid": {"type": "string"}}, required=["gid"]),
                _tool_def("delete_group", "Delete policy group (Admin Only).", {"gid": {"type": "string"}}, required=["gid"]),
                _tool_def("list_users", "List all registered users (Admin Only).", {}),
                _tool_def("update_user_role", "Update a user's role (Admin Only).", {"uid": {"type": "string"}, "role": {"type": "string"}}, required=["uid", "role"]),
                _tool_def("assign_user_to_group", "Assign user to group (Admin Only).", {"uid": {"type": "string"}, "group_id": {"type": "string"}}, required=["uid", "group_id"]),
                _tool_def("get_user_profile", "Get user's profile info.", {}),
                _tool_def("update_user_profile", "Update user's profile info.", {"username": {"type": "string"}}),
                _tool_def("create_session", "Create a new chat session.", {"provider_name": {"type": "string"}}),
                _tool_def("get_session", "Get specific session details.", {"session_id": {"type": "integer"}}, required=["session_id"]),
                _tool_def("list_sessions", "List all chat sessions.", {}),
                _tool_def("update_session", "Update session configuration.", {"session_id": {"type": "integer"}}, required=["session_id"]),
                _tool_def("delete_session", "Archive a chat session.", {"session_id": {"type": "integer"}}, required=["session_id"]),
                _tool_def("get_session_messages", "Retrieve chat history.", {"session_id": {"type": "integer"}}, required=["session_id"]),
                _tool_def("clear_session_history", "Delete all messages in session.", {"session_id": {"type": "integer"}}, required=["session_id"]),
                _tool_def("attach_nodes_to_session", "Attach agent nodes to session.", {"session_id": {"type": "integer"}, "node_ids": {"type": "array", "items": {"type": "string"}}}, required=["session_id", "node_ids"]),
                _tool_def("detach_node_from_session", "Detach agent node.", {"session_id": {"type": "integer"}, "node_id": {"type": "string"}}, required=["session_id", "node_id"]),
                _tool_def("get_session_nodes", "Get all nodes in session.", {"session_id": {"type": "integer"}}, required=["session_id"]),
                _tool_def("cancel_session_task", "Cancel session task.", {"session_id": {"type": "integer"}}, required=["session_id"]),
                _tool_def("get_system_status", "Retrieve full system state.", {}),
                _tool_def("get_global_config", "Get global LLM/TTS/STT settings (Admin only).", {}),
                _tool_def("update_global_config", "Update global LLM/TTS/STT settings (Admin only).", {
                    "llm_providers": {"type": "object"},
                    "active_llm_provider": {"type": "string"},
                    "tts_providers": {"type": "object"},
                    "active_tts_provider": {"type": "string"},
                    "stt_providers": {"type": "object"},
                    "active_stt_provider": {"type": "string"}
                })
            ]
        }

    if method == "tools/call":
        name = params.get("name", "")
        args = params.get("arguments", {})
        return await _call_tool(name, args, token, services)

    raise ValueError(f"Unknown method: '{method}'")


def _tool_def(name: str, description: str, properties: dict, required: list = None) -> dict:
    schema = {"type": "object", "properties": properties}
    if required:
        schema["required"] = required
    return {"name": name, "description": description, "inputSchema": schema}


class MCPToolDispatcher:
    def __init__(self, services: ServiceContainer, loop):
        self.services = services
        self.loop = loop
        self._handlers = {
            "list_nodes": self._list_nodes,
            "get_app_info": self._get_app_info,
            "get_node_details": self._get_node_details,
            "list_agents": self._list_agents,
            "deploy_agent": self._deploy_agent,
            "update_agent_config": self._update_agent_config,
            "delete_agent": self._delete_agent,
            "get_agent_details": self._get_agent_details,
            "get_user_config": self._get_user_config,
            "update_user_config": self._update_user_config,
            "list_skills": self._list_skills,
            "dispatch": self._dispatch,
            "write_file": self._write_file,
            "delete_file": self._delete_file,
            "list_groups": self._list_groups,
            "create_group": self._create_group,
            "update_group": self._update_group,
            "delete_group": self._delete_group,
            "list_users": self._list_users,
            "update_user_role": self._update_user_role,
            "assign_user_to_group": self._assign_user_to_group,
            "get_user_profile": self._get_user_profile,
            "update_user_profile": self._update_user_profile,
            "create_session": self._create_session,
            "get_session": self._get_session,
            "list_sessions": self._list_sessions,
            "update_session": self._update_session,
            "delete_session": self._delete_session,
            "get_session_messages": self._get_session_messages,
            "clear_session_history": self._clear_session_history,
            "attach_nodes_to_session": self._attach_nodes_to_session,
            "detach_node_from_session": self._detach_node_from_session,
            "get_session_nodes": self._get_session_nodes,
            "cancel_session_task": self._cancel_session_task,
            "get_system_status": self._get_system_status,
            "get_global_config": self._get_global_config,
            "update_global_config": self._update_global_config
        }

    async def dispatch(self, name: str, args: dict, token: Optional[str]) -> Any:
        handler = self._handlers.get(name)
        if not handler:
            raise ValueError(f"Tool '{name}' not found.")
        return await handler(args, token)

    async def _list_nodes(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            with get_db_session() as db:
                nodes = self.services.mesh_service.list_accessible_nodes(token, db)
                return {"nodes": [{"id": n.node_id, "name": n.display_name, "status": n.last_status, "os": (n.capabilities or {}).get("os"), "is_active": n.is_active} for n in nodes]}
        return await self.loop.run_in_executor(None, _query)

    async def _get_app_info(self, args: dict, token: Optional[str]):
        def _query():
            from app.db.session import get_db_session
            with get_db_session() as db:
                total, online = 0, 0
                if token:
                    nodes = self.services.mesh_service.list_accessible_nodes(token, db)
                    total = len(nodes)
                    online = len([n for n in nodes if n.last_status == "online"])
                return {
                    "name": "Cortex Hub", "version": "1.0.0", "capabilities": ["swarms", "webmcp", "mcp-sse", "voice-chat", "rag"],
                    "nodes": {"total": total, "online": online}, "mcp_transport": "sse", "sse_endpoint": f"{settings.HUB_PUBLIC_URL}/api/v1/mcp/sse",
                    "auth": {"oidc_enabled": settings.OIDC_ENABLED}
                }
        return await self.loop.run_in_executor(None, _query)

    async def _get_node_details(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        node_id = args.get("node_id")
        if not node_id: raise ValueError("node_id required.")
        def _query():
            from app.db.session import get_db_session
            with get_db_session() as db:
                try: self.services.mesh_service.require_node_access(token, node_id, db)
                except Exception: return None
                n = self.services.mesh_service.get_node_or_404(node_id, db)
                return {"node_id": n.node_id, "display_name": n.display_name, "description": n.description, "status": n.last_status, "is_active": n.is_active, "capabilities": n.capabilities, "skill_config": n.skill_config, "registered_by": n.registered_by, "last_seen_at": str(n.last_seen_at) if n.last_seen_at else None}
        res = await self.loop.run_in_executor(None, _query)
        if res is None: raise ValueError(f"Node '{node_id}' not found.")
        return res

    async def _list_agents(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db import models
            with get_db_session() as db:
                acc = self.services.mesh_service.list_accessible_nodes(token, db)
                nids = [n.node_id for n in acc]
                rows = db.query(models.AgentInstance).filter(models.AgentInstance.mesh_node_id.in_(nids)).all()
                return {"agents": [{"id": str(a.id), "name": a.template.name if a.template else None, "status": a.status, "node": a.mesh_node_id, "last_heartbeat": str(a.last_heartbeat) if a.last_heartbeat else None, "total_runs": a.total_runs, "quality_score": a.latest_quality_score} for a in rows]}
        return await self.loop.run_in_executor(None, _query)

    async def _deploy_agent(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        from app.api import schemas
        def _execute():
            from app.db.session import get_db_session
            with get_db_session() as db:
                req = schemas.DeployAgentRequest(**args)
                return self.services.agent_service.deploy_agent(db, token, req)
        res = await self.loop.run_in_executor(None, _execute)
        prompt = args.get("initial_prompt")
        if prompt:
            from app.core.orchestration.agent_loop import AgentExecutor
            asyncio.create_task(AgentExecutor.run(res["instance_id"], prompt, self.services, self.services.user_service))
        return res

    async def _update_agent_config(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        agent_id = args.get("agent_id")
        if not agent_id: raise ValueError("agent_id required.")
        from app.api import schemas
        def _execute():
            from app.db.session import get_db_session
            with get_db_session() as db:
                req_args = {k: v for k, v in args.items() if k != "agent_id"}
                req = schemas.AgentConfigUpdate(**req_args)
                res = self.services.agent_service.update_config(db, agent_id, token, req)
                return {"id": str(res.id), "status": res.status}
        return await self.loop.run_in_executor(None, _execute)

    async def _delete_agent(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        agent_id = args.get("agent_id")
        if not agent_id: raise ValueError("agent_id required.")
        def _execute():
            from app.db.session import get_db_session
            with get_db_session() as db:
                self.services.agent_service.delete_agent(db, agent_id, token)
                return {"status": "deleted", "agent_id": agent_id}
        return await self.loop.run_in_executor(None, _execute)

    async def _get_agent_details(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        agent_id = args.get("agent_id")
        if not agent_id: raise ValueError("agent_id required.")
        def _execute():
            from app.db.session import get_db_session
            with get_db_session() as db:
                a = self.services.agent_service.get_agent_instance(db, agent_id, token)
                return {"id": str(a.id), "name": a.template.name if a.template else None, "status": a.status, "node": a.mesh_node_id, "last_heartbeat": str(a.last_heartbeat) if a.last_heartbeat else None, "total_runs": a.total_runs, "successful_runs": a.successful_runs, "quality_score": a.latest_quality_score, "workspace_jail": a.current_workspace_jail, "last_error": a.last_error, "evaluation_status": a.evaluation_status}
        return await self.loop.run_in_executor(None, _execute)

    async def _get_user_config(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                user = db.query(User).filter(User.id == token).first()
                if not user: raise ValueError("User not found.")
                return self.services.preference_service.merge_user_config(user, db).model_dump()
        return await self.loop.run_in_executor(None, _query)

    async def _update_user_config(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        from app.api import schemas
        def _execute():
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                user = db.query(User).filter(User.id == token).first()
                if not user: raise ValueError("User not found.")
                prefs = schemas.UserPreferences(**args)
                return self.services.preference_service.update_user_config(user, prefs, db).model_dump()
        return await self.loop.run_in_executor(None, _execute)

    async def _list_skills(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db import models
            with get_db_session() as db:
                rows = db.query(models.Skill).filter(models.Skill.is_enabled == True).all()
                return {"skills": [{"id": s.id, "name": s.name, "description": s.description, "type": s.skill_type} for s in rows]}
        return await self.loop.run_in_executor(None, _query)

    async def _dispatch(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        node_id, command, session_id = args.get("node_id"), args.get("command"), args.get("session_id", "")
        if not node_id or not command: raise ValueError("node_id and command required.")
        def _execute():
            from app.db.session import get_db_session
            with get_db_session() as db:
                tid = self.services.mesh_service.dispatch_task(node_id, command, token, db, session_id=session_id)
                return {"status": "accepted", "task_id": tid}
        return await self.loop.run_in_executor(None, _execute)

    async def _write_file(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        node_id, path, content, is_dir, session_id = args.get("node_id"), args.get("path"), args.get("content", ""), args.get("is_dir", False), args.get("session_id", "__fs_explorer__")
        if not node_id or not path: raise ValueError("node_id and path required.")
        def _execute():
            from app.db.session import get_db_session
            with get_db_session() as db:
                self.services.mesh_service.require_node_access(token, node_id, db)
                return self.services.orchestrator.assistant.write(node_id, path, content, is_dir, session_id=session_id)
        return await self.loop.run_in_executor(None, _execute)

    async def _delete_file(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        node_id, path, session_id = args.get("node_id"), args.get("path"), args.get("session_id", "__fs_explorer__")
        if not node_id or not path: raise ValueError("node_id and path required.")
        def _execute():
            from app.db.session import get_db_session
            with get_db_session() as db:
                self.services.mesh_service.require_node_access(token, node_id, db)
                return self.services.orchestrator.assistant.rm(node_id, path, session_id=session_id)
        return await self.loop.run_in_executor(None, _execute)

    async def _list_groups(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                return [schemas.GroupInfo.model_validate(g).model_dump() for g in self.services.user_service.get_all_groups(db)]
        return await self.loop.run_in_executor(None, _query)

    async def _create_group(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                g = self.services.user_service.create_group(db, args.get("name"), args.get("description"), args.get("policy"))
                if g is None: raise ValueError(f"Group '{args.get('name')}' exists.")
                return schemas.GroupInfo.model_validate(g).model_dump()
        return await self.loop.run_in_executor(None, _query)

    async def _update_group(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                g = self.services.user_service.update_group(db, args.get("gid"), args.get("name"), args.get("description"), args.get("policy"))
                if g is None: raise ValueError("Group not found.")
                if g is False: raise ValueError("Group name conflict.")
                return schemas.GroupInfo.model_validate(g).model_dump()
        return await self.loop.run_in_executor(None, _query)

    async def _delete_group(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                if not self.services.user_service.delete_group(db, args.get("gid")): raise ValueError("Failed delete.")
                return {"message": "Group deleted successfully"}
        return await self.loop.run_in_executor(None, _query)

    async def _list_users(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                users = self.services.user_service.get_all_users(db)
                res = []
                for u_item in users:
                    p = schemas.UserProfile.model_validate(u_item)
                    if u_item.group: p.group_name = u_item.group.name
                    res.append(p.model_dump())
                return res
        return await self.loop.run_in_executor(None, _query)

    async def _update_user_role(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                if not self.services.user_service.update_user_role(db, args.get("uid"), args.get("role")): raise ValueError("Failed update.")
                updated = self.services.user_service.get_user_by_id(db, args.get("uid"))
                return schemas.UserProfile.model_validate(updated).model_dump()
        return await self.loop.run_in_executor(None, _query)

    async def _assign_user_to_group(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                if not self.services.user_service.assign_user_to_group(db, args.get("uid"), args.get("group_id")): raise ValueError("User or group not found.")
                updated = self.services.user_service.get_user_by_id(db, args.get("uid"))
                return schemas.UserProfile.model_validate(updated).model_dump()
        return await self.loop.run_in_executor(None, _query)

    async def _get_user_profile(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = self.services.user_service.get_user_by_id(db=db, user_id=token)
                if not u: raise ValueError("User not found.")
                p = schemas.UserProfile.model_validate(u)
                if u.group: p.group_name = u.group.name
                return p.model_dump()
        return await self.loop.run_in_executor(None, _query)

    async def _update_user_profile(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = self.services.user_service.get_user_by_id(db=db, user_id=token)
                if not u: raise ValueError("User not found.")
                if args.get("username"): u.username = args.get("username")
                if args.get("full_name"): u.full_name = args.get("full_name")
                if args.get("avatar_url"): u.avatar_url = args.get("avatar_url")
                db.commit(); db.refresh(u)
                p = schemas.UserProfile.model_validate(u)
                if u.group: p.group_name = u.group.name
                return p.model_dump()
        return await self.loop.run_in_executor(None, _query)

    async def _create_session(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            with get_db_session() as db:
                s = self.services.session_service.create_session(
                    db=db, user_id=token, provider_name=args.get("provider_name", "deepseek"),
                    model_name=args.get("model_name"), feature_name=args.get("feature_name", "default"),
                    stt_provider_name=args.get("stt_provider_name"), tts_provider_name=args.get("tts_provider_name")
                )
                return schemas.Session.model_validate(s).model_dump()
        return await self.loop.run_in_executor(None, _query)

    async def _get_session(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found.")
                return schemas.Session.model_validate(s).model_dump()
        return await self.loop.run_in_executor(None, _query)

    async def _list_sessions(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                sessions = db.query(DBSession).filter(
                    DBSession.user_id == token, DBSession.feature_name == args.get("feature_name", "default"),
                    DBSession.is_archived == False
                ).order_by(DBSession.created_at.desc()).all()
                return [schemas.Session.model_validate(s).model_dump() for s in sessions]
        return await self.loop.run_in_executor(None, _query)

    async def _update_session(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession, Skill
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found.")
                if args.get("title") is not None: s.title = args.get("title")
                if args.get("provider_name") is not None: s.provider_name = args.get("provider_name")
                if args.get("model_name") is not None: s.model_name = args.get("model_name")
                if args.get("restrict_skills") is not None: s.restrict_skills = args.get("restrict_skills")
                if args.get("allowed_skill_names") is not None: s.allowed_skill_names = args.get("allowed_skill_names")
                if args.get("allowed_skill_ids") is not None:
                    s.skills = db.query(Skill).filter(Skill.id.in_(args.get("allowed_skill_ids"))).all()
                if args.get("system_prompt_override") is not None: s.system_prompt_override = args.get("system_prompt_override")
                if args.get("is_locked") is not None: s.is_locked = args.get("is_locked")
                db.commit(); db.refresh(s)
                return schemas.Session.model_validate(s).model_dump()
        return await self.loop.run_in_executor(None, _query)

    async def _delete_session(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found.")
                self.services.session_service.archive_session(db, args.get("session_id"))
                return {"message": "Session deleted successfully."}
        return await self.loop.run_in_executor(None, _query)

    async def _get_session_messages(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session access denied.")
                msgs = self.services.rag_service.get_message_history(db=db, session_id=args.get("session_id"))
                res = []
                for m in msgs:
                    m_dict = schemas.Message.model_validate(m).model_dump()
                    if m.audio_path and os.path.exists(m.audio_path):
                        m_dict["has_audio"], m_dict["audio_url"] = True, f"/sessions/messages/{m.id}/audio"
                    res.append(m_dict)
                return res
        return await self.loop.run_in_executor(None, _query)

    async def _clear_session_history(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession, Message as DBMessage
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found.")
                if s.is_locked: raise ValueError("Session is locked.")
                cnt = db.query(DBMessage).filter(DBMessage.session_id == args.get("session_id")).delete()
                db.commit()
                return {"message": f"Cleared {cnt} messages."}
        return await self.loop.run_in_executor(None, _query)

    async def _attach_nodes_to_session(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session access denied.")
                req = schemas.NodeAttachRequest(**args)
                return self.services.session_service.attach_nodes(db, args.get("session_id"), req).model_dump()
        return await self.loop.run_in_executor(None, _query)

    async def _detach_node_from_session(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found.")
                nodes = list(s.attached_node_ids or [])
                if args.get("node_id") not in nodes: raise ValueError("Node not attached.")
                nodes.remove(args.get("node_id"))
                s.attached_node_ids = nodes
                status = dict(s.node_sync_status or {})
                status.pop(args.get("node_id"), None)
                s.node_sync_status = status
                db.commit()
                if hasattr(self.services, "orchestrator") and self.services.orchestrator:
                    self.services.orchestrator.assistant.clear_workspace(args.get("node_id"), s.sync_workspace_id)
                return {"message": "Detached successfully."}
        return await self.loop.run_in_executor(None, _query)

    async def _get_session_nodes(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.api import schemas
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found.")
                status = s.node_sync_status or {}
                entries = []
                for nid in (s.attached_node_ids or []):
                    live = self.services.node_registry_service.get_node(nid)
                    p = status.get(nid, {})
                    val = "connected" if live and p.get("status") == "pending" else p.get("status", "pending")
                    entries.append(schemas.NodeSyncStatusEntry(node_id=nid, status=val, last_sync=p.get("last_sync"), error=p.get("error")).model_dump())
                return {"session_id": s.id, "sync_workspace_id": s.sync_workspace_id, "nodes": entries, "sync_config": s.sync_config or {}}
        return await self.loop.run_in_executor(None, _query)

    async def _cancel_session_task(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db.models import Session as DBSession
            with get_db_session() as db:
                s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
                if not s: raise ValueError("Session not found.")
                s.is_cancelled = True
                db.commit()
                return {"message": "Cancellation request sent."}
        return await self.loop.run_in_executor(None, _query)

    async def _get_global_config(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                
                def mask_keys(providers_dict):
                    import copy
                    res = copy.deepcopy(providers_dict) if providers_dict else {}
                    for p_data in res.values():
                        if isinstance(p_data, dict) and p_data.get("api_key"):
                            k = str(p_data["api_key"])
                            p_data["api_key"] = k[:4] + "****" + k[-4:] if len(k) > 8 else "****"
                    return res
                
                return {
                    "llm_providers": mask_keys(settings.LLM_PROVIDERS),
                    "active_llm_provider": settings.ACTIVE_LLM_PROVIDER,
                    "tts_providers": mask_keys(settings.TTS_PROVIDERS),
                    "active_tts_provider": settings.TTS_PROVIDER,
                    "stt_providers": mask_keys(settings.STT_PROVIDERS),
                    "active_stt_provider": settings.STT_PROVIDER
                }
        return await self.loop.run_in_executor(None, _query)

    async def _update_global_config(self, args: dict, token: Optional[str]):
        if not token: raise ValueError("Authentication required.")
        def _query():
            from app.db.session import get_db_session
            from app.db.models import User
            with get_db_session() as db:
                u = db.query(User).filter(User.id == token).first()
                if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
                
                def preserve_masked(new_dict, old_dict):
                    if not new_dict or not old_dict: return
                    for p_name, p_data in new_dict.items():
                        if isinstance(p_data, dict) and p_data.get("api_key") and "****" in str(p_data["api_key"]):
                            if p_name in old_dict and isinstance(old_dict[p_name], dict):
                                p_data["api_key"] = old_dict[p_name].get("api_key")

                if args.get("llm_providers") is not None:
                    preserve_masked(args["llm_providers"], settings.LLM_PROVIDERS)
                    settings.LLM_PROVIDERS = args["llm_providers"]
                if args.get("active_llm_provider") is not None:
                    settings.ACTIVE_LLM_PROVIDER = args["active_llm_provider"]

                if args.get("tts_providers") is not None:
                    preserve_masked(args["tts_providers"], settings.TTS_PROVIDERS)
                    settings.TTS_PROVIDERS = args["tts_providers"]
                if args.get("active_tts_provider") is not None:
                    settings.TTS_PROVIDER = args["active_tts_provider"]

                if args.get("stt_providers") is not None:
                    preserve_masked(args["stt_providers"], settings.STT_PROVIDERS)
                    settings.STT_PROVIDERS = args["stt_providers"]
                if args.get("active_stt_provider") is not None:
                    settings.STT_PROVIDER = args["active_stt_provider"]

                settings.save_to_yaml()
                return {"message": "Global providers updated successfully"}
        return await self.loop.run_in_executor(None, _query)

    async def _get_system_status(self, args: dict, token: Optional[str]):
        def _query():
            return {"status": "running", "oidc_enabled": settings.OIDC_ENABLED, "version": "1.0.0"}
        return await self.loop.run_in_executor(None, _query)

async def _call_tool(name: str, args: dict, token: Optional[str], services: ServiceContainer) -> dict:
    """Execute a named tool via MCPToolDispatcher."""
    def _ok(data) -> dict:
        text = json.dumps(data, indent=2, default=str) if not isinstance(data, str) else data
        return {"content": [{"type": "text", "text": text}]}

    loop = asyncio.get_running_loop()
    dispatcher = MCPToolDispatcher(services, loop)
    result = await dispatcher.dispatch(name, args, token)
    return _ok(result)
