"""
Agent Node REST + WebSocket API
Admin-managed nodes, group access control, and user-facing live streaming.

Admin endpoints (require role=admin):
  POST  /nodes/                         — Create node registration + generate invite_token
  GET   /nodes/admin                    — List all nodes (admin view, full detail)
  GET   /nodes/admin/{node_id}          — Full admin detail including invite_token
  PATCH /nodes/admin/{node_id}          — Update node config (description, skill_config, is_active)
  POST  /nodes/admin/{node_id}/access   — Grant group access to a node
  DELETE /nodes/admin/{node_id}/access/{group_id} — Revoke group access

User endpoints (scoped to caller's group):
  GET  /nodes/                          — List accessible nodes (user view, no sensitive data)
  GET  /nodes/{node_id}/status          — Quick online/offline probe
  POST /nodes/{node_id}/dispatch        — Dispatch a task to a node
  PATCH /nodes/preferences              — Update user's default_nodes + data_source prefs

WebSocket (real-time streaming):
  WS   /nodes/{node_id}/stream          — Single-node live execution stream
  WS   /nodes/stream/all?user_id=...    — All-nodes global bus (multi-pane UI)
"""
import asyncio
import os
import json
import queue
import uuid
import secrets
from typing import Optional, Annotated
import logging
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends, Query, Header
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session


from app.api.dependencies import ServiceContainer, get_db
from app.api import schemas
from app.db import models

logger = logging.getLogger(__name__)

HEARTBEAT_INTERVAL_S = 5


