"""
Agent Node REST + WebSocket API
Admin-managed nodes, group access control, and user-facing live streaming.
Admin endpoints (require role=admin):
POST /nodes/ — Create node registration + generate invite_token
GET /nodes/admin — List all nodes (admin view, full detail)
GET /nodes/admin/{node_id} — Full admin detail including invite_token
PATCH /nodes/admin/{node_id} — Update node config (description, skill_config, is_active)
POST /nodes/admin/{node_id}/access — Grant group access to a node
DELETE /nodes/admin/{node_id}/access/{group_id} — Revoke group access
User endpoints (scoped to caller's group):
GET /nodes/ — List accessible nodes (user view, no sensitive data)
GET /nodes/{node_id}/status — Quick online/offline probe
POST /nodes/{node_id}/dispatch — Dispatch a task to a node
PATCH /nodes/preferences — Update user's default_nodes + data_source prefs
WebSocket (real-time streaming):
WS /nodes/{node_id}/stream — Single-node live execution stream
WS /nodes/stream/all?user_id=... — All-nodes global bus (multi-pane UI)
"""
import asyncio
import os
import json
import queue
import uuid
import secrets
from typing import Optional, Annotated
import logging
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends, Query, Header, Request, UploadFile, File
from fastapi.responses import StreamingResponse, FileResponse
from sqlalchemy.orm import Session
from app.api.dependencies import ServiceContainer, get_db
from app.config import settings
from app.api import schemas
from app.db import models
logger = logging.getLogger(__name__)
HEARTBEAT_INTERVAL_S = 5
def create_nodes_router(services: ServiceContainer) -> APIRouter:
router = APIRouter(prefix="/nodes", tags=["Agent Nodes"])
def _registry():
return services.node_registry_service
def _require_admin(user_id: str, db: Session):
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user or user.role != "admin":
raise HTTPException(status_code=403, detail="Admin access required.")
return user
def _require_node_access(user_id: str, node_id: str, db: Session):
"""
Ensures the user has permission to interact with a specific node.
Delegated to MeshService.
"""
return services.mesh_service.require_node_access(user_id, node_id, db)
# ==================================================================
# ADMIN ENDPOINTS
# ==================================================================
@router.post("/admin", response_model=schemas.AgentNodeAdminDetail, summary="[Admin] Register New Node")
def admin_create_node(request: schemas.AgentNodeCreate, admin_id: str = Query(...), db: Session = Depends(get_db)):
_require_admin(admin_id, db)
node = services.mesh_service.register_node(request, admin_id, db)
logger.info(f"[admin] Created node '{request.node_id}' by admin {admin_id}")
return services.mesh_service.node_to_admin_detail(node)
@router.get("/admin", response_model=list[schemas.AgentNodeAdminDetail], summary="[Admin] List All Nodes")
def admin_list_nodes(admin_id: str = Query(...), db: Session = Depends(get_db)):
_require_admin(admin_id, db)
return [services.mesh_service.node_to_admin_detail(n) for n in db.query(models.AgentNode).all()]
@router.get("/admin/{node_id}", response_model=schemas.AgentNodeAdminDetail, summary="[Admin] Get Node Detail")
def admin_get_node(node_id: str, admin_id: str = Query(...), db: Session = Depends(get_db)):
_require_admin(admin_id, db)
node = services.mesh_service.get_node_or_404(node_id, db)
return services.mesh_service.node_to_admin_detail(node)
@router.patch("/admin/{node_id}", response_model=schemas.AgentNodeAdminDetail, summary="[Admin] Update Node Config")
def admin_update_node(node_id: str, update: schemas.AgentNodeUpdate, admin_id: str = Query(...), db: Session = Depends(get_db)):
_require_admin(admin_id, db)
node = services.mesh_service.update_node(node_id, update, db)
return services.mesh_service.node_to_admin_detail(node)
@router.delete("/admin/{node_id}", summary="[Admin] Deregister Node")
def admin_delete_node(node_id: str, admin_id: str = Query(...), db: Session = Depends(get_db)):
_require_admin(admin_id, db)
services.mesh_service.delete_node(node_id, db)
return {"status": "success", "message": f"Node {node_id} deleted"}
@router.post("/admin/{node_id}/access", response_model=schemas.NodeAccessResponse, summary="[Admin] Grant Group Access")
def admin_grant_access(node_id: str, grant: schemas.NodeAccessGrant, admin_id: str = Query(...), db: Session = Depends(get_db)):
_require_admin(admin_id, db)
services.mesh_service.grant_access(node_id, grant, admin_id, db)
return db.query(models.NodeGroupAccess).filter(
models.NodeGroupAccess.node_id == node_id,
models.NodeGroupAccess.group_id == grant.group_id
).first()
@router.delete("/admin/{node_id}/access/{group_id}", summary="[Admin] Revoke Group Access")
def admin_revoke_access(
node_id: str,
group_id: str,
admin_id: str = Query(...),
db: Session = Depends(get_db)
):
_require_admin(admin_id, db)
access = db.query(models.NodeGroupAccess).filter(
models.NodeGroupAccess.node_id == node_id,
models.NodeGroupAccess.group_id == group_id
).first()
if not access:
raise HTTPException(status_code=404, detail="Access grant not found.")
db.delete(access)
db.commit()
return {"message": f"Access revoked for group '{group_id}' on node '{node_id}'."}
@router.post("/admin/mesh/reset", summary="[Admin] Emergency Mesh Reset")
def admin_reset_mesh(
admin_id: str = Query(...),
db: Session = Depends(get_db)
):
"""
DANGEROUS: Clears ALL live node collections from memory and resets DB statuses to offline.
Use this to resolve 'zombie' nodes or flapping connections.
"""
_require_admin(admin_id, db)
count = _registry().emergency_reset()
logger.warning(f"[Admin] Mesh Reset triggered by {admin_id}. Cleared {count} live nodes.")
return {"status": "success", "cleared_count": count}
@router.post("/purge", summary="Node Self-Purge")
def node_self_purge(
node_id: str = Query(...),
token: str = Query(...),
db: Session = Depends(get_db)
):
"""
Allows a node to deregister itself using its invite_token.
Called by the purge.py script during uninstallation.
"""
node = db.query(models.AgentNode).filter(
models.AgentNode.node_id == node_id,
models.AgentNode.invite_token == token
).first()
if not node:
raise HTTPException(status_code=401, detail="Invalid node or token.")
node_id = node.node_id
services.mesh_service.delete_node(node_id, db)
logger.info(f"[Mesh] Node '{node_id}' successfully purged itself.")
return {"status": "success", "message": f"Node {node_id} deregistered."}
# ==================================================================
# USER-FACING ENDPOINTS
# ==================================================================
@router.get("/", response_model=list[schemas.AgentNodeUserView], summary="List Accessible Nodes")
def list_accessible_nodes(user_id: str = Query(...), db: Session = Depends(get_db)):
nodes = services.mesh_service.list_accessible_nodes(user_id, db)
registry = _registry()
return [services.mesh_service.node_to_user_view(n, registry) for n in nodes]
@router.get("/{node_id}/status", summary="Quick Node Online Check")
def get_node_status(
node_id: str,
user_id: str = Header(..., alias="X-User-ID"),
db: Session = Depends(get_db)
):
_require_node_access(user_id, node_id, db)
live = _registry().get_node(node_id)
if not live:
return {"node_id": node_id, "status": "offline"}
return {"node_id": node_id, "status": live._compute_status(), "stats": live.stats}
@router.get("/{node_id}/terminal", summary="Read Node Terminal History (AI Use Case)")
def get_node_terminal(
node_id: str,
user_id: str = Header(..., alias="X-User-ID"),
db: Session = Depends(get_db)
):
"""
AI-Specific: Returns the most recent 150 terminal interaction chunks for a live node.
This provides context for the AI reasoning agent.
"""
_require_node_access(user_id, node_id, db)
live = _registry().get_node(node_id)
if not live:
return {"node_id": node_id, "status": "offline", "terminal": []}
return {
"node_id": node_id,
"status": live._compute_status(),
"terminal": live.terminal_history,
"session_id": live.session_id
}
@router.post("/{node_id}/dispatch", response_model=schemas.NodeDispatchResponse, summary="Dispatch Task to Node")
def dispatch_to_node(node_id: str, request: schemas.NodeDispatchRequest, user_id: str = Query(...), db: Session = Depends(get_db)):
task_id = services.mesh_service.dispatch_task(
node_id, request.command, user_id, db,
session_id=request.session_id, task_id=request.task_id, timeout_ms=request.timeout_ms
)
return schemas.NodeDispatchResponse(task_id=task_id, status="accepted")
@router.post("/{node_id}/cancel", summary="Cancel/Interrupt Task on Node")
def cancel_on_node(
node_id: str,
task_id: str = "",
user_id: str = Header(..., alias="X-User-ID"),
db: Session = Depends(get_db)
):
"""
Sends a TaskCancelRequest to the specified node.
For shell skills, this typically translates to SIGINT (Ctrl+C).
"""
_require_node_access(user_id, node_id, db)
registry = _registry()
live = registry.get_node(node_id)
if not live:
raise HTTPException(status_code=503, detail=f"Node '{node_id}' is not connected.")
from app.protos import agent_pb2
cancel_req = agent_pb2.TaskCancelRequest(task_id=task_id)
live.send_message(agent_pb2.ServerTaskMessage(task_cancel=cancel_req), priority=0)
registry.emit(node_id, "task_cancel", {"task_id": task_id})
return {"status": "cancel_sent", "task_id": task_id}
@router.patch("/preferences", summary="Update User Node Preferences")
def update_node_preferences(
user_id: str = Query(...),
prefs: schemas.UserNodePreferences = None,
db: Session = Depends(get_db)
):
"""
Save the user's default_node_ids and data_source config into their preferences.
The UI reads this to auto-attach nodes when a new session starts.
"""
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found.")
# Create a new dictionary to ensure SQLAlchemy detects the change to the JSON column
current_prefs = dict(user.preferences or {})
current_prefs["nodes"] = prefs.model_dump()
user.preferences = current_prefs
db.commit()
return {"message": "Node preferences saved.", "nodes": prefs.model_dump()}
@router.get("/preferences", response_model=schemas.UserNodePreferences, summary="Get User Node Preferences")
def get_node_preferences(user_id: str, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found.")
node_prefs = (user.preferences or {}).get("nodes", {})
return schemas.UserNodePreferences(**node_prefs) if node_prefs else schemas.UserNodePreferences()
@router.get("/admin/{node_id}/config.yaml", response_model=schemas.NodeConfigYamlResponse, summary="[Admin] Download Node Config YAML")
def download_node_config_yaml(node_id: str, admin_id: str = Query(...), db: Session = Depends(get_db)):
_require_admin(admin_id, db)
node = services.mesh_service.get_node_or_404(node_id, db)
config_yaml = services.mesh_service.generate_node_config_yaml(node)
return schemas.NodeConfigYamlResponse(node_id=node_id, config_yaml=config_yaml)
@router.get("/provision/{node_id}", summary="Headless Provisioning Script (Python)")
def provision_node(node_id: str, token: str, request: Request, db: Session = Depends(get_db)):
from fastapi.responses import PlainTextResponse
node = db.query(models.AgentNode).filter(models.AgentNode.node_id == node_id).first()
if not node or node.invite_token != token: raise HTTPException(status_code=403, detail="Invalid node or token.")
config_yaml = services.mesh_service.generate_node_config_yaml(node)
base_url = f"{request.url.scheme}://{request.url.netloc}"
return PlainTextResponse(services.mesh_service.generate_provisioning_script(node, config_yaml, base_url))
@router.get("/provision/sh/{node_id}", summary="Headless Provisioning Script (Bash Binary)")
def provision_node_sh(node_id: str, token: str, request: Request, db: Session = Depends(get_db)):
"""
Returns a Bash script that curls and executes the compiled standalone binary.
Usage: curl -sSL https://.../provision/sh/{node_id}?token={token} | bash
"""
from fastapi.responses import PlainTextResponse
node = db.query(models.AgentNode).filter(models.AgentNode.node_id == node_id).first()
if not node or node.invite_token != token:
raise HTTPException(status_code=403, detail="Invalid node or token.")
config_yaml = services.mesh_service.generate_node_config_yaml(node)
base_url = settings.HUB_PUBLIC_URL or f"{request.url.scheme}://{request.url.netloc}"
script = services.mesh_service.generate_provisioning_sh(node, config_yaml, base_url)
return PlainTextResponse(script)
@router.get("/provision/ps1/{node_id}", summary="Headless Provisioning Script (PowerShell)")
def provision_node_ps1(node_id: str, token: str, request: Request, db: Session = Depends(get_db)):
"""
Returns a PowerShell script that can be piped into IEX to automatically
install and start the agent node on Windows.
Usage: powershell -Command "irm http://.../provision/ps1/{node_id}?token={token} | iex"
"""
from fastapi.responses import PlainTextResponse
node = db.query(models.AgentNode).filter(models.AgentNode.node_id == node_id).first()
if not node or node.invite_token != token:
raise HTTPException(status_code=403, detail="Invalid node or token.")
config_yaml = services.mesh_service.generate_node_config_yaml(node, is_windows=True)
base_url = settings.HUB_PUBLIC_URL or f"{request.url.scheme}://{request.url.netloc}"
grpc_url = settings.GRPC_TARGET_ORIGIN or f"{request.url.hostname}:{settings.GRPC_PORT}"
script = services.mesh_service.generate_provisioning_ps1(node, config_yaml, base_url, grpc_url=grpc_url)
return PlainTextResponse(script)
@router.get("/provision/binary/{node_id}/{arch}", summary="Download Self-Contained Binary ZIP Bundle")
def provision_node_binary_bundle(node_id: str, arch: str, token: str, db: Session = Depends(get_db)):
return services.mesh_service.download_binary_bundle(node_id, arch, token, db)
@router.get("/provision/binaries/status", summary="Check binary availability status")
def get_binaries_status(db: Session = Depends(get_db)):
from app.api.routes.agent_update import _AGENT_NODE_DIR
dist_dir = os.path.join(_AGENT_NODE_DIR, "dist")
available = []
if os.path.exists(dist_dir):
for arch in os.listdir(dist_dir):
if os.path.exists(os.path.join(dist_dir, arch, "cortex-agent")):
available.append(arch)
return {"available_architectures": available}
@router.get("/admin/{node_id}/download", summary="[Admin] Download Agent Node Bundle (ZIP)")
def admin_download_bundle(node_id: str, admin_id: str = Query(...), db: Session = Depends(get_db)):
return services.mesh_service.download_admin_bundle(node_id, admin_id, db)
# ==================================================================
# M4: Invite Token Validation (called internally by gRPC server)
# ==================================================================
@router.post("/validate-token", summary="[Internal] Validate Node Invite Token")
def validate_invite_token(token: str, node_id: str, db: Session = Depends(get_db)):
result = services.mesh_service.validate_invite_token(token, node_id, db)
if not result["valid"]:
logger.warning(f"[M4] Token validation FAILED for node_id='{node_id}': {result.get('reason')}")
else:
logger.info(f"[M4] Token validated OK for node_id='{node_id}'")
return result
# ==================================================================
# WEBSOCKET — Single-node live event stream
# ==================================================================
@router.websocket("/{node_id}/stream")
async def node_event_stream(
websocket: WebSocket,
node_id: str,
user_id: str = Query(...)
):
"""
Single-node live event stream with Full-Duplex communication.
Provides gaming-fast terminal polling, sending commands inbound over the same WS connection!
"""
from app.db.session import get_db_session
with get_db_session() as db:
try:
_require_node_access(user_id, node_id, db)
except HTTPException as e:
await websocket.accept()
await websocket.send_json({"event": "error", "message": e.detail})
await websocket.close(code=4003)
return
await websocket.accept()
registry = _registry()
live = registry.get_node(node_id)
await websocket.send_json({
"event": "snapshot",
"node_id": node_id,
"timestamp": _now(),
"data": live.to_dict() if live else {"status": "offline"},
})
q: queue.Queue = queue.Queue(maxsize=500) # Capped to prevent memory leak
registry.subscribe_node(node_id, q)
async def send_events():
import time
last_heartbeat = 0
try:
while True:
await _drain(q, websocket)
now = time.time()
if now - last_heartbeat > HEARTBEAT_INTERVAL_S:
live_node = registry.get_node(node_id)
await websocket.send_json({
"event": "heartbeat", "node_id": node_id, "timestamp": _now(),
"data": {"status": live_node._compute_status() if live_node else "offline",
"stats": live_node.stats if live_node else {}},
})
last_heartbeat = now
# Extremely fast poll loop for real-time latency
await asyncio.sleep(0.02)
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"[nodes/stream_sender] {node_id}: {e}")
async def receive_events():
from app.protos import agent_pb2
from app.core.grpc.utils.crypto import sign_payload
import uuid
try:
while True:
data = await websocket.receive_json()
if data.get("action") == "ping":
await websocket.send_json({
"event": "pong",
"node_id": node_id,
"timestamp": _now(),
"client_ts": data.get("ts") # Echo back for RTT calculation
})
continue
if data.get("action") == "terminal_in":
live_node = registry.get_node(node_id)
if not live_node: continue
cmd = json.dumps({"tty": data.get("data", "")})
# Wrap keystrokes in a high-priority TaskRequest
task_req = agent_pb2.TaskRequest(
task_id=f"tty-{id(data)}",
payload_json=cmd,
signature=sign_payload(cmd),
timeout_ms=0,
session_id=data.get("session_id", "")
)
live_node.send_message(agent_pb2.ServerTaskMessage(task_request=task_req), priority=1)
continue
if data.get("action") == "resize":
live_node = registry.get_node(node_id)
if not live_node: continue
cmd = json.dumps({
"action": "resize",
"cols": data.get("cols", 80),
"rows": data.get("rows", 24)
})
task_req = agent_pb2.TaskRequest(
task_id=f"resize-{id(data)}",
payload_json=cmd,
signature=sign_payload(cmd),
timeout_ms=0,
session_id=data.get("session_id", "")
)
live_node.send_message(agent_pb2.ServerTaskMessage(task_request=task_req), priority=1)
continue
if data.get("action") == "dispatch":
live_node = registry.get_node(node_id)
if not live_node:
await websocket.send_json({"event": "task_error", "data": {"stderr": "Node offline"}})
continue
cmd = data.get("command", "")
session_id = data.get("session_id", "")
task_id = str(uuid.uuid4())
registry.emit(node_id, "task_assigned", {"command": cmd, "session_id": session_id}, task_id=task_id)
try:
task_req = agent_pb2.TaskRequest(
task_id=task_id,
task_type="shell",
payload_json=cmd,
signature=sign_payload(cmd),
timeout_ms=0,
session_id=session_id
)
live_node.send_message(agent_pb2.ServerTaskMessage(task_request=task_req), priority=1)
registry.emit(node_id, "task_start", {"command": cmd}, task_id=task_id)
except Exception as e:
logger.error(f"[ws/dispatch] Error: {e}")
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"[nodes/stream_receive] {node_id}: {e}")
sender_task = asyncio.create_task(send_events())
receiver_task = asyncio.create_task(receive_events())
try:
done, pending = await asyncio.wait(
[sender_task, receiver_task],
return_when=asyncio.FIRST_COMPLETED
)
for t in pending:
t.cancel()
except asyncio.CancelledError:
sender_task.cancel()
receiver_task.cancel()
finally:
registry.unsubscribe_node(node_id, q)
# ==================================================================
# WEBSOCKET — Multi-node global execution bus
# ==================================================================
@router.websocket("/stream/all")
async def all_nodes_event_stream(websocket: WebSocket, user_id: str):
"""
Multi-node global event bus for a user.
Powers the split-window multi-pane execution UI.
High-performance edition: streams events with millisecond latency.
"""
from app.db.session import get_db_session
# 1. Identify accessible nodes for this user based on group policy
accessible_ids = []
with get_db_session() as db:
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
logger.warning(f"[📶] User {user_id} not found for global stream.")
return
if user.role == "admin":
accessible_ids = [n.node_id for n in db.query(models.AgentNode).filter(models.AgentNode.is_active == True).all()]
else:
# Nodes accessible via user's group
accesses = db.query(models.NodeGroupAccess).filter(
models.NodeGroupAccess.group_id == user.group_id
).all()
accessible_ids = [a.node_id for a in accesses]
# Nodes in group policy
if user.group and user.group.policy:
policy_nodes = user.group.policy.get("nodes", [])
if isinstance(policy_nodes, list):
accessible_ids.extend(policy_nodes)
accessible_ids = list(set(accessible_ids))
try:
await websocket.accept()
except Exception as e:
logger.error(f"[📶] WebSocket accept failed for user={user_id}: {e}")
return
registry = _registry()
logger.info(f"[📶] Multi-node stream connected for user={user_id}. Accessible nodes: {len(accessible_ids)}")
try:
# 2. Send initial snapshot of only accessible live nodes
try:
all_live = [registry.get_node(nid) for nid in accessible_ids if registry.get_node(nid)]
snapshot_data = {
"event": "initial_snapshot",
"user_id": user_id,
"timestamp": _now(),
"data": {"nodes": [n.to_dict() for n in all_live], "count": len(all_live)},
}
await websocket.send_json(snapshot_data)
except Exception as e:
logger.error(f"[📶] Failed to send initial snapshot for user={user_id}: {e}", exc_info=True)
await websocket.close(code=1011)
return
q: queue.Queue = queue.Queue(maxsize=500)
# Subscribe to each accessible node individually
for nid in accessible_ids:
registry.subscribe_node(nid, q)
async def send_events():
import time
last_heartbeat = 0
try:
while True:
await _drain(q, websocket)
now = time.time()
if now - last_heartbeat > HEARTBEAT_INTERVAL_S:
live_nodes = [registry.get_node(nid) for nid in accessible_ids if registry.get_node(nid)]
await websocket.send_json({
"event": "mesh_heartbeat",
"user_id": user_id,
"timestamp": _now(),
"data": {
"nodes": [{"node_id": n.node_id, "status": n._compute_status(), "stats": n.stats}
for n in live_nodes]
},
})
last_heartbeat = now
await asyncio.sleep(0.05)
except WebSocketDisconnect:
logger.info(f"[📶] Sender disconnected for user={user_id}")
except Exception as e:
logger.error(f"[nodes/stream/all_sender] CRASH for user={user_id}: {e}", exc_info=True)
async def receive_events():
try:
while True:
data = await websocket.receive_json()
if data.get("action") == "ping":
await websocket.send_json({
"event": "pong",
"user_id": user_id,
"timestamp": _now(),
"client_ts": data.get("ts")
})
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"[nodes/stream/all_receiver] CRASH for user={user_id}: {e}", exc_info=True)
sender_task = asyncio.create_task(send_events())
receiver_task = asyncio.create_task(receive_events())
try:
done, pending = await asyncio.wait(
[sender_task, receiver_task],
return_when=asyncio.FIRST_COMPLETED
)
for t in pending:
t.cancel()
except asyncio.CancelledError:
sender_task.cancel()
receiver_task.cancel()
finally:
for nid in accessible_ids:
registry.unsubscribe_node(nid, q)
logger.info(f"[📶] Multi-node stream disconnected for user={user_id}")
except Exception as e:
logger.error(f"[nodes/stream/all] Error in stream handler for user={user_id}: {e}", exc_info=True)
# ==================================================================
# FS EXPLORER ENDPOINTS (Modular Navigator)
# ==================================================================
@router.get("/{node_id}/fs/ls", response_model=schemas.DirectoryListing, summary="List Directory Content")
async def fs_ls(
node_id: str,
path: str = ".",
session_id: str = "__fs_explorer__",
user_id: str = Header(..., alias="X-User-ID"),
db: Session = Depends(get_db)
):
"""
Request a directory listing from a node.
Returns a tree-structured list for the File Navigator.
"""
_require_node_access(user_id, node_id, db)
try:
try:
orchestrator = services.orchestrator
except AttributeError:
logger.error("[FS] Orchestrator service not found in ServiceContainer.")
raise HTTPException(status_code=500, detail="Agent Orchestrator service is starting or unavailable.")
loop = asyncio.get_event_loop()
res = await loop.run_in_executor(None, lambda: orchestrator.assistant.ls(node_id, path, session_id=session_id))
if not res:
logger.error(f"[FS] Received empty response from node {node_id} for path {path}")
raise HTTPException(status_code=500, detail="Node returned an empty response.")
if isinstance(res, dict) and "error" in res:
status_code = 404 if "not found" in res["error"].lower() or res["error"] == "Offline" else 500
logger.warning(f"[FS] Explorer Error for {node_id}: {res['error']}")
raise HTTPException(status_code=status_code, detail=res["error"])
files = res.get("files", [])
if session_id != "__fs_explorer__":
workspace_mirror = orchestrator.mirror.get_workspace_path(session_id)
for f in files:
mirror_item_path = os.path.join(workspace_mirror, f["path"])
f["is_synced"] = os.path.exists(mirror_item_path)
return schemas.DirectoryListing(node_id=node_id, path=path, files=files)
except HTTPException:
raise
except Exception as e:
logger.error(f"[FS] Unexpected error in fs_ls: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
@router.get("/{node_id}/fs/cat", summary="Read File Content")
async def fs_cat(
node_id: str,
path: str,
session_id: str = "__fs_explorer__",
user_id: str = Header(..., alias="X-User-ID"),
db: Session = Depends(get_db)
):
"""Read the content of a file on a remote node."""
_require_node_access(user_id, node_id, db)
try:
orchestrator = services.orchestrator
loop = asyncio.get_event_loop()
res = await loop.run_in_executor(None, lambda: orchestrator.assistant.cat(node_id, path, session_id=session_id))
if not res:
raise HTTPException(status_code=500, detail="Node returned an empty response.")
if isinstance(res, dict) and "error" in res:
status_code = 404 if "not found" in res["error"].lower() else 500
raise HTTPException(status_code=status_code, detail=res["error"])
return res
except HTTPException:
raise
except AttributeError:
raise HTTPException(status_code=500, detail="Orchestrator unavailable.")
except Exception as e:
logger.error(f"[FS] Cat error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/{node_id}/fs/touch", summary="Create File or Directory")
async def fs_touch(
node_id: str,
req: schemas.FileWriteRequest,
user_id: str = Header(..., alias="X-User-ID"),
db: Session = Depends(get_db)
):
"""Create a new file or directory on the node."""
_require_node_access(user_id, node_id, db)
try:
orchestrator = services.orchestrator
loop = asyncio.get_event_loop()
content = req.content.encode('utf-8') if isinstance(req.content, str) else req.content
res = await loop.run_in_executor(None, lambda: orchestrator.assistant.write(node_id, req.path, content, req.is_dir, session_id=req.session_id))
if not res:
raise HTTPException(status_code=500, detail="Node returned an empty response.")
if isinstance(res, dict) and "error" in res:
raise HTTPException(status_code=500, detail=res["error"])
return res
except HTTPException:
raise
except AttributeError:
raise HTTPException(status_code=500, detail="Orchestrator unavailable.")
except Exception as e:
logger.error(f"[FS] Touch error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{node_id}/fs/download", summary="Download File")
def fs_download(
node_id: str,
path: str = Query(...),
session_id: str = "__fs_explorer__",
user_id: str = Header(..., alias="X-User-ID"),
db: Session = Depends(get_db)
):
"""
Download a file from an agent node.
Triggers a fetch to the hub's mirror, then serves it.
"""
_require_node_access(user_id, node_id, db)
try:
orchestrator = services.orchestrator
# First, trigger the cat to get it into the mirror
res = orchestrator.assistant.cat(node_id, path, session_id=session_id)
if isinstance(res, dict) and res.get("error"):
# Mirror won't get the file if the node couldn't provide it.
raise HTTPException(status_code=404, detail=f"File not found: {res.get('error')}")
# Now, serve from mirror
workspace = orchestrator.mirror.get_workspace_path(session_id)
abs_path = os.path.normpath(os.path.join(workspace, path.lstrip("/")))
# Wait a moment for it to land on disk if it's large
import time
max_wait = 5.0
start = time.time()
while not os.path.exists(abs_path) and (time.time() - start) < max_wait:
time.sleep(0.2)
if not os.path.exists(abs_path):
raise HTTPException(status_code=404, detail="File did not reach mirror in time.")
return FileResponse(abs_path, filename=os.path.basename(path))
except HTTPException:
raise
except Exception as e:
logger.error(f"[FS] Download error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/{node_id}/fs/upload", summary="Upload File")
async def fs_upload(
node_id: str,
path: str = Query(...),
file: UploadFile = File(...),
session_id: str = "__fs_explorer__",
user_id: str = Header(..., alias="X-User-ID"),
db: Session = Depends(get_db)
):
"""
Upload a file to an agent node.
"""
_require_node_access(user_id, node_id, db)
try:
orchestrator = services.orchestrator
content = await file.read()
# If path ends in /, treat as parent directory
full_path = os.path.join(path, file.filename) if path.endswith("/") or path == "." else path
res = orchestrator.assistant.write(
node_id,
full_path,
content,
is_dir=False,
session_id=session_id
)
return res
except HTTPException:
raise
except Exception as e:
logger.error(f"[FS] Upload error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/{node_id}/fs/rm", summary="Delete File/Directory")
async def fs_rm(
node_id: str,
req: schemas.FileDeleteRequest,
user_id: str = Header(..., alias="X-User-ID"),
db: Session = Depends(get_db)
):
"""Delete a file or directory from a remote node."""
_require_node_access(user_id, node_id, db)
try:
orchestrator = services.orchestrator
loop = asyncio.get_event_loop()
res = await loop.run_in_executor(None, lambda: orchestrator.assistant.rm(node_id, req.path, session_id=req.session_id))
if not res:
raise HTTPException(status_code=500, detail="Node returned an empty response.")
return res
except HTTPException:
raise
except Exception as e:
logger.error(f"[FS] Delete error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/{node_id}/fs/move", summary="Move/Rename File or Directory")
async def fs_move(
node_id: str,
req: schemas.FileMoveRequest,
user_id: str = Header(..., alias="X-User-ID"),
db: Session = Depends(get_db)
):
"""Atomic move/rename within the mesh workspace."""
_require_node_access(user_id, node_id, db)
try:
orchestrator = services.orchestrator
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: orchestrator.assistant.move(req.session_id, req.old_path, req.new_path))
except Exception as e:
logger.error(f"[FS] Move error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/{node_id}/fs/copy", summary="Copy File or Directory")
async def fs_copy(
node_id: str,
req: schemas.FileCopyRequest,
user_id: str = Header(..., alias="X-User-ID"),
db: Session = Depends(get_db)
):
"""Atomic copy within the mesh workspace."""
_require_node_access(user_id, node_id, db)
try:
orchestrator = services.orchestrator
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: orchestrator.assistant.copy(req.session_id, req.old_path, req.new_path))
except Exception as e:
logger.error(f"[FS] Copy error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{node_id}/fs/stat", response_model=schemas.FileStatResponse, summary="Get File Metadata (Stat)")
def fs_stat(
node_id: str,
path: str = Query(...),
session_id: str = "__fs_explorer__",
user_id: str = Header(..., alias="X-User-ID"),
db: Session = Depends(get_db)
):
"""Get file metadata directly from the Hub mirror."""
_require_node_access(user_id, node_id, db)
try:
orchestrator = services.orchestrator
res = orchestrator.assistant.stat(session_id, path)
if "error" in res:
raise HTTPException(status_code=404, detail=res["error"])
return res
except HTTPException:
raise
except Exception as e:
logger.error(f"[FS] Stat error: {e}")
raise HTTPException(status_code=500, detail=str(e))
return router
return router
async def _drain(q: queue.Queue, websocket: WebSocket):
"""Drain all pending queue items and send to websocket (non-blocking)."""
while True:
try:
event = q.get_nowait()
await websocket.send_json(event)
except queue.Empty:
break
def _now() -> str:
from datetime import datetime
return datetime.utcnow().isoformat()