diff --git a/ai-hub/app/api/routes/mcp.py b/ai-hub/app/api/routes/mcp.py index 341d333..415f794 100644 --- a/ai-hub/app/api/routes/mcp.py +++ b/ai-hub/app/api/routes/mcp.py @@ -43,16 +43,14 @@ token: Optional[str] = Query(None, description="Optional user token (X-User-ID)"), ): """ - Establishes a Server-Sent Events (SSE) stream for an MCP client. - - Per the MCP spec the first event MUST be `endpoint`, whose data is the - URL the client should POST messages to. + Legacy SSE transport (MCP 2024-11-05). + Opens a persistent SSE stream; first event is `endpoint` telling the + client where to POST messages. """ session_id = str(uuid.uuid4()) queue: asyncio.Queue = asyncio.Queue() _sse_sessions[session_id] = queue - # Build the absolute messages URL from the request's base URL base = str(request.base_url).rstrip("/") messages_url = f"{base}/api/v1/mcp/messages?session_id={session_id}" if token: @@ -61,12 +59,10 @@ logger.info(f"[MCP] New SSE session: {session_id}") async def event_generator() -> AsyncIterator[str]: - # Required first event — tells the client where to POST messages yield f"event: endpoint\ndata: {messages_url}\n\n" try: while True: if await request.is_disconnected(): - logger.info(f"[MCP] Client disconnected: {session_id}") break try: msg = await asyncio.wait_for(queue.get(), timeout=25.0) @@ -75,18 +71,53 @@ yield ": keepalive\n\n" finally: _sse_sessions.pop(session_id, None) - logger.info(f"[MCP] Session cleaned up: {session_id}") return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", - "X-Accel-Buffering": "no", # Disable Nginx buffering for SSE + "X-Accel-Buffering": "no", "Access-Control-Allow-Origin": "*", }, ) + # ─── Streamable HTTP Transport (MCP 2025-03-26) ─────────────────────────── + # Modern clients (Antigravity, Cursor, VS Code) POST JSON-RPC directly to + # the serverURL and expect a synchronous JSON response back. + # Both /sse and / are registered so it works regardless of which URL + # the client is pointed at. + @router.post("/sse") + @router.post("/") + async def mcp_streamable_http( + request: Request, + token: Optional[str] = Query(None), + ): + """ + Streamable HTTP transport — MCP spec 2025-03-26. + Client sends a JSON-RPC 2.0 request via POST and receives the response + synchronously in the HTTP response body. + """ + try: + body = await request.json() + except Exception: + raise HTTPException(status_code=400, detail="Invalid JSON body.") + + # Batch requests (array) — process each and return array + if isinstance(body, list): + results = [] + for item in body: + results.append(await _handle_single(item, token, services)) + return JSONResponse(results, headers={"Access-Control-Allow-Origin": "*"}) + + # Single request + response = await _handle_single(body, token, services) + # Notifications have no id — return 202 with empty body + if response is None: + return JSONResponse(None, status_code=202, + headers={"Access-Control-Allow-Origin": "*"}) + return JSONResponse(response, headers={"Access-Control-Allow-Origin": "*"}) + # ─── SSE Transport — Message Handler ───────────────────────────────────── @router.post("/messages") async def mcp_messages( @@ -95,9 +126,8 @@ token: Optional[str] = Query(None), ): """ - Receives a JSON-RPC 2.0 message from an MCP client. - The response is pushed asynchronously back over the SSE stream. - Returns 202 Accepted immediately so the client doesn't time out. + Legacy SSE message handler — receives JSON-RPC 2.0 from a client that + first established a GET /sse stream, then pushes results over that stream. """ queue = _sse_sessions.get(session_id) if not queue: @@ -113,8 +143,6 @@ params = body.get("params", {}) logger.info(f"[MCP] [{session_id[:8]}] → {method}") - - # Dispatch asynchronously — don't block the HTTP response asyncio.create_task(_dispatch(queue, rpc_id, method, params, token, services)) return JSONResponse( @@ -126,6 +154,32 @@ return router +# ─── Single-request handler (used by Streamable HTTP) ──────────────────────── + +async def _handle_single(body: dict, token: Optional[str], services: ServiceContainer): + """Process one JSON-RPC object; return response dict or None for notifications.""" + rpc_id = body.get("id") # None means it's a notification + method = body.get("method", "") + params = body.get("params", {}) + + logger.info(f"[MCP-HTTP] → {method}") + try: + result = await _execute(method, params, token, services) + if rpc_id is None: + return None # notification — no response + return {"jsonrpc": "2.0", "id": rpc_id, "result": result} + except Exception as exc: + logger.exception(f"[MCP-HTTP] Error for '{method}': {exc}") + if rpc_id is None: + return None + return { + "jsonrpc": "2.0", + "id": rpc_id, + "error": {"code": -32000, "message": str(exc)}, + } + + + # ─── Dispatcher ─────────────────────────────────────────────────────────────── async def _dispatch(