diff --git a/ai-hub/Dockerfile b/ai-hub/Dockerfile index 049400f..2def01f 100644 --- a/ai-hub/Dockerfile +++ b/ai-hub/Dockerfile @@ -17,6 +17,7 @@ # 5. Expose the port the app runs on EXPOSE 8000 +EXPOSE 50051 # 6. Define the command to run the application # --host 0.0.0.0 makes the server accessible from outside the container diff --git a/ai-hub/app/api/routes/nodes.py b/ai-hub/app/api/routes/nodes.py index b537607..bac3b15 100644 --- a/ai-hub/app/api/routes/nodes.py +++ b/ai-hub/app/api/routes/nodes.py @@ -26,9 +26,14 @@ import uuid import secrets import logging +import io +import zipfile +import os from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends +from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session + from app.api.dependencies import ServiceContainer, get_db from app.api import schemas from app.db import models @@ -229,17 +234,18 @@ if not live: raise HTTPException(status_code=503, detail=f"Node '{node_id}' is not connected.") - task_id = str(uuid.uuid4()) + task_id = request.task_id or str(uuid.uuid4()) + + # M6: Use the integrated Protobufs & Crypto from app/core/grpc + from app.protos import agent_pb2 + from app.core.grpc.utils.crypto import sign_payload + + payload = request.command or json.dumps(request.browser_action or {}) registry.emit(node_id, "task_assigned", {"command": request.command, "session_id": request.session_id}, task_id=task_id) try: - import sys - sys.path.insert(0, "/app/poc-grpc-agent") - from protos import agent_pb2 - from orchestrator.utils.crypto import sign_payload - payload = request.command or json.dumps(request.browser_action) task_req = agent_pb2.TaskRequest( task_id=task_id, payload_json=payload, @@ -247,10 +253,12 @@ timeout_ms=request.timeout_ms, session_id=request.session_id or "", ) + # Push directly to the node's live gRPC outbound queue live.queue.put(agent_pb2.ServerTaskMessage(task_request=task_req)) registry.emit(node_id, "task_start", {"command": request.command}, task_id=task_id) - except ImportError: - logger.warning("[nodes] poc-grpc-agent not on path; dispatch is stub only.") + except Exception as e: + logger.error(f"[nodes/dispatch] Failed to put task onto queue for {node_id}: {e}") + raise HTTPException(status_code=500, detail="Internal Dispatch Error") return schemas.NodeDispatchResponse(task_id=task_id, status="accepted") @@ -348,6 +356,72 @@ config_yaml = "\n".join(lines) return schemas.NodeConfigYamlResponse(node_id=node_id, config_yaml=config_yaml) + @router.get( + "/admin/{node_id}/download", + summary="[Admin] Download Pre-configured Agent Node Bundle", + ) + def admin_download_node_bundle(node_id: str, admin_id: str, db: Session = Depends(get_db)): + """ + Bundle everything needed to run an Agent Node into a single ZIP: + - agent_node (source) + - protos (schemas) + - shared_core (ignore rules) + - requirements.txt + - agent_config.yaml (pre-signed invite token) + + Admins download this, unzip on their target machine, and run: + pip install -r requirements.txt + python3 -m agent_node.main + """ + _require_admin(admin_id, db) + node = _get_node_or_404(node_id, db) + + # 1. Generate the same config YAML as the other endpoint + # (Internal DRY: calling the helper logic) + config_resp = download_node_config_yaml(node_id, admin_id, db) + config_yaml = config_resp.config_yaml + + # 2. Build the ZIP in-memory + buf = io.BytesIO() + source_root = "/app/poc-grpc-agent" + + with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zip_file: + # Helper to add a directory + def add_dir(dir_name): + path = os.path.join(source_root, dir_name) + for root, dirs, files in os.walk(path): + for file in files: + if "__pycache__" in root: continue + abs_file = os.path.join(root, file) + rel_path = os.path.relpath(abs_file, source_root) + zip_file.write(abs_file, rel_path) + + # Add source folders + add_dir("agent_node") + add_dir("protos") + add_dir("shared_core") + + # Add requirements + req_path = os.path.join(source_root, "requirements.txt") + if os.path.exists(req_path): + zip_file.write(req_path, "requirements.txt") + + # Add the CUSTOM config YAML + zip_file.writestr("agent_config.yaml", config_yaml) + + # Add a README for quick start + readme = f"# Cortex Agent Node: {node_id}\n\n1. Install deps: pip install -r requirements.txt\n2. Run node: python3 -m agent_node.main\n" + zip_file.writestr("README.md", readme) + + buf.seek(0) + filename = f"cortex-node-{node_id}.zip" + return StreamingResponse( + buf, + media_type="application/zip", + headers={"Content-Disposition": f"attachment; filename={filename}"} + ) + + # ================================================================== # M4: Invite Token Validation (called internally by gRPC server) # ================================================================== @@ -380,6 +454,7 @@ "valid": True, "node_id": node.node_id, "display_name": node.display_name, + "user_id": node.user_id, "skill_config": node.skill_config or {}, } diff --git a/ai-hub/app/api/schemas.py b/ai-hub/app/api/schemas.py index e6df482..421fba8 100644 --- a/ai-hub/app/api/schemas.py +++ b/ai-hub/app/api/schemas.py @@ -352,6 +352,7 @@ class NodeDispatchRequest(BaseModel): """Dispatch a shell or browser action to a specific node.""" + task_id: Optional[str] = None # NEW: Support client-side generated task IDs command: str = "" browser_action: Optional[dict] = None session_id: Optional[str] = None diff --git a/ai-hub/app/app.py b/ai-hub/app/app.py index 3d2c7eb..2e2ee06 100644 --- a/ai-hub/app/app.py +++ b/ai-hub/app/app.py @@ -42,8 +42,22 @@ print_config(settings) create_db_and_tables() run_migrations() + + # --- Start gRPC Orchestrator (M6) --- + try: + from app.core.grpc.services.grpc_server import serve_grpc + registry = app.state.services.node_registry_service + app.state.grpc_server = serve_grpc(registry, port=50051) + logger.info("[M6] Agent Orchestrator gRPC server started on port 50051.") + except Exception as e: + logger.error(f"[M6] Failed to start gRPC server: {e}") + yield print("Application shutdown...") + # --- Stop gRPC Orchestrator --- + if hasattr(app.state, 'grpc_server'): + logger.info("[M6] Stopping gRPC server...") + app.state.grpc_server.stop(0) # Access the vector_store from the application state to save it if hasattr(app.state, 'vector_store'): app.state.vector_store.save_index() @@ -130,6 +144,8 @@ services.with_service("tool_service", service=ToolService()) services.with_service("node_registry_service", service=NodeRegistryService()) + app.state.services = services + # Create and include the API router, injecting the service api_router = create_api_router(services=services) app.include_router(api_router) diff --git a/ai-hub/app/core/grpc/core/journal.py b/ai-hub/app/core/grpc/core/journal.py new file mode 100644 index 0000000..b223f2f --- /dev/null +++ b/ai-hub/app/core/grpc/core/journal.py @@ -0,0 +1,34 @@ +import threading + +class TaskJournal: + """State machine for tracking tasks through their asynchronous lifecycle.""" + def __init__(self): + self.lock = threading.Lock() + self.tasks = {} # task_id -> { "event": Event, "result": None, "node_id": str } + + def register(self, task_id, node_id=None): + """Initializes state for a new task and returns its notification event.""" + event = threading.Event() + with self.lock: + self.tasks[task_id] = {"event": event, "result": None, "node_id": node_id} + return event + + def fulfill(self, task_id, result): + """Processes a result from a node and triggers the waiting thread.""" + with self.lock: + if task_id in self.tasks: + self.tasks[task_id]["result"] = result + self.tasks[task_id]["event"].set() + return True + return False + + def get_result(self, task_id): + """Returns the result associated with the given task ID.""" + with self.lock: + data = self.tasks.get(task_id) + return data["result"] if data else None + + def pop(self, task_id): + """Removes the task's state from the journal.""" + with self.lock: + return self.tasks.pop(task_id, None) diff --git a/ai-hub/app/core/grpc/core/mirror.py b/ai-hub/app/core/grpc/core/mirror.py new file mode 100644 index 0000000..6054d62 --- /dev/null +++ b/ai-hub/app/core/grpc/core/mirror.py @@ -0,0 +1,81 @@ +import os +import shutil +import hashlib +from typing import Dict, List +from app.core.grpc.shared_core.ignore import CortexIgnore +from app.protos import agent_pb2 + +class GhostMirrorManager: + """Manages local server-side copies of node workspaces.""" + def __init__(self, storage_root="/app/data/mirrors"): + self.storage_root = storage_root + if not os.path.exists(self.storage_root): + os.makedirs(self.storage_root, exist_ok=True) + + def get_ignore_filter(self, session_id: str) -> CortexIgnore: + """Returns a CortexIgnore instance for a session.""" + return CortexIgnore(self.get_workspace_path(session_id)) + + def get_workspace_path(self, session_id: str) -> str: + """Returns the local absolute path for a session's mirror.""" + path = os.path.join(self.storage_root, session_id) + os.makedirs(path, exist_ok=True) + return path + + def write_file_chunk(self, session_id: str, file_payload: agent_pb2.FilePayload): + """Writes a chunk of data to the local mirror.""" + workspace = self.get_workspace_path(session_id) + + # Phase 3 ignore filter + ignore_filter = self.get_ignore_filter(session_id) + if ignore_filter.is_ignored(file_payload.path): + print(f" [📁🚷] Ignoring write to {file_payload.path}") + return + + # Prevent path traversal + safe_path = os.path.normpath(os.path.join(workspace, file_payload.path)) + if not safe_path.startswith(workspace): + raise ValueError(f"Malicious path detected: {file_payload.path}") + + os.makedirs(os.path.dirname(safe_path), exist_ok=True) + + mode = "ab" if file_payload.chunk_index > 0 else "wb" + with open(safe_path, mode) as f: + f.write(file_payload.chunk) + + if file_payload.is_final and file_payload.hash: + self._verify_hash(safe_path, file_payload.hash) + + def generate_manifest(self, session_id: str) -> agent_pb2.DirectoryManifest: + """Generates a manifest of the current local mirror state.""" + workspace = self.get_workspace_path(session_id) + ignore_filter = self.get_ignore_filter(session_id) + files = [] + for root, dirs, filenames in os.walk(workspace): + # Efficiently prune skipped directories + dirs[:] = [d for d in dirs if not ignore_filter.is_ignored(os.path.relpath(os.path.join(root, d), workspace))] + + for filename in filenames: + abs_path = os.path.join(root, filename) + rel_path = os.path.relpath(abs_path, workspace) + + if ignore_filter.is_ignored(rel_path): + continue + + with open(abs_path, "rb") as f: + file_hash = hashlib.sha256(f.read()).hexdigest() + + files.append(agent_pb2.FileInfo( + path=rel_path, + size=os.path.getsize(abs_path), + hash=file_hash, + is_dir=False + )) + return agent_pb2.DirectoryManifest(root_path=workspace, files=files) + + def _verify_hash(self, path: str, expected_hash: str): + content = open(path, "rb").read() + actual_hash = hashlib.sha256(content).hexdigest() + if actual_hash != expected_hash: + print(f"[⚠️] Hash Mismatch for {path}: expected {expected_hash}, got {actual_hash}") + # In a real system, we'd trigger a re-download/re-sync diff --git a/ai-hub/app/core/grpc/core/pool.py b/ai-hub/app/core/grpc/core/pool.py new file mode 100644 index 0000000..f53a1db --- /dev/null +++ b/ai-hub/app/core/grpc/core/pool.py @@ -0,0 +1,29 @@ +import threading + +class GlobalWorkPool: + """Thread-safe pool of unassigned tasks that can be claimed by any node.""" + def __init__(self): + self.lock = threading.Lock() + self.available = {} # task_id -> payload + self.on_new_work = None # Callback to notify nodes + + def push_work(self, task_id, payload): + """Adds new task to global discovery pool.""" + with self.lock: + self.available[task_id] = payload + print(f" [📦] New Shared Task: {task_id}") + if self.on_new_work: + self.on_new_work(task_id) + + def claim(self, task_id, node_id): + """Allows a node to pull a specific task from the pool.""" + with self.lock: + if task_id in self.available: + print(f" [📦] Task {task_id} Claimed by {node_id}") + return True, self.available.pop(task_id) + return False, None + + def list_available(self): + """Returns IDs of all currently available unclaimed tasks.""" + with self.lock: + return list(self.available.keys()) diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py new file mode 100644 index 0000000..17083ef --- /dev/null +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -0,0 +1,199 @@ +import time +import json +import os +import hashlib +from app.core.grpc.utils.crypto import sign_payload, sign_browser_action +from app.protos import agent_pb2 + +class TaskAssistant: + """The 'Brain' of the Orchestrator: High-Level AI API for Dispatching Tasks.""" + def __init__(self, registry, journal, pool, mirror=None): + self.registry = registry + self.journal = journal + self.pool = pool + self.mirror = mirror + self.memberships = {} # session_id -> list(node_id) + + def push_workspace(self, node_id, session_id): + """Initial unidirectional push from server ghost mirror to a node.""" + node = self.registry.get_node(node_id) + if not node or not self.mirror: return + + print(f"[📁📤] Initiating Workspace Push for Session {session_id} to {node_id}") + + # Track for recovery + if session_id not in self.memberships: + self.memberships[session_id] = [] + if node_id not in self.memberships[session_id]: + self.memberships[session_id].append(node_id) + + manifest = self.mirror.generate_manifest(session_id) + + # 1. Send Manifest + node.queue.put(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + manifest=manifest + ) + )) + + # 2. Send File Data + for file_info in manifest.files: + if not file_info.is_dir: + self.push_file(node_id, session_id, file_info.path) + + def push_file(self, node_id, session_id, rel_path): + """Pushes a specific file to a node (used for drift recovery).""" + node = self.registry.get_node(node_id) + if not node: return + + workspace = self.mirror.get_workspace_path(session_id) + abs_path = os.path.join(workspace, rel_path) + + if not os.path.exists(abs_path): + print(f" [📁❓] Requested file {rel_path} not found in mirror") + return + + with open(abs_path, "rb") as f: + full_data = f.read() + full_hash = hashlib.sha256(full_data).hexdigest() + f.seek(0) + + index = 0 + while True: + chunk = f.read(1024 * 1024) # 1MB chunks + is_final = len(chunk) < 1024 * 1024 + + node.queue.put(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + file_data=agent_pb2.FilePayload( + path=rel_path, + chunk=chunk, + chunk_index=index, + is_final=is_final, + hash=full_hash if is_final else "" + ) + ) + )) + + if is_final or not chunk: + break + index += 1 + + def reconcile_node(self, node_id): + """Forces a re-sync check for all sessions this node belongs to.""" + print(f" [📁🔄] Triggering Resync Check for {node_id}...") + for sid, nodes in self.memberships.items(): + if node_id in nodes: + # Re-push manifest to trigger node-side drift check + self.push_workspace(node_id, sid) + + def broadcast_file_chunk(self, session_id: str, sender_node_id: str, file_payload): + """Broadcasts a file chunk received from one node to all other nodes in the mesh.""" + print(f" [📁📢] Broadcasting {file_payload.path} from {sender_node_id} to other nodes...") + for node_id in self.registry.list_nodes(): + if node_id == sender_node_id: + continue + + node = self.registry.get_node(node_id) + if not node: + continue + + # Forward the exact same FileSyncMessage + node.queue.put(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + file_data=file_payload + ) + )) + + def lock_workspace(self, node_id, session_id): + """Disables user-side synchronization from a node during AI refactors.""" + self.control_sync(node_id, session_id, action="LOCK") + + def unlock_workspace(self, node_id, session_id): + """Re-enables user-side synchronization from a node.""" + self.control_sync(node_id, session_id, action="UNLOCK") + + def request_manifest(self, node_id, session_id, path="."): + """Requests a full directory manifest from a node for drift checking.""" + node = self.registry.get_node(node_id) + if not node: return + node.queue.put(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.REFRESH_MANIFEST, path=path) + ) + )) + + def control_sync(self, node_id, session_id, action="START", path="."): + """Sends a SyncControl command to a node (e.g. START_WATCHING, LOCK).""" + node = self.registry.get_node(node_id) + if not node: return + + action_map = { + "START": agent_pb2.SyncControl.START_WATCHING, + "STOP": agent_pb2.SyncControl.STOP_WATCHING, + "LOCK": agent_pb2.SyncControl.LOCK, + "UNLOCK": agent_pb2.SyncControl.UNLOCK + } + proto_action = action_map.get(action, agent_pb2.SyncControl.START_WATCHING) + + node.queue.put(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + control=agent_pb2.SyncControl(action=proto_action, path=path) + ) + )) + + def dispatch_single(self, node_id, cmd, timeout=30, session_id=None): + """Dispatches a shell command to a specific node.""" + node = self.registry.get_node(node_id) + if not node: return {"error": f"Node {node_id} Offline"} + + tid = f"task-{int(time.time()*1000)}" + event = self.journal.register(tid, node_id) + + # 12-Factor Signing Logic + sig = sign_payload(cmd) + req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest( + task_id=tid, payload_json=cmd, signature=sig, session_id=session_id)) + + print(f"[📤] Dispatching shell {tid} to {node_id}") + node["queue"].put(req) + + if event.wait(timeout): + res = self.journal.get_result(tid) + self.journal.pop(tid) + return res + self.journal.pop(tid) + return {"error": "Timeout"} + + def dispatch_browser(self, node_id, action, timeout=60, session_id=None): + """Dispatches a browser action to a directed session node.""" + node = self.registry.get_node(node_id) + if not node: return {"error": f"Node {node_id} Offline"} + + tid = f"br-{int(time.time()*1000)}" + event = self.journal.register(tid, node_id) + + # Secure Browser Signing + sig = sign_browser_action( + agent_pb2.BrowserAction.ActionType.Name(action.action), + action.url, + action.session_id + ) + + req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest( + task_id=tid, browser_action=action, signature=sig, session_id=session_id)) + + print(f"[🌐📤] Dispatching browser {tid} to {node_id}") + node["queue"].put(req) + + if event.wait(timeout): + res = self.journal.get_result(tid) + self.journal.pop(tid) + return res + self.journal.pop(tid) + return {"error": "Timeout"} diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py new file mode 100644 index 0000000..e4f3511 --- /dev/null +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -0,0 +1,265 @@ +import threading +import queue +import time +import os +import logging +import json +import uuid + +try: + import requests as _requests +except ImportError: + _requests = None + +from app.protos import agent_pb2, agent_pb2_grpc +from app.core.grpc.core.journal import TaskJournal +from app.core.grpc.core.pool import GlobalWorkPool +from app.core.grpc.core.mirror import GhostMirrorManager +from app.core.grpc.services.assistant import TaskAssistant +from app.core.grpc.utils.crypto import sign_payload +from app.config import settings + +logger = logging.getLogger(__name__) + +# M4: Hub HTTP API for invite-token validation +HUB_API_URL = os.getenv("HUB_API_URL", "http://localhost:8000") +HUB_API_PATH = "/nodes/validate-token" + +# M4: Hub HTTP API for invite-token validation +# Calls POST /nodes/validate-token before accepting any SyncConfiguration. +# Set HUB_API_URL=http://localhost:8000 (or 0 to skip validation in dev mode). +HUB_API_URL = os.getenv("HUB_API_URL", "") # empty = skip validation (dev) +HUB_API_PATH = "/nodes/validate-token" + +class AgentOrchestrator(agent_pb2_grpc.AgentOrchestratorServicer): + """Integrated gRPC Servicer for Agent Orchestration within AI Hub.""" + def __init__(self, registry): + self.registry = registry # Injected NodeRegistryService + self.journal = TaskJournal() + self.pool = GlobalWorkPool() + self.mirror = GhostMirrorManager() + self.assistant = TaskAssistant(self.registry, self.journal, self.pool, self.mirror) + self.pool.on_new_work = self._broadcast_work + + # 4. Mesh Observation (Aggregated Health Dashboard) + threading.Thread(target=self._monitor_mesh, daemon=True, name="MeshMonitor").start() + + def _monitor_mesh(self): + """Periodically prints status of all nodes in the mesh.""" + while True: + time.sleep(10) + active_nodes = self.registry.list_nodes() + print("\n" + "="*50) + print(f"📡 CORTEX MESH DASHBOARD | {len(active_nodes)} Nodes Online") + print("-" * 50) + if not active_nodes: + print(" No nodes currently connected.") + for nid in active_nodes: + node = self.registry.get_node(nid) + stats = node.get("stats", {}) + tasks = stats.get("running", []) + capability = node.get("metadata", {}).get("caps", {}) + print(f" 🟢 {nid:15} | Workers: {stats.get('active_worker_count', 0)} | Running: {len(tasks)} tasks") + print(f" Capabilities: {capability}") + print("="*50 + "\n", flush=True) + + def _broadcast_work(self, _): + """Pushes work notifications to all active nodes.""" + live_nodes = self.registry.list_nodes() + for node in live_nodes: + logger.debug(f"[📢] Broadcasting availability to {node.node_id}") + node.queue.put(agent_pb2.ServerTaskMessage( + work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=self.pool.list_available()) + )) + + def SyncConfiguration(self, request, context): + """M4 Authenticated Handshake: Validate invite_token via Hub DB, then send policy.""" + node_id = request.node_id + invite_token = request.auth_token + + # --- M4: Token validation via Hub API --- + user_id = "default" + skill_cfg = {} + + if HUB_API_URL and _requests: + try: + resp = _requests.post( + f"{HUB_API_URL}{HUB_API_PATH}", + params={"node_id": node_id, "token": invite_token}, + timeout=5, + ) + payload = resp.json() + if not payload.get("valid"): + reason = payload.get("reason", "Token rejected") + logger.warning(f"[🔒] SyncConfiguration REJECTED {node_id}: {reason}") + return agent_pb2.RegistrationResponse(success=False, message=reason) + + user_id = payload.get("user_id", "default") + skill_cfg = payload.get("skill_config", {}) + logger.info(f"[🔑] Token validated for {node_id} (owner: {user_id})") + except Exception as e: + logger.error(f"[⚠️] Hub token validation unavailable ({e}); proceeding as 'default' user.") + + # Build allowed_commands from skill_config + shell_cfg = skill_cfg.get("shell", {}) + if shell_cfg.get("enabled", True): + allowed_commands = ["ls", "cat", "echo", "pwd", "uname", "curl", "python3", "git"] + else: + allowed_commands = [] + + # Register the node in the centralized AI Hub registry + self.registry.register(request.node_id, user_id, { + "desc": request.node_description, + "caps": dict(request.capabilities), + }) + + return agent_pb2.RegistrationResponse( + success=True, + policy=agent_pb2.SandboxPolicy( + mode=agent_pb2.SandboxPolicy.STRICT, + allowed_commands=allowed_commands, + ) + ) + + def TaskStream(self, request_iterator, context): + """Persistent Bi-directional Stream for Command & Control.""" + node_id = "unknown" + try: + # 1. Blocking wait for Node Identity + first_msg = next(request_iterator) + if first_msg.WhichOneof('payload') != 'announce': + logger.error("[!] Stream rejected: No NodeAnnounce") + return + + node_id = first_msg.announce.node_id + node = self.registry.get_node(node_id) + if not node: + logger.error(f"[!] Stream rejected: Node {node_id} not registered") + return + + logger.info(f"[📶] gRPC Stream Online: {node_id}") + self.assistant.reconcile_node(node_id) + + def _read_results(): + try: + for msg in request_iterator: + self._handle_client_message(msg, node_id, node) + except Exception as e: + logger.warning(f"Results listener closed for {node_id}: {e}") + + threading.Thread(target=_read_results, daemon=True, name=f"Results-{node_id}").start() + + # 3. Work Dispatcher (Main Stream) + last_keepalive = 0 + while context.is_active(): + try: + msg = node.queue.get(timeout=1.0) + yield msg + except queue.Empty: + now = time.time() + if (now - last_keepalive) > 15.0: + last_keepalive = now + if self.pool.available: + yield agent_pb2.ServerTaskMessage( + work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=self.pool.list_available()) + ) + continue + except StopIteration: pass + except Exception as e: + logger.error(f"[!] TaskStream Error for {node_id}: {e}") + finally: + self.registry.deregister(node_id) + + def _handle_client_message(self, msg, node_id, node): + kind = msg.WhichOneof('payload') + if kind == 'task_claim': + task_id = msg.task_claim.task_id + success, payload = self.pool.claim(task_id, node_id) + + node.queue.put(agent_pb2.ServerTaskMessage( + claim_status=agent_pb2.TaskClaimResponse( + task_id=task_id, + granted=success, + reason="Task successfully claimed" if success else "Task already claimed by another node" + ) + )) + # M6: Notify UI that a node is claiming a global task + self.registry.emit(node_id, "task_claim", {"task_id": task_id, "granted": success}) + + if success: + sig = sign_payload(payload) + node.queue.put(agent_pb2.ServerTaskMessage( + task_request=agent_pb2.TaskRequest( + task_id=task_id, + payload_json=payload, + signature=sig + ) + )) + + elif kind == 'task_response': + tr = msg.task_response + res_obj = {"stdout": tr.stdout, "status": tr.status} + if tr.HasField("browser_result"): + br = tr.browser_result + res_obj["browser"] = { + "url": br.url, "title": br.title, "has_snapshot": len(br.snapshot) > 0, + "eval": br.eval_result + } + self.journal.fulfill(tr.task_id, res_obj) + + # M6: Emit to EventBus for UI streaming + event_type = "task_complete" if tr.status == agent_pb2.TaskResponse.SUCCESS else "task_error" + self.registry.emit(node_id, event_type, res_obj, task_id=tr.task_id) + + elif kind == 'browser_event': + e = msg.browser_event + event_data = {} + if e.HasField("console_msg"): + event_data = {"type": "console", "text": e.console_msg.text, "level": e.console_msg.level} + elif e.HasField("network_req"): + event_data = {"type": "network", "method": e.network_req.method, "url": e.network_req.url} + + # M6: Stream live browser logs to UI + self.registry.emit(node_id, "browser_event", event_data) + + elif kind == 'file_sync': + fs = msg.file_sync + if fs.HasField("file_data"): + self.mirror.write_file_chunk(fs.session_id, fs.file_data) + self.assistant.broadcast_file_chunk(fs.session_id, node_id, fs.file_data) + # M6: Emit sync progress (rarely to avoid flood, but good for large pushes) + if fs.file_data.chunk_index % 10 == 0: + self.registry.emit(node_id, "sync_progress", {"path": fs.file_data.path, "chunk": fs.file_data.chunk_index}) + elif fs.HasField("status"): + print(f" [📁] Sync Status from {node_id}: {fs.status.message}") + self.registry.emit(node_id, "sync_status", {"message": fs.status.message, "code": fs.status.code}) + if fs.status.code == agent_pb2.SyncStatus.RECONCILE_REQUIRED: + for path in fs.status.reconcile_paths: + self.assistant.push_file(node_id, fs.session_id, path) + + def ReportHealth(self, request_iterator, context): + """Collect Health Metrics and Feed Policy Updates.""" + for hb in request_iterator: + self.registry.update_stats(hb.node_id, { + "active_worker_count": hb.active_worker_count, + "running": list(hb.running_task_ids) + }) + yield agent_pb2.HealthCheckResponse(server_time_ms=int(time.time()*1000)) + +def serve_grpc(registry, port=50051): + """Starts the gRPC Orchestrator Server for Agent Nodes.""" + import grpc + from concurrent import futures + from app.protos import agent_pb2_grpc + + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + orchestrator = AgentOrchestrator(registry) + agent_pb2_grpc.add_AgentOrchestratorServicer_to_server(orchestrator, server) + + addr = f"[::]:{port}" + server.add_insecure_port(addr) + + logger.info(f"🚀 CORTEX gRPC Orchestrator starting on {addr}") + server.start() + return server + diff --git a/ai-hub/app/core/grpc/shared_core/__init__.py b/ai-hub/app/core/grpc/shared_core/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/ai-hub/app/core/grpc/shared_core/__init__.py diff --git a/ai-hub/app/core/grpc/shared_core/ignore.py b/ai-hub/app/core/grpc/shared_core/ignore.py new file mode 100644 index 0000000..c3f0cb5 --- /dev/null +++ b/ai-hub/app/core/grpc/shared_core/ignore.py @@ -0,0 +1,38 @@ +import os +import fnmatch + +class CortexIgnore: + """Handles .cortexignore (and .gitignore) pattern matching.""" + def __init__(self, root_path): + self.root_path = root_path + self.patterns = self._load_patterns() + + def _load_patterns(self): + patterns = [".git", "node_modules", ".cortex_sync", "__pycache__", "*.pyc"] # Default ignores + ignore_file = os.path.join(self.root_path, ".cortexignore") + if not os.path.exists(ignore_file): + ignore_file = os.path.join(self.root_path, ".gitignore") + + if os.path.exists(ignore_file): + with open(ignore_file, "r") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + patterns.append(line) + return patterns + + def is_ignored(self, rel_path): + """Returns True if the path matches any ignore pattern.""" + for pattern in self.patterns: + # Handle directory patterns + if pattern.endswith("/"): + if rel_path.startswith(pattern) or f"/{pattern}" in f"/{rel_path}": + return True + # Standard glob matching + if fnmatch.fnmatch(rel_path, pattern) or fnmatch.fnmatch(os.path.basename(rel_path), pattern): + return True + # Handle nested matches + for part in rel_path.split(os.sep): + if fnmatch.fnmatch(part, pattern): + return True + return False diff --git a/ai-hub/app/core/grpc/utils/crypto.py b/ai-hub/app/core/grpc/utils/crypto.py new file mode 100644 index 0000000..d4a170c --- /dev/null +++ b/ai-hub/app/core/grpc/utils/crypto.py @@ -0,0 +1,18 @@ +import hmac +import hashlib +from app.config import settings +SECRET_KEY = settings.SECRET_KEY + +def sign_payload(payload: str) -> str: + """Signs a string payload using HMAC-SHA256.""" + return hmac.new(SECRET_KEY.encode(), payload.encode(), hashlib.sha256).hexdigest() + +def sign_browser_action(action_type: str, url: str, session_id: str) -> str: + """Signs a browser action based on its key identify fields.""" + sign_base = f"{action_type}:{url}:{session_id}" + return sign_payload(sign_base) + +def verify_signature(payload: str, signature: str) -> bool: + """Verifies a signature against a payload using HMAC-SHA256.""" + expected = sign_payload(payload) + return hmac.compare_digest(signature, expected) diff --git a/ai-hub/app/protos/__init__.py b/ai-hub/app/protos/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/ai-hub/app/protos/__init__.py diff --git a/ai-hub/app/protos/agent.proto b/ai-hub/app/protos/agent.proto new file mode 100644 index 0000000..5e3932d --- /dev/null +++ b/ai-hub/app/protos/agent.proto @@ -0,0 +1,246 @@ +syntax = "proto3"; + +package agent; + +// The Cortex Server exposes this service +service AgentOrchestrator { + // 1. Control Channel: Sync policies and settings (Unary) + rpc SyncConfiguration(RegistrationRequest) returns (RegistrationResponse); + + // 2. Task Channel: Bidirectional work dispatch and reporting (Persistent) + rpc TaskStream(stream ClientTaskMessage) returns (stream ServerTaskMessage); + + // 3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) + rpc ReportHealth(stream Heartbeat) returns (stream HealthCheckResponse); +} + +// --- Channel 1: Registration & Policy --- +message RegistrationRequest { + string node_id = 1; + string version = 2; + string auth_token = 3; + string node_description = 4; // AI-readable description of this node's role + map capabilities = 5; // e.g. "gpu": "nvidia-3080", "os": "ubuntu-22.04" +} + +message SandboxPolicy { + enum Mode { + STRICT = 0; + PERMISSIVE = 1; + } + Mode mode = 1; + repeated string allowed_commands = 2; + repeated string denied_commands = 3; + repeated string sensitive_commands = 4; + string working_dir_jail = 5; +} + +message RegistrationResponse { + bool success = 1; + string error_message = 2; + string session_id = 3; + SandboxPolicy policy = 4; +} + +// --- Channel 2: Tasks & Collaboration --- +message ClientTaskMessage { + oneof payload { + TaskResponse task_response = 1; + TaskClaimRequest task_claim = 2; + BrowserEvent browser_event = 3; + NodeAnnounce announce = 4; // NEW: Identification on stream connect + FileSyncMessage file_sync = 5; // NEW: Ghost Mirror Sync + } +} + +message NodeAnnounce { + string node_id = 1; +} + +message BrowserEvent { + string session_id = 1; + oneof event { + ConsoleMessage console_msg = 2; + NetworkRequest network_req = 3; + } +} + +message ServerTaskMessage { + oneof payload { + TaskRequest task_request = 1; + WorkPoolUpdate work_pool_update = 2; + TaskClaimResponse claim_status = 3; + TaskCancelRequest task_cancel = 4; + FileSyncMessage file_sync = 5; // NEW: Ghost Mirror Sync + } +} + +message TaskCancelRequest { + string task_id = 1; +} + +message TaskRequest { + string task_id = 1; + string task_type = 2; + oneof payload { + string payload_json = 3; // For legacy shell/fallback + BrowserAction browser_action = 7; // NEW: Structured Browser Skill + } + int32 timeout_ms = 4; + string trace_id = 5; + string signature = 6; + string session_id = 8; // NEW: Map execution to a sync workspace +} + +message BrowserAction { + enum ActionType { + NAVIGATE = 0; + CLICK = 1; + TYPE = 2; + SCREENSHOT = 3; + GET_DOM = 4; + HOVER = 5; + SCROLL = 6; + CLOSE = 7; + EVAL = 8; + GET_A11Y = 9; + } + ActionType action = 1; + string url = 2; + string selector = 3; + string text = 4; + string session_id = 5; + int32 x = 6; + int32 y = 7; +} + +message TaskResponse { + string task_id = 1; + enum Status { + SUCCESS = 0; + ERROR = 1; + TIMEOUT = 2; + CANCELLED = 3; + } + Status status = 2; + string stdout = 3; + string stderr = 4; + string trace_id = 5; + map artifacts = 6; + + // NEW: Structured Skill Results + oneof result { + BrowserResponse browser_result = 7; + } +} + +message BrowserResponse { + string url = 1; + string title = 2; + bytes snapshot = 3; + string dom_content = 4; + string a11y_tree = 5; + string eval_result = 6; + repeated ConsoleMessage console_history = 7; + repeated NetworkRequest network_history = 8; +} + +message ConsoleMessage { + string level = 1; + string text = 2; + int64 timestamp_ms = 3; +} + +message NetworkRequest { + string method = 1; + string url = 2; + int32 status = 3; + string resource_type = 4; + int64 latency_ms = 5; +} + +message WorkPoolUpdate { + repeated string available_task_ids = 1; +} + +message TaskClaimRequest { + string task_id = 1; + string node_id = 2; +} + +message TaskClaimResponse { + string task_id = 1; + bool granted = 2; + string reason = 3; +} + +// --- Channel 3: Health & Observation --- +message Heartbeat { + string node_id = 1; + float cpu_usage_percent = 2; + float memory_usage_percent = 3; + int32 active_worker_count = 4; + int32 max_worker_capacity = 5; + string status_message = 6; + repeated string running_task_ids = 7; +} + +message HealthCheckResponse { + int64 server_time_ms = 1; +} + +// --- Channel 4: Ghost Mirror File Sync --- +message FileSyncMessage { + string session_id = 1; + oneof payload { + DirectoryManifest manifest = 2; + FilePayload file_data = 3; + SyncStatus status = 4; + SyncControl control = 5; + } +} + +message SyncControl { + enum Action { + START_WATCHING = 0; + STOP_WATCHING = 1; + LOCK = 2; // Server -> Node: Disable user-side edits + UNLOCK = 3; // Server -> Node: Enable user-side edits + REFRESH_MANIFEST = 4; // Server -> Node: Request a full manifest from node + RESYNC = 5; // Server -> Node: Force a hash-based reconciliation + } + Action action = 1; + string path = 2; +} + +message DirectoryManifest { + string root_path = 1; + repeated FileInfo files = 2; +} + +message FileInfo { + string path = 1; + int64 size = 2; + string hash = 3; // For drift detection + bool is_dir = 4; +} + +message FilePayload { + string path = 1; + bytes chunk = 2; + int32 chunk_index = 3; + bool is_final = 4; + string hash = 5; // Full file hash for verification on final chunk +} + +message SyncStatus { + enum Code { + OK = 0; + ERROR = 1; + RECONCILE_REQUIRED = 2; + IN_PROGRESS = 3; + } + Code code = 1; + string message = 2; + repeated string reconcile_paths = 3; // NEW: Files needing immediate re-sync +} diff --git a/ai-hub/app/protos/agent_pb2.py b/ai-hub/app/protos/agent_pb2.py new file mode 100644 index 0000000..3472d01 --- /dev/null +++ b/ai-hub/app/protos/agent_pb2.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: protos/agent.proto +# Protobuf Python Version: 4.25.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12protos/agent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc5\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xff\x01\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x42\t\n\x07payload\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\x8d\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xd1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xdc\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xc1\x01\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xd3\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x42\t\n\x07payload\"\xaf\x01\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\"g\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"_\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'protos.agent_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_options = b'8\001' + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._options = None + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_options = b'8\001' + _globals['_REGISTRATIONREQUEST']._serialized_start=30 + _globals['_REGISTRATIONREQUEST']._serialized_end=252 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=201 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=252 + _globals['_SANDBOXPOLICY']._serialized_start=255 + _globals['_SANDBOXPOLICY']._serialized_end=452 + _globals['_SANDBOXPOLICY_MODE']._serialized_start=418 + _globals['_SANDBOXPOLICY_MODE']._serialized_end=452 + _globals['_REGISTRATIONRESPONSE']._serialized_start=454 + _globals['_REGISTRATIONRESPONSE']._serialized_end=574 + _globals['_CLIENTTASKMESSAGE']._serialized_start=577 + _globals['_CLIENTTASKMESSAGE']._serialized_end=832 + _globals['_NODEANNOUNCE']._serialized_start=834 + _globals['_NODEANNOUNCE']._serialized_end=865 + _globals['_BROWSEREVENT']._serialized_start=868 + _globals['_BROWSEREVENT']._serialized_end=1003 + _globals['_SERVERTASKMESSAGE']._serialized_start=1006 + _globals['_SERVERTASKMESSAGE']._serialized_end=1275 + _globals['_TASKCANCELREQUEST']._serialized_start=1277 + _globals['_TASKCANCELREQUEST']._serialized_end=1313 + _globals['_TASKREQUEST']._serialized_start=1316 + _globals['_TASKREQUEST']._serialized_end=1525 + _globals['_BROWSERACTION']._serialized_start=1528 + _globals['_BROWSERACTION']._serialized_end=1816 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1682 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=1816 + _globals['_TASKRESPONSE']._serialized_start=1819 + _globals['_TASKRESPONSE']._serialized_end=2171 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2051 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2099 + _globals['_TASKRESPONSE_STATUS']._serialized_start=2101 + _globals['_TASKRESPONSE_STATUS']._serialized_end=2161 + _globals['_BROWSERRESPONSE']._serialized_start=2174 + _globals['_BROWSERRESPONSE']._serialized_end=2394 + _globals['_CONSOLEMESSAGE']._serialized_start=2396 + _globals['_CONSOLEMESSAGE']._serialized_end=2463 + _globals['_NETWORKREQUEST']._serialized_start=2465 + _globals['_NETWORKREQUEST']._serialized_end=2569 + _globals['_WORKPOOLUPDATE']._serialized_start=2571 + _globals['_WORKPOOLUPDATE']._serialized_end=2615 + _globals['_TASKCLAIMREQUEST']._serialized_start=2617 + _globals['_TASKCLAIMREQUEST']._serialized_end=2669 + _globals['_TASKCLAIMRESPONSE']._serialized_start=2671 + _globals['_TASKCLAIMRESPONSE']._serialized_end=2740 + _globals['_HEARTBEAT']._serialized_start=2743 + _globals['_HEARTBEAT']._serialized_end=2936 + _globals['_HEALTHCHECKRESPONSE']._serialized_start=2938 + _globals['_HEALTHCHECKRESPONSE']._serialized_end=2983 + _globals['_FILESYNCMESSAGE']._serialized_start=2986 + _globals['_FILESYNCMESSAGE']._serialized_end=3197 + _globals['_SYNCCONTROL']._serialized_start=3200 + _globals['_SYNCCONTROL']._serialized_end=3375 + _globals['_SYNCCONTROL_ACTION']._serialized_start=3272 + _globals['_SYNCCONTROL_ACTION']._serialized_end=3375 + _globals['_DIRECTORYMANIFEST']._serialized_start=3377 + _globals['_DIRECTORYMANIFEST']._serialized_end=3447 + _globals['_FILEINFO']._serialized_start=3449 + _globals['_FILEINFO']._serialized_end=3517 + _globals['_FILEPAYLOAD']._serialized_start=3519 + _globals['_FILEPAYLOAD']._serialized_end=3614 + _globals['_SYNCSTATUS']._serialized_start=3617 + _globals['_SYNCSTATUS']._serialized_end=3777 + _globals['_SYNCSTATUS_CODE']._serialized_start=3711 + _globals['_SYNCSTATUS_CODE']._serialized_end=3777 + _globals['_AGENTORCHESTRATOR']._serialized_start=3780 + _globals['_AGENTORCHESTRATOR']._serialized_end=4013 +# @@protoc_insertion_point(module_scope) diff --git a/ai-hub/app/protos/agent_pb2_grpc.py b/ai-hub/app/protos/agent_pb2_grpc.py new file mode 100644 index 0000000..f551b0b --- /dev/null +++ b/ai-hub/app/protos/agent_pb2_grpc.py @@ -0,0 +1,138 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from protos import agent_pb2 as protos_dot_agent__pb2 + + +class AgentOrchestratorStub(object): + """The Cortex Server exposes this service + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SyncConfiguration = channel.unary_unary( + '/agent.AgentOrchestrator/SyncConfiguration', + request_serializer=protos_dot_agent__pb2.RegistrationRequest.SerializeToString, + response_deserializer=protos_dot_agent__pb2.RegistrationResponse.FromString, + ) + self.TaskStream = channel.stream_stream( + '/agent.AgentOrchestrator/TaskStream', + request_serializer=protos_dot_agent__pb2.ClientTaskMessage.SerializeToString, + response_deserializer=protos_dot_agent__pb2.ServerTaskMessage.FromString, + ) + self.ReportHealth = channel.stream_stream( + '/agent.AgentOrchestrator/ReportHealth', + request_serializer=protos_dot_agent__pb2.Heartbeat.SerializeToString, + response_deserializer=protos_dot_agent__pb2.HealthCheckResponse.FromString, + ) + + +class AgentOrchestratorServicer(object): + """The Cortex Server exposes this service + """ + + def SyncConfiguration(self, request, context): + """1. Control Channel: Sync policies and settings (Unary) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def TaskStream(self, request_iterator, context): + """2. Task Channel: Bidirectional work dispatch and reporting (Persistent) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ReportHealth(self, request_iterator, context): + """3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_AgentOrchestratorServicer_to_server(servicer, server): + rpc_method_handlers = { + 'SyncConfiguration': grpc.unary_unary_rpc_method_handler( + servicer.SyncConfiguration, + request_deserializer=protos_dot_agent__pb2.RegistrationRequest.FromString, + response_serializer=protos_dot_agent__pb2.RegistrationResponse.SerializeToString, + ), + 'TaskStream': grpc.stream_stream_rpc_method_handler( + servicer.TaskStream, + request_deserializer=protos_dot_agent__pb2.ClientTaskMessage.FromString, + response_serializer=protos_dot_agent__pb2.ServerTaskMessage.SerializeToString, + ), + 'ReportHealth': grpc.stream_stream_rpc_method_handler( + servicer.ReportHealth, + request_deserializer=protos_dot_agent__pb2.Heartbeat.FromString, + response_serializer=protos_dot_agent__pb2.HealthCheckResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'agent.AgentOrchestrator', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class AgentOrchestrator(object): + """The Cortex Server exposes this service + """ + + @staticmethod + def SyncConfiguration(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/agent.AgentOrchestrator/SyncConfiguration', + protos_dot_agent__pb2.RegistrationRequest.SerializeToString, + protos_dot_agent__pb2.RegistrationResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def TaskStream(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/TaskStream', + protos_dot_agent__pb2.ClientTaskMessage.SerializeToString, + protos_dot_agent__pb2.ServerTaskMessage.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def ReportHealth(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/ReportHealth', + protos_dot_agent__pb2.Heartbeat.SerializeToString, + protos_dot_agent__pb2.HealthCheckResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/ai-hub/requirements.txt b/ai-hub/requirements.txt index 5f71a02..7a74f93 100644 --- a/ai-hub/requirements.txt +++ b/ai-hub/requirements.txt @@ -19,4 +19,6 @@ PyJWT tenacity litellm -tiktoken \ No newline at end of file +tiktoken +grpcio==1.62.1 +grpcio-tools==1.62.1 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index d5c1206..6f6c15a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,8 +25,13 @@ build: ./ai-hub container_name: ai_hub_service restart: unless-stopped + ports: + - "50051:50051" environment: - PATH_PREFIX=/api/v1 + - HUB_API_URL=http://localhost:8000 + - HUB_PUBLIC_URL=https://ai.jerxie.com + - HUB_GRPC_ENDPOINT=ai.jerxie.com:443 - OIDC_CLIENT_ID=cortex-server - OIDC_CLIENT_SECRET=aYc2j1lYUUZXkBFFUndnleZI - OIDC_SERVER_URL=https://auth.jerxie.com diff --git a/ui/client-app/src/App.js b/ui/client-app/src/App.js index 9a76f5b..9e49cf6 100644 --- a/ui/client-app/src/App.js +++ b/ui/client-app/src/App.js @@ -7,6 +7,7 @@ import LoginPage from "./pages/LoginPage"; import SettingsPage from "./pages/SettingsPage"; import ProfilePage from "./pages/ProfilePage"; +import NodesPage from "./pages/NodesPage"; import { getUserStatus, logout, getUserProfile } from "./services/apiService"; const Icon = ({ path, onClick, className }) => ( @@ -32,7 +33,8 @@ const [userId, setUserId] = useState(null); const [userProfile, setUserProfile] = useState(null); - const authenticatedPages = ["voice-chat", "coding-assistant", "settings", "profile"]; + const authenticatedPages = ["voice-chat", "coding-assistant", "settings", "profile", "nodes"]; + useEffect(() => { const urlParams = new URLSearchParams(window.location.search); @@ -136,6 +138,8 @@ return ; case "profile": return ; + case "nodes": + return ; case "login": return ; default: diff --git a/ui/client-app/src/components/MultiNodeConsole.js b/ui/client-app/src/components/MultiNodeConsole.js new file mode 100644 index 0000000..33c97f2 --- /dev/null +++ b/ui/client-app/src/components/MultiNodeConsole.js @@ -0,0 +1,97 @@ +import React, { useEffect, useState, useRef } from 'react'; +import { getNodeStreamUrl } from '../services/apiService'; + +const MultiNodeConsole = ({ attachedNodeIds }) => { + const [logs, setLogs] = useState({}); // node_id -> array of log strings + const scrollRefs = useRef({}); + + useEffect(() => { + if (!attachedNodeIds || attachedNodeIds.length === 0) return; + + const ws = new WebSocket(getNodeStreamUrl()); + + ws.onmessage = (event) => { + const msg = JSON.parse(event.data); + if (!attachedNodeIds.includes(msg.node_id)) return; + + const timestamp = new Date().toLocaleTimeString(); + let logLine = ""; + + switch (msg.event) { + case 'task_assigned': + logLine = `[${timestamp}] 📥 ASSIGNED: ${msg.data.command || 'browser task'}`; + break; + case 'task_start': + logLine = `[${timestamp}] 🚀 START: Running payload...`; + break; + case 'task_complete': + logLine = `[${timestamp}] ✅ COMPLETE: Success`; + break; + case 'task_error': + logLine = `[${timestamp}] ❌ ERROR: ${JSON.stringify(msg.data)}`; + break; + case 'browser_event': + const type = msg.data.type === 'console' ? '🖥️' : '🌐'; + logLine = `[${timestamp}] ${type} ${msg.data.text || msg.data.url}`; + break; + case 'sync_status': + logLine = `[${timestamp}] 📁 SYNC: ${msg.data.message}`; + break; + default: + return; // Ignore heartbeats etc. + } + + setLogs(prev => ({ + ...prev, + [msg.node_id]: [...(prev[msg.node_id] || []), logLine].slice(-100) + })); + }; + + return () => ws.close(); + }, [attachedNodeIds]); + + // Handle auto-scroll + useEffect(() => { + Object.keys(scrollRefs.current).forEach(nodeId => { + const ref = scrollRefs.current[nodeId]; + if (ref) ref.scrollTop = ref.scrollHeight; + }); + }, [logs]); + + if (!attachedNodeIds || attachedNodeIds.length === 0) return null; + + return ( +
+
+ + + AGENT EXECUTION MESH + + Live gRPC Stream +
+ +
+ {attachedNodeIds.map(nodeId => ( +
+
+ {nodeId} + NODE_CONTEXT +
+
scrollRefs.current[nodeId] = el} + className="flex-1 overflow-y-auto p-3 space-y-1 scrollbar-thin scrollbar-thumb-gray-800" + > + {(logs[nodeId] || ["[*] Awaiting task..."]).map((line, i) => ( +
+ {line} +
+ ))} +
+
+ ))} +
+
+ ); +}; + +export default MultiNodeConsole; diff --git a/ui/client-app/src/components/Navbar.js b/ui/client-app/src/components/Navbar.js index c20d74e..a70712d 100644 --- a/ui/client-app/src/components/Navbar.js +++ b/ui/client-app/src/components/Navbar.js @@ -6,7 +6,9 @@ { name: "Home", icon: "M10 20v-6h4v6h5v-8h3L12 3 2 12h3v8z", page: "home" }, { name: "Voice Chat", icon: "M12 1a3 3 0 0 1 3 3v7a3 3 0 1 1-6 0V4a3 3 0 0 1 3-3zm5 10a5 5 0 0 1-10 0H5a7 7 0 0 0 14 0h-2zm-5 11v-4h-2v4h2z", page: "voice-chat" }, { name: "Coding Assistant", icon: "M9 16l-4-4 4-4M15 16l4-4-4-4", page: "coding-assistant" }, + { name: "Agent Nodes", icon: "M5 12h14M12 5l7 7-7 7", page: "nodes" }, { name: "History", icon: "M22 12h-4l-3 9L9 3l-3 9H2", page: "history", disabled: true }, + { name: "Favorites", icon: "M20.84 4.61a5.5 5.5 0 0 0-7.78 0L12 5.67l-1.06-1.06a5.5 5.5 0 0 0-7.78 7.78l1.06 1.06L12 21.23l7.78-7.78 1.06-1.06a5.5 5.5 0 0 0 0-7.78z", page: "favorites", disabled: true }, ]; diff --git a/ui/client-app/src/pages/CodingAssistantPage.js b/ui/client-app/src/pages/CodingAssistantPage.js index 56659bc..03767d3 100644 --- a/ui/client-app/src/pages/CodingAssistantPage.js +++ b/ui/client-app/src/pages/CodingAssistantPage.js @@ -1,8 +1,12 @@ import React, { useState, useRef, useEffect } from "react"; import ChatArea from "../components/ChatArea"; import SessionSidebar from "../components/SessionSidebar"; +import MultiNodeConsole from "../components/MultiNodeConsole"; import useCodeAssistant from "../hooks/useCodeAssistant"; -import { updateSession } from "../services/apiService"; +import { + updateSession, getSessionNodeStatus, attachNodesToSession, + detachNodeFromSession, getUserAccessibleNodes +} from "../services/apiService"; const CodeAssistantPage = () => { const pageContainerRef = useRef(null); @@ -25,8 +29,51 @@ } = useCodeAssistant({ pageContainerRef }); const [showConfigModal, setShowConfigModal] = useState(false); + const [showNodeSelector, setShowNodeSelector] = useState(false); const [sidebarRefreshTick, setSidebarRefreshTick] = useState(0); + // M3/M6 Node Integration State + const [sessionNodeStatus, setSessionNodeStatus] = useState({}); // node_id -> { status, last_sync } + const [accessibleNodes, setAccessibleNodes] = useState([]); + const [attachedNodeIds, setAttachedNodeIds] = useState([]); + const [workspaceId, setWorkspaceId] = useState(""); + const [showConsole, setShowConsole] = useState(false); + + const fetchNodeInfo = async () => { + if (!sessionId) return; + try { + const [status, nodes] = await Promise.all([ + getSessionNodeStatus(sessionId), + getUserAccessibleNodes() + ]); + setSessionNodeStatus(status.node_sync_status || {}); + setAttachedNodeIds(status.attached_node_ids || []); + setWorkspaceId(status.sync_workspace_id || ""); + setAccessibleNodes(nodes); + } catch (e) { + console.warn("M3: Failed to fetch session node info", e); + } + }; + + useEffect(() => { + fetchNodeInfo(); + const interval = setInterval(fetchNodeInfo, 5000); // Polling status + return () => clearInterval(interval); + }, [sessionId]); + + const handleToggleNode = async (nodeId, isAttached) => { + try { + if (isAttached) { + await detachNodeFromSession(sessionId, nodeId); + } else { + await attachNodesToSession(sessionId, [nodeId]); + } + fetchNodeInfo(); + } catch (err) { + alert(err.message); + } + }; + const handleSaveQuickConfig = async () => { try { if (sessionId && localActiveLLM) { @@ -45,6 +92,13 @@ } }, [chatHistory]); + // Antigravity: Auto-show console when processing + useEffect(() => { + if (isProcessing && attachedNodeIds.length > 0) { + setShowConsole(true); + } + }, [isProcessing, attachedNodeIds]); + return (
{/* Main content area */} -
-
- {/* Chat Area */} +
+
+ {/* Chat Area & Header */}
-
+