def create_nodes_router(services: ServiceContainer) -> APIRouter:
    router = APIRouter(prefix="/nodes", tags=["Agent Nodes"])

    def _registry():
        return services.node_registry_service

    def _require_admin(user_id: str, db: Session):
        user = db.query(models.User).filter(models.User.id == user_id).first()
        if not user or user.role != "admin":
            raise HTTPException(status_code=403, detail="Admin access required.")
        return user

    def _require_node_access(user_id: str, node_id: str, db: Session):
        """
        Ensures the user has permission to interact with a specific node.
        - Admins always have access to mesh operations.
        - Normal users must have explicit group access or be in a group with node policy.
        """
        user = db.query(models.User).filter(models.User.id == user_id).first()
        if not user:
            raise HTTPException(status_code=404, detail="User not found.")
        
        # Admin ALWAYS has access to the underlying mesh features (terminal/FS)
        if user.role == "admin":
            return user
            
        # Check explicit group access
        access = db.query(models.NodeGroupAccess).filter(
            models.NodeGroupAccess.node_id == node_id,
            models.NodeGroupAccess.group_id == user.group_id
        ).first()
        if access:
            return user
            
        # Check group policy whitelist
        if user.group and user.group.policy:
            policy_nodes = user.group.policy.get("nodes", [])
            if isinstance(policy_nodes, list) and node_id in policy_nodes:
                return user
                
        raise HTTPException(status_code=403, detail=f"Access Denied: You do not have permission to access node '{node_id}'.")

    # ==================================================================
    #  ADMIN ENDPOINTS
    # ==================================================================

    @router.post("/admin", response_model=schemas.AgentNodeAdminDetail, summary="[Admin] Register New Node")
    def admin_create_node(
        request: schemas.AgentNodeCreate,
        admin_id: str,
        db: Session = Depends(get_db)
    ):
        """
        Admin registers a new Agent Node.
        Returns the node record including a generated invite_token that must be
        placed in the node's config YAML before deployment.
        """
        _require_admin(admin_id, db)

        existing = db.query(models.AgentNode).filter(
            models.AgentNode.node_id == request.node_id
        ).first()
        if existing:
            raise HTTPException(status_code=409, detail=f"Node '{request.node_id}' already exists.")

        # Generate a cryptographically secure invite token
        invite_token = secrets.token_urlsafe(32)

        node = models.AgentNode(
            node_id=request.node_id,
            display_name=request.display_name,
            description=request.description,
            registered_by=admin_id,
            skill_config=request.skill_config.model_dump(),
            invite_token=invite_token,
            last_status="offline",
        )
        db.add(node)
        db.commit()
        db.refresh(node)

        logger.info(f"[admin] Created node '{request.node_id}' by admin {admin_id}")
        return _node_to_admin_detail(node, _registry())

    @router.get("/admin", response_model=list[schemas.AgentNodeAdminDetail], summary="[Admin] List All Nodes")
    def admin_list_nodes(admin_id: str, db: Session = Depends(get_db)):
        """Full node list for admin dashboard, including invite_token and skill config."""
        _require_admin(admin_id, db)
        nodes = db.query(models.AgentNode).all()
        return [_node_to_admin_detail(n, _registry()) for n in nodes]

    @router.get("/admin/{node_id}", response_model=schemas.AgentNodeAdminDetail, summary="[Admin] Get Node Detail")
    def admin_get_node(node_id: str, admin_id: str, db: Session = Depends(get_db)):
        _require_admin(admin_id, db)
        node = _get_node_or_404(node_id, db)
        return _node_to_admin_detail(node, _registry())

    @router.patch("/admin/{node_id}", response_model=schemas.AgentNodeAdminDetail, summary="[Admin] Update Node Config")
    def admin_update_node(
        node_id: str,
        update: schemas.AgentNodeUpdate,
        admin_id: str,
        db: Session = Depends(get_db)
    ):
        """Update display_name, description, skill_config toggles, or is_active."""
        _require_admin(admin_id, db)
        node = _get_node_or_404(node_id, db)

        if update.display_name is not None:
            node.display_name = update.display_name
        if update.description is not None:
            node.description = update.description
        if update.skill_config is not None:
            node.skill_config = update.skill_config.model_dump()
            # M6: Push policy live to the node if it's connected
            try:
                services.orchestrator.push_policy(node_id, node.skill_config)
            except Exception as e:
                logger.warning(f"Could not push live policy to {node_id}: {e}")

        if update.is_active is not None:
            node.is_active = update.is_active

        db.commit()
        db.refresh(node)
        return _node_to_admin_detail(node, _registry())

    @router.delete("/admin/{node_id}", summary="[Admin] Deregister Node")
    def admin_delete_node(
        node_id: str,
        admin_id: str,
        db: Session = Depends(get_db)
    ):
        """Delete a node registration and all its access grants."""
        _require_admin(admin_id, db)
        node = _get_node_or_404(node_id, db)
        
        # Deregister from live memory if online
        _registry().deregister(node_id)
        
        db.delete(node)
        db.commit()
        return {"status": "success", "message": f"Node {node_id} deleted"}

    @router.post("/admin/{node_id}/access", response_model=schemas.NodeAccessResponse, summary="[Admin] Grant Group Access")
    def admin_grant_access(
        node_id: str,
        grant: schemas.NodeAccessGrant,
        admin_id: str,
        db: Session = Depends(get_db)
    ):
        """Grant a group access to use this node in sessions."""
        _require_admin(admin_id, db)
        node = _get_node_or_404(node_id, db)

        existing = db.query(models.NodeGroupAccess).filter(
            models.NodeGroupAccess.node_id == node_id,
            models.NodeGroupAccess.group_id == grant.group_id
        ).first()
        if existing:
            existing.access_level = grant.access_level
            existing.granted_by = admin_id
            db.commit()
            db.refresh(existing)
            return existing

        access = models.NodeGroupAccess(
            node_id=node_id,
            group_id=grant.group_id,
            access_level=grant.access_level,
            granted_by=admin_id,
        )
        db.add(access)
        db.commit()
        db.refresh(access)
        return access

    @router.delete("/admin/{node_id}/access/{group_id}", summary="[Admin] Revoke Group Access")
    def admin_revoke_access(
        node_id: str,
        group_id: str,
        admin_id: str,
        db: Session = Depends(get_db)
    ):
        _require_admin(admin_id, db)
        access = db.query(models.NodeGroupAccess).filter(
            models.NodeGroupAccess.node_id == node_id,
            models.NodeGroupAccess.group_id == group_id
        ).first()
        if not access:
            raise HTTPException(status_code=404, detail="Access grant not found.")
        db.delete(access)
        db.commit()
        return {"message": f"Access revoked for group '{group_id}' on node '{node_id}'."}

    @router.post("/admin/mesh/reset", summary="[Admin] Emergency Mesh Reset")
    def admin_reset_mesh(
        admin_id: str,
        db: Session = Depends(get_db)
    ):
        """
        DANGEROUS: Clears ALL live node collections from memory and resets DB statuses to offline.
        Use this to resolve 'zombie' nodes or flapping connections.
        """
        _require_admin(admin_id, db)
        
        # 1. Reset DB
        _registry().reset_all_statuses()
        
        # 2. Clear Memory
        count = _registry().clear_memory_cache()
        
        logger.warning(f"[Admin] Mesh Reset triggered by {admin_id}. Cleared {count} live nodes.")
        return {"status": "success", "cleared_count": count}

    # ==================================================================
    #  USER-FACING ENDPOINTS
    # ==================================================================

    @router.get("/", response_model=list[schemas.AgentNodeUserView], summary="List Accessible Nodes")
    def list_accessible_nodes(
        user_id: str = Query(...), 
        db: Session = Depends(get_db)
    ):
        """
        Returns nodes the calling user's group has access to.
        Merges live connection state from the in-memory registry.
        """
        user = db.query(models.User).filter(models.User.id == user_id).first()
        if not user:
            raise HTTPException(status_code=404, detail="User not found.")

        # Both admins and users only see nodes explicitly granted to their group in this user-facing list.
        # This prevents the 'Personal Preferences' and 'Mesh Explorer' from showing ungranted nodes.
        
        # Nodes accessible via user's group (relational)
        accesses = db.query(models.NodeGroupAccess).filter(
            models.NodeGroupAccess.group_id == user.group_id
        ).all()
        node_ids = set([a.node_id for a in accesses])
        
        # Nodes accessible via group policy whitelist
        if user.group and user.group.policy:
            policy_nodes = user.group.policy.get("nodes", [])
            if isinstance(policy_nodes, list):
                for nid in policy_nodes:
                    node_ids.add(nid)

        nodes = db.query(models.AgentNode).filter(
            models.AgentNode.node_id.in_(list(node_ids)),
            models.AgentNode.is_active == True
        ).all()

        registry = _registry()
        return [_node_to_user_view(n, registry) for n in nodes]

    @router.get("/{node_id}/status", summary="Quick Node Online Check")
    def get_node_status(
        node_id: str, 
        user_id: str = Header(..., alias="X-User-ID"), 
        db: Session = Depends(get_db)
    ):
        _require_node_access(user_id, node_id, db)
        live = _registry().get_node(node_id)
        if not live:
            return {"node_id": node_id, "status": "offline"}
        return {"node_id": node_id, "status": live._compute_status(), "stats": live.stats}
    
    @router.get("/{node_id}/terminal", summary="Read Node Terminal History (AI Use Case)")
    def get_node_terminal(
        node_id: str, 
        user_id: str = Header(..., alias="X-User-ID"), 
        db: Session = Depends(get_db)
    ):
        """
        AI-Specific: Returns the most recent 150 terminal interaction chunks for a live node.
        This provides context for the AI reasoning agent.
        """
        _require_node_access(user_id, node_id, db)
        live = _registry().get_node(node_id)
        if not live:
            return {"node_id": node_id, "status": "offline", "terminal": []}
        return {
            "node_id": node_id, 
            "status": live._compute_status(), 
            "terminal": live.terminal_history,
            "session_id": live.session_id
        }

    @router.post("/{node_id}/dispatch", response_model=schemas.NodeDispatchResponse, summary="Dispatch Task to Node")
    def dispatch_to_node(
        node_id: str, 
        request: schemas.NodeDispatchRequest, 
        user_id: str = Query(...), 
        db: Session = Depends(get_db)
    ):
        """
        Queue a shell or browser task to an online node.
        Emits task_assigned immediately so the live UI shows it.
        """
        _require_node_access(user_id, node_id, db)
        registry = _registry()
        live = registry.get_node(node_id)
        if not live:
            raise HTTPException(status_code=503, detail=f"Node '{node_id}' is not connected.")

        task_id = request.task_id or str(uuid.uuid4())
        
        # M6: Use the integrated Protobufs & Crypto from app/core/grpc
        from app.protos import agent_pb2
        from app.core.grpc.utils.crypto import sign_payload
        
        payload = request.command or json.dumps(request.browser_action or {})
        registry.emit(node_id, "task_assigned",
                       {"command": request.command, "session_id": request.session_id},
                       task_id=task_id)

        try:
            task_req = agent_pb2.TaskRequest(
                task_id=task_id,
                payload_json=payload,
                signature=sign_payload(payload),
                timeout_ms=request.timeout_ms,
                session_id=request.session_id or "",
            )
            # Push directly to the node's live gRPC outbound queue
            live.queue.put(agent_pb2.ServerTaskMessage(task_request=task_req))
            registry.emit(node_id, "task_start", {"command": request.command}, task_id=task_id)
        except Exception as e:
            logger.error(f"[nodes/dispatch] Failed to put task onto queue for {node_id}: {e}")
            raise HTTPException(status_code=500, detail="Internal Dispatch Error")

        return schemas.NodeDispatchResponse(task_id=task_id, status="accepted")

    @router.post("/{node_id}/cancel", summary="Cancel/Interrupt Task on Node")
    def cancel_on_node(
        node_id: str, 
        task_id: str = "", 
        user_id: str = Header(..., alias="X-User-ID"), 
        db: Session = Depends(get_db)
    ):
        """
        Sends a TaskCancelRequest to the specified node.
        For shell skills, this typically translates to SIGINT (Ctrl+C).
        """
        _require_node_access(user_id, node_id, db)
        registry = _registry()
        live = registry.get_node(node_id)
        if not live:
            raise HTTPException(status_code=503, detail=f"Node '{node_id}' is not connected.")
        
        from app.protos import agent_pb2
        cancel_req = agent_pb2.TaskCancelRequest(task_id=task_id)
        live.queue.put(agent_pb2.ServerTaskMessage(task_cancel=cancel_req))
        
        registry.emit(node_id, "task_cancel", {"task_id": task_id})
        return {"status": "cancel_sent", "task_id": task_id}

    @router.patch("/preferences", summary="Update User Node Preferences")
    def update_node_preferences(
        user_id: str,
        prefs: schemas.UserNodePreferences,
        db: Session = Depends(get_db)
    ):
        """
        Save the user's default_node_ids and data_source config into their preferences.
        The UI reads this to auto-attach nodes when a new session starts.
        """
        user = db.query(models.User).filter(models.User.id == user_id).first()
        if not user:
            raise HTTPException(status_code=404, detail="User not found.")
        # Create a new dictionary to ensure SQLAlchemy detects the change to the JSON column
        current_prefs = dict(user.preferences or {})
        current_prefs["nodes"] = prefs.model_dump()
        user.preferences = current_prefs
        db.commit()
        return {"message": "Node preferences saved.", "nodes": prefs.model_dump()}

    @router.get("/preferences", response_model=schemas.UserNodePreferences, summary="Get User Node Preferences")
    def get_node_preferences(user_id: str, db: Session = Depends(get_db)):
        user = db.query(models.User).filter(models.User.id == user_id).first()
        if not user:
            raise HTTPException(status_code=404, detail="User not found.")
        node_prefs = (user.preferences or {}).get("nodes", {})
        return schemas.UserNodePreferences(**node_prefs) if node_prefs else schemas.UserNodePreferences()

    # ==================================================================
    #  M4: Config YAML Download (admin only)
    # ==================================================================

    README_CONTENT = """# Cortex Agent Node

This bundle contains the Cortex Agent Node, a modular software that connects to your Cortex Hub.

## Running the Node

### Linux & macOS
1. Open your terminal in this directory.
2. Make the runner script executable: `chmod +x run.sh`
3. Run: `./run.sh`

### Windows
1. Double-click `run.bat`.

The scripts will automatically detect if the node is provided as a binary executable or source code, and handle the environment setup accordingly.

## Configuration

The `agent_config.yaml` file has been pre-configured with your node's identity and security tokens. Do not share this file.
"""

    RUN_SH_CONTENT = """#!/bin/bash
# Cortex Agent Node — Seamless Runner
set -e

# Ensure we are in the script's directory
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &> /dev/null && pwd)"
cd "$SCRIPT_DIR" || { echo "❌ Error: Could not change directory to $SCRIPT_DIR"; exit 1; }

echo "🚀 Starting Cortex Agent Node..."

# 1. Future Binary Check
if [ -f "./agent-node" ]; then
    echo "[*] Binary executable detected. Launching..."
    chmod +x ./agent-node
    ./agent-node
    exit $?
fi

# 2. Source Code Fallback
if [ -d "./agent_node" ]; then
    echo "[*] Source code detected. Setting up Python environment..."
    if ! command -v python3 &> /dev/null; then
        echo "❌ Error: python3 not found. Please install Python 3.10+."
        exit 1
    fi

    VENV=".venv"
    if [ ! -d "$VENV" ]; then
        echo "[*] Creating virtual environment..."
        python3 -m venv "$VENV"
    fi
    source "$VENV/bin/activate"

    if [ -f "requirements.txt" ]; then
        echo "[*] Syncing dependencies..."
        pip install --upgrade pip --quiet
        pip install -r requirements.txt --quiet
    fi

    echo "✅ Environment ready. Booting node..."
    python3 -m agent_node.main
else
    echo "❌ Error: No executable ('agent-node') or source code ('agent_node/') found in this bundle."
    exit 1
fi
"""

    RUN_MAC_CONTENT = RUN_SH_CONTENT # They are compatible, .command just helps macOS users

    RUN_BAT_CONTENT = """@echo off
echo 🚀 Starting Cortex Agent Node...

if exist agent-node.exe (
    echo [*] Binary executable detected. Launching...
    agent-node.exe
    exit /b %errorlevel%
)

if exist agent_node (
    echo [*] Source code detected. Checking environment...
    python --version >nul 2>&1
    if %errorlevel% neq 0 (
        echo ❌ Error: python not found. Please install Python 3.10+.
        pause
        exit /b 1
    )
    if not exist .venv (
        echo [*] Creating virtual environment...
        python -m venv .venv
    )
    call .venv\\Scripts\\activate
    if exist requirements.txt (
        echo [*] Syncing dependencies...
        pip install --upgrade pip --quiet
        pip install -r requirements.txt --quiet
    )
    echo ✅ Environment ready. Booting node...
    python -m agent_node.main
) else (
    echo ❌ Error: No executable ('agent-node.exe') or source code ('agent_node/') found.
    pause
    exit /b 1
)
"""

    @router.get(
        "/admin/{node_id}/config.yaml",
        response_model=schemas.NodeConfigYamlResponse,
        summary="[Admin] Download Node Config YAML",
    )
    def download_node_config_yaml(node_id: str, admin_id: str, db: Session = Depends(get_db)):
        """
        Generate and return the agent_config.yaml content an admin downloads
        and places alongside the node client software before deployment.
        """
        _require_admin(admin_id, db)
        node = _get_node_or_404(node_id, db)
        config_yaml = _generate_node_config_yaml(node)
        return schemas.NodeConfigYamlResponse(node_id=node_id, config_yaml=config_yaml)

    @router.get("/admin/{node_id}/download", summary="[Admin] Download Agent Node Bundle (ZIP)")
    def admin_download_bundle(
        node_id: str,
        admin_id: str,
        db: Session = Depends(get_db)
    ):
        """
        Bundles the entire Agent Node source code along with a pre-configured
        agent_config.yaml into a single ZIP file for the user to download.
        """
        import io
        import zipfile
        
        _require_admin(admin_id, db)
        node = _get_node_or_404(node_id, db)
        config_yaml = _generate_node_config_yaml(node)
        
        # Create ZIP in memory
        zip_buffer = io.BytesIO()
        with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
            # 1. Add Agent Node files (binary or supporting scripts)
            # Try production mount first, then local fallback
            source_dirs = ["/app/agent-node-source", "/app/agent-node"]
            found_dir = None
            for sd in source_dirs:
                if os.path.exists(sd):
                    found_dir = sd
                    break
            
            if found_dir:
                for root, dirs, files in os.walk(found_dir):
                    # Exclude source code folders and sensitive/build metadata
                    dirs[:] = [d for d in dirs if d not in ["agent_node", "shared_core", "protos", "__pycache__", ".git", ".venv"]]
                    
                    for file in files:
                        # Security/Cleanliness Exclusion: Never include raw source, requirements, or local envs
                        if file.endswith(".py") or file == "requirements.txt" or file == ".env" or file == "agent_config.yaml":
                            continue
                        
                        file_path = os.path.join(root, file)
                        rel_path = os.path.relpath(file_path, found_dir)
                        zip_file.write(file_path, rel_path)
            
            # 2. Add skills from /app/skills
            skills_dir = "/app/skills"
            if os.path.exists(skills_dir):
                 for root, dirs, files in os.walk(skills_dir):
                    dirs[:] = [d for d in dirs if d != "__pycache__"]
                    for file in files:
                        file_path = os.path.join(root, file)
                        rel_path = os.path.join("skills", os.path.relpath(file_path, skills_dir))
                        zip_file.write(file_path, rel_path)
            
            # 3. Add the generated config YAML as 'agent_config.yaml'
            zip_file.writestr("agent_config.yaml", config_yaml)
            
            # 4. Add README and run.sh / run.bat / run_mac.command
            zip_file.writestr("README.md", README_CONTENT)
            
            # Create run.sh with execute permissions (external_attr)
            run_sh_info = zipfile.ZipInfo("run.sh")
            run_sh_info.external_attr = 0o100755 << 16 # -rwxr-xr-x
            run_sh_info.compress_type = zipfile.ZIP_DEFLATED
            zip_file.writestr(run_sh_info, RUN_SH_CONTENT)

            # Create run_mac.command (Mac double-clickable)
            run_mac_info = zipfile.ZipInfo("run_mac.command")
            run_mac_info.external_attr = 0o100755 << 16 # -rwxr-xr-x
            run_mac_info.compress_type = zipfile.ZIP_DEFLATED
            zip_file.writestr(run_mac_info, RUN_MAC_CONTENT)

            # Create run.bat
            zip_file.writestr("run.bat", RUN_BAT_CONTENT)

        zip_buffer.seek(0)
        return StreamingResponse(
            zip_buffer,
            media_type="application/x-zip-compressed",
            headers={"Content-Disposition": f"attachment; filename=cortex-node-{node_id}.zip"}
        )


    # ==================================================================
    #  M4: Invite Token Validation (called internally by gRPC server)
    # ==================================================================

    @router.post("/validate-token", summary="[Internal] Validate Node Invite Token")
    def validate_invite_token(token: str, node_id: str, db: Session = Depends(get_db)):
        """
        Internal HTTP endpoint called by the gRPC SyncConfiguration handler
        to validate an invite_token before accepting a node connection.

        Returns the node's skill_config (sandbox policy) on success so the
        gRPC server can populate the SandboxPolicy response.

        Response:
          200 { valid: true, node_id, skill_config, display_name }
          401 { valid: false, reason }
        """
        node = db.query(models.AgentNode).filter(
            models.AgentNode.node_id == node_id,
            models.AgentNode.invite_token == token,
            models.AgentNode.is_active == True,
        ).first()

        if not node:
            logger.warning(f"[M4] Token validation FAILED for node_id='{node_id}'")
            return {"valid": False, "reason": "Invalid token or unknown node."}

        logger.info(f"[M4] Token validated OK for node_id='{node_id}'")
        return {
            "valid": True,
            "node_id": node.node_id,
            "display_name": node.display_name,
            "user_id": node.registered_by,  # AgentNode has registered_by, not user_id
            "skill_config": node.skill_config or {},
        }

    # ==================================================================
    #  WEBSOCKET — Single-node live event stream
    # ==================================================================

    @router.websocket("/{node_id}/stream")
    async def node_event_stream(
        websocket: WebSocket, 
        node_id: str, 
        user_id: str = Query(...)
    ):
        """
        Single-node live event stream with Full-Duplex communication.
        Provides gaming-fast terminal polling, sending commands inbound over the same WS connection!
        """
        from app.db.session import get_db_session
        with get_db_session() as db:
            try:
                _require_node_access(user_id, node_id, db)
            except HTTPException as e:
                await websocket.accept()
                await websocket.send_json({"event": "error", "message": e.detail})
                await websocket.close(code=4003)
                return

        await websocket.accept()
        registry = _registry()

        live = registry.get_node(node_id)
        await websocket.send_json({
            "event": "snapshot",
            "node_id": node_id,
            "timestamp": _now(),
            "data": live.to_dict() if live else {"status": "offline"},
        })

        q: queue.Queue = queue.Queue()
        registry.subscribe_node(node_id, q)
        
        async def send_events():
            import time
            last_heartbeat = 0
            try:
                while True:
                    await _drain(q, websocket)
                    now = time.time()
                    if now - last_heartbeat > HEARTBEAT_INTERVAL_S:
                        live_node = registry.get_node(node_id)
                        await websocket.send_json({
                            "event": "heartbeat", "node_id": node_id, "timestamp": _now(),
                            "data": {"status": live_node._compute_status() if live_node else "offline",
                                     "stats": live_node.stats if live_node else {}},
                        })
                        last_heartbeat = now
                    # Extremely fast poll loop for real-time latency
                    await asyncio.sleep(0.02)
            except WebSocketDisconnect:
                pass
            except Exception as e:
                logger.error(f"[nodes/stream_sender] {node_id}: {e}")

        async def receive_events():
            from app.protos import agent_pb2
            from app.core.grpc.utils.crypto import sign_payload
            import uuid
            try:
                while True:
                    data = await websocket.receive_json()
                    if data.get("action") == "ping":
                        await websocket.send_json({
                            "event": "pong",
                            "node_id": node_id,
                            "timestamp": _now(),
                            "client_ts": data.get("ts") # Echo back for RTT calculation
                        })
                        continue

                    if data.get("action") == "dispatch":
                        live_node = registry.get_node(node_id)
                        if not live_node:
                            await websocket.send_json({"event": "task_error", "data": {"stderr": "Node offline"}})
                            continue
                        
                        cmd = data.get("command", "")
                        session_id = data.get("session_id", "")
                        task_id = str(uuid.uuid4())
                        
                        registry.emit(node_id, "task_assigned", {"command": cmd, "session_id": session_id}, task_id=task_id)
                        
                        try:
                            task_req = agent_pb2.TaskRequest(
                                task_id=task_id,
                                task_type="shell",
                                payload_json=cmd,
                                signature=sign_payload(cmd),
                                timeout_ms=0,
                                session_id=session_id
                            )
                            live_node.queue.put(agent_pb2.ServerTaskMessage(task_request=task_req))
                            registry.emit(node_id, "task_start", {"command": cmd}, task_id=task_id)
                        except Exception as e:
                            logger.error(f"[ws/dispatch] Error: {e}")
            except WebSocketDisconnect:
                pass
            except Exception as e:
                logger.error(f"[nodes/stream_receive] {node_id}: {e}")

        sender_task = asyncio.create_task(send_events())
        receiver_task = asyncio.create_task(receive_events())

        try:
            done, pending = await asyncio.wait(
                [sender_task, receiver_task],
                return_when=asyncio.FIRST_COMPLETED
            )
            for t in pending:
                t.cancel()
        except asyncio.CancelledError:
            sender_task.cancel()
            receiver_task.cancel()
        finally:
            registry.unsubscribe_node(node_id, q)

    # ==================================================================
    #  WEBSOCKET — Multi-node global execution bus
    # ==================================================================

    @router.websocket("/stream/all")
    async def all_nodes_event_stream(websocket: WebSocket, user_id: str):
        """
        Multi-node global event bus for a user.
        Powers the split-window multi-pane execution UI.
        High-performance edition: streams events with millisecond latency.
        """
        from app.db.session import get_db_session
        
        # 1. Identify accessible nodes for this user based on group policy
        accessible_ids = []
        with get_db_session() as db:
            user = db.query(models.User).filter(models.User.id == user_id).first()
            if not user:
                logger.warning(f"[📶] User {user_id} not found for global stream.")
                return
                
            # Nodes accessible via user's group
            accesses = db.query(models.NodeGroupAccess).filter(
                models.NodeGroupAccess.group_id == user.group_id
            ).all()
            accessible_ids = [a.node_id for a in accesses]
            
            # Nodes in group policy
            if user.group and user.group.policy:
                policy_nodes = user.group.policy.get("nodes", [])
                if isinstance(policy_nodes, list):
                    accessible_ids.extend(policy_nodes)
            
            accessible_ids = list(set(accessible_ids))

        try:
            await websocket.accept()
        except Exception as e:
            logger.error(f"[📶] WebSocket accept failed for user={user_id}: {e}")
            return

        registry = _registry()
        logger.info(f"[📶] Multi-node stream connected for user={user_id}. Accessible nodes: {len(accessible_ids)}")

        try:
            # 2. Send initial snapshot of only accessible live nodes
            try:
                all_live = [registry.get_node(nid) for nid in accessible_ids if registry.get_node(nid)]
                snapshot_data = {
                    "event": "initial_snapshot",
                    "user_id": user_id,
                    "timestamp": _now(),
                    "data": {"nodes": [n.to_dict() for n in all_live], "count": len(all_live)},
                }
                await websocket.send_json(snapshot_data)
            except Exception as e:
                logger.error(f"[📶] Failed to send initial snapshot for user={user_id}: {e}", exc_info=True)
                await websocket.close(code=1011)
                return

            q: queue.Queue = queue.Queue()
            # Subscribe to each accessible node individually
            for nid in accessible_ids:
                registry.subscribe_node(nid, q)

            async def send_events():
                import time
                last_heartbeat = 0
                try:
                    while True:
                        await _drain(q, websocket)
                        
                        now = time.time()
                        if now - last_heartbeat > HEARTBEAT_INTERVAL_S:
                            live_nodes = [registry.get_node(nid) for nid in accessible_ids if registry.get_node(nid)]
                            await websocket.send_json({
                                "event": "mesh_heartbeat",
                                "user_id": user_id,
                                "timestamp": _now(),
                                "data": {
                                    "nodes": [{"node_id": n.node_id, "status": n._compute_status(), "stats": n.stats}
                                              for n in live_nodes]
                                },
                            })
                            last_heartbeat = now
                        
                        await asyncio.sleep(0.05)
                except WebSocketDisconnect:
                    logger.info(f"[📶] Sender disconnected for user={user_id}")
                except Exception as e:
                    logger.error(f"[nodes/stream/all_sender] CRASH for user={user_id}: {e}", exc_info=True)

            async def receive_events():
                try:
                    while True:
                        data = await websocket.receive_json()
                        if data.get("action") == "ping":
                            await websocket.send_json({
                                "event": "pong",
                                "user_id": user_id,
                                "timestamp": _now(),
                                "client_ts": data.get("ts")
                            })
                except WebSocketDisconnect:
                    pass
                except Exception as e:
                    logger.error(f"[nodes/stream/all_receiver] CRASH for user={user_id}: {e}", exc_info=True)

            sender_task = asyncio.create_task(send_events())
            receiver_task = asyncio.create_task(receive_events())

            try:
                done, pending = await asyncio.wait(
                    [sender_task, receiver_task],
                    return_when=asyncio.FIRST_COMPLETED
                )
                for t in pending:
                    t.cancel()
            except asyncio.CancelledError:
                sender_task.cancel()
                receiver_task.cancel()
            finally:
                for nid in accessible_ids:
                    registry.unsubscribe_node(nid, q)
                logger.info(f"[📶] Multi-node stream disconnected for user={user_id}")

        except Exception as e:
            logger.error(f"[nodes/stream/all] Error in stream handler for user={user_id}: {e}", exc_info=True)

    # ==================================================================
    #  FS EXPLORER ENDPOINTS (Modular Navigator)
    # ==================================================================

    @router.get("/{node_id}/fs/ls", response_model=schemas.DirectoryListing, summary="List Directory Content")
    def fs_ls(
        node_id: str, 
        path: str = ".", 
        session_id: str = "__fs_explorer__",
        user_id: str = Header(..., alias="X-User-ID"),
        db: Session = Depends(get_db)
    ):
        """
        Request a directory listing from a node.
        Returns a tree-structured list for the File Navigator.
        """
        _require_node_access(user_id, node_id, db)
        try:
            # Defensive check for orchestrator service injection
            try:
                orchestrator = services.orchestrator
            except AttributeError:
                logger.error("[FS] Orchestrator service not found in ServiceContainer.")
                raise HTTPException(status_code=500, detail="Agent Orchestrator service is starting or unavailable.")

            res = orchestrator.assistant.ls(node_id, path, session_id=session_id)
            
            if not res:
                logger.error(f"[FS] Received empty response from node {node_id} for path {path}")
                raise HTTPException(status_code=500, detail="Node returned an empty response.")

            if isinstance(res, dict) and "error" in res:
                status_code = 404 if res["error"] == "Offline" else 500
                logger.warning(f"[FS] Explorer Error for {node_id}: {res['error']}")
                raise HTTPException(status_code=status_code, detail=res["error"])
            
            files = res.get("files", [])
            
            # M6: Check sync status ONLY for real user sessions, not for the node-wide navigator
            if session_id != "__fs_explorer__":
                workspace_mirror = orchestrator.mirror.get_workspace_path(session_id)
                for f in files:
                    mirror_item_path = os.path.join(workspace_mirror, f["path"])
                    f["is_synced"] = os.path.exists(mirror_item_path)

            return schemas.DirectoryListing(node_id=node_id, path=path, files=files)
        except HTTPException:
            raise
        except Exception as e:
            logger.error(f"[FS] Unexpected error in fs_ls: {e}", exc_info=True)
            raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")

    @router.get("/{node_id}/fs/cat", summary="Read File Content")
    def fs_cat(
        node_id: str, 
        path: str, 
        session_id: str = "__fs_explorer__",
        user_id: str = Header(..., alias="X-User-ID"),
        db: Session = Depends(get_db)
    ):
        """
        Read the content of a file on a remote node.
        """
        _require_node_access(user_id, node_id, db)
        try:
            orchestrator = services.orchestrator
            res = orchestrator.assistant.cat(node_id, path, session_id=session_id)
            if not res:
                raise HTTPException(status_code=500, detail="Node returned an empty response.")
            if isinstance(res, dict) and "error" in res:
                raise HTTPException(status_code=500, detail=res["error"])
            return res # Expecting {"content": "..."}
        except AttributeError:
             raise HTTPException(status_code=500, detail="Orchestrator unavailable.")
        except Exception as e:
            logger.error(f"[FS] Cat error: {e}")
            raise HTTPException(status_code=500, detail=str(e))

    @router.post("/{node_id}/fs/touch", summary="Create File or Directory")
    def fs_touch(
        node_id: str, 
        req: schemas.FileWriteRequest,
        user_id: str = Header(..., alias="X-User-ID"),
        db: Session = Depends(get_db)
    ):
        """
        Create a new file or directory on the node.
        """
        _require_node_access(user_id, node_id, db)
        try:
            orchestrator = services.orchestrator
            res = orchestrator.assistant.write(
                node_id, 
                req.path, 
                req.content.encode('utf-8'), 
                req.is_dir,
                session_id=req.session_id
            )
            if not res:
                raise HTTPException(status_code=500, detail="Node returned an empty response.")
            if isinstance(res, dict) and "error" in res:
                raise HTTPException(status_code=500, detail=res["error"])
            return res # Expecting {"success": bool, "message": str}
        except AttributeError:
             raise HTTPException(status_code=500, detail="Orchestrator unavailable.")
        except Exception as e:
            logger.error(f"[FS] Touch error: {e}")
            raise HTTPException(status_code=500, detail=str(e))

    @router.post("/{node_id}/fs/rm", summary="Delete File/Directory")
    def fs_rm(
        node_id: str, 
        req: schemas.FileDeleteRequest,
        user_id: str = Header(..., alias="X-User-ID"),
        db: Session = Depends(get_db)
    ):
        """
        Delete a file or directory from a remote node.
        """
        _require_node_access(user_id, node_id, db)
        try:
            orchestrator = services.orchestrator
            res = orchestrator.assistant.rm(node_id, req.path, session_id=req.session_id)
            if not res:
                raise HTTPException(status_code=500, detail="Node returned an empty response.")
            return res
        except Exception as e:
            logger.error(f"[FS] Delete error: {e}")
            raise HTTPException(status_code=500, detail=str(e))

    return router


