"""
Agent Node REST + WebSocket API
Admin-managed nodes, group access control, and user-facing live streaming.

Admin endpoints (require role=admin):
  POST  /nodes/                         — Create node registration + generate invite_token
  GET   /nodes/admin                    — List all nodes (admin view, full detail)
  GET   /nodes/admin/{node_id}          — Full admin detail including invite_token
  PATCH /nodes/admin/{node_id}          — Update node config (description, skill_config, is_active)
  POST  /nodes/admin/{node_id}/access   — Grant group access to a node
  DELETE /nodes/admin/{node_id}/access/{group_id} — Revoke group access

User endpoints (scoped to caller's group):
  GET  /nodes/                          — List accessible nodes (user view, no sensitive data)
  GET  /nodes/{node_id}/status          — Quick online/offline probe
  POST /nodes/{node_id}/dispatch        — Dispatch a task to a node
  PATCH /nodes/preferences              — Update user's default_nodes + data_source prefs

WebSocket (real-time streaming):
  WS   /nodes/{node_id}/stream          — Single-node live execution stream
  WS   /nodes/stream/all?user_id=...    — All-nodes global bus (multi-pane UI)
"""
import asyncio
import json
import queue
import uuid
import secrets
import logging
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends
from sqlalchemy.orm import Session


from app.api.dependencies import ServiceContainer, get_db
from app.api import schemas
from app.db import models

logger = logging.getLogger(__name__)

HEARTBEAT_INTERVAL_S = 5