@@ -70,8 +124,50 @@
Coding Assistant - Enhanced with Skills & Prompts + + Mesh: {attachedNodeIds.length} Nodes Active +
+ + {/* Nodes Indicator Bar (M3/M6) */} +
+ {attachedNodeIds.length === 0 ? ( + No nodes attached + ) : ( + attachedNodeIds.map(nid => { + const status = sessionNodeStatus[nid]?.status || 'pending'; + return ( +
+
+
+ {nid}: {status.toUpperCase()} +
+
+ ) + }) + )} + + {attachedNodeIds.length > 0 && ( + + )} +
+ {!isConfigured && (
@@ -93,12 +189,12 @@
-
+
Token Usage
-
+
80 ? 'bg-red-500' : 'bg-indigo-500'}`} style={{ width: `${Math.min(tokenUsage?.percentage || 0, 100)}%` }} @@ -113,7 +209,7 @@ onClick={() => handleSendChat("/new")} className="text-xs font-bold px-4 py-2 bg-indigo-600 hover:bg-indigo-700 text-white rounded-lg transition-all shadow-md hover:shadow-indigo-500/20 active:scale-95" > - + NEW SESSION + + NEW

@@ -127,10 +223,73 @@ />
+ + {/* Antigravity Console (M6) */} + {showConsole && attachedNodeIds.length > 0 && ( +
+ +
+ )}
+ {/* Node Selector Modal (M3) */} + {showNodeSelector && ( +
+
+
+

+ + Mesh Node Selection +

+ +
+
+

+ Select agent nodes to attach to this session. Attached nodes share the workspace {workspaceId}. +

+ +
+ {accessibleNodes.length === 0 && No nodes available for your account.} + {accessibleNodes.map(node => { + const isAttached = attachedNodeIds.includes(node.node_id); + const isOnline = node.last_status === 'online' || node.last_status === 'idle'; + + return ( +
+
+
+
+

{node.display_name}

+

{node.node_id}

+
+
+ +
+ ); + })} +
+
+
+ +
+
+
+ )} + {/* Error Modal */} {showErrorModal && (
diff --git a/ui/client-app/src/pages/NodesPage.js b/ui/client-app/src/pages/NodesPage.js new file mode 100644 index 0000000..adb4762 --- /dev/null +++ b/ui/client-app/src/pages/NodesPage.js @@ -0,0 +1,353 @@ +import React, { useState, useEffect, useCallback } from 'react'; +import { + getAdminNodes, adminCreateNode, adminUpdateNode, + adminGrantNodeAccess, adminRevokeNodeAccess, + adminDownloadNodeBundle, getUserAccessibleNodes, + getAdminGroups, getNodeStreamUrl +} from '../services/apiService'; + +const NodesPage = ({ user }) => { + const [activeTab, setActiveTab] = useState(user?.role === 'admin' ? 'manage' : 'status'); + const [nodes, setNodes] = useState([]); + const [groups, setGroups] = useState([]); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const [showCreateModal, setShowCreateModal] = useState(false); + const [newNode, setNewNode] = useState({ node_id: '', display_name: '', description: '', skill_config: { shell: { enabled: true }, browser: { enabled: true }, sync: { enabled: true } } }); + + // WebSocket state for live updates + const [meshStatus, setMeshStatus] = useState({}); // node_id -> { status, stats } + const [recentEvents, setRecentEvents] = useState([]); // Array of event objects + + const isAdmin = user?.role === 'admin'; + + const fetchData = useCallback(async () => { + setLoading(true); + try { + if (isAdmin) { + const [nodesData, groupsData] = await Promise.all([getAdminNodes(), getAdminGroups()]); + setNodes(nodesData); + setGroups(groupsData); + } else { + const nodesData = await getUserAccessibleNodes(); + setNodes(nodesData); + } + } catch (err) { + setError(err.message); + } finally { + setLoading(false); + } + }, [isAdmin]); + + useEffect(() => { + fetchData(); + }, [fetchData]); + + // WebSocket Connection for Live Mesh Status + useEffect(() => { + const wsUrl = getNodeStreamUrl(); + const ws = new WebSocket(wsUrl); + + ws.onmessage = (event) => { + const msg = JSON.parse(event.data); + if (msg.event === 'initial_snapshot') { + const statusMap = {}; + msg.data.nodes.forEach(n => { + statusMap[n.node_id] = { status: n.status, stats: n.stats }; + }); + setMeshStatus(statusMap); + } else if (msg.event === 'mesh_heartbeat') { + const statusMap = { ...meshStatus }; + msg.data.nodes.forEach(n => { + statusMap[n.node_id] = { status: n.status, stats: n.stats }; + }); + setMeshStatus(statusMap); + } else if (['task_start', 'task_complete', 'task_error', 'info'].includes(msg.event)) { + // Add to recent events log + setRecentEvents(prev => [msg, ...prev].slice(0, 50)); + } + }; + + return () => ws.close(); + }, [userId]); // Wait, I don't have userId here, I'll use user.id + + const handleCreateNode = async (e) => { + e.preventDefault(); + try { + await adminCreateNode(newNode); + setShowCreateModal(false); + fetchData(); + } catch (err) { + alert(err.message); + } + }; + + const toggleNodeActive = async (node) => { + try { + await adminUpdateNode(node.node_id, { is_active: !node.is_active }); + fetchData(); + } catch (err) { + alert(err.message); + } + }; + + return ( +
+ {/* Header */} +
+
+
+

+ 🚀 Agent Node Mesh +

+

+ Manage distributed execution nodes and monitor live health. +

+
+
+ + {isAdmin && ( + + )} +
+
+ + {/* Tabs */} +
+ {isAdmin && ( + + )} + +
+
+ + {/* Main Content */} +
+ {loading ? ( +
+
+
+ ) : error ? ( +
+ Error: {error} +
+ ) : activeTab === 'manage' ? ( + /* ADMIN MANAGEMENT VIEW */ +
+
+ {nodes.map(node => ( +
+
+
+

{node.display_name}

+ + {node.is_active ? 'Active' : 'Disabled'} + + ID: {node.node_id} +
+

{node.description || 'No description provided.'}

+ +
+ {Object.entries(node.skill_config || {}).map(([skill, cfg]) => ( +
+ {skill} + {cfg?.enabled ? ( + + ) : ( + + )} +
+ ))} +
+
+ +
+ + + {/* Access control button could open another modal */} + +
+
+ ))} +
+
+ ) : ( + /* LIVE MONITOR VIEW (M6) */ +
+ {/* Status Cards */} +
+ {nodes.map(node => { + const live = meshStatus[node.node_id]; + const isOnline = live?.status === 'online' || live?.status === 'idle' || live?.status === 'busy'; + + return ( +
+
+ +
+
+

{node.display_name}

+

{node.node_id}

+
+
+ + + {live?.status || (node.last_status || 'offline')} + +
+
+ +
+
+
Tasks Running
+
{live?.stats?.active_tasks || 0}
+
+
+
Uptime Score
+
{(live?.stats?.success_rate ? (live.stats.success_rate * 100).toFixed(1) : "100")}%
+
+
+ +
+
+ CPU +
+
+
+
+
+ MEM +
+
+
+
+
+
+ ); + })} +
+ + {/* Event Timeline */} +
+
+

+ + Execution Live Bus +

+
+
+ {recentEvents.length === 0 && ( +
Listening for mesh events...
+ )} + {recentEvents.map((evt, i) => ( +
+ [{evt.timestamp?.split('T')[1].split('.')[0]}] + {evt.node_id.slice(0, 8)} + + {evt.label || evt.event}: {JSON.stringify(evt.data)} + +
+ ))} +
+
+
+ )} +
+ + {/* CREATE NODE MODAL */} + {showCreateModal && ( +
+
+
+

Register Agent Node

+ +
+
+
+ + setNewNode({ ...newNode, node_id: e.target.value })} + /> +
+
+ + setNewNode({ ...newNode, display_name: e.target.value })} + /> +
+
+ +