diff --git a/agent-node/agent_node/node.py b/agent-node/agent_node/node.py index 12823c6..4b6efac 100644 --- a/agent-node/agent_node/node.py +++ b/agent-node/agent_node/node.py @@ -15,7 +15,7 @@ from agent_node.core.watcher import WorkspaceWatcher from agent_node.utils.auth import verify_task_signature from agent_node.utils.network import get_secure_stub -from agent_node.config import NODE_ID, NODE_DESC, AUTH_TOKEN, HEALTH_REPORT_INTERVAL, MAX_SKILL_WORKERS, DEBUG_GRPC +from agent_node.config import NODE_ID, NODE_DESC, AUTH_TOKEN, HEALTH_REPORT_INTERVAL, MAX_SKILL_WORKERS, DEBUG_GRPC, FS_ROOT class AgentNode: @@ -364,7 +364,7 @@ def _get_base_dir(self, session_id, create=False): """Helper to resolve the effective root for a session (Watcher > SyncDir).""" if session_id == "__fs_explorer__": - root = os.getenv("CORTEX_FS_ROOT", "/") + root = FS_ROOT print(f" [📁] Explorer Root: {root}") return root diff --git a/ai-hub/app/api/routes/nodes.py b/ai-hub/app/api/routes/nodes.py index b0fe226..e15b53f 100644 --- a/ai-hub/app/api/routes/nodes.py +++ b/ai-hub/app/api/routes/nodes.py @@ -26,8 +26,9 @@ import queue import uuid import secrets +from typing import Optional, Annotated import logging -from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends +from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends, Query, Header from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session @@ -53,6 +54,36 @@ raise HTTPException(status_code=403, detail="Admin access required.") return user + def _require_node_access(user_id: str, node_id: str, db: Session): + """ + Ensures the user has permission to interact with a specific node. + - Admins always have access to mesh operations. + - Normal users must have explicit group access or be in a group with node policy. + """ + user = db.query(models.User).filter(models.User.id == user_id).first() + if not user: + raise HTTPException(status_code=404, detail="User not found.") + + # Admin ALWAYS has access to the underlying mesh features (terminal/FS) + if user.role == "admin": + return user + + # Check explicit group access + access = db.query(models.NodeGroupAccess).filter( + models.NodeGroupAccess.node_id == node_id, + models.NodeGroupAccess.group_id == user.group_id + ).first() + if access: + return user + + # Check group policy whitelist + if user.group and user.group.policy: + policy_nodes = user.group.policy.get("nodes", []) + if isinstance(policy_nodes, list) and node_id in policy_nodes: + return user + + raise HTTPException(status_code=403, detail=f"Access Denied: You do not have permission to access node '{node_id}'.") + # ================================================================== # ADMIN ENDPOINTS # ================================================================== @@ -231,7 +262,10 @@ # ================================================================== @router.get("/", response_model=list[schemas.AgentNodeUserView], summary="List Accessible Nodes") - def list_accessible_nodes(user_id: str, db: Session = Depends(get_db)): + def list_accessible_nodes( + user_id: str = Query(...), + db: Session = Depends(get_db) + ): """ Returns nodes the calling user's group has access to. Merges live connection state from the in-memory registry. @@ -240,44 +274,53 @@ if not user: raise HTTPException(status_code=404, detail="User not found.") - # Admin sees everything; users see only group-granted nodes - if user.role == "admin": - nodes = db.query(models.AgentNode).filter(models.AgentNode.is_active == True).all() - else: - # Nodes accessible via user's group (relational) - accesses = db.query(models.NodeGroupAccess).filter( - models.NodeGroupAccess.group_id == user.group_id - ).all() - node_ids = set([a.node_id for a in accesses]) - - # Nodes accessible via group policy whitelist - if user.group and user.group.policy: - policy_nodes = user.group.policy.get("nodes", []) - if isinstance(policy_nodes, list): - for nid in policy_nodes: - node_ids.add(nid) + # Both admins and users only see nodes explicitly granted to their group in this user-facing list. + # This prevents the 'Personal Preferences' and 'Mesh Explorer' from showing ungranted nodes. + + # Nodes accessible via user's group (relational) + accesses = db.query(models.NodeGroupAccess).filter( + models.NodeGroupAccess.group_id == user.group_id + ).all() + node_ids = set([a.node_id for a in accesses]) + + # Nodes accessible via group policy whitelist + if user.group and user.group.policy: + policy_nodes = user.group.policy.get("nodes", []) + if isinstance(policy_nodes, list): + for nid in policy_nodes: + node_ids.add(nid) - nodes = db.query(models.AgentNode).filter( - models.AgentNode.node_id.in_(list(node_ids)), - models.AgentNode.is_active == True - ).all() + nodes = db.query(models.AgentNode).filter( + models.AgentNode.node_id.in_(list(node_ids)), + models.AgentNode.is_active == True + ).all() registry = _registry() return [_node_to_user_view(n, registry) for n in nodes] @router.get("/{node_id}/status", summary="Quick Node Online Check") - def get_node_status(node_id: str): + def get_node_status( + node_id: str, + user_id: str = Header(..., alias="X-User-ID"), + db: Session = Depends(get_db) + ): + _require_node_access(user_id, node_id, db) live = _registry().get_node(node_id) if not live: return {"node_id": node_id, "status": "offline"} return {"node_id": node_id, "status": live._compute_status(), "stats": live.stats} @router.get("/{node_id}/terminal", summary="Read Node Terminal History (AI Use Case)") - def get_node_terminal(node_id: str): + def get_node_terminal( + node_id: str, + user_id: str = Header(..., alias="X-User-ID"), + db: Session = Depends(get_db) + ): """ AI-Specific: Returns the most recent 150 terminal interaction chunks for a live node. This provides context for the AI reasoning agent. """ + _require_node_access(user_id, node_id, db) live = _registry().get_node(node_id) if not live: return {"node_id": node_id, "status": "offline", "terminal": []} @@ -289,11 +332,17 @@ } @router.post("/{node_id}/dispatch", response_model=schemas.NodeDispatchResponse, summary="Dispatch Task to Node") - def dispatch_to_node(node_id: str, request: schemas.NodeDispatchRequest): + def dispatch_to_node( + node_id: str, + request: schemas.NodeDispatchRequest, + user_id: str = Query(...), + db: Session = Depends(get_db) + ): """ Queue a shell or browser task to an online node. Emits task_assigned immediately so the live UI shows it. """ + _require_node_access(user_id, node_id, db) registry = _registry() live = registry.get_node(node_id) if not live: @@ -328,11 +377,17 @@ return schemas.NodeDispatchResponse(task_id=task_id, status="accepted") @router.post("/{node_id}/cancel", summary="Cancel/Interrupt Task on Node") - def cancel_on_node(node_id: str, task_id: str = ""): + def cancel_on_node( + node_id: str, + task_id: str = "", + user_id: str = Header(..., alias="X-User-ID"), + db: Session = Depends(get_db) + ): """ Sends a TaskCancelRequest to the specified node. For shell skills, this typically translates to SIGINT (Ctrl+C). """ + _require_node_access(user_id, node_id, db) registry = _registry() live = registry.get_node(node_id) if not live: @@ -358,9 +413,10 @@ user = db.query(models.User).filter(models.User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found.") - existing_prefs = user.preferences or {} - existing_prefs["nodes"] = prefs.model_dump() - user.preferences = existing_prefs + # Create a new dictionary to ensure SQLAlchemy detects the change to the JSON column + current_prefs = dict(user.preferences or {}) + current_prefs["nodes"] = prefs.model_dump() + user.preferences = current_prefs db.commit() return {"message": "Node preferences saved.", "nodes": prefs.model_dump()} @@ -568,8 +624,8 @@ # Create ZIP in memory zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file: - # 1. Add Agent Node source files from /app/agent-node-source - source_dir = "/app/agent-node-source" + # 1. Add Agent Node source files from /app/agent-node + source_dir = "/app/agent-node" if os.path.exists(source_dir): for root, dirs, files in os.walk(source_dir): # Exclude unwanted directories @@ -662,11 +718,25 @@ # ================================================================== @router.websocket("/{node_id}/stream") - async def node_event_stream(websocket: WebSocket, node_id: str): + async def node_event_stream( + websocket: WebSocket, + node_id: str, + user_id: str = Query(...) + ): """ Single-node live event stream with Full-Duplex communication. Provides gaming-fast terminal polling, sending commands inbound over the same WS connection! """ + from app.db.session import get_db_session + with get_db_session() as db: + try: + _require_node_access(user_id, node_id, db) + except HTTPException as e: + await websocket.accept() + await websocket.send_json({"event": "error", "message": e.detail}) + await websocket.close(code=4003) + return + await websocket.accept() registry = _registry() @@ -776,6 +846,30 @@ Powers the split-window multi-pane execution UI. High-performance edition: streams events with millisecond latency. """ + from app.db.session import get_db_session + + # 1. Identify accessible nodes for this user based on group policy + accessible_ids = [] + with get_db_session() as db: + user = db.query(models.User).filter(models.User.id == user_id).first() + if not user: + logger.warning(f"[📶] User {user_id} not found for global stream.") + return + + # Nodes accessible via user's group + accesses = db.query(models.NodeGroupAccess).filter( + models.NodeGroupAccess.group_id == user.group_id + ).all() + accessible_ids = [a.node_id for a in accesses] + + # Nodes in group policy + if user.group and user.group.policy: + policy_nodes = user.group.policy.get("nodes", []) + if isinstance(policy_nodes, list): + accessible_ids.extend(policy_nodes) + + accessible_ids = list(set(accessible_ids)) + try: await websocket.accept() except Exception as e: @@ -783,13 +877,12 @@ return registry = _registry() - logger.info(f"[📶] Multi-node stream connected for user={user_id}") + logger.info(f"[📶] Multi-node stream connected for user={user_id}. Accessible nodes: {len(accessible_ids)}") try: - # 1. Send initial snapshot immediately + # 2. Send initial snapshot of only accessible live nodes try: - all_live = registry.list_nodes(user_id=user_id) - logger.info(f"[📶] Sending initial snapshot for user={user_id} with {len(all_live)} nodes") + all_live = [registry.get_node(nid) for nid in accessible_ids if registry.get_node(nid)] snapshot_data = { "event": "initial_snapshot", "user_id": user_id, @@ -797,26 +890,26 @@ "data": {"nodes": [n.to_dict() for n in all_live], "count": len(all_live)}, } await websocket.send_json(snapshot_data) - logger.info(f"[📶] Initial snapshot sent successfully for user={user_id}") except Exception as e: logger.error(f"[📶] Failed to send initial snapshot for user={user_id}: {e}", exc_info=True) - await websocket.close(code=1011) # Internal Error + await websocket.close(code=1011) return q: queue.Queue = queue.Queue() - registry.subscribe_user(user_id, q) + # Subscribe to each accessible node individually + for nid in accessible_ids: + registry.subscribe_node(nid, q) async def send_events(): import time last_heartbeat = 0 try: while True: - # Drain all events from queue and send await _drain(q, websocket) now = time.time() if now - last_heartbeat > HEARTBEAT_INTERVAL_S: - live_nodes = registry.list_nodes(user_id=user_id) + live_nodes = [registry.get_node(nid) for nid in accessible_ids if registry.get_node(nid)] await websocket.send_json({ "event": "mesh_heartbeat", "user_id": user_id, @@ -828,7 +921,6 @@ }) last_heartbeat = now - # High-frequency polling (20Hz) for gaming-fast UI updates await asyncio.sleep(0.05) except WebSocketDisconnect: logger.info(f"[📶] Sender disconnected for user={user_id}") @@ -836,10 +928,8 @@ logger.error(f"[nodes/stream/all_sender] CRASH for user={user_id}: {e}", exc_info=True) async def receive_events(): - """Keep connection alive and handle client-initiated pings/close.""" try: while True: - # Consume client messages to prevent buffer bloat data = await websocket.receive_json() if data.get("action") == "ping": await websocket.send_json({ @@ -849,11 +939,10 @@ "client_ts": data.get("ts") }) except WebSocketDisconnect: - logger.info(f"[📶] Receiver disconnected for user={user_id}") + pass except Exception as e: logger.error(f"[nodes/stream/all_receiver] CRASH for user={user_id}: {e}", exc_info=True) - # Run sender and receiver concurrently sender_task = asyncio.create_task(send_events()) receiver_task = asyncio.create_task(receive_events()) @@ -868,23 +957,30 @@ sender_task.cancel() receiver_task.cancel() finally: - registry.unsubscribe_user(user_id, q) + for nid in accessible_ids: + registry.unsubscribe_node(nid, q) logger.info(f"[📶] Multi-node stream disconnected for user={user_id}") except Exception as e: logger.error(f"[nodes/stream/all] Error in stream handler for user={user_id}: {e}", exc_info=True) - # Socket will be closed by FastAPI on uncaught exception if not already closed # ================================================================== # FS EXPLORER ENDPOINTS (Modular Navigator) # ================================================================== @router.get("/{node_id}/fs/ls", response_model=schemas.DirectoryListing, summary="List Directory Content") - def fs_ls(node_id: str, path: str = ".", session_id: str = "__fs_explorer__"): + def fs_ls( + node_id: str, + path: str = ".", + session_id: str = "__fs_explorer__", + user_id: str = Header(..., alias="X-User-ID"), + db: Session = Depends(get_db) + ): """ Request a directory listing from a node. Returns a tree-structured list for the File Navigator. """ + _require_node_access(user_id, node_id, db) try: # Defensive check for orchestrator service injection try: @@ -921,10 +1017,17 @@ raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") @router.get("/{node_id}/fs/cat", summary="Read File Content") - def fs_cat(node_id: str, path: str, session_id: str = "__fs_explorer__"): + def fs_cat( + node_id: str, + path: str, + session_id: str = "__fs_explorer__", + user_id: str = Header(..., alias="X-User-ID"), + db: Session = Depends(get_db) + ): """ Read the content of a file on a remote node. """ + _require_node_access(user_id, node_id, db) try: orchestrator = services.orchestrator res = orchestrator.assistant.cat(node_id, path, session_id=session_id) @@ -940,10 +1043,16 @@ raise HTTPException(status_code=500, detail=str(e)) @router.post("/{node_id}/fs/touch", summary="Create File or Directory") - def fs_touch(node_id: str, req: schemas.FileWriteRequest): + def fs_touch( + node_id: str, + req: schemas.FileWriteRequest, + user_id: str = Header(..., alias="X-User-ID"), + db: Session = Depends(get_db) + ): """ Create a new file or directory on the node. """ + _require_node_access(user_id, node_id, db) try: orchestrator = services.orchestrator res = orchestrator.assistant.write( @@ -965,10 +1074,16 @@ raise HTTPException(status_code=500, detail=str(e)) @router.post("/{node_id}/fs/rm", summary="Delete File/Directory") - def fs_rm(node_id: str, req: schemas.FileDeleteRequest): + def fs_rm( + node_id: str, + req: schemas.FileDeleteRequest, + user_id: str = Header(..., alias="X-User-ID"), + db: Session = Depends(get_db) + ): """ Delete a file or directory from a remote node. """ + _require_node_access(user_id, node_id, db) try: orchestrator = services.orchestrator res = orchestrator.assistant.rm(node_id, req.path, session_id=req.session_id) @@ -989,9 +1104,13 @@ def _generate_node_config_yaml(node: models.AgentNode) -> str: """Helper to generate the agent_config.yaml content.""" hub_url = os.getenv("HUB_PUBLIC_URL", "https://ai.jerxie.com") - hub_grpc = os.getenv("HUB_GRPC_ENDPOINT", "ai.jerxie.com:50051") + hub_grpc = os.getenv("HUB_GRPC_ENDPOINT", "ai.jerxie.com:443") secret_key = os.getenv("SECRET_KEY", "dev-secret-key-1337") + # Clean up gRPC endpoint - if it ends in :443, remove it as it's implied by TLS + if hub_grpc.endswith(":443"): + hub_grpc = hub_grpc[:-4] + skill_cfg = node.skill_config or {} if isinstance(skill_cfg, str): try: @@ -1035,6 +1154,9 @@ "# Workspace sync root — override if needed", "sync_root: \"/tmp/cortex-workspace\"", "", + "# FS Explorer root — defaults to user home if not specified here", + "# fs_root: \"/User/username/Documents\"", + "", "# TLS — set to false only in dev", "tls: true", ] diff --git a/ai-hub/app/api/routes/skills.py b/ai-hub/app/api/routes/skills.py index f088c2d..a6aa2a6 100644 --- a/ai-hub/app/api/routes/skills.py +++ b/ai-hub/app/api/routes/skills.py @@ -94,6 +94,10 @@ db_skill = db.query(models.Skill).filter(models.Skill.id == skill_id).first() if not db_skill: raise HTTPException(status_code=404, detail="Skill not found") + + # Block modification of system skills + if db_skill.is_system: + raise HTTPException(status_code=403, detail="System skills cannot be modified.") if db_skill.owner_id != current_user.id and current_user.role != 'admin': raise HTTPException(status_code=403, detail="Not authorized to update this skill") @@ -123,6 +127,10 @@ if not db_skill: raise HTTPException(status_code=404, detail="Skill not found") + # Block deletion of system skills + if db_skill.is_system: + raise HTTPException(status_code=403, detail="System skills cannot be deleted.") + if db_skill.owner_id != current_user.id and current_user.role != 'admin': raise HTTPException(status_code=403, detail="Not authorized to delete this skill") diff --git a/ai-hub/app/api/routes/user.py b/ai-hub/app/api/routes/user.py index b001d5f..1dea9c6 100644 --- a/ai-hub/app/api/routes/user.py +++ b/ai-hub/app/api/routes/user.py @@ -464,12 +464,15 @@ if prefs.tts: resolve_clone_from("tts", prefs.tts) if prefs.stt: resolve_clone_from("stt", prefs.stt) - user.preferences = { + # Preserve other keys like 'nodes' + current_prefs = dict(user.preferences or {}) + current_prefs.update({ "llm": prefs.llm, "tts": prefs.tts, "stt": prefs.stt, "statuses": prefs.statuses or {} - } + }) + user.preferences = current_prefs # --- Enterprise RBAC Sync --- # ONLY admins can sync to Global Settings and persist to config.yaml diff --git a/ai-hub/app/core/orchestration/architect.py b/ai-hub/app/core/orchestration/architect.py index 3ae31c4..f5f03dd 100644 --- a/ai-hub/app/core/orchestration/architect.py +++ b/ai-hub/app/core/orchestration/architect.py @@ -41,6 +41,13 @@ db=db, user_id=user_id, prompt_service=prompt_service, prompt_slug=prompt_slug, tools=tools ) + + # DEBUG: Log the total prompt size to detect bloated contexts + total_chars = sum(len(m.get("content", "") or "") for m in messages) + logging.info(f"[Architect] Starting autonomous loop (Turn 1). Prompt Size: {total_chars} chars across {len(messages)} messages.") + for i, m in enumerate(messages): + content = m.get("content", "") or "" + logging.info(f"[Architect] Msg {i} ({m['role']}): {content[:500]}...") # 2. Setup Mesh Observation mesh_bridge = queue.Queue() @@ -86,7 +93,12 @@ accumulated_reasoning = "" tool_calls_map = {} + chunk_count = 0 async for chunk in prediction: + chunk_count += 1 + if chunk_count == 1: + logging.info(f"[Architect] First chunk received after {time.time() - turn_start_time:.2f}s") + if not chunk.choices: continue delta = chunk.choices[0].delta @@ -221,7 +233,7 @@ kwargs["tools"] = tools kwargs["tool_choice"] = "auto" try: - return await llm_provider.acompletion(messages=messages, **kwargs) + return await llm_provider.acompletion(messages=messages, timeout=60, **kwargs) except Exception as e: logging.error(f"[Architect] LLM Exception: {e}") return None diff --git a/ai-hub/app/core/services/node_registry.py b/ai-hub/app/core/services/node_registry.py index 3606e70..0054d64 100644 --- a/ai-hub/app/core/services/node_registry.py +++ b/ai-hub/app/core/services/node_registry.py @@ -322,7 +322,10 @@ if not is_tty_char: node.terminal_history.append(f"$ {cmd}\n") elif event_type == "task_stdout" and isinstance(data, str): - node.terminal_history.append(data) + # NEW: Strip ANSI codes for AI readability + ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') + clean_output = ansi_escape.sub('', data) + node.terminal_history.append(clean_output) elif event_type == "skill_event" and isinstance(data, dict): if data.get("type") == "output": output_data = data.get("data", "") diff --git a/ai-hub/app/core/services/rag.py b/ai-hub/app/core/services/rag.py index e81c465..b7c01b1 100644 --- a/ai-hub/app/core/services/rag.py +++ b/ai-hub/app/core/services/rag.py @@ -1,4 +1,8 @@ +import logging +import re from typing import List, Tuple, Optional + +logger = logging.getLogger(__name__) from sqlalchemy.orm import Session, joinedload from app.db import models @@ -127,7 +131,7 @@ shell_config = (node.skill_config or {}).get("shell", {}) if shell_config.get("enabled"): - sandbox = shell_config.get("sandbox", {}) + sandbox = shell_config.get("sandbox") or {} mode = sandbox.get("mode", "PERMISSIVE") allowed = sandbox.get("allowed_commands", []) denied = sandbox.get("denied_commands", []) @@ -149,13 +153,27 @@ if registry: live = registry.get_node(node.node_id) if live and live.terminal_history: - history = live.terminal_history[-40:] + # Grab recent chunks and join + history_blob = "".join(live.terminal_history[-40:]) + + # Extreme Sanity Check: Strip ANSI again just in case, and limit total size + ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') + clean_history = ansi_escape.sub('', history_blob) + + # Limit to 2000 chars to avoid bloating the context / breaking LLMs + if len(clean_history) > 2000: + clean_history = "...[truncated]...\n" + clean_history[-2000:] + mesh_context += " Recent Terminal Output:\n" mesh_context += " ```\n" - for line in history: mesh_context += f" {line}" - if not history[-1].endswith('\n'): mesh_context += "\n" + mesh_context += f" {clean_history}" + if not clean_history.endswith('\n'): mesh_context += "\n" mesh_context += " ```\n" mesh_context += "\n" + + logger.info(f"[RAG] Mesh Context gathered. Length: {len(mesh_context)} chars.") + if mesh_context: + logger.info(f"[RAG] Mesh Context excerpt: {mesh_context[:200]}...") # Accumulators for the DB save at the end full_answer = "" diff --git a/ui/client-app/src/pages/NodesPage.js b/ui/client-app/src/pages/NodesPage.js index 7e782c5..7caf16a 100644 --- a/ui/client-app/src/pages/NodesPage.js +++ b/ui/client-app/src/pages/NodesPage.js @@ -518,13 +518,15 @@
* Skills are defined via the Node Manifest in the bundle.
No agent nodes accessible to your group.
- ) : ( - accessibleNodes.map(node => { + {accessibleNodes.length > 0 && ( ++ Determines where the agent should look for files on the node when starting a chat. +
- Determines where the agent should look for files on the node when starting a chat. -
Customize your individual experience
+Auto-attach these nodes to new sessions:
+No agent nodes are currently assigned to your group.
++ Determines where the agent should look for files on the node when starting a chat. +
+