def create_nodes_router(services: ServiceContainer) -> APIRouter:
    router = APIRouter(prefix="/nodes", tags=["Agent Nodes"])

    def _registry():
        return services.node_registry_service

    def _require_admin(user_id: str, db: Session):
        user = db.query(models.User).filter(models.User.id == user_id).first()
        if not user or user.role != "admin":
            raise HTTPException(status_code=403, detail="Admin access required.")
        return user

    # ==================================================================
    #  ADMIN ENDPOINTS
    # ==================================================================

    @router.post("/admin", response_model=schemas.AgentNodeAdminDetail, summary="[Admin] Register New Node")
    def admin_create_node(
        request: schemas.AgentNodeCreate,
        admin_id: str,
        db: Session = Depends(get_db)
    ):
        """
        Admin registers a new Agent Node.
        Returns the node record including a generated invite_token that must be
        placed in the node's config YAML before deployment.
        """
        _require_admin(admin_id, db)

        existing = db.query(models.AgentNode).filter(
            models.AgentNode.node_id == request.node_id
        ).first()
        if existing:
            raise HTTPException(status_code=409, detail=f"Node '{request.node_id}' already exists.")

        # Generate a cryptographically secure invite token
        invite_token = secrets.token_urlsafe(32)

        node = models.AgentNode(
            node_id=request.node_id,
            display_name=request.display_name,
            description=request.description,
            registered_by=admin_id,
            skill_config=request.skill_config.model_dump(),
            invite_token=invite_token,
            last_status="offline",
        )
        db.add(node)
        db.commit()
        db.refresh(node)

        logger.info(f"[admin] Created node '{request.node_id}' by admin {admin_id}")
        return _node_to_admin_detail(node, _registry())

    @router.get("/admin", response_model=list[schemas.AgentNodeAdminDetail], summary="[Admin] List All Nodes")
    def admin_list_nodes(admin_id: str, db: Session = Depends(get_db)):
        """Full node list for admin dashboard, including invite_token and skill config."""
        _require_admin(admin_id, db)
        nodes = db.query(models.AgentNode).all()
        return [_node_to_admin_detail(n, _registry()) for n in nodes]

    @router.get("/admin/{node_id}", response_model=schemas.AgentNodeAdminDetail, summary="[Admin] Get Node Detail")
    def admin_get_node(node_id: str, admin_id: str, db: Session = Depends(get_db)):
        _require_admin(admin_id, db)
        node = _get_node_or_404(node_id, db)
        return _node_to_admin_detail(node, _registry())

    @router.patch("/admin/{node_id}", response_model=schemas.AgentNodeAdminDetail, summary="[Admin] Update Node Config")
    def admin_update_node(
        node_id: str,
        update: schemas.AgentNodeUpdate,
        admin_id: str,
        db: Session = Depends(get_db)
    ):
        """Update display_name, description, skill_config toggles, or is_active."""
        _require_admin(admin_id, db)
        node = _get_node_or_404(node_id, db)

        if update.display_name is not None:
            node.display_name = update.display_name
        if update.description is not None:
            node.description = update.description
        if update.skill_config is not None:
            node.skill_config = update.skill_config.model_dump()
        if update.is_active is not None:
            node.is_active = update.is_active

        db.commit()
        db.refresh(node)
        return _node_to_admin_detail(node, _registry())

    @router.delete("/admin/{node_id}", summary="[Admin] Deregister Node")
    def admin_delete_node(
        node_id: str,
        admin_id: str,
        db: Session = Depends(get_db)
    ):
        """Delete a node registration and all its access grants."""
        _require_admin(admin_id, db)
        node = _get_node_or_404(node_id, db)
        
        # Deregister from live memory if online
        _registry().deregister(node_id)
        
        db.delete(node)
        db.commit()
        return {"status": "success", "message": f"Node {node_id} deleted"}

    @router.post("/admin/{node_id}/access", response_model=schemas.NodeAccessResponse, summary="[Admin] Grant Group Access")
    def admin_grant_access(
        node_id: str,
        grant: schemas.NodeAccessGrant,
        admin_id: str,
        db: Session = Depends(get_db)
    ):
        """Grant a group access to use this node in sessions."""
        _require_admin(admin_id, db)
        node = _get_node_or_404(node_id, db)

        existing = db.query(models.NodeGroupAccess).filter(
            models.NodeGroupAccess.node_id == node_id,
            models.NodeGroupAccess.group_id == grant.group_id
        ).first()
        if existing:
            existing.access_level = grant.access_level
            existing.granted_by = admin_id
            db.commit()
            db.refresh(existing)
            return existing

        access = models.NodeGroupAccess(
            node_id=node_id,
            group_id=grant.group_id,
            access_level=grant.access_level,
            granted_by=admin_id,
        )
        db.add(access)
        db.commit()
        db.refresh(access)
        return access

    @router.delete("/admin/{node_id}/access/{group_id}", summary="[Admin] Revoke Group Access")
    def admin_revoke_access(
        node_id: str,
        group_id: str,
        admin_id: str,
        db: Session = Depends(get_db)
    ):
        _require_admin(admin_id, db)
        access = db.query(models.NodeGroupAccess).filter(
            models.NodeGroupAccess.node_id == node_id,
            models.NodeGroupAccess.group_id == group_id
        ).first()
        if not access:
            raise HTTPException(status_code=404, detail="Access grant not found.")
        db.delete(access)
        db.commit()
        return {"message": f"Access revoked for group '{group_id}' on node '{node_id}'."}

    # ==================================================================
    #  USER-FACING ENDPOINTS
    # ==================================================================

    @router.get("/", response_model=list[schemas.AgentNodeUserView], summary="List Accessible Nodes")
    def list_accessible_nodes(user_id: str, db: Session = Depends(get_db)):
        """
        Returns nodes the calling user's group has access to.
        Merges live connection state from the in-memory registry.
        """
        user = db.query(models.User).filter(models.User.id == user_id).first()
        if not user:
            raise HTTPException(status_code=404, detail="User not found.")

        # Admin sees everything; users see only group-granted nodes
        if user.role == "admin":
            nodes = db.query(models.AgentNode).filter(models.AgentNode.is_active == True).all()
        else:
            # Nodes accessible via user's group (relational)
            accesses = db.query(models.NodeGroupAccess).filter(
                models.NodeGroupAccess.group_id == user.group_id
            ).all()
            node_ids = set([a.node_id for a in accesses])
            
            # Nodes accessible via group policy whitelist
            if user.group and user.group.policy:
                policy_nodes = user.group.policy.get("nodes", [])
                if isinstance(policy_nodes, list):
                    for nid in policy_nodes:
                        node_ids.add(nid)

            nodes = db.query(models.AgentNode).filter(
                models.AgentNode.node_id.in_(list(node_ids)),
                models.AgentNode.is_active == True
            ).all()

        registry = _registry()
        return [_node_to_user_view(n, registry) for n in nodes]

    @router.get("/{node_id}/status", summary="Quick Node Online Check")
    def get_node_status(node_id: str):
        live = _registry().get_node(node_id)
        if not live:
            return {"node_id": node_id, "status": "offline"}
        return {"node_id": node_id, "status": live._compute_status(), "stats": live.stats}

    @router.post("/{node_id}/dispatch", response_model=schemas.NodeDispatchResponse, summary="Dispatch Task to Node")
    def dispatch_to_node(node_id: str, request: schemas.NodeDispatchRequest):
        """
        Queue a shell or browser task to an online node.
        Emits task_assigned immediately so the live UI shows it.
        """
        registry = _registry()
        live = registry.get_node(node_id)
        if not live:
            raise HTTPException(status_code=503, detail=f"Node '{node_id}' is not connected.")

        task_id = request.task_id or str(uuid.uuid4())
        
        # M6: Use the integrated Protobufs & Crypto from app/core/grpc
        from app.protos import agent_pb2
        from app.core.grpc.utils.crypto import sign_payload
        
        payload = request.command or json.dumps(request.browser_action or {})
        registry.emit(node_id, "task_assigned",
                       {"command": request.command, "session_id": request.session_id},
                       task_id=task_id)

        try:
            task_req = agent_pb2.TaskRequest(
                task_id=task_id,
                payload_json=payload,
                signature=sign_payload(payload),
                timeout_ms=request.timeout_ms,
                session_id=request.session_id or "",
            )
            # Push directly to the node's live gRPC outbound queue
            live.queue.put(agent_pb2.ServerTaskMessage(task_request=task_req))
            registry.emit(node_id, "task_start", {"command": request.command}, task_id=task_id)
        except Exception as e:
            logger.error(f"[nodes/dispatch] Failed to put task onto queue for {node_id}: {e}")
            raise HTTPException(status_code=500, detail="Internal Dispatch Error")

        return schemas.NodeDispatchResponse(task_id=task_id, status="accepted")

    @router.post("/{node_id}/cancel", summary="Cancel/Interrupt Task on Node")
    def cancel_on_node(node_id: str, task_id: str = ""):
        """
        Sends a TaskCancelRequest to the specified node.
        For shell skills, this typically translates to SIGINT (Ctrl+C).
        """
        registry = _registry()
        live = registry.get_node(node_id)
        if not live:
            raise HTTPException(status_code=503, detail=f"Node '{node_id}' is not connected.")
        
        from app.protos import agent_pb2
        cancel_req = agent_pb2.TaskCancelRequest(task_id=task_id)
        live.queue.put(agent_pb2.ServerTaskMessage(task_cancel=cancel_req))
        
        registry.emit(node_id, "task_cancel", {"task_id": task_id})
        return {"status": "cancel_sent", "task_id": task_id}

    @router.patch("/preferences", summary="Update User Node Preferences")
    def update_node_preferences(
        user_id: str,
        prefs: schemas.UserNodePreferences,
        db: Session = Depends(get_db)
    ):
        """
        Save the user's default_node_ids and data_source config into their preferences.
        The UI reads this to auto-attach nodes when a new session starts.
        """
        user = db.query(models.User).filter(models.User.id == user_id).first()
        if not user:
            raise HTTPException(status_code=404, detail="User not found.")
        existing_prefs = user.preferences or {}
        existing_prefs["nodes"] = prefs.model_dump()
        user.preferences = existing_prefs
        db.commit()
        return {"message": "Node preferences saved.", "nodes": prefs.model_dump()}

    @router.get("/preferences", response_model=schemas.UserNodePreferences, summary="Get User Node Preferences")
    def get_node_preferences(user_id: str, db: Session = Depends(get_db)):
        user = db.query(models.User).filter(models.User.id == user_id).first()
        if not user:
            raise HTTPException(status_code=404, detail="User not found.")
        node_prefs = (user.preferences or {}).get("nodes", {})
        return schemas.UserNodePreferences(**node_prefs) if node_prefs else schemas.UserNodePreferences()

    # ==================================================================
    #  M4: Config YAML Download (admin only)
    # ==================================================================

    @router.get(
        "/admin/{node_id}/config.yaml",
        response_model=schemas.NodeConfigYamlResponse,
        summary="[Admin] Download Node Config YAML",
    )
    def download_node_config_yaml(node_id: str, admin_id: str, db: Session = Depends(get_db)):
        """
        Generate and return the agent_config.yaml content an admin downloads
        and places alongside the node client software before deployment.

        The YAML contains:
          - node_id, hub_url, invite_token (pre-signed)
          - skill enable/disable toggles matching the admin's skill_config

        Example usage:
          curl -o agent_config.yaml $HUB/nodes/admin/{node_id}/config.yaml?admin_id=...
          # Deploy the node software with this file present.
        """
        import os
        _require_admin(admin_id, db)
        node = _get_node_or_404(node_id, db)

        hub_url = os.getenv("HUB_PUBLIC_URL", "https://ai.jerxie.com")
        hub_grpc = os.getenv("HUB_GRPC_ENDPOINT", "ai.jerxie.com:50051")
        secret_key = os.getenv("SECRET_KEY", "dev-secret-key-1337")
        skill_cfg = node.skill_config or {}
        if isinstance(skill_cfg, str):
            import json
            try:
                skill_cfg = json.loads(skill_cfg)
            except Exception:
                skill_cfg = {}

        lines = [
            "# Cortex Hub — Agent Node Configuration",
            f"# Generated for node '{node_id}' — keep this file secret.",
            "",
            f"node_id: \"{node_id}\"",
            f"node_description: \"{node.display_name}\"",
            "",
            "# Hub connection",
            f"hub_url: \"{hub_url}\"",
            f"grpc_endpoint: \"{hub_grpc}\"",
            "",
            "# Authentication — do NOT share these secrets",
            f"invite_token: \"{node.invite_token}\"",
            f"auth_token: \"{node.invite_token}\"",
            "",
            "# HMAC signing key — must match the hub's SECRET_KEY exactly",
            f"secret_key: \"{secret_key}\"",
            "",
            "# Skill configuration (mirrors admin settings; node respects these at startup)",
            "skills:",
        ]
        for skill, cfg in skill_cfg.items():
            if not isinstance(cfg, dict):
                continue
            enabled = cfg.get("enabled", True)
            lines.append(f"  {skill}:")
            lines.append(f"    enabled: {str(enabled).lower()}")
            for k, v in cfg.items():
                if k != "enabled" and v is not None:
                    lines.append(f"    {k}: {v}")

        lines += [
            "",
            "# Workspace sync root — override if needed",
            "sync_root: \"/tmp/cortex-workspace\"",
            "",
            "# TLS — set to false only in dev",
            "tls: true",
        ]

        config_yaml = "\n".join(lines)
        return schemas.NodeConfigYamlResponse(node_id=node_id, config_yaml=config_yaml)


    # ==================================================================
    #  M4: Invite Token Validation (called internally by gRPC server)
    # ==================================================================

    @router.post("/validate-token", summary="[Internal] Validate Node Invite Token")
    def validate_invite_token(token: str, node_id: str, db: Session = Depends(get_db)):
        """
        Internal HTTP endpoint called by the gRPC SyncConfiguration handler
        to validate an invite_token before accepting a node connection.

        Returns the node's skill_config (sandbox policy) on success so the
        gRPC server can populate the SandboxPolicy response.

        Response:
          200 { valid: true, node_id, skill_config, display_name }
          401 { valid: false, reason }
        """
        node = db.query(models.AgentNode).filter(
            models.AgentNode.node_id == node_id,
            models.AgentNode.invite_token == token,
            models.AgentNode.is_active == True,
        ).first()

        if not node:
            logger.warning(f"[M4] Token validation FAILED for node_id='{node_id}'")
            return {"valid": False, "reason": "Invalid token or unknown node."}

        logger.info(f"[M4] Token validated OK for node_id='{node_id}'")
        return {
            "valid": True,
            "node_id": node.node_id,
            "display_name": node.display_name,
            "user_id": node.registered_by,  # AgentNode has registered_by, not user_id
            "skill_config": node.skill_config or {},
        }

    # ==================================================================
    #  WEBSOCKET — Single-node live event stream
    # ==================================================================

    @router.websocket("/{node_id}/stream")
    async def node_event_stream(websocket: WebSocket, node_id: str):
        """
        Single-node live event stream with Full-Duplex communication.
        Provides gaming-fast terminal polling, sending commands inbound over the same WS connection!
        """
        await websocket.accept()
        registry = _registry()

        live = registry.get_node(node_id)
        await websocket.send_json({
            "event": "snapshot",
            "node_id": node_id,
            "timestamp": _now(),
            "data": live.to_dict() if live else {"status": "offline"},
        })

        q: queue.Queue = queue.Queue()
        registry.subscribe_node(node_id, q)
        
        async def send_events():
            import time
            last_heartbeat = 0
            try:
                while True:
                    await _drain(q, websocket)
                    now = time.time()
                    if now - last_heartbeat > HEARTBEAT_INTERVAL_S:
                        live_node = registry.get_node(node_id)
                        await websocket.send_json({
                            "event": "heartbeat", "node_id": node_id, "timestamp": _now(),
                            "data": {"status": live_node._compute_status() if live_node else "offline",
                                     "stats": live_node.stats if live_node else {}},
                        })
                        last_heartbeat = now
                    # Extremely fast poll loop for real-time latency
                    await asyncio.sleep(0.02)
            except WebSocketDisconnect:
                pass
            except Exception as e:
                logger.error(f"[nodes/stream_sender] {node_id}: {e}")

        async def receive_events():
            from app.protos import agent_pb2
            from app.core.grpc.utils.crypto import sign_payload
            import uuid
            try:
                while True:
                    data = await websocket.receive_json()
                    if data.get("action") == "ping":
                        await websocket.send_json({
                            "event": "pong",
                            "node_id": node_id,
                            "timestamp": _now(),
                            "client_ts": data.get("ts") # Echo back for RTT calculation
                        })
                        continue

                    if data.get("action") == "dispatch":
                        live_node = registry.get_node(node_id)
                        if not live_node:
                            await websocket.send_json({"event": "task_error", "data": {"stderr": "Node offline"}})
                            continue
                        
                        cmd = data.get("command", "")
                        session_id = data.get("session_id", "")
                        task_id = str(uuid.uuid4())
                        
                        registry.emit(node_id, "task_assigned", {"command": cmd, "session_id": session_id}, task_id=task_id)
                        
                        try:
                            task_req = agent_pb2.TaskRequest(
                                task_id=task_id,
                                payload_json=cmd,
                                signature=sign_payload(cmd),
                                timeout_ms=0,
                                session_id=session_id
                            )
                            live_node.queue.put(agent_pb2.ServerTaskMessage(task_request=task_req))
                            registry.emit(node_id, "task_start", {"command": cmd}, task_id=task_id)
                        except Exception as e:
                            logger.error(f"[ws/dispatch] Error: {e}")
            except WebSocketDisconnect:
                pass
            except Exception as e:
                logger.error(f"[nodes/stream_receive] {node_id}: {e}")

        sender_task = asyncio.create_task(send_events())
        receiver_task = asyncio.create_task(receive_events())

        try:
            done, pending = await asyncio.wait(
                [sender_task, receiver_task],
                return_when=asyncio.FIRST_COMPLETED
            )
            for t in pending:
                t.cancel()
        except asyncio.CancelledError:
            sender_task.cancel()
            receiver_task.cancel()
        finally:
            registry.unsubscribe_node(node_id, q)

    # ==================================================================
    #  WEBSOCKET — Multi-node global execution bus
    # ==================================================================

    @router.websocket("/stream/all")
    async def all_nodes_event_stream(websocket: WebSocket, user_id: str):
        """
        Multi-node global event bus for a user.
        Powers the split-window multi-pane execution UI.

        On connect: sends initial_snapshot with all live nodes.
        Ongoing:    streams all events from all user's nodes (disambiguated by node_id).
        Every 5s:   sends a mesh_heartbeat summary across all nodes.
        """
        await websocket.accept()
        registry = _registry()

        all_live = registry.list_nodes(user_id=user_id)
        await websocket.send_json({
            "event": "initial_snapshot",
            "user_id": user_id,
            "timestamp": _now(),
            "data": {"nodes": [n.to_dict() for n in all_live], "count": len(all_live)},
        })

        q: queue.Queue = queue.Queue()
        registry.subscribe_user(user_id, q)
        try:
            while True:
                await _drain(q, websocket)
                await asyncio.sleep(HEARTBEAT_INTERVAL_S)
                live_nodes = registry.list_nodes(user_id=user_id)
                await websocket.send_json({
                    "event": "mesh_heartbeat",
                    "user_id": user_id,
                    "timestamp": _now(),
                    "data": {
                        "nodes": [{"node_id": n.node_id, "status": n._compute_status(), "stats": n.stats}
                                  for n in live_nodes]
                    },
                })
        except WebSocketDisconnect:
            pass
        except Exception as e:
            logger.error(f"[nodes/stream/all] user={user_id}: {e}")
        finally:
            registry.unsubscribe_user(user_id, q)

    # ==================================================================
    #  FS EXPLORER ENDPOINTS (Modular Navigator)
    # ==================================================================

    @router.get("/{node_id}/fs/ls", response_model=schemas.DirectoryListing, summary="List Directory Content")
    def fs_ls(node_id: str, path: str = ".", session_id: str = "__fs_explorer__"):
        """
        Request a directory listing from a node.
        Returns a tree-structured list for the File Navigator.
        """
        try:
            # Defensive check for orchestrator service injection
            try:
                orchestrator = services.orchestrator
            except AttributeError:
                logger.error("[FS] Orchestrator service not found in ServiceContainer.")
                raise HTTPException(status_code=500, detail="Agent Orchestrator service is starting or unavailable.")

            res = orchestrator.assistant.ls(node_id, path, session_id=session_id)
            
            if not res:
                logger.error(f"[FS] Received empty response from node {node_id} for path {path}")
                raise HTTPException(status_code=500, detail="Node returned an empty response.")

            if isinstance(res, dict) and "error" in res:
                status_code = 404 if res["error"] == "Offline" else 500
                logger.warning(f"[FS] Explorer Error for {node_id}: {res['error']}")
                raise HTTPException(status_code=status_code, detail=res["error"])
            
            files = res.get("files", [])
            
            # M6: Check sync status ONLY for real user sessions, not for the node-wide navigator
            if session_id != "__fs_explorer__":
                workspace_mirror = orchestrator.mirror.get_workspace_path(session_id)
                for f in files:
                    mirror_item_path = os.path.join(workspace_mirror, f["path"])
                    f["is_synced"] = os.path.exists(mirror_item_path)

            return schemas.DirectoryListing(node_id=node_id, path=path, files=files)
        except HTTPException:
            raise
        except Exception as e:
            logger.error(f"[FS] Unexpected error in fs_ls: {e}", exc_info=True)
            raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")

    @router.get("/{node_id}/fs/cat", summary="Read File Content")
    def fs_cat(node_id: str, path: str, session_id: str = "__fs_explorer__"):
        """
        Read the content of a file on a remote node.
        """
        try:
            orchestrator = services.orchestrator
            res = orchestrator.assistant.cat(node_id, path, session_id=session_id)
            if not res:
                raise HTTPException(status_code=500, detail="Node returned an empty response.")
            if isinstance(res, dict) and "error" in res:
                raise HTTPException(status_code=500, detail=res["error"])
            return res # Expecting {"content": "..."}
        except AttributeError:
             raise HTTPException(status_code=500, detail="Orchestrator unavailable.")
        except Exception as e:
            logger.error(f"[FS] Cat error: {e}")
            raise HTTPException(status_code=500, detail=str(e))

    @router.post("/{node_id}/fs/touch", summary="Create File or Directory")
    def fs_touch(node_id: str, req: schemas.FileWriteRequest, session_id: str = "__fs_explorer__"):
        """
        Create a new file or directory on the node.
        """
        try:
            orchestrator = services.orchestrator
            res = orchestrator.assistant.write(
                node_id, 
                req.path, 
                req.content.encode('utf-8'), 
                req.is_dir,
                session_id=session_id
            )
            if not res:
                raise HTTPException(status_code=500, detail="Node returned an empty response.")
            if isinstance(res, dict) and "error" in res:
                raise HTTPException(status_code=500, detail=res["error"])
            return res # Expecting {"success": bool, "message": str}
        except AttributeError:
             raise HTTPException(status_code=500, detail="Orchestrator unavailable.")
        except Exception as e:
            logger.error(f"[FS] Touch error: {e}")
            raise HTTPException(status_code=500, detail=str(e))

    @router.delete("/{node_id}/fs/rm", summary="Delete File/Directory")
    def fs_rm(node_id: str, path: str, session_id: str = "__fs_explorer__"):
        """
        Delete a file or directory from a remote node.
        """
        try:
            orchestrator = services.orchestrator
            res = orchestrator.assistant.rm(node_id, path, session_id=session_id)
            if not res:
                raise HTTPException(status_code=500, detail="Node returned an empty response.")
            if isinstance(res, dict) and "error" in res:
                raise HTTPException(status_code=500, detail=res["error"])
            return res # Expecting {"success": bool, "message": str}
        except AttributeError:
             raise HTTPException(status_code=500, detail="Orchestrator unavailable.")
        except Exception as e:
            logger.error(f"[FS] Rm error: {e}")
            raise HTTPException(status_code=500, detail=str(e))

    return router


