"""
Agent Node REST + WebSocket API
Admin-managed nodes, group access control, and user-facing live streaming.
Admin endpoints (require role=admin):
POST /nodes/ — Create node registration + generate invite_token
GET /nodes/admin — List all nodes (admin view, full detail)
GET /nodes/admin/{node_id} — Full admin detail including invite_token
PATCH /nodes/admin/{node_id} — Update node config (description, skill_config, is_active)
POST /nodes/admin/{node_id}/access — Grant group access to a node
DELETE /nodes/admin/{node_id}/access/{group_id} — Revoke group access
User endpoints (scoped to caller's group):
GET /nodes/ — List accessible nodes (user view, no sensitive data)
GET /nodes/{node_id}/status — Quick online/offline probe
POST /nodes/{node_id}/dispatch — Dispatch a task to a node
PATCH /nodes/preferences — Update user's default_nodes + data_source prefs
WebSocket (real-time streaming):
WS /nodes/{node_id}/stream — Single-node live execution stream
WS /nodes/stream/all?user_id=... — All-nodes global bus (multi-pane UI)
"""
import asyncio
import os
import json
import queue
import uuid
import secrets
import logging
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from app.api.dependencies import ServiceContainer, get_db
from app.api import schemas
from app.db import models
logger = logging.getLogger(__name__)
HEARTBEAT_INTERVAL_S = 5
def create_nodes_router(services: ServiceContainer) -> APIRouter:
router = APIRouter(prefix="/nodes", tags=["Agent Nodes"])
def _registry():
return services.node_registry_service
def _require_admin(user_id: str, db: Session):
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user or user.role != "admin":
raise HTTPException(status_code=403, detail="Admin access required.")
return user
# ==================================================================
# ADMIN ENDPOINTS
# ==================================================================
@router.post("/admin", response_model=schemas.AgentNodeAdminDetail, summary="[Admin] Register New Node")
def admin_create_node(
request: schemas.AgentNodeCreate,
admin_id: str,
db: Session = Depends(get_db)
):
"""
Admin registers a new Agent Node.
Returns the node record including a generated invite_token that must be
placed in the node's config YAML before deployment.
"""
_require_admin(admin_id, db)
existing = db.query(models.AgentNode).filter(
models.AgentNode.node_id == request.node_id
).first()
if existing:
raise HTTPException(status_code=409, detail=f"Node '{request.node_id}' already exists.")
# Generate a cryptographically secure invite token
invite_token = secrets.token_urlsafe(32)
node = models.AgentNode(
node_id=request.node_id,
display_name=request.display_name,
description=request.description,
registered_by=admin_id,
skill_config=request.skill_config.model_dump(),
invite_token=invite_token,
last_status="offline",
)
db.add(node)
db.commit()
db.refresh(node)
logger.info(f"[admin] Created node '{request.node_id}' by admin {admin_id}")
return _node_to_admin_detail(node, _registry())
@router.get("/admin", response_model=list[schemas.AgentNodeAdminDetail], summary="[Admin] List All Nodes")
def admin_list_nodes(admin_id: str, db: Session = Depends(get_db)):
"""Full node list for admin dashboard, including invite_token and skill config."""
_require_admin(admin_id, db)
nodes = db.query(models.AgentNode).all()
return [_node_to_admin_detail(n, _registry()) for n in nodes]
@router.get("/admin/{node_id}", response_model=schemas.AgentNodeAdminDetail, summary="[Admin] Get Node Detail")
def admin_get_node(node_id: str, admin_id: str, db: Session = Depends(get_db)):
_require_admin(admin_id, db)
node = _get_node_or_404(node_id, db)
return _node_to_admin_detail(node, _registry())
@router.patch("/admin/{node_id}", response_model=schemas.AgentNodeAdminDetail, summary="[Admin] Update Node Config")
def admin_update_node(
node_id: str,
update: schemas.AgentNodeUpdate,
admin_id: str,
db: Session = Depends(get_db)
):
"""Update display_name, description, skill_config toggles, or is_active."""
_require_admin(admin_id, db)
node = _get_node_or_404(node_id, db)
if update.display_name is not None:
node.display_name = update.display_name
if update.description is not None:
node.description = update.description
if update.skill_config is not None:
node.skill_config = update.skill_config.model_dump()
# M6: Push policy live to the node if it's connected
try:
services.orchestrator.push_policy(node_id, node.skill_config)
except Exception as e:
logger.warning(f"Could not push live policy to {node_id}: {e}")
if update.is_active is not None:
node.is_active = update.is_active
db.commit()
db.refresh(node)
return _node_to_admin_detail(node, _registry())
@router.delete("/admin/{node_id}", summary="[Admin] Deregister Node")
def admin_delete_node(
node_id: str,
admin_id: str,
db: Session = Depends(get_db)
):
"""Delete a node registration and all its access grants."""
_require_admin(admin_id, db)
node = _get_node_or_404(node_id, db)
# Deregister from live memory if online
_registry().deregister(node_id)
db.delete(node)
db.commit()
return {"status": "success", "message": f"Node {node_id} deleted"}
@router.post("/admin/{node_id}/access", response_model=schemas.NodeAccessResponse, summary="[Admin] Grant Group Access")
def admin_grant_access(
node_id: str,
grant: schemas.NodeAccessGrant,
admin_id: str,
db: Session = Depends(get_db)
):
"""Grant a group access to use this node in sessions."""
_require_admin(admin_id, db)
node = _get_node_or_404(node_id, db)
existing = db.query(models.NodeGroupAccess).filter(
models.NodeGroupAccess.node_id == node_id,
models.NodeGroupAccess.group_id == grant.group_id
).first()
if existing:
existing.access_level = grant.access_level
existing.granted_by = admin_id
db.commit()
db.refresh(existing)
return existing
access = models.NodeGroupAccess(
node_id=node_id,
group_id=grant.group_id,
access_level=grant.access_level,
granted_by=admin_id,
)
db.add(access)
db.commit()
db.refresh(access)
return access
@router.delete("/admin/{node_id}/access/{group_id}", summary="[Admin] Revoke Group Access")
def admin_revoke_access(
node_id: str,
group_id: str,
admin_id: str,
db: Session = Depends(get_db)
):
_require_admin(admin_id, db)
access = db.query(models.NodeGroupAccess).filter(
models.NodeGroupAccess.node_id == node_id,
models.NodeGroupAccess.group_id == group_id
).first()
if not access:
raise HTTPException(status_code=404, detail="Access grant not found.")
db.delete(access)
db.commit()
return {"message": f"Access revoked for group '{group_id}' on node '{node_id}'."}
@router.post("/admin/mesh/reset", summary="[Admin] Emergency Mesh Reset")
def admin_reset_mesh(
admin_id: str,
db: Session = Depends(get_db)
):
"""
DANGEROUS: Clears ALL live node collections from memory and resets DB statuses to offline.
Use this to resolve 'zombie' nodes or flapping connections.
"""
_require_admin(admin_id, db)
# 1. Reset DB
_registry().reset_all_statuses()
# 2. Clear Memory
count = _registry().clear_memory_cache()
logger.warning(f"[Admin] Mesh Reset triggered by {admin_id}. Cleared {count} live nodes.")
return {"status": "success", "cleared_count": count}
# ==================================================================
# USER-FACING ENDPOINTS
# ==================================================================
@router.get("/", response_model=list[schemas.AgentNodeUserView], summary="List Accessible Nodes")
def list_accessible_nodes(user_id: str, db: Session = Depends(get_db)):
"""
Returns nodes the calling user's group has access to.
Merges live connection state from the in-memory registry.
"""
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found.")
# Admin sees everything; users see only group-granted nodes
if user.role == "admin":
nodes = db.query(models.AgentNode).filter(models.AgentNode.is_active == True).all()
else:
# Nodes accessible via user's group (relational)
accesses = db.query(models.NodeGroupAccess).filter(
models.NodeGroupAccess.group_id == user.group_id
).all()
node_ids = set([a.node_id for a in accesses])
# Nodes accessible via group policy whitelist
if user.group and user.group.policy:
policy_nodes = user.group.policy.get("nodes", [])
if isinstance(policy_nodes, list):
for nid in policy_nodes:
node_ids.add(nid)
nodes = db.query(models.AgentNode).filter(
models.AgentNode.node_id.in_(list(node_ids)),
models.AgentNode.is_active == True
).all()
registry = _registry()
return [_node_to_user_view(n, registry) for n in nodes]
@router.get("/{node_id}/status", summary="Quick Node Online Check")
def get_node_status(node_id: str):
live = _registry().get_node(node_id)
if not live:
return {"node_id": node_id, "status": "offline"}
return {"node_id": node_id, "status": live._compute_status(), "stats": live.stats}
@router.get("/{node_id}/terminal", summary="Read Node Terminal History (AI Use Case)")
def get_node_terminal(node_id: str):
"""
AI-Specific: Returns the most recent 150 terminal interaction chunks for a live node.
This provides context for the AI reasoning agent.
"""
live = _registry().get_node(node_id)
if not live:
return {"node_id": node_id, "status": "offline", "terminal": []}
return {
"node_id": node_id,
"status": live._compute_status(),
"terminal": live.terminal_history,
"session_id": live.session_id
}
@router.post("/{node_id}/dispatch", response_model=schemas.NodeDispatchResponse, summary="Dispatch Task to Node")
def dispatch_to_node(node_id: str, request: schemas.NodeDispatchRequest):
"""
Queue a shell or browser task to an online node.
Emits task_assigned immediately so the live UI shows it.
"""
registry = _registry()
live = registry.get_node(node_id)
if not live:
raise HTTPException(status_code=503, detail=f"Node '{node_id}' is not connected.")
task_id = request.task_id or str(uuid.uuid4())
# M6: Use the integrated Protobufs & Crypto from app/core/grpc
from app.protos import agent_pb2
from app.core.grpc.utils.crypto import sign_payload
payload = request.command or json.dumps(request.browser_action or {})
registry.emit(node_id, "task_assigned",
{"command": request.command, "session_id": request.session_id},
task_id=task_id)
try:
task_req = agent_pb2.TaskRequest(
task_id=task_id,
payload_json=payload,
signature=sign_payload(payload),
timeout_ms=request.timeout_ms,
session_id=request.session_id or "",
)
# Push directly to the node's live gRPC outbound queue
live.queue.put(agent_pb2.ServerTaskMessage(task_request=task_req))
registry.emit(node_id, "task_start", {"command": request.command}, task_id=task_id)
except Exception as e:
logger.error(f"[nodes/dispatch] Failed to put task onto queue for {node_id}: {e}")
raise HTTPException(status_code=500, detail="Internal Dispatch Error")
return schemas.NodeDispatchResponse(task_id=task_id, status="accepted")
@router.post("/{node_id}/cancel", summary="Cancel/Interrupt Task on Node")
def cancel_on_node(node_id: str, task_id: str = ""):
"""
Sends a TaskCancelRequest to the specified node.
For shell skills, this typically translates to SIGINT (Ctrl+C).
"""
registry = _registry()
live = registry.get_node(node_id)
if not live:
raise HTTPException(status_code=503, detail=f"Node '{node_id}' is not connected.")
from app.protos import agent_pb2
cancel_req = agent_pb2.TaskCancelRequest(task_id=task_id)
live.queue.put(agent_pb2.ServerTaskMessage(task_cancel=cancel_req))
registry.emit(node_id, "task_cancel", {"task_id": task_id})
return {"status": "cancel_sent", "task_id": task_id}
@router.patch("/preferences", summary="Update User Node Preferences")
def update_node_preferences(
user_id: str,
prefs: schemas.UserNodePreferences,
db: Session = Depends(get_db)
):
"""
Save the user's default_node_ids and data_source config into their preferences.
The UI reads this to auto-attach nodes when a new session starts.
"""
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found.")
existing_prefs = user.preferences or {}
existing_prefs["nodes"] = prefs.model_dump()
user.preferences = existing_prefs
db.commit()
return {"message": "Node preferences saved.", "nodes": prefs.model_dump()}
@router.get("/preferences", response_model=schemas.UserNodePreferences, summary="Get User Node Preferences")
def get_node_preferences(user_id: str, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found.")
node_prefs = (user.preferences or {}).get("nodes", {})
return schemas.UserNodePreferences(**node_prefs) if node_prefs else schemas.UserNodePreferences()
# ==================================================================
# M4: Config YAML Download (admin only)
# ==================================================================
README_CONTENT = """# Cortex Agent Node
This bundle contains the Cortex Agent Node, a modular software that connects to your Cortex Hub to execute tasks like browser automation, file management, and terminal control.
## Prerequisites
- **Python 3.10+**: Ensure you have Python installed.
- **Internet Access**: The node needs to reach your Hub at the endpoint specified in `agent_config.yaml`.
## Quick Start
### Linux & macOS
1. Open your terminal.
2. Navigate to this directory.
3. Make the runner script executable:
```bash
chmod +x run.sh
```
4. Run the node:
```bash
./run.sh
```
*Note for Mac users:* You can also double-click the `run_mac.command` file to start the node directly.
### Windows
1. Open the folder in File Explorer.
2. Double-click `run.bat`.
The scripts will automatically set up a Python virtual environment, install dependencies, and start the node.
## Configuration
The `agent_config.yaml` file contains the connection details and security tokens for your node. It has been pre-configured for you. Do not share this file.
## Troubleshooting
- **Permissions**: If you encounter permission errors, ensure you are running in a directory where you have write access.
- **Port 50051**: Ensure your network/firewall allows outgoing connections to the Hub's gRPC port.
"""
RUN_SH_CONTENT = """#!/bin/bash
# Cortex Agent Node — Seamless Runner
set -e
# Ensure we are in the script's directory (crucial for double-clicking on Mac)
cd "$(dirname "$0")"
echo "🚀 Starting Cortex Agent Node..."
# 1. Environment Check
if ! command -v python3 &> /dev/null; then
echo "❌ Error: python3 is not installed. Please install Python 3.10+ and try again."
# Mac users often need to be pointed to python.org or brew
if [[ "$OSTYPE" == "darwin"* ]]; then
echo "💡 Tip: Visit https://www.python.org/downloads/macos/ or run 'brew install python'"
fi
sleep 5
exit 1
fi
# 2. Virtual Environment Setup
VENV=".venv"
if [ ! -d "$VENV" ]; then
echo "[*] Creating virtual environment..."
python3 -m venv "$VENV" || {
echo "❌ Error: Failed to create virtual environment."
if [[ "$OSTYPE" == "linux-gnu"* ]]; then
echo "💡 Tip: Try: sudo apt install python3-venv"
fi
sleep 5
exit 1
}
fi
# Activate venv
# shellcheck source=/dev/null
source "$VENV/bin/activate"
# 3. Dependency Installation
echo "[*] Ensuring dependencies (pip, gRPC, etc.) are installed..."
pip install --upgrade pip --quiet
pip install -r requirements.txt --quiet
# 4. Playwright Setup (for Browser Skills)
if grep -q "playwright" requirements.txt; then
if [ ! -f "$VENV/.playwright-installed" ]; then
echo "[*] Installing browser engines and system dependencies..."
# --with-deps ensures chromium actually runs on Linux (installs missing .so files)
if [[ "$OSTYPE" == "linux-gnu"* ]]; then
python3 -m playwright install --with-deps chromium
else
python3 -m playwright install chromium
fi
touch "$VENV/.playwright-installed"
fi
fi
# 5. Start the Node
echo "✅ Environment ready. Booting node..."
python3 -m agent_node.main
if [ $? -ne 0 ]; then
echo "⚠️ Node exited with error. Check the logs above."
sleep 5
fi
"""
RUN_MAC_CONTENT = RUN_SH_CONTENT # They are compatible, .command just helps macOS users
RUN_BAT_CONTENT = """@echo off
echo 🚀 Starting Cortex Agent Node...
python --version >nul 2>&1
if %errorlevel% neq 0 (
echo ❌ Error: python is not installed or not in PATH. Please install Python 3.10+.
pause
exit /b 1
)
if not exist .venv (
echo [*] Creating virtual environment...
python -m venv .venv
)
call .venv\\Scripts\\activate
echo [*] Ensuring dependencies are installed...
pip install --upgrade pip --quiet
pip install -r requirements.txt --quiet
findstr "playwright" requirements.txt >nul
if %errorlevel% equ 0 (
if not exist .venv\\.playwright-installed (
echo [*] Installing browser engines...
python -m playwright install chromium
echo done > .venv\\.playwright-installed
)
)
echo ✅ Environment ready. Booting node...
python -m agent_node.main
if %errorlevel% neq 0 (
echo [!] Node exited with error level %errorlevel%
pause
)
"""
@router.get(
"/admin/{node_id}/config.yaml",
response_model=schemas.NodeConfigYamlResponse,
summary="[Admin] Download Node Config YAML",
)
def download_node_config_yaml(node_id: str, admin_id: str, db: Session = Depends(get_db)):
"""
Generate and return the agent_config.yaml content an admin downloads
and places alongside the node client software before deployment.
"""
_require_admin(admin_id, db)
node = _get_node_or_404(node_id, db)
config_yaml = _generate_node_config_yaml(node)
return schemas.NodeConfigYamlResponse(node_id=node_id, config_yaml=config_yaml)
@router.get("/admin/{node_id}/download", summary="[Admin] Download Agent Node Bundle (ZIP)")
def admin_download_bundle(
node_id: str,
admin_id: str,
db: Session = Depends(get_db)
):
"""
Bundles the entire Agent Node source code along with a pre-configured
agent_config.yaml into a single ZIP file for the user to download.
"""
import io
import zipfile
_require_admin(admin_id, db)
node = _get_node_or_404(node_id, db)
config_yaml = _generate_node_config_yaml(node)
# Create ZIP in memory
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
# 1. Add Agent Node source files from /app/agent-node
source_dir = "/app/agent-node"
if os.path.exists(source_dir):
for root, dirs, files in os.walk(source_dir):
# Exclude unwanted directories
dirs[:] = [d for d in dirs if not d.startswith("sync-node-") and d != "__pycache__" and d != ".git"]
for file in files:
if file == ".env" or file == "agent_config.yaml": continue
file_path = os.path.join(root, file)
rel_path = os.path.relpath(file_path, source_dir)
zip_file.write(file_path, rel_path)
# 2. Add skills from /app/skills
skills_dir = "/app/skills"
if os.path.exists(skills_dir):
for root, dirs, files in os.walk(skills_dir):
dirs[:] = [d for d in dirs if d != "__pycache__"]
for file in files:
file_path = os.path.join(root, file)
rel_path = os.path.join("skills", os.path.relpath(file_path, skills_dir))
zip_file.write(file_path, rel_path)
# 3. Add the generated config YAML as 'agent_config.yaml'
zip_file.writestr("agent_config.yaml", config_yaml)
# 4. Add README and run.sh / run.bat / run_mac.command
zip_file.writestr("README.md", README_CONTENT)
# Create run.sh with execute permissions (external_attr)
run_sh_info = zipfile.ZipInfo("run.sh")
run_sh_info.external_attr = 0o100755 << 16 # -rwxr-xr-x
run_sh_info.compress_type = zipfile.ZIP_DEFLATED
zip_file.writestr(run_sh_info, RUN_SH_CONTENT)
# Create run_mac.command (Mac double-clickable)
run_mac_info = zipfile.ZipInfo("run_mac.command")
run_mac_info.external_attr = 0o100755 << 16 # -rwxr-xr-x
run_mac_info.compress_type = zipfile.ZIP_DEFLATED
zip_file.writestr(run_mac_info, RUN_MAC_CONTENT)
# Create run.bat
zip_file.writestr("run.bat", RUN_BAT_CONTENT)
zip_buffer.seek(0)
return StreamingResponse(
zip_buffer,
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment; filename=cortex-node-{node_id}.zip"}
)
# ==================================================================
# M4: Invite Token Validation (called internally by gRPC server)
# ==================================================================
@router.post("/validate-token", summary="[Internal] Validate Node Invite Token")
def validate_invite_token(token: str, node_id: str, db: Session = Depends(get_db)):
"""
Internal HTTP endpoint called by the gRPC SyncConfiguration handler
to validate an invite_token before accepting a node connection.
Returns the node's skill_config (sandbox policy) on success so the
gRPC server can populate the SandboxPolicy response.
Response:
200 { valid: true, node_id, skill_config, display_name }
401 { valid: false, reason }
"""
node = db.query(models.AgentNode).filter(
models.AgentNode.node_id == node_id,
models.AgentNode.invite_token == token,
models.AgentNode.is_active == True,
).first()
if not node:
logger.warning(f"[M4] Token validation FAILED for node_id='{node_id}'")
return {"valid": False, "reason": "Invalid token or unknown node."}
logger.info(f"[M4] Token validated OK for node_id='{node_id}'")
return {
"valid": True,
"node_id": node.node_id,
"display_name": node.display_name,
"user_id": node.registered_by, # AgentNode has registered_by, not user_id
"skill_config": node.skill_config or {},
}
# ==================================================================
# WEBSOCKET — Single-node live event stream
# ==================================================================
@router.websocket("/{node_id}/stream")
async def node_event_stream(websocket: WebSocket, node_id: str):
"""
Single-node live event stream with Full-Duplex communication.
Provides gaming-fast terminal polling, sending commands inbound over the same WS connection!
"""
await websocket.accept()
registry = _registry()
live = registry.get_node(node_id)
await websocket.send_json({
"event": "snapshot",
"node_id": node_id,
"timestamp": _now(),
"data": live.to_dict() if live else {"status": "offline"},
})
q: queue.Queue = queue.Queue()
registry.subscribe_node(node_id, q)
async def send_events():
import time
last_heartbeat = 0
try:
while True:
await _drain(q, websocket)
now = time.time()
if now - last_heartbeat > HEARTBEAT_INTERVAL_S:
live_node = registry.get_node(node_id)
await websocket.send_json({
"event": "heartbeat", "node_id": node_id, "timestamp": _now(),
"data": {"status": live_node._compute_status() if live_node else "offline",
"stats": live_node.stats if live_node else {}},
})
last_heartbeat = now
# Extremely fast poll loop for real-time latency
await asyncio.sleep(0.02)
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"[nodes/stream_sender] {node_id}: {e}")
async def receive_events():
from app.protos import agent_pb2
from app.core.grpc.utils.crypto import sign_payload
import uuid
try:
while True:
data = await websocket.receive_json()
if data.get("action") == "ping":
await websocket.send_json({
"event": "pong",
"node_id": node_id,
"timestamp": _now(),
"client_ts": data.get("ts") # Echo back for RTT calculation
})
continue
if data.get("action") == "dispatch":
live_node = registry.get_node(node_id)
if not live_node:
await websocket.send_json({"event": "task_error", "data": {"stderr": "Node offline"}})
continue
cmd = data.get("command", "")
session_id = data.get("session_id", "")
task_id = str(uuid.uuid4())
registry.emit(node_id, "task_assigned", {"command": cmd, "session_id": session_id}, task_id=task_id)
try:
task_req = agent_pb2.TaskRequest(
task_id=task_id,
task_type="shell",
payload_json=cmd,
signature=sign_payload(cmd),
timeout_ms=0,
session_id=session_id
)
live_node.queue.put(agent_pb2.ServerTaskMessage(task_request=task_req))
registry.emit(node_id, "task_start", {"command": cmd}, task_id=task_id)
except Exception as e:
logger.error(f"[ws/dispatch] Error: {e}")
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"[nodes/stream_receive] {node_id}: {e}")
sender_task = asyncio.create_task(send_events())
receiver_task = asyncio.create_task(receive_events())
try:
done, pending = await asyncio.wait(
[sender_task, receiver_task],
return_when=asyncio.FIRST_COMPLETED
)
for t in pending:
t.cancel()
except asyncio.CancelledError:
sender_task.cancel()
receiver_task.cancel()
finally:
registry.unsubscribe_node(node_id, q)
# ==================================================================
# WEBSOCKET — Multi-node global execution bus
# ==================================================================
@router.websocket("/stream/all")
async def all_nodes_event_stream(websocket: WebSocket, user_id: str):
"""
Multi-node global event bus for a user.
Powers the split-window multi-pane execution UI.
High-performance edition: streams events with millisecond latency.
"""
try:
await websocket.accept()
except Exception as e:
logger.error(f"[📶] WebSocket accept failed for user={user_id}: {e}")
return
registry = _registry()
logger.info(f"[📶] Multi-node stream connected for user={user_id}")
try:
# 1. Send initial snapshot immediately
try:
all_live = registry.list_nodes(user_id=user_id)
logger.info(f"[📶] Sending initial snapshot for user={user_id} with {len(all_live)} nodes")
snapshot_data = {
"event": "initial_snapshot",
"user_id": user_id,
"timestamp": _now(),
"data": {"nodes": [n.to_dict() for n in all_live], "count": len(all_live)},
}
await websocket.send_json(snapshot_data)
logger.info(f"[📶] Initial snapshot sent successfully for user={user_id}")
except Exception as e:
logger.error(f"[📶] Failed to send initial snapshot for user={user_id}: {e}", exc_info=True)
await websocket.close(code=1011) # Internal Error
return
q: queue.Queue = queue.Queue()
registry.subscribe_user(user_id, q)
async def send_events():
import time
last_heartbeat = 0
try:
while True:
# Drain all events from queue and send
await _drain(q, websocket)
now = time.time()
if now - last_heartbeat > HEARTBEAT_INTERVAL_S:
live_nodes = registry.list_nodes(user_id=user_id)
await websocket.send_json({
"event": "mesh_heartbeat",
"user_id": user_id,
"timestamp": _now(),
"data": {
"nodes": [{"node_id": n.node_id, "status": n._compute_status(), "stats": n.stats}
for n in live_nodes]
},
})
last_heartbeat = now
# High-frequency polling (20Hz) for gaming-fast UI updates
await asyncio.sleep(0.05)
except WebSocketDisconnect:
logger.info(f"[📶] Sender disconnected for user={user_id}")
except Exception as e:
logger.error(f"[nodes/stream/all_sender] CRASH for user={user_id}: {e}", exc_info=True)
async def receive_events():
"""Keep connection alive and handle client-initiated pings/close."""
try:
while True:
# Consume client messages to prevent buffer bloat
data = await websocket.receive_json()
if data.get("action") == "ping":
await websocket.send_json({
"event": "pong",
"user_id": user_id,
"timestamp": _now(),
"client_ts": data.get("ts")
})
except WebSocketDisconnect:
logger.info(f"[📶] Receiver disconnected for user={user_id}")
except Exception as e:
logger.error(f"[nodes/stream/all_receiver] CRASH for user={user_id}: {e}", exc_info=True)
# Run sender and receiver concurrently
sender_task = asyncio.create_task(send_events())
receiver_task = asyncio.create_task(receive_events())
try:
done, pending = await asyncio.wait(
[sender_task, receiver_task],
return_when=asyncio.FIRST_COMPLETED
)
for t in pending:
t.cancel()
except asyncio.CancelledError:
sender_task.cancel()
receiver_task.cancel()
finally:
registry.unsubscribe_user(user_id, q)
logger.info(f"[📶] Multi-node stream disconnected for user={user_id}")
except Exception as e:
logger.error(f"[nodes/stream/all] Error in stream handler for user={user_id}: {e}", exc_info=True)
# Socket will be closed by FastAPI on uncaught exception if not already closed
# ==================================================================
# FS EXPLORER ENDPOINTS (Modular Navigator)
# ==================================================================
@router.get("/{node_id}/fs/ls", response_model=schemas.DirectoryListing, summary="List Directory Content")
def fs_ls(node_id: str, path: str = ".", session_id: str = "__fs_explorer__"):
"""
Request a directory listing from a node.
Returns a tree-structured list for the File Navigator.
"""
try:
# Defensive check for orchestrator service injection
try:
orchestrator = services.orchestrator
except AttributeError:
logger.error("[FS] Orchestrator service not found in ServiceContainer.")
raise HTTPException(status_code=500, detail="Agent Orchestrator service is starting or unavailable.")
res = orchestrator.assistant.ls(node_id, path, session_id=session_id)
if not res:
logger.error(f"[FS] Received empty response from node {node_id} for path {path}")
raise HTTPException(status_code=500, detail="Node returned an empty response.")
if isinstance(res, dict) and "error" in res:
status_code = 404 if res["error"] == "Offline" else 500
logger.warning(f"[FS] Explorer Error for {node_id}: {res['error']}")
raise HTTPException(status_code=status_code, detail=res["error"])
files = res.get("files", [])
# M6: Check sync status ONLY for real user sessions, not for the node-wide navigator
if session_id != "__fs_explorer__":
workspace_mirror = orchestrator.mirror.get_workspace_path(session_id)
for f in files:
mirror_item_path = os.path.join(workspace_mirror, f["path"])
f["is_synced"] = os.path.exists(mirror_item_path)
return schemas.DirectoryListing(node_id=node_id, path=path, files=files)
except HTTPException:
raise
except Exception as e:
logger.error(f"[FS] Unexpected error in fs_ls: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
@router.get("/{node_id}/fs/cat", summary="Read File Content")
def fs_cat(node_id: str, path: str, session_id: str = "__fs_explorer__"):
"""
Read the content of a file on a remote node.
"""
try:
orchestrator = services.orchestrator
res = orchestrator.assistant.cat(node_id, path, session_id=session_id)
if not res:
raise HTTPException(status_code=500, detail="Node returned an empty response.")
if isinstance(res, dict) and "error" in res:
raise HTTPException(status_code=500, detail=res["error"])
return res # Expecting {"content": "..."}
except AttributeError:
raise HTTPException(status_code=500, detail="Orchestrator unavailable.")
except Exception as e:
logger.error(f"[FS] Cat error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/{node_id}/fs/touch", summary="Create File or Directory")
def fs_touch(node_id: str, req: schemas.FileWriteRequest):
"""
Create a new file or directory on the node.
"""
try:
orchestrator = services.orchestrator
res = orchestrator.assistant.write(
node_id,
req.path,
req.content.encode('utf-8'),
req.is_dir,
session_id=req.session_id
)
if not res:
raise HTTPException(status_code=500, detail="Node returned an empty response.")
if isinstance(res, dict) and "error" in res:
raise HTTPException(status_code=500, detail=res["error"])
return res # Expecting {"success": bool, "message": str}
except AttributeError:
raise HTTPException(status_code=500, detail="Orchestrator unavailable.")
except Exception as e:
logger.error(f"[FS] Touch error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/{node_id}/fs/rm", summary="Delete File/Directory")
def fs_rm(node_id: str, req: schemas.FileDeleteRequest):
"""
Delete a file or directory from a remote node.
"""
try:
orchestrator = services.orchestrator
res = orchestrator.assistant.rm(node_id, req.path, session_id=req.session_id)
if not res:
raise HTTPException(status_code=500, detail="Node returned an empty response.")
return res
except Exception as e:
logger.error(f"[FS] Delete error: {e}")
raise HTTPException(status_code=500, detail=str(e))
return router
# ===========================================================================
# Helpers
# ===========================================================================
def _generate_node_config_yaml(node: models.AgentNode) -> str:
"""Helper to generate the agent_config.yaml content."""
hub_url = os.getenv("HUB_PUBLIC_URL", "https://ai.jerxie.com")
hub_grpc = os.getenv("HUB_GRPC_ENDPOINT", "ai.jerxie.com:50051")
secret_key = os.getenv("SECRET_KEY", "dev-secret-key-1337")
skill_cfg = node.skill_config or {}
if isinstance(skill_cfg, str):
try:
skill_cfg = json.loads(skill_cfg)
except Exception:
skill_cfg = {}
lines = [
"# Cortex Hub — Agent Node Configuration",
f"# Generated for node '{node.node_id}' — keep this file secret.",
"",
f"node_id: \"{node.node_id}\"",
f"node_description: \"{node.display_name}\"",
"",
"# Hub connection",
f"hub_url: \"{hub_url}\"",
f"grpc_endpoint: \"{hub_grpc}\"",
"",
"# Authentication — do NOT share these secrets",
f"invite_token: \"{node.invite_token}\"",
f"auth_token: \"{node.invite_token}\"",
"",
"# HMAC signing key — must match the hub's SECRET_KEY exactly",
f"secret_key: \"{secret_key}\"",
"",
"# Skill configuration (mirrors admin settings; node respects these at startup)",
"skills:",
]
for skill, cfg in skill_cfg.items():
if not isinstance(cfg, dict):
continue
enabled = cfg.get("enabled", True)
lines.append(f" {skill}:")
lines.append(f" enabled: {str(enabled).lower()}")
for k, v in cfg.items():
if k != "enabled" and v is not None:
lines.append(f" {k}: {v}")
lines += [
"",
"# Workspace sync root — override if needed",
"sync_root: \"/tmp/cortex-workspace\"",
"",
"# TLS — set to false only in dev",
"tls: true",
]
return "\n".join(lines)
def _get_node_or_404(node_id: str, db: Session) -> models.AgentNode:
node = db.query(models.AgentNode).filter(models.AgentNode.node_id == node_id).first()
if not node:
raise HTTPException(status_code=404, detail=f"Node '{node_id}' not found.")
return node
def _node_to_admin_detail(node: models.AgentNode, registry) -> schemas.AgentNodeAdminDetail:
live = registry.get_node(node.node_id)
status = live._compute_status() if live else node.last_status or "offline"
stats = schemas.AgentNodeStats(**live.stats) if live else schemas.AgentNodeStats()
return schemas.AgentNodeAdminDetail(
node_id=node.node_id,
display_name=node.display_name,
description=node.description,
skill_config=node.skill_config or {},
capabilities=node.capabilities or {},
invite_token=node.invite_token,
is_active=node.is_active,
last_status=status,
last_seen_at=node.last_seen_at,
created_at=node.created_at,
registered_by=node.registered_by,
group_access=[
schemas.NodeAccessResponse(
id=a.id, node_id=a.node_id, group_id=a.group_id,
access_level=a.access_level, granted_at=a.granted_at
) for a in (node.group_access or [])
],
stats=stats,
)
def _node_to_user_view(node: models.AgentNode, registry) -> schemas.AgentNodeUserView:
live = registry.get_node(node.node_id)
# The record should only show online if it's currently connected and in the live gRPC registry map.
# We default back to "offline" even if the DB record says "online" (zombie fix).
status = live._compute_status() if live else "offline"
skill_cfg = node.skill_config or {}
if isinstance(skill_cfg, str):
import json
try: skill_cfg = json.loads(skill_cfg)
except: skill_cfg = {}
available = [skill for skill, cfg in skill_cfg.items() if isinstance(cfg, dict) and cfg.get("enabled", True)]
stats = live.stats if live else {}
return schemas.AgentNodeUserView(
node_id=node.node_id,
display_name=node.display_name,
description=node.description,
capabilities=node.capabilities or {},
available_skills=available,
last_status=status,
last_seen_at=node.last_seen_at,
stats=schemas.AgentNodeStats(**stats) if stats else schemas.AgentNodeStats()
)
def _now() -> str:
from datetime import datetime
return datetime.utcnow().isoformat()
async def _drain(q: queue.Queue, websocket: WebSocket):
"""Drain all pending queue items and send to websocket (non-blocking)."""
while True:
try:
event = q.get_nowait()
await websocket.send_json(event)
except queue.Empty:
break