# ===========================================================================
#  Helpers
# ===========================================================================

def _generate_node_config_yaml(node: models.AgentNode) -> str:
    """Helper to generate the agent_config.yaml content."""
    hub_url = os.getenv("HUB_PUBLIC_URL", "https://ai.jerxie.com")
    hub_grpc = os.getenv("HUB_GRPC_ENDPOINT", "ai.jerxie.com:443")
    secret_key = os.getenv("SECRET_KEY", "dev-secret-key-1337")
    
    # Clean up gRPC endpoint - if it ends in :443, remove it as it's implied by TLS
    if hub_grpc.endswith(":443"):
        hub_grpc = hub_grpc[:-4]
    
    skill_cfg = node.skill_config or {}
    if isinstance(skill_cfg, str):
        try:
            skill_cfg = json.loads(skill_cfg)
        except Exception:
            skill_cfg = {}

    lines = [
        "# Cortex Hub — Agent Node Configuration",
        f"# Generated for node '{node.node_id}' — keep this file secret.",
        "",
        f"node_id: \"{node.node_id}\"",
        f"node_description: \"{node.display_name}\"",
        "",
        "# Hub connection",
        f"hub_url: \"{hub_url}\"",
        f"grpc_endpoint: \"{hub_grpc}\"",
        "",
        "# Authentication — do NOT share these secrets",
        f"invite_token: \"{node.invite_token}\"",
        f"auth_token: \"{node.invite_token}\"",
        "",
        "# HMAC signing key — must match the hub's SECRET_KEY exactly",
        f"secret_key: \"{secret_key}\"",
        "",
        "# Skill configuration (mirrors admin settings; node respects these at startup)",
        "skills:",
    ]
    for skill, cfg in skill_cfg.items():
        if not isinstance(cfg, dict):
            continue
        enabled = cfg.get("enabled", True)
        lines.append(f"  {skill}:")
        lines.append(f"    enabled: {str(enabled).lower()}")
        for k, v in cfg.items():
            if k != "enabled" and v is not None:
                lines.append(f"    {k}: {v}")

    lines += [
        "",
        "# Workspace sync root — override if needed",
        "sync_root: \"/tmp/cortex-workspace\"",
        "",
        "# FS Explorer root — defaults to user home if not specified here",
        "# fs_root: \"/User/username/Documents\"",
        "",
        "# TLS — set to false only in dev",
        "tls: true",
    ]
    return "\n".join(lines)