# ===========================================================================
#  Helpers
# ===========================================================================

def _get_node_or_404(node_id: str, db: Session) -> models.AgentNode:
    node = db.query(models.AgentNode).filter(models.AgentNode.node_id == node_id).first()
    if not node:
        raise HTTPException(status_code=404, detail=f"Node '{node_id}' not found.")
    return node


def _node_to_admin_detail(node: models.AgentNode, registry) -> schemas.AgentNodeAdminDetail:
    live = registry.get_node(node.node_id)
    status = live._compute_status() if live else node.last_status or "offline"
    stats = schemas.AgentNodeStats(**live.stats) if live else schemas.AgentNodeStats()
    return schemas.AgentNodeAdminDetail(
        node_id=node.node_id,
        display_name=node.display_name,
        description=node.description,
        skill_config=node.skill_config or {},
        capabilities=node.capabilities or {},
        invite_token=node.invite_token,
        is_active=node.is_active,
        last_status=status,
        last_seen_at=node.last_seen_at,
        created_at=node.created_at,
        registered_by=node.registered_by,
        group_access=[
            schemas.NodeAccessResponse(
                id=a.id, node_id=a.node_id, group_id=a.group_id,
                access_level=a.access_level, granted_at=a.granted_at
            ) for a in (node.group_access or [])
        ],
        stats=stats,
    )


