"""
MCP (Model Context Protocol) Server Route — Streamable HTTP + Legacy SSE Transport
Supports:
MCP spec 2025-11-25 — Streamable HTTP (primary, recommended)
MCP spec 2024-11-05 — HTTP+SSE (legacy, backwards-compat)
Endpoints (mounted under /api/v1/mcp/*):
POST /mcp/sse — Streamable HTTP: JSON-RPC in, JSON response out
POST /mcp/ — Same, aliased for clients using the base path
GET /mcp/sse — Legacy SSE stream (sends endpoint event)
POST /mcp/messages — Legacy SSE message handler
Discovery:
GET /.well-known/mcp/manifest.json (mounted in app.py)
"""
import asyncio
import json
import uuid
import logging
import jwt
import os
from typing import Optional, List, Annotated, Any
from fastapi import APIRouter, HTTPException, Request, Query, Header
from fastapi.responses import JSONResponse, StreamingResponse
from app.api.dependencies import ServiceContainer
from app.config import settings
logger = logging.getLogger(__name__)
MCP_VERSION = "2025-11-25" # Latest MCP specification version
# ─── In-process SSE session registry ─────────────────────────────────────────
# Maps session_id → asyncio.Queue of JSON-serializable dicts
_sse_sessions: dict[str, asyncio.Queue] = {}
def create_mcp_router(services: ServiceContainer) -> APIRouter:
router = APIRouter(tags=["MCP"])
async def _get_authenticated_user(request: Request, token: Optional[str], db) -> Optional[str]:
"""
Resolves the user_id from either the Authorization header (JWT) or the token query param.
If OIDC is enabled, this strictly requires a valid JWT.
"""
from app.config import settings
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
if not token:
return None
is_jwt = "." in token
# 1. OIDC Mode: Support both OIDC (RS256) and Internal (HS256) JWTs
if settings.OIDC_ENABLED:
if not is_jwt:
logger.warning(f"[MCP] Rejected non-JWT token in OIDC mode.")
raise HTTPException(
status_code=401,
detail="Authentication required: Provide a valid JWT."
)
try:
# Try internal HS256 first
unverified = jwt.decode(token, options={"verify_signature": False})
if unverified.get("iss") == "cortex-hub-internal":
decoded = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
return decoded.get("sub")
# Fallback to OIDC RS256
user = await services.auth_service.verify_id_token(token, db)
return user.id
except Exception as e:
logger.error(f"[MCP] JWT verification failed: {e}")
raise HTTPException(status_code=401, detail=f"Invalid token: {str(e)}")
# 2. Legacy/Bootstrap Mode: Accept plain user_id (Identity Claim)
# This is only active when OIDC is not configured.
return token
# ─── SSE Transport — Client Connection ────────────────────────────────────
@router.get("/sse", summary="MCP SSE Transport Endpoint")
async def mcp_sse(
request: Request,
token: Optional[str] = Query(None),
):
from app.db.session import get_db_session
with get_db_session() as db:
user_id = await _get_authenticated_user(request, token, db)
if not user_id:
logger.info("[MCP] SSE connection opened without initial auth.")
# Preserve the original credential (JWT or plain id) for the messages URL.
# Using user_id here breaks OIDC mode because the messages endpoint requires a JWT.
auth_header = request.headers.get("Authorization", "")
original_token = auth_header[7:] if auth_header.startswith("Bearer ") else token
queue = asyncio.Queue()
session_id = str(uuid.uuid4())
_sse_sessions[session_id] = queue
messages_url = f"{settings.HUB_PUBLIC_URL}/api/v1/mcp/messages?session_id={session_id}"
if original_token:
messages_url += f"&token={original_token}"
origin = request.headers.get("origin")
if origin:
allowed = ["https://ai.jerxie.com", "http://localhost:3000", "http://localhost:8080"]
if not any(origin.startswith(a) for a in allowed):
logger.warning(f"[MCP] Blocked unauthorized origin: {origin}")
raise HTTPException(status_code=403, detail="Unauthorized Origin")
async def _event_generator():
try:
yield f"event: endpoint\ndata: {messages_url}\n\n"
while True:
msg = await queue.get()
yield f"event: message\ndata: {json.dumps(msg)}\n\n"
finally:
_sse_sessions.pop(session_id, None)
return StreamingResponse(
_event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": "*",
},
)
# ─── Streamable HTTP Transport (MCP 2025-11-25) ───────────────────────────
@router.post("/sse")
@router.post("/", summary="MCP Streamable HTTP Endpoint (Post-only mode)")
async def mcp_streamable_http(
request: Request,
token: Optional[str] = Query(None),
):
from app.db.session import get_db_session
with get_db_session() as db:
user_id = await _get_authenticated_user(request, token, db)
origin = request.headers.get("origin")
if origin:
allowed = [
"https://ai.jerxie.com",
"http://localhost:3000",
"http://localhost:8080",
]
server_host = request.headers.get("host", "")
allowed.append(f"https://{server_host}")
allowed.append(f"http://{server_host}")
if not any(origin.startswith(a) for a in allowed):
logger.warning(f"[MCP] Rejected request from disallowed origin: {origin}")
return JSONResponse(
{"jsonrpc": "2.0", "error": {"code": -32000, "message": "Forbidden origin"}},
status_code=403,
)
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body.")
if isinstance(body, list):
results = []
for item in body:
results.append(await _handle_single(item, user_id, services))
return JSONResponse(
[r for r in results if r is not None],
headers={"Access-Control-Allow-Origin": "*", "MCP-Protocol-Version": MCP_VERSION},
)
response = await _handle_single(body, user_id, services)
if response is None:
return JSONResponse(
None, status_code=202,
headers={"Access-Control-Allow-Origin": "*"},
)
headers = {"Access-Control-Allow-Origin": "*", "MCP-Protocol-Version": MCP_VERSION}
if body.get("method") == "initialize":
headers["Mcp-Session-Id"] = str(uuid.uuid4())
return JSONResponse(response, headers=headers)
# ─── SSE Transport — Message Handler ─────────────────────────────────────
@router.post("/messages")
async def mcp_messages(
request: Request,
session_id: str = Query(...),
token: Optional[str] = Query(None),
):
queue = _sse_sessions.get(session_id)
if not queue:
raise HTTPException(status_code=404, detail="MCP session not found or expired.")
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body.")
rpc_id = body.get("id")
method = body.get("method", "")
params = body.get("params", {})
from app.db.session import get_db_session
with get_db_session() as db:
user_id = await _get_authenticated_user(request, token, db)
logger.info(f"[MCP] [{session_id[:8]}] → {method}")
asyncio.create_task(_dispatch(queue, rpc_id, method, params, user_id, services))
return JSONResponse(
{"status": "accepted"},
status_code=202,
headers={"Access-Control-Allow-Origin": "*"},
)
return router
async def _handle_single(body: dict, token: Optional[str], services: ServiceContainer):
rpc_id = body.get("id")
method = body.get("method", "")
params = body.get("params", {})
logger.info(f"[MCP-HTTP] → {method}")
try:
result = await _execute(method, params, token, services)
if rpc_id is None:
return None
return {"jsonrpc": "2.0", "id": rpc_id, "result": result}
except Exception as exc:
logger.exception(f"[MCP-HTTP] Error for '{method}': {exc}")
if rpc_id is None:
return None
return {
"jsonrpc": "2.0",
"id": rpc_id,
"error": {"code": -32000, "message": str(exc)},
}
async def _dispatch(
queue: asyncio.Queue,
rpc_id,
method: str,
params: dict,
token: Optional[str],
services: ServiceContainer,
):
try:
result = await _execute(method, params, token, services)
await queue.put({"jsonrpc": "2.0", "id": rpc_id, "result": result})
except Exception as exc:
logger.exception(f"[MCP] Tool error for '{method}': {exc}")
await queue.put({
"jsonrpc": "2.0",
"id": rpc_id,
"error": {"code": -32000, "message": str(exc)},
})
async def _execute(method: str, params: dict, token: Optional[str], services: ServiceContainer):
if method == "initialize":
return {
"protocolVersion": MCP_VERSION,
"capabilities": {"tools": {}},
"serverInfo": {"name": "Cortex Hub", "version": "1.0.0"},
}
if method == "ping":
return {}
if method == "tools/list":
return {
"tools": [
_tool_def("list_nodes", "List all agent nodes in the Cortex swarm mesh.", {}),
_tool_def("get_app_info", "Get metadata about this Cortex Hub instance.", {}),
_tool_def("get_node_details", "Get full details for a specific agent node.", {"node_id": {"type": "string"}}, required=["node_id"]),
_tool_def("list_agents", "List all autonomous agents.", {}),
_tool_def("list_skills", "List all registered skill folders.", {}),
_tool_def("dispatch", "Dispatch a shell command to a node.", {"node_id": {"type": "string"}, "command": {"type": "string"}, "session_id": {"type": "string"}}, required=["node_id", "command"]),
_tool_def("write_file", "Create/Update a file on a node.", {"node_id": {"type": "string"}, "path": {"type": "string"}, "content": {"type": "string"}, "is_dir": {"type": "boolean"}, "session_id": {"type": "string"}}, required=["node_id", "path"]),
_tool_def("delete_file", "Delete a file/directory on a node.", {"node_id": {"type": "string"}, "path": {"type": "string"}, "session_id": {"type": "string"}}, required=["node_id", "path"]),
_tool_def("deploy_agent", "Deploy a new autonomous agent.", {"name": {"type": "string"}, "mesh_node_id": {"type": "string"}, "description": {"type": "string"}, "system_prompt": {"type": "string"}, "initial_prompt": {"type": "string"}}, required=["name", "mesh_node_id"]),
_tool_def("update_agent_config", "Update config for an agent.", {"agent_id": {"type": "string"}, "mesh_node_id": {"type": "string"}}, required=["agent_id", "mesh_node_id"]),
_tool_def("delete_agent", "Delete an autonomous agent.", {"agent_id": {"type": "string"}}, required=["agent_id"]),
_tool_def("get_agent_details", "Get details for an autonomous agent.", {"agent_id": {"type": "string"}}, required=["agent_id"]),
_tool_def("get_user_config", "Get user effective config.", {}),
_tool_def("update_user_config", "Update user preferences.", {
"llm": {"type": "object"},
"tts": {"type": "object"},
"stt": {"type": "object"},
"statuses": {"type": "object"}
}),
_tool_def("list_groups", "List all security groups (Admin Only).", {}),
_tool_def("create_group", "Create policy group (Admin Only).", {"name": {"type": "string"}}, required=["name"]),
_tool_def("update_group", "Update policy group (Admin Only).", {"gid": {"type": "string"}}, required=["gid"]),
_tool_def("delete_group", "Delete policy group (Admin Only).", {"gid": {"type": "string"}}, required=["gid"]),
_tool_def("list_users", "List all registered users (Admin Only).", {}),
_tool_def("update_user_role", "Update a user's role (Admin Only).", {"uid": {"type": "string"}, "role": {"type": "string"}}, required=["uid", "role"]),
_tool_def("assign_user_to_group", "Assign user to group (Admin Only).", {"uid": {"type": "string"}, "group_id": {"type": "string"}}, required=["uid", "group_id"]),
_tool_def("get_user_profile", "Get user's profile info.", {}),
_tool_def("update_user_profile", "Update user's profile info.", {"username": {"type": "string"}}),
_tool_def("create_session", "Create a new chat session.", {"provider_name": {"type": "string"}}),
_tool_def("get_session", "Get specific session details.", {"session_id": {"type": "integer"}}, required=["session_id"]),
_tool_def("list_sessions", "List all chat sessions.", {}),
_tool_def("update_session", "Update session configuration.", {"session_id": {"type": "integer"}}, required=["session_id"]),
_tool_def("delete_session", "Archive a chat session.", {"session_id": {"type": "integer"}}, required=["session_id"]),
_tool_def("get_session_messages", "Retrieve chat history.", {"session_id": {"type": "integer"}}, required=["session_id"]),
_tool_def("clear_session_history", "Delete all messages in session.", {"session_id": {"type": "integer"}}, required=["session_id"]),
_tool_def("attach_nodes_to_session", "Attach agent nodes to session.", {"session_id": {"type": "integer"}, "node_ids": {"type": "array", "items": {"type": "string"}}}, required=["session_id", "node_ids"]),
_tool_def("detach_node_from_session", "Detach agent node.", {"session_id": {"type": "integer"}, "node_id": {"type": "string"}}, required=["session_id", "node_id"]),
_tool_def("get_session_nodes", "Get all nodes in session.", {"session_id": {"type": "integer"}}, required=["session_id"]),
_tool_def("cancel_session_task", "Cancel session task.", {"session_id": {"type": "integer"}}, required=["session_id"]),
_tool_def("get_system_status", "Retrieve full system state.", {}),
_tool_def("get_global_config", "Get global LLM/TTS/STT settings (Admin only).", {}),
_tool_def("update_global_config", "Update global LLM/TTS/STT settings (Admin only).", {
"llm_providers": {"type": "object"},
"active_llm_provider": {"type": "string"},
"tts_providers": {"type": "object"},
"active_tts_provider": {"type": "string"},
"stt_providers": {"type": "object"},
"active_stt_provider": {"type": "string"}
}),
_tool_def("verify_provider", "Test connection for an AI provider.", {
"section": {"type": "string", "enum": ["llm", "tts", "stt"]},
"provider_name": {"type": "string"},
"provider_type": {"type": "string"},
"api_key": {"type": "string"},
"model": {"type": "string"},
"voice": {"type": "string"}
}, required=["section", "provider_name"]),
_tool_def("list_files", "List files/directories on an agent node.", {
"node_id": {"type": "string"},
"path": {"type": "string", "description": "Directory path (default: '.')"},
"session_id": {"type": "string", "description": "Session workspace ID (default: '__fs_explorer__')"},
"recursive": {"type": "boolean"}
}, required=["node_id"]),
_tool_def("upload_file", "Upload/Update a file on a node.", {
"node_id": {"type": "string"},
"path": {"type": "string"},
"content": {"type": "string"},
"session_id": {"type": "string"}
}, required=["node_id", "path", "content"]),
_tool_def("download_file", "Read file content from a node.", {
"node_id": {"type": "string"},
"path": {"type": "string"},
"session_id": {"type": "string"}
}, required=["node_id", "path"]),
_tool_def("remove_file", "Delete a file/directory from a node.", {
"node_id": {"type": "string"},
"path": {"type": "string"},
"session_id": {"type": "string"}
}, required=["node_id", "path"]),
_tool_def("create_file", "Create a new empty file or directory.", {
"node_id": {"type": "string"},
"path": {"type": "string"},
"is_dir": {"type": "boolean"},
"session_id": {"type": "string"}
}, required=["node_id", "path"]),
_tool_def("move_file", "Move or rename a file/directory on a node.", {
"node_id": {"type": "string"},
"old_path": {"type": "string"},
"new_path": {"type": "string"},
"session_id": {"type": "string"}
}, required=["node_id", "old_path", "new_path"]),
_tool_def("copy_file", "Copy a file/directory on a node.", {
"node_id": {"type": "string"},
"old_path": {"type": "string"},
"new_path": {"type": "string"},
"session_id": {"type": "string"}
}, required=["node_id", "old_path", "new_path"]),
_tool_def("rename_file", "Rename a file/directory on a node (alias for move_file).", {
"node_id": {"type": "string"},
"old_path": {"type": "string"},
"new_path": {"type": "string"},
"session_id": {"type": "string"}
}, required=["node_id", "old_path", "new_path"]),
_tool_def("get_file_stat", "Get metadata for a file in the Hub mirror.", {
"node_id": {"type": "string"},
"path": {"type": "string"},
"session_id": {"type": "string"}
}, required=["node_id", "path"])
]
}
if method == "tools/call":
name = params.get("name", "")
args = params.get("arguments", {})
return await _call_tool(name, args, token, services)
raise ValueError(f"Unknown method: '{method}'")
def _tool_def(name: str, description: str, properties: dict, required: list = None) -> dict:
schema = {"type": "object", "properties": properties}
if required:
schema["required"] = required
return {"name": name, "description": description, "inputSchema": schema}
class MCPToolDispatcher:
def __init__(self, services: ServiceContainer, loop):
self.services = services
self.loop = loop
self._handlers = {
"list_nodes": self._list_nodes,
"get_app_info": self._get_app_info,
"get_node_details": self._get_node_details,
"list_agents": self._list_agents,
"deploy_agent": self._deploy_agent,
"update_agent_config": self._update_agent_config,
"delete_agent": self._delete_agent,
"get_agent_details": self._get_agent_details,
"get_user_config": self._get_user_config,
"update_user_config": self._update_user_config,
"list_skills": self._list_skills,
"dispatch": self._dispatch,
"write_file": self._write_file,
"delete_file": self._delete_file,
"list_groups": self._list_groups,
"create_group": self._create_group,
"update_group": self._update_group,
"delete_group": self._delete_group,
"list_users": self._list_users,
"update_user_role": self._update_user_role,
"assign_user_to_group": self._assign_user_to_group,
"get_user_profile": self._get_user_profile,
"update_user_profile": self._update_user_profile,
"create_session": self._create_session,
"get_session": self._get_session,
"list_sessions": self._list_sessions,
"update_session": self._update_session,
"delete_session": self._delete_session,
"get_session_messages": self._get_session_messages,
"clear_session_history": self._clear_session_history,
"attach_nodes_to_session": self._attach_nodes_to_session,
"detach_node_from_session": self._detach_node_from_session,
"get_session_nodes": self._get_session_nodes,
"cancel_session_task": self._cancel_session_task,
"get_system_status": self._get_system_status,
"get_global_config": self._get_global_config,
"update_global_config": self._update_global_config,
"verify_provider": self._verify_provider,
"list_files": self._list_files,
"upload_file": self._upload_file,
"download_file": self._download_file,
"remove_file": self._remove_file,
"create_file": self._create_file,
"move_file": self._move_file,
"copy_file": self._copy_file,
"rename_file": self._move_file,
"get_file_stat": self._get_file_stat
}
async def dispatch(self, name: str, args: dict, token: Optional[str]) -> Any:
handler = self._handlers.get(name)
if not handler:
raise ValueError(f"Tool '{name}' not found.")
return await handler(args, token)
async def _list_nodes(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.db.session import get_db_session
with get_db_session() as db:
nodes = self.services.mesh_service.list_accessible_nodes(token, db)
return {"nodes": [{"id": n.node_id, "name": n.display_name, "status": n.last_status, "os": (n.capabilities or {}).get("os"), "is_active": n.is_active} for n in nodes]}
return await self.loop.run_in_executor(None, _query)
async def _get_app_info(self, args: dict, token: Optional[str]):
def _query():
from app.db.session import get_db_session
with get_db_session() as db:
total, online = 0, 0
if token:
nodes = self.services.mesh_service.list_accessible_nodes(token, db)
total = len(nodes)
online = len([n for n in nodes if n.last_status == "online"])
return {
"name": "Cortex Hub", "version": "1.0.0", "capabilities": ["swarms", "webmcp", "mcp-sse", "voice-chat", "rag"],
"nodes": {"total": total, "online": online}, "mcp_transport": "sse", "sse_endpoint": f"{settings.HUB_PUBLIC_URL}/api/v1/mcp/sse",
"auth": {"oidc_enabled": settings.OIDC_ENABLED}
}
return await self.loop.run_in_executor(None, _query)
async def _get_node_details(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
node_id = args.get("node_id")
if not node_id: raise ValueError("node_id required.")
def _query():
from app.db.session import get_db_session
with get_db_session() as db:
try: self.services.mesh_service.require_node_access(token, node_id, db)
except Exception: return None
n = self.services.mesh_service.get_node_or_404(node_id, db)
return {"node_id": n.node_id, "display_name": n.display_name, "description": n.description, "status": n.last_status, "is_active": n.is_active, "capabilities": n.capabilities, "skill_config": n.skill_config, "registered_by": n.registered_by, "last_seen_at": str(n.last_seen_at) if n.last_seen_at else None}
res = await self.loop.run_in_executor(None, _query)
if res is None: raise ValueError(f"Node '{node_id}' not found.")
return res
async def _list_agents(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.db.session import get_db_session
from app.db import models
with get_db_session() as db:
acc = self.services.mesh_service.list_accessible_nodes(token, db)
nids = [n.node_id for n in acc]
rows = db.query(models.AgentInstance).filter(models.AgentInstance.mesh_node_id.in_(nids)).all()
return {"agents": [{"id": str(a.id), "name": a.template.name if a.template else None, "status": a.status, "node": a.mesh_node_id, "last_heartbeat": str(a.last_heartbeat) if a.last_heartbeat else None, "total_runs": a.total_runs, "quality_score": a.latest_quality_score} for a in rows]}
return await self.loop.run_in_executor(None, _query)
async def _deploy_agent(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
from app.api import schemas
def _execute():
from app.db.session import get_db_session
with get_db_session() as db:
req = schemas.DeployAgentRequest(**args)
return self.services.agent_service.deploy_agent(db, token, req)
res = await self.loop.run_in_executor(None, _execute)
prompt = args.get("initial_prompt")
if prompt:
from app.core.orchestration.agent_loop import AgentExecutor
asyncio.create_task(AgentExecutor.run(res["instance_id"], prompt, self.services, self.services.user_service))
return res
async def _update_agent_config(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
agent_id = args.get("agent_id")
if not agent_id: raise ValueError("agent_id required.")
from app.api import schemas
def _execute():
from app.db.session import get_db_session
with get_db_session() as db:
req_args = {k: v for k, v in args.items() if k != "agent_id"}
req = schemas.AgentConfigUpdate(**req_args)
res = self.services.agent_service.update_config(db, agent_id, token, req)
return {"id": str(res.id), "status": res.status}
return await self.loop.run_in_executor(None, _execute)
async def _delete_agent(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
agent_id = args.get("agent_id")
if not agent_id: raise ValueError("agent_id required.")
def _execute():
from app.db.session import get_db_session
with get_db_session() as db:
self.services.agent_service.delete_agent(db, agent_id, token)
return {"status": "deleted", "agent_id": agent_id}
return await self.loop.run_in_executor(None, _execute)
async def _get_agent_details(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
agent_id = args.get("agent_id")
if not agent_id: raise ValueError("agent_id required.")
def _execute():
from app.db.session import get_db_session
with get_db_session() as db:
a = self.services.agent_service.get_agent_instance(db, agent_id, token)
return {"id": str(a.id), "name": a.template.name if a.template else None, "status": a.status, "node": a.mesh_node_id, "last_heartbeat": str(a.last_heartbeat) if a.last_heartbeat else None, "total_runs": a.total_runs, "successful_runs": a.successful_runs, "quality_score": a.latest_quality_score, "workspace_jail": a.current_workspace_jail, "last_error": a.last_error, "evaluation_status": a.evaluation_status}
return await self.loop.run_in_executor(None, _execute)
async def _get_user_config(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
user = db.query(User).filter(User.id == token).first()
if not user: raise ValueError("User not found.")
return self.services.preference_service.merge_user_config(user, db).model_dump()
return await self.loop.run_in_executor(None, _query)
async def _update_user_config(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
from app.api import schemas
def _execute():
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
user = db.query(User).filter(User.id == token).first()
if not user: raise ValueError("User not found.")
prefs = schemas.UserPreferences(**args)
return self.services.preference_service.update_user_config(user, prefs, db).model_dump()
return await self.loop.run_in_executor(None, _execute)
async def _list_skills(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.db.session import get_db_session
from app.db import models
with get_db_session() as db:
rows = db.query(models.Skill).filter(models.Skill.is_enabled == True).all()
return {"skills": [{"id": s.id, "name": s.name, "description": s.description, "type": s.skill_type} for s in rows]}
return await self.loop.run_in_executor(None, _query)
async def _dispatch(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
node_id, command, session_id = args.get("node_id"), args.get("command"), args.get("session_id", "")
if not node_id or not command: raise ValueError("node_id and command required.")
def _execute():
from app.db.session import get_db_session
with get_db_session() as db:
tid = self.services.mesh_service.dispatch_task(node_id, command, token, db, session_id=session_id)
return {"status": "accepted", "task_id": tid}
return await self.loop.run_in_executor(None, _execute)
async def _write_file(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
node_id, path, content, is_dir, session_id = args.get("node_id"), args.get("path"), args.get("content", ""), args.get("is_dir", False), args.get("session_id", "__fs_explorer__")
if not node_id or not path: raise ValueError("node_id and path required.")
def _execute():
from app.db.session import get_db_session
with get_db_session() as db:
self.services.mesh_service.require_node_access(token, node_id, db)
return self.services.orchestrator.assistant.write(node_id, path, content, is_dir, session_id=session_id)
return await self.loop.run_in_executor(None, _execute)
async def _delete_file(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
node_id, path, session_id = args.get("node_id"), args.get("path"), args.get("session_id", "__fs_explorer__")
if not node_id or not path: raise ValueError("node_id and path required.")
def _execute():
from app.db.session import get_db_session
with get_db_session() as db:
self.services.mesh_service.require_node_access(token, node_id, db)
return self.services.orchestrator.assistant.rm(node_id, path, session_id=session_id)
return await self.loop.run_in_executor(None, _execute)
async def _list_files(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
node_id, path, session_id = args.get("node_id"), args.get("path", "."), args.get("session_id", "__fs_explorer__")
if not node_id: raise ValueError("node_id required.")
def _execute():
from app.db.session import get_db_session
with get_db_session() as db:
self.services.mesh_service.require_node_access(token, node_id, db)
return self.services.orchestrator.assistant.ls(node_id, path, session_id=session_id)
return await self.loop.run_in_executor(None, _execute)
async def _upload_file(self, args: dict, token: Optional[str]):
return await self._write_file(args, token)
async def _download_file(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
node_id, path, session_id = args.get("node_id"), args.get("path"), args.get("session_id", "__fs_explorer__")
if not node_id or not path: raise ValueError("node_id and path required.")
def _execute():
from app.db.session import get_db_session
with get_db_session() as db:
self.services.mesh_service.require_node_access(token, node_id, db)
return self.services.orchestrator.assistant.cat(node_id, path, session_id=session_id)
return await self.loop.run_in_executor(None, _execute)
async def _remove_file(self, args: dict, token: Optional[str]):
return await self._delete_file(args, token)
async def _create_file(self, args: dict, token: Optional[str]):
args["content"] = ""
return await self._write_file(args, token)
async def _move_file(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
node_id, old_path, new_path, session_id = args.get("node_id"), args.get("old_path"), args.get("new_path"), args.get("session_id", "__fs_explorer__")
if not node_id or not old_path or not new_path: raise ValueError("node_id, old_path, and new_path required.")
def _execute():
from app.db.session import get_db_session
with get_db_session() as db:
self.services.mesh_service.require_node_access(token, node_id, db)
return self.services.orchestrator.assistant.move(session_id, old_path, new_path, node_id=node_id)
return await self.loop.run_in_executor(None, _execute)
async def _copy_file(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
node_id, old_path, new_path, session_id = args.get("node_id"), args.get("old_path"), args.get("new_path"), args.get("session_id", "__fs_explorer__")
if not node_id or not old_path or not new_path: raise ValueError("node_id, old_path, and new_path required.")
def _execute():
from app.db.session import get_db_session
with get_db_session() as db:
self.services.mesh_service.require_node_access(token, node_id, db)
return self.services.orchestrator.assistant.copy(session_id, old_path, new_path, node_id=node_id)
return await self.loop.run_in_executor(None, _execute)
async def _get_file_stat(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
node_id, path, session_id = args.get("node_id"), args.get("path"), args.get("session_id", "__fs_explorer__")
if not node_id or not path: raise ValueError("node_id and path required.")
def _execute():
from app.db.session import get_db_session
with get_db_session() as db:
self.services.mesh_service.require_node_access(token, node_id, db)
return self.services.orchestrator.assistant.stat(session_id, path)
return await self.loop.run_in_executor(None, _execute)
async def _list_groups(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
u = db.query(User).filter(User.id == token).first()
if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
return [schemas.GroupInfo.model_validate(g).model_dump() for g in self.services.user_service.get_all_groups(db)]
return await self.loop.run_in_executor(None, _query)
async def _create_group(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
u = db.query(User).filter(User.id == token).first()
if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
g = self.services.user_service.create_group(db, args.get("name"), args.get("description"), args.get("policy"))
if g is None: raise ValueError(f"Group '{args.get('name')}' exists.")
return schemas.GroupInfo.model_validate(g).model_dump()
return await self.loop.run_in_executor(None, _query)
async def _update_group(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
u = db.query(User).filter(User.id == token).first()
if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
g = self.services.user_service.update_group(db, args.get("gid"), args.get("name"), args.get("description"), args.get("policy"))
if g is None: raise ValueError("Group not found.")
if g is False: raise ValueError("Group name conflict.")
return schemas.GroupInfo.model_validate(g).model_dump()
return await self.loop.run_in_executor(None, _query)
async def _delete_group(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
u = db.query(User).filter(User.id == token).first()
if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
if not self.services.user_service.delete_group(db, args.get("gid")): raise ValueError("Failed delete.")
return {"message": "Group deleted successfully"}
return await self.loop.run_in_executor(None, _query)
async def _list_users(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
u = db.query(User).filter(User.id == token).first()
if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
users = self.services.user_service.get_all_users(db)
res = []
for u_item in users:
p = schemas.UserProfile.model_validate(u_item)
if u_item.group: p.group_name = u_item.group.name
res.append(p.model_dump())
return res
return await self.loop.run_in_executor(None, _query)
async def _update_user_role(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
u = db.query(User).filter(User.id == token).first()
if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
if not self.services.user_service.update_user_role(db, args.get("uid"), args.get("role")): raise ValueError("Failed update.")
updated = self.services.user_service.get_user_by_id(db, args.get("uid"))
return schemas.UserProfile.model_validate(updated).model_dump()
return await self.loop.run_in_executor(None, _query)
async def _assign_user_to_group(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
u = db.query(User).filter(User.id == token).first()
if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
if not self.services.user_service.assign_user_to_group(db, args.get("uid"), args.get("group_id")): raise ValueError("User or group not found.")
updated = self.services.user_service.get_user_by_id(db, args.get("uid"))
return schemas.UserProfile.model_validate(updated).model_dump()
return await self.loop.run_in_executor(None, _query)
async def _get_user_profile(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
u = self.services.user_service.get_user_by_id(db=db, user_id=token)
if not u: raise ValueError("User not found.")
p = schemas.UserProfile.model_validate(u)
if u.group: p.group_name = u.group.name
return p.model_dump()
return await self.loop.run_in_executor(None, _query)
async def _update_user_profile(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
u = self.services.user_service.get_user_by_id(db=db, user_id=token)
if not u: raise ValueError("User not found.")
if args.get("username"): u.username = args.get("username")
if args.get("full_name"): u.full_name = args.get("full_name")
if args.get("avatar_url"): u.avatar_url = args.get("avatar_url")
db.commit(); db.refresh(u)
p = schemas.UserProfile.model_validate(u)
if u.group: p.group_name = u.group.name
return p.model_dump()
return await self.loop.run_in_executor(None, _query)
async def _create_session(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
with get_db_session() as db:
s = self.services.session_service.create_session(
db=db, user_id=token, provider_name=args.get("provider_name", "deepseek"),
model_name=args.get("model_name"), feature_name=args.get("feature_name", "default"),
stt_provider_name=args.get("stt_provider_name"), tts_provider_name=args.get("tts_provider_name")
)
return schemas.Session.model_validate(s).model_dump()
return await self.loop.run_in_executor(None, _query)
async def _get_session(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import Session as DBSession
with get_db_session() as db:
s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
if not s: raise ValueError("Session not found.")
return schemas.Session.model_validate(s).model_dump()
return await self.loop.run_in_executor(None, _query)
async def _list_sessions(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import Session as DBSession
with get_db_session() as db:
sessions = db.query(DBSession).filter(
DBSession.user_id == token, DBSession.feature_name == args.get("feature_name", "default"),
DBSession.is_archived == False
).order_by(DBSession.created_at.desc()).all()
return [schemas.Session.model_validate(s).model_dump() for s in sessions]
return await self.loop.run_in_executor(None, _query)
async def _update_session(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import Session as DBSession, Skill
with get_db_session() as db:
s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
if not s: raise ValueError("Session not found.")
if args.get("title") is not None: s.title = args.get("title")
if args.get("provider_name") is not None: s.provider_name = args.get("provider_name")
if args.get("model_name") is not None: s.model_name = args.get("model_name")
if args.get("restrict_skills") is not None: s.restrict_skills = args.get("restrict_skills")
if args.get("allowed_skill_names") is not None: s.allowed_skill_names = args.get("allowed_skill_names")
if args.get("allowed_skill_ids") is not None:
s.skills = db.query(Skill).filter(Skill.id.in_(args.get("allowed_skill_ids"))).all()
if args.get("system_prompt_override") is not None: s.system_prompt_override = args.get("system_prompt_override")
if args.get("is_locked") is not None: s.is_locked = args.get("is_locked")
db.commit(); db.refresh(s)
return schemas.Session.model_validate(s).model_dump()
return await self.loop.run_in_executor(None, _query)
async def _delete_session(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.db.session import get_db_session
from app.db.models import Session as DBSession
with get_db_session() as db:
s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
if not s: raise ValueError("Session not found.")
self.services.session_service.archive_session(db, args.get("session_id"))
return {"message": "Session deleted successfully."}
return await self.loop.run_in_executor(None, _query)
async def _get_session_messages(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import Session as DBSession
with get_db_session() as db:
s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
if not s: raise ValueError("Session access denied.")
msgs = self.services.rag_service.get_message_history(db=db, session_id=args.get("session_id"))
res = []
for m in msgs:
m_dict = schemas.Message.model_validate(m).model_dump()
if m.audio_path and os.path.exists(m.audio_path):
m_dict["has_audio"], m_dict["audio_url"] = True, f"/sessions/messages/{m.id}/audio"
res.append(m_dict)
return res
return await self.loop.run_in_executor(None, _query)
async def _clear_session_history(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.db.session import get_db_session
from app.db.models import Session as DBSession, Message as DBMessage
with get_db_session() as db:
s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
if not s: raise ValueError("Session not found.")
if s.is_locked: raise ValueError("Session is locked.")
cnt = db.query(DBMessage).filter(DBMessage.session_id == args.get("session_id")).delete()
db.commit()
return {"message": f"Cleared {cnt} messages."}
return await self.loop.run_in_executor(None, _query)
async def _attach_nodes_to_session(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import Session as DBSession
with get_db_session() as db:
s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
if not s: raise ValueError("Session access denied.")
req = schemas.NodeAttachRequest(**args)
return self.services.session_service.attach_nodes(db, args.get("session_id"), req).model_dump()
return await self.loop.run_in_executor(None, _query)
async def _detach_node_from_session(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.db.session import get_db_session
from app.db.models import Session as DBSession
with get_db_session() as db:
s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
if not s: raise ValueError("Session not found.")
nodes = list(s.attached_node_ids or [])
if args.get("node_id") not in nodes: raise ValueError("Node not attached.")
nodes.remove(args.get("node_id"))
s.attached_node_ids = nodes
status = dict(s.node_sync_status or {})
status.pop(args.get("node_id"), None)
s.node_sync_status = status
db.commit()
if hasattr(self.services, "orchestrator") and self.services.orchestrator:
self.services.orchestrator.assistant.clear_workspace(args.get("node_id"), s.sync_workspace_id)
return {"message": "Detached successfully."}
return await self.loop.run_in_executor(None, _query)
async def _get_session_nodes(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import Session as DBSession
with get_db_session() as db:
s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
if not s: raise ValueError("Session not found.")
status = s.node_sync_status or {}
entries = []
for nid in (s.attached_node_ids or []):
live = self.services.node_registry_service.get_node(nid)
p = status.get(nid, {})
val = "connected" if live and p.get("status") == "pending" else p.get("status", "pending")
entries.append(schemas.NodeSyncStatusEntry(node_id=nid, status=val, last_sync=p.get("last_sync"), error=p.get("error")).model_dump())
return {"session_id": s.id, "sync_workspace_id": s.sync_workspace_id, "nodes": entries, "sync_config": s.sync_config or {}}
return await self.loop.run_in_executor(None, _query)
async def _cancel_session_task(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.db.session import get_db_session
from app.db.models import Session as DBSession
with get_db_session() as db:
s = db.query(DBSession).filter(DBSession.id == args.get("session_id"), DBSession.user_id == token).first()
if not s: raise ValueError("Session not found.")
s.is_cancelled = True
db.commit()
return {"message": "Cancellation request sent."}
return await self.loop.run_in_executor(None, _query)
async def _get_global_config(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
def _query():
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
u = db.query(User).filter(User.id == token).first()
if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
config = self.services.preference_service.get_global_config(db)
def mask_keys(section_name, providers_dict):
import copy
res = copy.deepcopy(providers_dict) if providers_dict else {}
for p_data in res.values():
if isinstance(p_data, dict) and p_data.get("api_key"):
p_data["api_key"] = self.services.preference_service.mask_key(p_data["api_key"])
return res
return {
"llm_providers": mask_keys("llm", config.get("llm", {}).get("providers") or settings.LLM_PROVIDERS),
"active_llm_provider": config.get("llm", {}).get("active_provider") or settings.ACTIVE_LLM_PROVIDER,
"tts_providers": mask_keys("tts", config.get("tts", {}).get("providers") or {
settings.TTS_PROVIDER: {"api_key": settings.TTS_API_KEY, "model": settings.TTS_MODEL_NAME, "voice": settings.TTS_VOICE_NAME}
}),
"active_tts_provider": config.get("tts", {}).get("active_provider") or settings.TTS_PROVIDER,
"stt_providers": mask_keys("stt", config.get("stt", {}).get("providers") or {
settings.STT_PROVIDER: {"api_key": settings.STT_API_KEY, "model": settings.STT_MODEL_NAME}
}),
"active_stt_provider": config.get("stt", {}).get("active_provider") or settings.STT_PROVIDER
}
return await self.loop.run_in_executor(None, _query)
async def _update_global_config(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
from app.api import schemas
def _execute():
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
u = db.query(User).filter(User.id == token).first()
if not u or u.role != "admin": raise ValueError("Forbidden: Admin only.")
# Transform MCP args to UserPreferences schema
prefs = schemas.UserPreferences(
llm={
"active_provider": args.get("active_llm_provider"),
"providers": args.get("llm_providers")
} if args.get("llm_providers") or args.get("active_llm_provider") else None,
tts={
"active_provider": args.get("active_tts_provider"),
"providers": args.get("tts_providers")
} if args.get("tts_providers") or args.get("active_tts_provider") else None,
stt={
"active_provider": args.get("active_stt_provider"),
"providers": args.get("stt_providers")
} if args.get("stt_providers") or args.get("active_stt_provider") else None
)
self.services.preference_service.update_global_config(prefs, db, u)
return {"message": "Global providers updated successfully via system_config"}
return await self.loop.run_in_executor(None, _execute)
async def _verify_provider(self, args: dict, token: Optional[str]):
if not token: raise ValueError("Authentication required.")
from app.api import schemas
from app.db.session import get_db_session
from app.db.models import User
with get_db_session() as db:
u = db.query(User).filter(User.id == token).first()
if not u: raise ValueError("User not found.")
section = args.get("section")
req = schemas.VerifyProviderRequest(
provider_name=args.get("provider_name"),
provider_type=args.get("provider_type"),
api_key=args.get("api_key"),
model=args.get("model"),
voice=args.get("voice")
)
res = await self.services.preference_service.verify_provider(db, u, req, section)
return res.model_dump()
async def _get_system_status(self, args: dict, token: Optional[str]):
def _query():
return {"status": "running", "oidc_enabled": settings.OIDC_ENABLED, "version": "1.0.0"}
return await self.loop.run_in_executor(None, _query)
async def _call_tool(name: str, args: dict, token: Optional[str], services: ServiceContainer) -> dict:
"""Execute a named tool via MCPToolDispatcher."""
def _ok(data) -> dict:
text = json.dumps(data, indent=2, default=str) if not isinstance(data, str) else data
return {"content": [{"type": "text", "text": text}]}
loop = asyncio.get_running_loop()
dispatcher = MCPToolDispatcher(services, loop)
result = await dispatcher.dispatch(name, args, token)
return _ok(result)