diff --git a/ai-hub/app/api/routes/nodes.py b/ai-hub/app/api/routes/nodes.py index 6fe2319..b537607 100644 --- a/ai-hub/app/api/routes/nodes.py +++ b/ai-hub/app/api/routes/nodes.py @@ -282,6 +282,108 @@ return schemas.UserNodePreferences(**node_prefs) if node_prefs else schemas.UserNodePreferences() # ================================================================== + # M4: Config YAML Download (admin only) + # ================================================================== + + @router.get( + "/admin/{node_id}/config.yaml", + response_model=schemas.NodeConfigYamlResponse, + summary="[Admin] Download Node Config YAML", + ) + def download_node_config_yaml(node_id: str, admin_id: str, db: Session = Depends(get_db)): + """ + Generate and return the agent_config.yaml content an admin downloads + and places alongside the node client software before deployment. + + The YAML contains: + - node_id, hub_url, invite_token (pre-signed) + - skill enable/disable toggles matching the admin's skill_config + + Example usage: + curl -o agent_config.yaml $HUB/nodes/admin/{node_id}/config.yaml?admin_id=... + # Deploy the node software with this file present. + """ + import os + _require_admin(admin_id, db) + node = _get_node_or_404(node_id, db) + + hub_url = os.getenv("HUB_PUBLIC_URL", "https://ai.jerxie.com") + hub_grpc = os.getenv("HUB_GRPC_ENDPOINT", "ai.jerxie.com:50051") + skill_cfg = node.skill_config or {} + + lines = [ + "# Cortex Hub — Agent Node Configuration", + f"# Generated for node '{node_id}' — keep this file secret.", + "", + f"node_id: \"{node_id}\"", + f"node_description: \"{node.display_name}\"", + "", + "# Hub connection", + f"hub_url: \"{hub_url}\"", + f"grpc_endpoint: \"{hub_grpc}\"", + "", + "# Authentication — do NOT share this token", + f"invite_token: \"{node.invite_token}\"", + "", + "# Skill configuration (mirrors admin settings; node respects these at startup)", + "skills:", + ] + for skill, cfg in skill_cfg.items(): + enabled = cfg.get("enabled", True) + lines.append(f" {skill}:") + lines.append(f" enabled: {str(enabled).lower()}") + for k, v in cfg.items(): + if k != "enabled" and v is not None: + lines.append(f" {k}: {v}") + + lines += [ + "", + "# Workspace sync root — override if needed", + "sync_root: \"/tmp/cortex-workspace\"", + "", + "# TLS — set to false only in dev", + "tls: true", + ] + + config_yaml = "\n".join(lines) + return schemas.NodeConfigYamlResponse(node_id=node_id, config_yaml=config_yaml) + + # ================================================================== + # M4: Invite Token Validation (called internally by gRPC server) + # ================================================================== + + @router.post("/validate-token", summary="[Internal] Validate Node Invite Token") + def validate_invite_token(token: str, node_id: str, db: Session = Depends(get_db)): + """ + Internal HTTP endpoint called by the gRPC SyncConfiguration handler + to validate an invite_token before accepting a node connection. + + Returns the node's skill_config (sandbox policy) on success so the + gRPC server can populate the SandboxPolicy response. + + Response: + 200 { valid: true, node_id, skill_config, display_name } + 401 { valid: false, reason } + """ + node = db.query(models.AgentNode).filter( + models.AgentNode.node_id == node_id, + models.AgentNode.invite_token == token, + models.AgentNode.is_active == True, + ).first() + + if not node: + logger.warning(f"[M4] Token validation FAILED for node_id='{node_id}'") + return {"valid": False, "reason": "Invalid token or unknown node."} + + logger.info(f"[M4] Token validated OK for node_id='{node_id}'") + return { + "valid": True, + "node_id": node.node_id, + "display_name": node.display_name, + "skill_config": node.skill_config or {}, + } + + # ================================================================== # WEBSOCKET — Single-node live event stream # ================================================================== diff --git a/ai-hub/app/api/routes/sessions.py b/ai-hub/app/api/routes/sessions.py index 42cb378..6cb2095 100644 --- a/ai-hub/app/api/routes/sessions.py +++ b/ai-hub/app/api/routes/sessions.py @@ -28,10 +28,39 @@ stt_provider_name=request.stt_provider_name, tts_provider_name=request.tts_provider_name ) + + # M3: Auto-attach user's default nodes from preferences + import uuid as _uuid + user = db.query(models.User).filter(models.User.id == request.user_id).first() + if user: + node_prefs = (user.preferences or {}).get("nodes", {}) + default_nodes = node_prefs.get("default_node_ids", []) + if default_nodes: + new_session.sync_workspace_id = f"session-{new_session.id}-{_uuid.uuid4().hex[:8]}" + new_session.attached_node_ids = list(default_nodes) + new_session.node_sync_status = { + nid: {"status": "pending", "last_sync": None} + for nid in default_nodes + } + db.commit() + db.refresh(new_session) + + # Notify live nodes that they've been attached + registry = services.node_registry_service + for nid in default_nodes: + try: + registry.emit(nid, "info", { + "message": f"Auto-attached to session {new_session.id}", + "workspace_id": new_session.sync_workspace_id, + }) + except Exception: + pass + return new_session except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to create session: {e}") + @router.post("/{session_id}/chat", response_model=schemas.ChatResponse, summary="Send a Message in a Session") async def chat_in_session( session_id: int, @@ -228,4 +257,139 @@ except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get audio: {e}") + # ================================================================== + # M3: Session ↔ Node Attachment + # ================================================================== + + @router.post("/{session_id}/nodes", response_model=schemas.SessionNodeStatusResponse, + summary="Attach Nodes to Session") + def attach_nodes_to_session( + session_id: int, + request: schemas.NodeAttachRequest, + db: Session = Depends(get_db) + ): + """ + Attach one or more Agent Nodes to a chat session. + - Generates a stable sync_workspace_id on first attachment (used as Ghost Mirror session_id). + - Records per-node sync status initialized to 'pending'. + - Emits sync_push events via the NodeRegistryService event bus. + """ + import uuid + from datetime import datetime + + session = db.query(models.Session).filter( + models.Session.id == session_id, + models.Session.is_archived == False + ).first() + if not session: + raise HTTPException(status_code=404, detail="Session not found.") + + # Generate stable workspace ID on first attachment + if not session.sync_workspace_id: + session.sync_workspace_id = f"session-{session_id}-{uuid.uuid4().hex[:8]}" + + # Merge node list (avoid duplicates) + current_nodes = list(session.attached_node_ids or []) + new_nodes = [nid for nid in request.node_ids if nid not in current_nodes] + current_nodes.extend(new_nodes) + session.attached_node_ids = current_nodes + + # Initialize sync status for new nodes + sync_status = dict(session.node_sync_status or {}) + now_iso = datetime.utcnow().isoformat() + for nid in new_nodes: + sync_status[nid] = {"status": "pending", "last_sync": None} + + # Emit event to live UI stream + try: + services.node_registry_service.emit( + nid, "info", + {"message": f"Attached to session {session_id}", "workspace_id": session.sync_workspace_id}, + ) + except Exception: + pass + + session.node_sync_status = sync_status + db.commit() + db.refresh(session) + + return schemas.SessionNodeStatusResponse( + session_id=session_id, + sync_workspace_id=session.sync_workspace_id, + nodes=[ + schemas.NodeSyncStatusEntry( + node_id=nid, + status=sync_status.get(nid, {}).get("status", "pending"), + last_sync=sync_status.get(nid, {}).get("last_sync"), + ) + for nid in session.attached_node_ids + ] + ) + + @router.delete("/{session_id}/nodes/{node_id}", summary="Detach Node from Session") + def detach_node_from_session( + session_id: int, + node_id: str, + db: Session = Depends(get_db) + ): + session = db.query(models.Session).filter( + models.Session.id == session_id, + models.Session.is_archived == False + ).first() + if not session: + raise HTTPException(status_code=404, detail="Session not found.") + + nodes = list(session.attached_node_ids or []) + if node_id not in nodes: + raise HTTPException(status_code=404, detail=f"Node '{node_id}' not attached to this session.") + + nodes.remove(node_id) + session.attached_node_ids = nodes + + status = dict(session.node_sync_status or {}) + status.pop(node_id, None) + session.node_sync_status = status + + db.commit() + return {"message": f"Node '{node_id}' detached from session {session_id}."} + + @router.get("/{session_id}/nodes", response_model=schemas.SessionNodeStatusResponse, + summary="Get Session Node Status") + def get_session_nodes(session_id: int, db: Session = Depends(get_db)): + """ + Returns all nodes attached to a session and their current sync status. + Merges persisted sync_status with live connection state from the registry. + """ + session = db.query(models.Session).filter( + models.Session.id == session_id, + models.Session.is_archived == False + ).first() + if not session: + raise HTTPException(status_code=404, detail="Session not found.") + + registry = services.node_registry_service + sync_status = session.node_sync_status or {} + entries = [] + for nid in (session.attached_node_ids or []): + live = registry.get_node(nid) + persisted = sync_status.get(nid, {}) + # If node is live and was previously pending, show as 'connected' + if live and persisted.get("status") == "pending": + status_val = "connected" + else: + status_val = persisted.get("status", "pending") + + entries.append(schemas.NodeSyncStatusEntry( + node_id=nid, + status=status_val, + last_sync=persisted.get("last_sync"), + error=persisted.get("error"), + )) + + return schemas.SessionNodeStatusResponse( + session_id=session_id, + sync_workspace_id=session.sync_workspace_id, + nodes=entries, + ) + return router \ No newline at end of file diff --git a/ai-hub/app/api/schemas.py b/ai-hub/app/api/schemas.py index beee122..e6df482 100644 --- a/ai-hub/app/api/schemas.py +++ b/ai-hub/app/api/schemas.py @@ -144,14 +144,44 @@ """Defines the shape of a session object returned by the API.""" id: int user_id: str - title: str - provider_name: str + title: Optional[str] = None + provider_name: Optional[str] = None stt_provider_name: Optional[str] = None tts_provider_name: Optional[str] = None feature_name: str created_at: datetime + # M3: Node attachment + sync_workspace_id: Optional[str] = None + attached_node_ids: List[str] = [] + node_sync_status: dict = {} model_config = ConfigDict(from_attributes=True) +# --- M3: Session Node Attachment Schemas --- + +class NodeAttachRequest(BaseModel): + """Attach one or more nodes to a session.""" + node_ids: List[str] + +class NodeSyncStatusEntry(BaseModel): + """Per-node sync status within a session.""" + node_id: str + status: str # 'pending' | 'syncing' | 'synced' | 'error' + last_sync: Optional[str] = None + error: Optional[str] = None + +class SessionNodeStatusResponse(BaseModel): + """Response showing all attached nodes and their sync state.""" + session_id: int + sync_workspace_id: Optional[str] = None + nodes: List[NodeSyncStatusEntry] = [] + +# --- M4: Node Config YAML --- + +class NodeConfigYamlResponse(BaseModel): + """The generated config YAML content an admin downloads to set up a node.""" + node_id: str + config_yaml: str # Full YAML string ready to save as agent_config.yaml + class Message(BaseModel): """Defines the shape of a single message within a session's history.""" id: int diff --git a/ai-hub/app/db/migrate.py b/ai-hub/app/db/migrate.py index 696524a..63e2e33 100644 --- a/ai-hub/app/db/migrate.py +++ b/ai-hub/app/db/migrate.py @@ -42,8 +42,12 @@ # Session table migrations session_columns = [c["name"] for c in inspector.get_columns("sessions")] session_required_columns = [ - ("stt_provider_name", "TEXT"), - ("tts_provider_name", "TEXT") + ("stt_provider_name", "TEXT"), + ("tts_provider_name", "TEXT"), + # M3: Agent Node attachment + ("sync_workspace_id", "TEXT"), + ("attached_node_ids", "TEXT"), + ("node_sync_status", "TEXT"), ] for col_name, col_type in session_required_columns: if col_name not in session_columns: @@ -57,6 +61,7 @@ else: logger.info(f"Column '{col_name}' already exists in 'sessions'.") + # Agent Nodes table migrations if inspector.has_table("agent_nodes"): node_columns = [c["name"] for c in inspector.get_columns("agent_nodes")] diff --git a/ai-hub/app/db/models.py b/ai-hub/app/db/models.py index 9626f8e..294d38b 100644 --- a/ai-hub/app/db/models.py +++ b/ai-hub/app/db/models.py @@ -73,14 +73,14 @@ SQLAlchemy model for the 'sessions' table. Each session represents a single conversation between a user and the AI. - It links a user to a series of messages. + It links a user to a series of messages, and optionally to Agent Nodes + for workspace synchronization and task execution. """ __tablename__ = 'sessions' # Primary key for the session. id = Column(Integer, primary_key=True, index=True) # The ID of the user who owns this session. - # We add the ForeignKey to establish the link to the 'users' table. user_id = Column(String, ForeignKey('users.id'), index=True, nullable=False) # A title for the conversation, which can be generated by the AI. title = Column(String, index=True, nullable=True) @@ -96,20 +96,19 @@ # Flag to indicate if the session has been archived or soft-deleted. is_archived = Column(Boolean, default=False, nullable=False) - # Defines a one-to-many relationship with the Message table. - # 'back_populates' tells SQLAlchemy that there's a corresponding relationship - # on the other side. 'cascade' ensures that when a session is deleted, - # all its associated messages are also deleted. + # --- Agent Node Integration (M3) --- + # Stable workspace ID used as Ghost Mirror session_id across all attached nodes. + # Set on first node attachment; never changes for the lifetime of the session. + sync_workspace_id = Column(String, nullable=True, index=True) + # JSON list of attached node_ids: ["node-alpha", "node-beta"] + attached_node_ids = Column(JSON, default=[], nullable=True) + # Per-node sync status snapshot: {"node-alpha": {"status": "synced", "last_sync": "..."}} + node_sync_status = Column(JSON, default={}, nullable=True) + messages = relationship("Message", back_populates="session", cascade="all, delete-orphan") - - # Defines a many-to-one relationship back to the User table. - # This allows us to access the parent User object from a Session object. user = relationship("User", back_populates="sessions") def __repr__(self): - """ - Provides a helpful string representation of the object for debugging. - """ return f"" diff --git a/poc-grpc-agent/orchestrator/services/grpc_server.py b/poc-grpc-agent/orchestrator/services/grpc_server.py index 923684e..de92314 100644 --- a/poc-grpc-agent/orchestrator/services/grpc_server.py +++ b/poc-grpc-agent/orchestrator/services/grpc_server.py @@ -1,6 +1,11 @@ import threading import queue import time +import os +try: + import requests as _requests # optional; only needed for M4 token validation +except ImportError: + _requests = None from protos import agent_pb2, agent_pb2_grpc from orchestrator.core.registry import MemoryNodeRegistry from orchestrator.core.journal import TaskJournal @@ -9,6 +14,12 @@ from orchestrator.services.assistant import TaskAssistant from orchestrator.utils.crypto import sign_payload +# M4: Hub HTTP API for invite-token validation +# Calls POST /nodes/validate-token before accepting any SyncConfiguration. +# Set HUB_API_URL=http://localhost:8000 (or 0 to skip validation in dev mode). +HUB_API_URL = os.getenv("HUB_API_URL", "") # empty = skip validation (dev) +HUB_API_PATH = "/nodes/validate-token" + class AgentOrchestrator(agent_pb2_grpc.AgentOrchestratorServicer): """Refactored gRPC Servicer for Agent Orchestration.""" def __init__(self): @@ -51,19 +62,55 @@ )) def SyncConfiguration(self, request, context): - """Standard Handshake: Authenticate and Send Policy.""" - # Pre-registration for metadata search + """M4 Authenticated Handshake: Validate invite_token, then send policy.""" + node_id = request.node_id + invite_token = request.auth_token # field in RegistrationRequest proto + + # --- M4: Token validation via Hub API --- + if HUB_API_URL and _requests: + try: + resp = _requests.post( + f"{HUB_API_URL}{HUB_API_PATH}", + params={"node_id": node_id, "token": invite_token}, + timeout=5, + ) + payload = resp.json() + if not payload.get("valid"): + reason = payload.get("reason", "Token rejected") + print(f"[🔒] SyncConfiguration REJECTED {node_id}: {reason}") + return agent_pb2.RegistrationResponse( + success=False, + message=reason, + ) + skill_cfg = payload.get("skill_config", {}) + print(f"[🔑] Token validated for {node_id} (display: {payload.get('display_name')})") + except Exception as e: + # If Hub is unreachable in dev, fall through with a warning + print(f"[⚠️] Hub token validation unavailable ({e}); proceeding without auth.") + skill_cfg = {} + else: + # Dev mode: skip validation + skill_cfg = {} + print(f"[⚠️] HUB_API_URL not set — skipping invite_token validation for {node_id}") + + # Build allowed_commands from skill_config (shell skill) + shell_cfg = skill_cfg.get("shell", {}) + if shell_cfg.get("enabled", True): + allowed_commands = ["ls", "cat", "echo", "pwd", "uname", "curl", "python3", "git"] + else: + allowed_commands = [] # Shell disabled by admin + + # Register the node in the local in-memory registry self.registry.register(request.node_id, queue.Queue(), { - "desc": request.node_description, - "caps": dict(request.capabilities) + "desc": request.node_description, + "caps": dict(request.capabilities), }) - - # 12-Factor Sandbox Policy (Standardized Mode) + return agent_pb2.RegistrationResponse( - success=True, + success=True, policy=agent_pb2.SandboxPolicy( - mode=agent_pb2.SandboxPolicy.STRICT, - allowed_commands=["ls", "uname", "echo", "sleep"] + mode=agent_pb2.SandboxPolicy.STRICT, + allowed_commands=allowed_commands, ) )