def _node_to_user_view(node: models.AgentNode, registry) -> schemas.AgentNodeUserView:
    live = registry.get_node(node.node_id)
    # The record should only show online if it's currently connected and in the live gRPC registry map.
    # We default back to "offline" even if the DB record says "online" (zombie fix).
    status = live._compute_status() if live else "offline"
    
    skill_cfg = node.skill_config or {}
    if isinstance(skill_cfg, str):
        import json
        try: skill_cfg = json.loads(skill_cfg)
        except: skill_cfg = {}
    available = [skill for skill, cfg in skill_cfg.items() if isinstance(cfg, dict) and cfg.get("enabled", True)]
    stats = live.stats if live else {}
    
    return schemas.AgentNodeUserView(
        node_id=node.node_id,
        display_name=node.display_name,
        description=node.description,
        capabilities=node.capabilities or {},
        available_skills=available,
        last_status=status,
        last_seen_at=node.last_seen_at,
        stats=schemas.AgentNodeStats(**stats) if stats else schemas.AgentNodeStats()
    )


def _now() -> str:
    from datetime import datetime
    return datetime.utcnow().isoformat()


async def _drain(q: queue.Queue, websocket: WebSocket):
    """Drain all pending queue items and send to websocket (non-blocking)."""
    while True:
        try:
            event = q.get_nowait()
            await websocket.send_json(event)
        except queue.Empty:
            break