def _get_node_or_404(node_id: str, db: Session) -> models.AgentNode:
    node = db.query(models.AgentNode).filter(models.AgentNode.node_id == node_id).first()
    if not node:
        raise HTTPException(status_code=404, detail=f"Node '{node_id}' not found.")
    return node


def _node_to_admin_detail(node: models.AgentNode, registry) -> schemas.AgentNodeAdminDetail:
    live = registry.get_node(node.node_id)
    status = live._compute_status() if live else node.last_status or "offline"
    stats = schemas.AgentNodeStats(**live.stats) if live else schemas.AgentNodeStats()
    return schemas.AgentNodeAdminDetail(
        node_id=node.node_id,
        display_name=node.display_name,
        description=node.description,
        skill_config=node.skill_config or {},
        capabilities=node.capabilities or {},
        invite_token=node.invite_token,
        is_active=node.is_active,
        last_status=status,
        last_seen_at=node.last_seen_at,
        created_at=node.created_at,
        registered_by=node.registered_by,
        group_access=[
            schemas.NodeAccessResponse(
                id=a.id, node_id=a.node_id, group_id=a.group_id,
                access_level=a.access_level, granted_at=a.granted_at
            ) for a in (node.group_access or [])
        ],
        stats=stats,
    )


def _node_to_user_view(node: models.AgentNode, registry) -> schemas.AgentNodeUserView:
    live = registry.get_node(node.node_id)
    # The record should only show online if it's currently connected and in the live gRPC registry map.
    # We default back to "offline" even if the DB record says "online" (zombie fix).
    status = live._compute_status() if live else "offline"
    
    skill_cfg = node.skill_config or {}
    if isinstance(skill_cfg, str):
        import json
        try: skill_cfg = json.loads(skill_cfg)
        except: skill_cfg = {}
    available = [skill for skill, cfg in skill_cfg.items() if isinstance(cfg, dict) and cfg.get("enabled", True)]
    stats = live.stats if live else {}
    
    return schemas.AgentNodeUserView(
        node_id=node.node_id,
        display_name=node.display_name,
        description=node.description,
        capabilities=node.capabilities or {},
        available_skills=available,
        last_status=status,
        last_seen_at=node.last_seen_at,
        stats=schemas.AgentNodeStats(**stats) if stats else schemas.AgentNodeStats()
    )


def _now() -> str:
    from datetime import datetime
    return datetime.utcnow().isoformat()


async def _drain(q: queue.Queue, websocket: WebSocket):
    """Drain all pending queue items and send to websocket (non-blocking)."""
    while True:
        try:
            event = q.get_nowait()
            await websocket.send_json(event)
        except queue.Empty:
            break
