diff --git a/docs/architecture/cortex_agent_node_plan.md b/docs/architecture/cortex_agent_node_plan.md index d393bfa..376cca0 100644 --- a/docs/architecture/cortex_agent_node_plan.md +++ b/docs/architecture/cortex_agent_node_plan.md @@ -75,7 +75,9 @@ - **Skill-Based Extensibility**: Unified `BaseSkill` interface for Shell, Browser, and future capabilities. - **Graceful Shutdown**: Implemented `SIGTERM`/`SIGINT` handling with clean browser-actor cleanup. - **Global Work Pool**: Shared task discovery and **Task Claiming** to prevent rework across nodes. + - **Standardized Work-Stealing**: Implementation of `TaskClaimResponse` to handle race conditions during task selection. - **Hanging Task Recovery**: Remote cancellation and automatic retries in the `TaskAssistant`. + - **Aggregated Health Dashboard**: Real-time server-side monitoring of all connected nodes, their health, and active tasks. - **Outcome**: A professional, scalable, and extensible distributed agent mesh. ### Phase 6: Scaling & Frontend UI Integration diff --git a/docs/architecture/cortex_project_todo.md b/docs/architecture/cortex_project_todo.md index ee0c435..2ee2ecf 100644 --- a/docs/architecture/cortex_project_todo.md +++ b/docs/architecture/cortex_project_todo.md @@ -38,8 +38,8 @@ - **Description**: Migrate `NodeRegistry` and `WorkPool` from in-memory to a persistent backend (Postgres/Redis). - **Priority**: Deferred until **Full System Integration** phase. -### [ ] Workspace Mirroring & Efficient File Sync -- **Description**: Maintain a local server-side mirror of node workspaces for Zero-latency AI perception. +### [ ] Workspace Mirroring & Efficient File Sync - ✅ [PHASE 2 COMPLETE](file:///app/docs/architecture/workspace_mirroring_design.md) +- **Description**: Real-time bidirectional sync with multi-node broadcast propagation established. ### [ ] Real-time gRPC Log Streaming - **Description**: Bidirectional stream for live `stdout/stderr`. diff --git a/docs/architecture/workspace_mirroring_design.md b/docs/architecture/workspace_mirroring_design.md new file mode 100644 index 0000000..788d53c --- /dev/null +++ b/docs/architecture/workspace_mirroring_design.md @@ -0,0 +1,65 @@ +# Cortex Agent Node: Workspace Mirroring & Real-time Sync + +## 📌 Overview +This document outlines the architecture for the **Cortex Workspace Sync System**, designed to provide the AI with "Zero-latency Perception" of remote node files while maintaining high-fidelity synchronization across a collaborative agent mesh. + +The core innovation is the **"Ghost Mirror" pattern**, where the Cortex Server maintains a local, hash-verified copy of the node's workspace to allow instantaneous AI analysis without gRPC round-trips for every file read. + +--- + +## 🏗️ Core Concepts + +### 1. The Session-Based Attachment +When a user begins an AI session, they can "Attach Nodes" to that specific execution context. +- **Workspace Selection**: During attachment, the user chooses the **Source of Truth (SoT)** for the files. +- **Unique Identification**: Every sync folder in a node uses a session-specific UUID to prevent path collisions. + +### 2. Dual Source-of-Truth Models + +#### A. Server-Primary (The "Scratchpad" Model) +Used for new projects or AI-generated tasks. +- **Root**: A temporary or persistent directory on the Cortex Server. +- **Propagation**: On initialization, the server pushes the initial manifest and data to all attached nodes. +- **Lifecycle**: Node folders are "mirrors" of the server. Any code generated by the AI on the server is instantly pushed down to the nodes for execution. + +#### B. Node-Primary (The "Local Codebase" Model) +Used for existing local repositories (e.g., a professional dev machine). +- **Root**: A specific local folder on one "Lead" Agent Node. +- **Mirroring**: The server pulls the initial state to create a local **Ghost Mirror**. +- **Change Detection**: The Lead Node uses OS-level watchers (`inotify`, `FSEvents`) to detect changes and stream deltas back to the server. +- **Propagation**: The server then broadcasts these changes to any other "Support" nodes attached to the session. + +--- + +## 🚀 Technical Architecture + +### 1. Change Detection & Reconciliation +- **Watcher-Driven Deltas**: Nodes stream `(File Path, Hash, ContentDelta)` messages via the gRPC `TaskStream` whenever local files change. +- **Hashed State Checks**: Before the AI begins a complex multi-file refactor, the server executes a quick "Fingerprint" check (comparing top-level directory hashes) to ensure the Ghost Mirror isn't drifting from the Node. + +### 2. Efficiency & Large Data Handling +- **Cortex-Ignore**: Native support for `.cortexignore` (and inherited `.gitignore`) to block heavy assets (`node_modules`, `.git`, `.venv`). +- **Chunked Deduplication**: Using chunk-based transfer to only send binary changes, minimizing bandwidth during repeated refactors. + +### 3. Conflict Resolution & Safety +- **Workspace Locking**: During AI "Write" operations, the target node's sync folder can be briefly locked to prevent concurrent manual edits from corrupting the refactor. +- **Deterministic Reconciliation**: If a conflict is detected (Node changed during AI planning), the server enters a **Reconcile Mode**, forcing a pull of the node state before retrying the edit. + +--- + +## 🛠️ Implementation Phasing + +### Phase 1: Ghost Mirror Foundations +- [x] Implement `FSMirror` service on the Orchestrator. +- [x] Implement basic gRPC `FileSync` messages in `agent.proto`. +- [x] support for `server-to-node` initial push. + +### Phase 2: Active Watchers & Multi-Node Sync +- [x] Integrate local directory watchers on Agent Nodes. +- [x] Implement `node-to-server` delta streaming. +- [x] Multi-node propagation (Server broadcasts Lead Node updates to Support Nodes). + +### Phase 3: Conflict Handling & Optimization +- [ ] Implementation of `.cortexignore` filtering. +- [ ] Workspace state locking and concurrency guards. +- [ ] Verification of Content-Addressable Storage (CAS) to optimize binary transfers. diff --git a/poc-grpc-agent/.nfs00000000006c17bd0000003f b/poc-grpc-agent/.nfs00000000006c17bd0000003f deleted file mode 100644 index fbc49e2..0000000 --- a/poc-grpc-agent/.nfs00000000006c17bd0000003f +++ /dev/null @@ -1,52 +0,0 @@ -[🛡️] Boss Plane Orchestrator Starting on [::]:50051... -[🛡️] Boss Plane Refactored & Online. -[📋] Registered Agent Node: agent-node-007 -[📶] Stream Online for agent-node-007 - [📦] Task shared-001 Claimed by agent-node-007 - [📦] Task shared-002 Claimed by agent-node-007 - [🚀] Streamed message to agent-node-007 - [🚀] Streamed message to agent-node-007 - -[🧠] AI Simulation Start... -[📤] Dispatching shell task-1772515596627 to agent-node-007 - [🚀] Streamed message to agent-node-007 - Uname Output: {'stdout': 'Linux d1ceb63b86a7 6.10.11-linuxkit #1 SMP Thu Oct 3 10:17:28 UTC 2024 aarch64 GNU/Linux\n', 'status': 0} - -[🧠] AI Phase 4: Navigating Browser (Antigravity Bridge)... -[🌐📤] Dispatching browser br-1772515596633 to agent-node-007 - [🚀] Streamed message to agent-node-007 - [🌐] Net Inspect: GET https://example.com/ - Nav Result: {'stdout': '', 'status': 0, 'browser': {'url': 'https://example.com/', 'title': 'Example Domain', 'has_snapshot': False, 'a11y': None, 'eval': ''}} - -[🧠] AI Phase 4 Pro: Perception & Advanced Logic... -[🌐📤] Dispatching browser br-1772515596761 to agent-node-007 - [🚀] Streamed message to agent-node-007 - A11y Result: {"role": "WebArea", "name": "Example Domain", "children": [{"role": "heading", "name": "Example Doma... -[🌐📤] Dispatching browser br-1772515596767 to agent-node-007 - [🚀] Streamed message to agent-node-007 - Eval Result: 92.89999997615814 - -[🧠] AI Phase 4 Pro: Triggering Real-time Events (Tunneling)... -[🌐📤] Dispatching browser br-1772515596772 to agent-node-007 - [🚀] Streamed message to agent-node-007 - [🖥️] Live Console: Refactored Hello! - [🖥️] Live Console: Failed to load resource: the server responded with a status of 404 () -Exception in thread Results-agent-node-007: -Traceback (most recent call last): - File "/usr/local/lib/python3.11/threading.py", line 1045, in _bootstrap_inner - self.run() - File "/usr/local/lib/python3.11/threading.py", line 982, in run - self._target(*self._args, **self._kwargs) - File "/app/poc-grpc-agent/orchestrator/services/grpc_server.py", line 55, in _read_results - for msg in request_iterator: - File "/home/vscode/.local/lib/python3.11/site-packages/grpc/_server.py", line 488, in __next__ - return self._next() - ^^^^^^^^^^^^ - File "/home/vscode/.local/lib/python3.11/site-packages/grpc/_server.py", line 480, in _next - request = self._look_for_request() - ^^^^^^^^^^^^^^^^^^^^^^^^ - File "/home/vscode/.local/lib/python3.11/site-packages/grpc/_server.py", line 462, in _look_for_request - _raise_rpc_error(self._state) - File "/home/vscode/.local/lib/python3.11/site-packages/grpc/_server.py", line 162, in _raise_rpc_error - raise rpc_error -grpc.RpcError diff --git a/poc-grpc-agent/agent_node/config.py b/poc-grpc-agent/agent_node/config.py index d0c5f62..6ed6746 100644 --- a/poc-grpc-agent/agent_node/config.py +++ b/poc-grpc-agent/agent_node/config.py @@ -18,3 +18,4 @@ # Resource Limits MAX_SKILL_WORKERS = int(os.getenv("MAX_SKILL_WORKERS", "5")) HEALTH_REPORT_INTERVAL = int(os.getenv("HEALTH_REPORT_INTERVAL", "10")) +SYNC_DIR = os.getenv("CORTEX_SYNC_DIR", "/tmp/cortex-sync") diff --git a/poc-grpc-agent/agent_node/core/sync.py b/poc-grpc-agent/agent_node/core/sync.py new file mode 100644 index 0000000..16f70cf --- /dev/null +++ b/poc-grpc-agent/agent_node/core/sync.py @@ -0,0 +1,52 @@ +import os +import hashlib +from agent_node.config import SYNC_DIR +from protos import agent_pb2 + +class NodeSyncManager: + """Handles local filesystem synchronization on the Agent Node.""" + def __init__(self, base_sync_dir=SYNC_DIR): + self.base_sync_dir = base_sync_dir + if not os.path.exists(self.base_sync_dir): + os.makedirs(self.base_sync_dir, exist_ok=True) + + def get_session_dir(self, session_id: str) -> str: + """Returns the unique identifier directory for this session's sync.""" + path = os.path.join(self.base_sync_dir, session_id) + os.makedirs(path, exist_ok=True) + return path + + def handle_manifest(self, session_id: str, manifest: agent_pb2.DirectoryManifest): + """Prepares the local directory based on the server manifest.""" + session_dir = self.get_session_dir(session_id) + print(f"[📁] Preparing Sync Directory: {session_dir}") + # In Phase 1, we just ensure directories exist + for file_info in manifest.files: + if file_info.is_dir: + os.makedirs(os.path.join(session_dir, file_info.path), exist_ok=True) + + def write_chunk(self, session_id: str, payload: agent_pb2.FilePayload) -> bool: + """Writes a file chunk to the local session directory.""" + session_dir = self.get_session_dir(session_id) + target_path = os.path.normpath(os.path.join(session_dir, payload.path)) + + if not target_path.startswith(session_dir): + return False # Path traversal guard + + os.makedirs(os.path.dirname(target_path), exist_ok=True) + + mode = "ab" if payload.chunk_index > 0 else "wb" + with open(target_path, mode) as f: + f.write(payload.chunk) + + if payload.is_final and payload.hash: + return self._verify(target_path, payload.hash) + return True + + def _verify(self, path, expected_hash): + with open(path, "rb") as f: + actual = hashlib.sha256(f.read()).hexdigest() + if actual != expected_hash: + print(f"[⚠️] Sync Hash Mismatch for {path}") + return False + return True diff --git a/poc-grpc-agent/agent_node/core/watcher.py b/poc-grpc-agent/agent_node/core/watcher.py new file mode 100644 index 0000000..8ddee21 --- /dev/null +++ b/poc-grpc-agent/agent_node/core/watcher.py @@ -0,0 +1,88 @@ + +import time +import os +import hashlib +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +from protos import agent_pb2 + +class SyncEventHandler(FileSystemEventHandler): + """Listens for FS events and triggers gRPC delta pushes.""" + def __init__(self, session_id, root_path, callback): + self.session_id = session_id + self.root_path = root_path + self.callback = callback + self.last_sync = {} # path -> last_hash + + def on_modified(self, event): + if not event.is_directory: + self._process_change(event.src_path) + + def on_created(self, event): + if not event.is_directory: + self._process_change(event.src_path) + + def on_moved(self, event): + # Simplification: treat move as a delete and create, or just process the dest + self._process_change(event.dest_path) + + def _process_change(self, abs_path): + rel_path = os.path.relpath(abs_path, self.root_path) + # Basic ignore filter (v1) + if any(part.startswith('.') for part in rel_path.split(os.sep)) or "node_modules" in rel_path: + return + + try: + with open(abs_path, "rb") as f: + content = f.read() + file_hash = hashlib.sha256(content).hexdigest() + + if self.last_sync.get(rel_path) == file_hash: + return # No actual change + + self.last_sync[rel_path] = file_hash + print(f" [📁📤] Detected Change: {rel_path}") + + # Chunk and Send + chunk_size = 64 * 1024 + for i in range(0, len(content), chunk_size): + chunk = content[i:i + chunk_size] + is_final = i + chunk_size >= len(content) + payload = agent_pb2.FilePayload( + path=rel_path, + chunk=chunk, + chunk_index=i // chunk_size, + is_final=is_final, + hash=file_hash if is_final else "" + ) + self.callback(self.session_id, payload) + except Exception as e: + print(f" [!] Watcher Error for {rel_path}: {e}") + +class WorkspaceWatcher: + """Manages FS observers for active synchronization.""" + def __init__(self, callback): + self.callback = callback + self.observers = {} # session_id -> observer + + def start_watching(self, session_id, root_path): + if session_id in self.observers: + self.stop_watching(session_id) + + print(f"[*] Starting Watcher for Session {session_id} at {root_path}") + handler = SyncEventHandler(session_id, root_path, self.callback) + observer = Observer() + observer.schedule(handler, root_path, recursive=True) + observer.start() + self.observers[session_id] = observer + + def stop_watching(self, session_id): + if session_id in self.observers: + print(f"[*] Stopping Watcher for Session {session_id}") + self.observers[session_id].stop() + self.observers[session_id].join() + del self.observers[session_id] + + def shutdown(self): + for sid in list(self.observers.keys()): + self.stop_watching(sid) diff --git a/poc-grpc-agent/agent_node/node.py b/poc-grpc-agent/agent_node/node.py index 0500e5c..c7fab99 100644 --- a/poc-grpc-agent/agent_node/node.py +++ b/poc-grpc-agent/agent_node/node.py @@ -2,9 +2,12 @@ import queue import time import sys +import os from protos import agent_pb2, agent_pb2_grpc from agent_node.skills.manager import SkillManager from agent_node.core.sandbox import SandboxEngine +from agent_node.core.sync import NodeSyncManager +from agent_node.core.watcher import WorkspaceWatcher from agent_node.utils.auth import create_auth_token, verify_task_signature from agent_node.utils.network import get_secure_stub from agent_node.config import NODE_ID, NODE_DESC, HEALTH_REPORT_INTERVAL, MAX_SKILL_WORKERS @@ -15,6 +18,8 @@ self.node_id = node_id self.skills = SkillManager(max_workers=MAX_SKILL_WORKERS) self.sandbox = SandboxEngine() + self.sync_mgr = NodeSyncManager() + self.watcher = WorkspaceWatcher(self._on_sync_delta) self.task_queue = queue.Queue() self.stub = get_secure_stub() @@ -92,12 +97,58 @@ self._send_response(msg.task_cancel.task_id, None, agent_pb2.TaskResponse.CANCELLED) elif kind == 'work_pool_update': - # Claim logical idle tasks from global pool + # Claim logical idle tasks from global pool with slight randomized jitter + # to prevent thundering herd where every node claims the same task at the exact same ms. if len(self.skills.get_active_ids()) < MAX_SKILL_WORKERS: for tid in msg.work_pool_update.available_task_ids: + # Deterministic delay based on node_id to distribute claims + import random + time.sleep(random.uniform(0.1, 0.5)) + self.task_queue.put(agent_pb2.ClientTaskMessage( task_claim=agent_pb2.TaskClaimRequest(task_id=tid, node_id=self.node_id) )) + + elif kind == 'claim_status': + status = "GRANTED" if msg.claim_status.granted else "DENIED" + print(f" [📦] Claim {msg.claim_status.task_id}: {status} ({msg.claim_status.reason})", flush=True) + + elif kind == 'file_sync': + self._handle_file_sync(msg.file_sync) + + def _on_sync_delta(self, session_id, file_payload): + """Callback from watcher to push local changes to server.""" + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + file_data=file_payload + ) + )) + + def _handle_file_sync(self, fs): + """Processes inbound file synchronization messages from the Orchestrator.""" + sid = fs.session_id + if fs.HasField("manifest"): + self.sync_mgr.handle_manifest(sid, fs.manifest) + elif fs.HasField("file_data"): + success = self.sync_mgr.write_chunk(sid, fs.file_data) + if fs.file_data.is_final: + print(f" [📁] File Received: {fs.file_data.path} (Verified: {success})") + status = agent_pb2.SyncStatus.OK if success else agent_pb2.SyncStatus.ERROR + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=sid, + status=agent_pb2.SyncStatus(code=status, message=f"File {fs.file_data.path} synced") + ) + )) + elif fs.HasField("control"): + ctrl = fs.control + if ctrl.action == agent_pb2.SyncControl.START_WATCHING: + # Path relative to sync dir or absolute + watch_path = ctrl.path if os.path.isabs(ctrl.path) else os.path.join(self.sync_mgr.get_session_dir(sid), ctrl.path) + self.watcher.start_watching(sid, watch_path) + elif ctrl.action == agent_pb2.SyncControl.STOP_WATCHING: + self.watcher.stop_watching(sid) def _handle_task(self, task): print(f"[*] Task Launch: {task.task_id}", flush=True) diff --git a/poc-grpc-agent/orchestrator/app.py b/poc-grpc-agent/orchestrator/app.py index 0812478..30d40b3 100644 --- a/poc-grpc-agent/orchestrator/app.py +++ b/poc-grpc-agent/orchestrator/app.py @@ -52,12 +52,25 @@ orch.pool.push_work("shared-mesh-002", "uptime") time.sleep(5) # Let nodes claim - target_node = "agent-node-001" + active_nodes = orch.registry.list_nodes() + if not active_nodes: + print("[!] No nodes available for direct task simulation.") + return + target_node = active_nodes[0] # Phase 1: Direct Shell Task print(f"\n[📤] Dispatching shell task to {target_node}") res_single = orch.assistant.dispatch_single(target_node, 'uname -a') print(f" Uname Output: {res_single}", flush=True) + + # Ghost Mirror Sync Phase 1 & 2 + print("\n[🧠] AI Phase: Ghost Mirror Workspace Sync (Multi-Node Broadcast)...") + for node_id in active_nodes: + orch.assistant.push_workspace(node_id, "test-session-001") + + time.sleep(2) + # Start watching only on the first node to test broadcast to others + orch.assistant.control_sync(active_nodes[0], "test-session-001", action="START") # Phase 4: Browser Bridge print("\n[🧠] AI Phase 4: Navigating Browser (Antigravity Bridge)...") diff --git a/poc-grpc-agent/orchestrator/core/mirror.py b/poc-grpc-agent/orchestrator/core/mirror.py new file mode 100644 index 0000000..6af816a --- /dev/null +++ b/poc-grpc-agent/orchestrator/core/mirror.py @@ -0,0 +1,62 @@ +import os +import shutil +import hashlib +from typing import Dict, List +from protos import agent_pb2 + +class GhostMirrorManager: + """Manages local server-side copies of node workspaces.""" + def __init__(self, storage_root="/app/data/mirrors"): + self.storage_root = storage_root + if not os.path.exists(self.storage_root): + os.makedirs(self.storage_root, exist_ok=True) + + def get_workspace_path(self, session_id: str) -> str: + """Returns the local absolute path for a session's mirror.""" + path = os.path.join(self.storage_root, session_id) + os.makedirs(path, exist_ok=True) + return path + + def write_file_chunk(self, session_id: str, file_payload: agent_pb2.FilePayload): + """Writes a chunk of data to the local mirror.""" + workspace = self.get_workspace_path(session_id) + # Prevent path traversal + safe_path = os.path.normpath(os.path.join(workspace, file_payload.path)) + if not safe_path.startswith(workspace): + raise ValueError(f"Malicious path detected: {file_payload.path}") + + os.makedirs(os.path.dirname(safe_path), exist_ok=True) + + mode = "ab" if file_payload.chunk_index > 0 else "wb" + with open(safe_path, mode) as f: + f.write(file_payload.chunk) + + if file_payload.is_final and file_payload.hash: + self._verify_hash(safe_path, file_payload.hash) + + def generate_manifest(self, session_id: str) -> agent_pb2.DirectoryManifest: + """Generates a manifest of the current local mirror state.""" + workspace = self.get_workspace_path(session_id) + files = [] + for root, _, filenames in os.walk(workspace): + for filename in filenames: + abs_path = os.path.join(root, filename) + rel_path = os.path.relpath(abs_path, workspace) + + with open(abs_path, "rb") as f: + file_hash = hashlib.sha256(f.read()).hexdigest() + + files.append(agent_pb2.FileInfo( + path=rel_path, + size=os.path.getsize(abs_path), + hash=file_hash, + is_dir=False + )) + return agent_pb2.DirectoryManifest(root_path=workspace, files=files) + + def _verify_hash(self, path: str, expected_hash: str): + content = open(path, "rb").read() + actual_hash = hashlib.sha256(content).hexdigest() + if actual_hash != expected_hash: + print(f"[⚠️] Hash Mismatch for {path}: expected {expected_hash}, got {actual_hash}") + # In a real system, we'd trigger a re-download/re-sync diff --git a/poc-grpc-agent/orchestrator/core/registry.py b/poc-grpc-agent/orchestrator/core/registry.py index b40e6e4..71a0fc9 100644 --- a/poc-grpc-agent/orchestrator/core/registry.py +++ b/poc-grpc-agent/orchestrator/core/registry.py @@ -34,3 +34,7 @@ def get_node(self, node_id): with self.lock: return self.nodes.get(node_id) + + def list_nodes(self): + with self.lock: + return list(self.nodes.keys()) diff --git a/poc-grpc-agent/orchestrator/services/assistant.py b/poc-grpc-agent/orchestrator/services/assistant.py index 67266ba..693c8f8 100644 --- a/poc-grpc-agent/orchestrator/services/assistant.py +++ b/poc-grpc-agent/orchestrator/services/assistant.py @@ -1,14 +1,100 @@ import time import json +import os from orchestrator.utils.crypto import sign_payload, sign_browser_action from protos import agent_pb2 class TaskAssistant: """The 'Brain' of the Orchestrator: High-Level AI API for Dispatching Tasks.""" - def __init__(self, registry, journal, pool): + def __init__(self, registry, journal, pool, mirror=None): self.registry = registry self.journal = journal self.pool = pool + self.mirror = mirror + + def push_workspace(self, node_id, session_id): + """Initial unidirectional push from server ghost mirror to a node.""" + node = self.registry.get_node(node_id) + if not node or not self.mirror: return + + print(f"[📁📤] Initiating Workspace Push for Session {session_id} to {node_id}") + manifest = self.mirror.generate_manifest(session_id) + + # 1. Send Manifest + node["queue"].put(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + manifest=manifest + ) + )) + + # 2. Send File Data + for file_info in manifest.files: + if not file_info.is_dir: + self._push_file(node, session_id, file_info) + + def _push_file(self, node, session_id, file_info): + """Chunks and sends a single file to a node.""" + full_path = os.path.join(self.mirror.get_workspace_path(session_id), file_info.path) + chunk_size = 1024 * 64 # 64KB chunks + + with open(full_path, "rb") as f: + index = 0 + while True: + chunk = f.read(chunk_size) + is_final = len(chunk) < chunk_size + + payload = agent_pb2.FilePayload( + path=file_info.path, + chunk=chunk, + chunk_index=index, + is_final=is_final, + hash=file_info.hash if is_final else "" + ) + + node["queue"].put(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + file_data=payload + ) + )) + + if is_final or not chunk: + break + index += 1 + + def broadcast_file_chunk(self, session_id: str, sender_node_id: str, file_payload): + """Broadcasts a file chunk received from one node to all other nodes in the mesh.""" + print(f" [📁📢] Broadcasting {file_payload.path} from {sender_node_id} to other nodes...") + for node_id in self.registry.list_nodes(): + if node_id == sender_node_id: + continue + + node = self.registry.get_node(node_id) + if not node: + continue + + # Forward the exact same FileSyncMessage + node["queue"].put(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + file_data=file_payload + ) + )) + + def control_sync(self, node_id, session_id, action="START", path="."): + """Sends a SyncControl command to a node (e.g. START_WATCHING).""" + node = self.registry.get_node(node_id) + if not node: return + + proto_action = agent_pb2.SyncControl.START_WATCHING if action == "START" else agent_pb2.SyncControl.STOP_WATCHING + + node["queue"].put(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + control=agent_pb2.SyncControl(action=proto_action, path=path) + ) + )) def dispatch_single(self, node_id, cmd, timeout=30): """Dispatches a shell command to a specific node.""" diff --git a/poc-grpc-agent/orchestrator/services/grpc_server.py b/poc-grpc-agent/orchestrator/services/grpc_server.py index 27d116c..f650120 100644 --- a/poc-grpc-agent/orchestrator/services/grpc_server.py +++ b/poc-grpc-agent/orchestrator/services/grpc_server.py @@ -5,6 +5,7 @@ from orchestrator.core.registry import MemoryNodeRegistry from orchestrator.core.journal import TaskJournal from orchestrator.core.pool import GlobalWorkPool +from orchestrator.core.mirror import GhostMirrorManager from orchestrator.services.assistant import TaskAssistant from orchestrator.utils.crypto import sign_payload @@ -14,8 +15,31 @@ self.registry = MemoryNodeRegistry() self.journal = TaskJournal() self.pool = GlobalWorkPool() - self.assistant = TaskAssistant(self.registry, self.journal, self.pool) + self.mirror = GhostMirrorManager() + self.assistant = TaskAssistant(self.registry, self.journal, self.pool, self.mirror) self.pool.on_new_work = self._broadcast_work + + # 4. Mesh Observation (Aggregated Health Dashboard) + threading.Thread(target=self._monitor_mesh, daemon=True, name="MeshMonitor").start() + + def _monitor_mesh(self): + """Periodically prints status of all nodes in the mesh.""" + while True: + time.sleep(10) + active_nodes = self.registry.list_nodes() + print("\n" + "="*50) + print(f"📡 CORTEX MESH DASHBOARD | {len(active_nodes)} Nodes Online") + print("-" * 50) + if not active_nodes: + print(" No nodes currently connected.") + for nid in active_nodes: + node = self.registry.get_node(nid) + stats = node.get("stats", {}) + tasks = stats.get("running", []) + capability = node.get("metadata", {}).get("caps", {}) + print(f" 🟢 {nid:15} | Workers: {stats.get('active_worker_count', 0)} | Running: {len(tasks)} tasks") + print(f" Capabilities: {capability}") + print("="*50 + "\n", flush=True) def _broadcast_work(self, _): """Pushes work notifications to all active nodes.""" @@ -92,11 +116,27 @@ def _handle_client_message(self, msg, node_id, node): kind = msg.WhichOneof('payload') if kind == 'task_claim': - success, payload = self.pool.claim(msg.task_claim.task_id, node_id) + task_id = msg.task_claim.task_id + success, payload = self.pool.claim(task_id, node_id) + + # Send status response back to the node first + node["queue"].put(agent_pb2.ServerTaskMessage( + claim_status=agent_pb2.TaskClaimResponse( + task_id=task_id, + granted=success, + reason="Task successfully claimed" if success else "Task already claimed by another node" + ) + )) + if success: sig = sign_payload(payload) node["queue"].put(agent_pb2.ServerTaskMessage( - task_request=agent_pb2.TaskRequest(task_id=msg.task_claim.task_id, payload_json=payload, signature=sig))) + task_request=agent_pb2.TaskRequest( + task_id=task_id, + payload_json=payload, + signature=sig + ) + )) elif kind == 'task_response': res_obj = {"stdout": msg.task_response.stdout, "status": msg.task_response.status} @@ -115,6 +155,17 @@ content = e.console_msg.text if e.HasField("console_msg") else f"{e.network_req.method} {e.network_req.url}" print(f" {prefix}: {content}", flush=True) + elif kind == 'file_sync': + # Handle inbound file data from nodes (Node-Primary model) + fs = msg.file_sync + if fs.HasField("file_data"): + print(f" [📁📥] Mirroring {fs.file_data.path} (chunk {fs.file_data.chunk_index})") + self.mirror.write_file_chunk(fs.session_id, fs.file_data) + # BROADCAST to other nodes in the mesh + self.assistant.broadcast_file_chunk(fs.session_id, node_id, fs.file_data) + elif fs.HasField("status"): + print(f" [📁] Sync Status from {node_id}: {fs.status.message}") + def ReportHealth(self, request_iterator, context): """Collect Health Metrics and Feed Policy Updates.""" for hb in request_iterator: diff --git a/poc-grpc-agent/protos/agent.proto b/poc-grpc-agent/protos/agent.proto index 35226bd..64a0813 100644 --- a/poc-grpc-agent/protos/agent.proto +++ b/poc-grpc-agent/protos/agent.proto @@ -49,6 +49,7 @@ TaskClaimRequest task_claim = 2; BrowserEvent browser_event = 3; NodeAnnounce announce = 4; // NEW: Identification on stream connect + FileSyncMessage file_sync = 5; // NEW: Ghost Mirror Sync } } @@ -70,6 +71,7 @@ WorkPoolUpdate work_pool_update = 2; TaskClaimResponse claim_status = 3; TaskCancelRequest task_cancel = 4; + FileSyncMessage file_sync = 5; // NEW: Ghost Mirror Sync } } @@ -185,3 +187,54 @@ message HealthCheckResponse { int64 server_time_ms = 1; } + +// --- Channel 4: Ghost Mirror File Sync --- +message FileSyncMessage { + string session_id = 1; + oneof payload { + DirectoryManifest manifest = 2; + FilePayload file_data = 3; + SyncStatus status = 4; + SyncControl control = 5; + } +} + +message SyncControl { + enum Action { + START_WATCHING = 0; + STOP_WATCHING = 1; + } + Action action = 1; + string path = 2; +} + +message DirectoryManifest { + string root_path = 1; + repeated FileInfo files = 2; +} + +message FileInfo { + string path = 1; + int64 size = 2; + string hash = 3; // For drift detection + bool is_dir = 4; +} + +message FilePayload { + string path = 1; + bytes chunk = 2; + int32 chunk_index = 3; + bool is_final = 4; + string hash = 5; // Full file hash for verification on final chunk +} + +message SyncStatus { + enum Code { + OK = 0; + ERROR = 1; + RECONCILE_REQUIRED = 2; + IN_PROGRESS = 3; + } + Code code = 1; + string message = 2; +} diff --git a/poc-grpc-agent/protos/agent_pb2.py b/poc-grpc-agent/protos/agent_pb2.py index f51d6f2..338fb99 100644 --- a/poc-grpc-agent/protos/agent_pb2.py +++ b/poc-grpc-agent/protos/agent_pb2.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: agent.proto +# source: protos/agent.proto # Protobuf Python Version: 4.25.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor @@ -14,65 +14,81 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61gent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc5\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xd2\x01\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x42\t\n\x07payload\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xe0\x01\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xbd\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xdc\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xc1\x01\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12protos/agent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc5\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xff\x01\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x42\t\n\x07payload\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\x8d\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xbd\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xdc\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xc1\x01\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xd3\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x42\t\n\x07payload\"w\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\"/\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"_\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\"\x87\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_pb2', _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'protos.agent_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._options = None _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_options = b'8\001' _globals['_TASKRESPONSE_ARTIFACTSENTRY']._options = None _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_options = b'8\001' - _globals['_REGISTRATIONREQUEST']._serialized_start=23 - _globals['_REGISTRATIONREQUEST']._serialized_end=245 - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=194 - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=245 - _globals['_SANDBOXPOLICY']._serialized_start=248 - _globals['_SANDBOXPOLICY']._serialized_end=445 - _globals['_SANDBOXPOLICY_MODE']._serialized_start=411 - _globals['_SANDBOXPOLICY_MODE']._serialized_end=445 - _globals['_REGISTRATIONRESPONSE']._serialized_start=447 - _globals['_REGISTRATIONRESPONSE']._serialized_end=567 - _globals['_CLIENTTASKMESSAGE']._serialized_start=570 - _globals['_CLIENTTASKMESSAGE']._serialized_end=780 - _globals['_NODEANNOUNCE']._serialized_start=782 - _globals['_NODEANNOUNCE']._serialized_end=813 - _globals['_BROWSEREVENT']._serialized_start=816 - _globals['_BROWSEREVENT']._serialized_end=951 - _globals['_SERVERTASKMESSAGE']._serialized_start=954 - _globals['_SERVERTASKMESSAGE']._serialized_end=1178 - _globals['_TASKCANCELREQUEST']._serialized_start=1180 - _globals['_TASKCANCELREQUEST']._serialized_end=1216 - _globals['_TASKREQUEST']._serialized_start=1219 - _globals['_TASKREQUEST']._serialized_end=1408 - _globals['_BROWSERACTION']._serialized_start=1411 - _globals['_BROWSERACTION']._serialized_end=1699 - _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1565 - _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=1699 - _globals['_TASKRESPONSE']._serialized_start=1702 - _globals['_TASKRESPONSE']._serialized_end=2054 - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=1934 - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=1982 - _globals['_TASKRESPONSE_STATUS']._serialized_start=1984 - _globals['_TASKRESPONSE_STATUS']._serialized_end=2044 - _globals['_BROWSERRESPONSE']._serialized_start=2057 - _globals['_BROWSERRESPONSE']._serialized_end=2277 - _globals['_CONSOLEMESSAGE']._serialized_start=2279 - _globals['_CONSOLEMESSAGE']._serialized_end=2346 - _globals['_NETWORKREQUEST']._serialized_start=2348 - _globals['_NETWORKREQUEST']._serialized_end=2452 - _globals['_WORKPOOLUPDATE']._serialized_start=2454 - _globals['_WORKPOOLUPDATE']._serialized_end=2498 - _globals['_TASKCLAIMREQUEST']._serialized_start=2500 - _globals['_TASKCLAIMREQUEST']._serialized_end=2552 - _globals['_TASKCLAIMRESPONSE']._serialized_start=2554 - _globals['_TASKCLAIMRESPONSE']._serialized_end=2623 - _globals['_HEARTBEAT']._serialized_start=2626 - _globals['_HEARTBEAT']._serialized_end=2819 - _globals['_HEALTHCHECKRESPONSE']._serialized_start=2821 - _globals['_HEALTHCHECKRESPONSE']._serialized_end=2866 - _globals['_AGENTORCHESTRATOR']._serialized_start=2869 - _globals['_AGENTORCHESTRATOR']._serialized_end=3102 + _globals['_REGISTRATIONREQUEST']._serialized_start=30 + _globals['_REGISTRATIONREQUEST']._serialized_end=252 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=201 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=252 + _globals['_SANDBOXPOLICY']._serialized_start=255 + _globals['_SANDBOXPOLICY']._serialized_end=452 + _globals['_SANDBOXPOLICY_MODE']._serialized_start=418 + _globals['_SANDBOXPOLICY_MODE']._serialized_end=452 + _globals['_REGISTRATIONRESPONSE']._serialized_start=454 + _globals['_REGISTRATIONRESPONSE']._serialized_end=574 + _globals['_CLIENTTASKMESSAGE']._serialized_start=577 + _globals['_CLIENTTASKMESSAGE']._serialized_end=832 + _globals['_NODEANNOUNCE']._serialized_start=834 + _globals['_NODEANNOUNCE']._serialized_end=865 + _globals['_BROWSEREVENT']._serialized_start=868 + _globals['_BROWSEREVENT']._serialized_end=1003 + _globals['_SERVERTASKMESSAGE']._serialized_start=1006 + _globals['_SERVERTASKMESSAGE']._serialized_end=1275 + _globals['_TASKCANCELREQUEST']._serialized_start=1277 + _globals['_TASKCANCELREQUEST']._serialized_end=1313 + _globals['_TASKREQUEST']._serialized_start=1316 + _globals['_TASKREQUEST']._serialized_end=1505 + _globals['_BROWSERACTION']._serialized_start=1508 + _globals['_BROWSERACTION']._serialized_end=1796 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1662 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=1796 + _globals['_TASKRESPONSE']._serialized_start=1799 + _globals['_TASKRESPONSE']._serialized_end=2151 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2031 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2079 + _globals['_TASKRESPONSE_STATUS']._serialized_start=2081 + _globals['_TASKRESPONSE_STATUS']._serialized_end=2141 + _globals['_BROWSERRESPONSE']._serialized_start=2154 + _globals['_BROWSERRESPONSE']._serialized_end=2374 + _globals['_CONSOLEMESSAGE']._serialized_start=2376 + _globals['_CONSOLEMESSAGE']._serialized_end=2443 + _globals['_NETWORKREQUEST']._serialized_start=2445 + _globals['_NETWORKREQUEST']._serialized_end=2549 + _globals['_WORKPOOLUPDATE']._serialized_start=2551 + _globals['_WORKPOOLUPDATE']._serialized_end=2595 + _globals['_TASKCLAIMREQUEST']._serialized_start=2597 + _globals['_TASKCLAIMREQUEST']._serialized_end=2649 + _globals['_TASKCLAIMRESPONSE']._serialized_start=2651 + _globals['_TASKCLAIMRESPONSE']._serialized_end=2720 + _globals['_HEARTBEAT']._serialized_start=2723 + _globals['_HEARTBEAT']._serialized_end=2916 + _globals['_HEALTHCHECKRESPONSE']._serialized_start=2918 + _globals['_HEALTHCHECKRESPONSE']._serialized_end=2963 + _globals['_FILESYNCMESSAGE']._serialized_start=2966 + _globals['_FILESYNCMESSAGE']._serialized_end=3177 + _globals['_SYNCCONTROL']._serialized_start=3179 + _globals['_SYNCCONTROL']._serialized_end=3298 + _globals['_SYNCCONTROL_ACTION']._serialized_start=3251 + _globals['_SYNCCONTROL_ACTION']._serialized_end=3298 + _globals['_DIRECTORYMANIFEST']._serialized_start=3300 + _globals['_DIRECTORYMANIFEST']._serialized_end=3370 + _globals['_FILEINFO']._serialized_start=3372 + _globals['_FILEINFO']._serialized_end=3440 + _globals['_FILEPAYLOAD']._serialized_start=3442 + _globals['_FILEPAYLOAD']._serialized_end=3537 + _globals['_SYNCSTATUS']._serialized_start=3540 + _globals['_SYNCSTATUS']._serialized_end=3675 + _globals['_SYNCSTATUS_CODE']._serialized_start=3609 + _globals['_SYNCSTATUS_CODE']._serialized_end=3675 + _globals['_AGENTORCHESTRATOR']._serialized_start=3678 + _globals['_AGENTORCHESTRATOR']._serialized_end=3911 # @@protoc_insertion_point(module_scope) diff --git a/poc-grpc-agent/protos/agent_pb2_grpc.py b/poc-grpc-agent/protos/agent_pb2_grpc.py index b91c8a0..f551b0b 100644 --- a/poc-grpc-agent/protos/agent_pb2_grpc.py +++ b/poc-grpc-agent/protos/agent_pb2_grpc.py @@ -2,7 +2,7 @@ """Client and server classes corresponding to protobuf-defined services.""" import grpc -from . import agent_pb2 as agent__pb2 +from protos import agent_pb2 as protos_dot_agent__pb2 class AgentOrchestratorStub(object): @@ -17,18 +17,18 @@ """ self.SyncConfiguration = channel.unary_unary( '/agent.AgentOrchestrator/SyncConfiguration', - request_serializer=agent__pb2.RegistrationRequest.SerializeToString, - response_deserializer=agent__pb2.RegistrationResponse.FromString, + request_serializer=protos_dot_agent__pb2.RegistrationRequest.SerializeToString, + response_deserializer=protos_dot_agent__pb2.RegistrationResponse.FromString, ) self.TaskStream = channel.stream_stream( '/agent.AgentOrchestrator/TaskStream', - request_serializer=agent__pb2.ClientTaskMessage.SerializeToString, - response_deserializer=agent__pb2.ServerTaskMessage.FromString, + request_serializer=protos_dot_agent__pb2.ClientTaskMessage.SerializeToString, + response_deserializer=protos_dot_agent__pb2.ServerTaskMessage.FromString, ) self.ReportHealth = channel.stream_stream( '/agent.AgentOrchestrator/ReportHealth', - request_serializer=agent__pb2.Heartbeat.SerializeToString, - response_deserializer=agent__pb2.HealthCheckResponse.FromString, + request_serializer=protos_dot_agent__pb2.Heartbeat.SerializeToString, + response_deserializer=protos_dot_agent__pb2.HealthCheckResponse.FromString, ) @@ -62,18 +62,18 @@ rpc_method_handlers = { 'SyncConfiguration': grpc.unary_unary_rpc_method_handler( servicer.SyncConfiguration, - request_deserializer=agent__pb2.RegistrationRequest.FromString, - response_serializer=agent__pb2.RegistrationResponse.SerializeToString, + request_deserializer=protos_dot_agent__pb2.RegistrationRequest.FromString, + response_serializer=protos_dot_agent__pb2.RegistrationResponse.SerializeToString, ), 'TaskStream': grpc.stream_stream_rpc_method_handler( servicer.TaskStream, - request_deserializer=agent__pb2.ClientTaskMessage.FromString, - response_serializer=agent__pb2.ServerTaskMessage.SerializeToString, + request_deserializer=protos_dot_agent__pb2.ClientTaskMessage.FromString, + response_serializer=protos_dot_agent__pb2.ServerTaskMessage.SerializeToString, ), 'ReportHealth': grpc.stream_stream_rpc_method_handler( servicer.ReportHealth, - request_deserializer=agent__pb2.Heartbeat.FromString, - response_serializer=agent__pb2.HealthCheckResponse.SerializeToString, + request_deserializer=protos_dot_agent__pb2.Heartbeat.FromString, + response_serializer=protos_dot_agent__pb2.HealthCheckResponse.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( @@ -98,8 +98,8 @@ timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/agent.AgentOrchestrator/SyncConfiguration', - agent__pb2.RegistrationRequest.SerializeToString, - agent__pb2.RegistrationResponse.FromString, + protos_dot_agent__pb2.RegistrationRequest.SerializeToString, + protos_dot_agent__pb2.RegistrationResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -115,8 +115,8 @@ timeout=None, metadata=None): return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/TaskStream', - agent__pb2.ClientTaskMessage.SerializeToString, - agent__pb2.ServerTaskMessage.FromString, + protos_dot_agent__pb2.ClientTaskMessage.SerializeToString, + protos_dot_agent__pb2.ServerTaskMessage.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -132,7 +132,7 @@ timeout=None, metadata=None): return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/ReportHealth', - agent__pb2.Heartbeat.SerializeToString, - agent__pb2.HealthCheckResponse.FromString, + protos_dot_agent__pb2.Heartbeat.SerializeToString, + protos_dot_agent__pb2.HealthCheckResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/poc-grpc-agent/requirements.txt b/poc-grpc-agent/requirements.txt index 6e33b6d..3e04bee 100644 --- a/poc-grpc-agent/requirements.txt +++ b/poc-grpc-agent/requirements.txt @@ -2,3 +2,4 @@ grpcio-tools==1.62.1 PyJWT==2.8.0 playwright==1.42.0 +watchdog==4.0.0 diff --git a/poc-grpc-agent/test_mesh.py b/poc-grpc-agent/test_mesh.py new file mode 100644 index 0000000..ecb8cc2 --- /dev/null +++ b/poc-grpc-agent/test_mesh.py @@ -0,0 +1,85 @@ + +import time +import subprocess +import os +import signal + +def run_mesh_test(): + print("[🚀] Starting Collaborative Mesh Test...") + print("[🛡️] Orchestrator: Starting...") + + # 1. Start Orchestrator + orchestrator = subprocess.Popen( + ["python3", "-m", "orchestrator.app"], + cwd="/app/poc-grpc-agent", + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1 + ) + + time.sleep(3) # Wait for start + + print("[🤖] Node Alpha: Starting...") + # 2. Start Agent Node 1 + node1 = subprocess.Popen( + ["python3", "-m", "agent_node.main"], + cwd="/app/poc-grpc-agent", + env={**os.environ, "AGENT_NODE_ID": "node-alpha", "CORTEX_SYNC_DIR": "/tmp/cortex-sync-alpha"}, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1 + ) + + print("[🤖] Node Beta: Starting...") + # 3. Start Agent Node 2 + node2 = subprocess.Popen( + ["python3", "-m", "agent_node.main"], + cwd="/app/poc-grpc-agent", + env={**os.environ, "AGENT_NODE_ID": "node-beta", "CORTEX_SYNC_DIR": "/tmp/cortex-sync-beta"}, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1 + ) + + print("[⏳] Running simulation for 40 seconds...") + start_time = time.time() + + # Simple thread to print outputs in real-time + import threading + def pipe_output(name, pipe): + for line in pipe: + print(f"[{name}] {line.strip()}") + + threading.Thread(target=pipe_output, args=("ORCH", orchestrator.stdout), daemon=True).start() + threading.Thread(target=pipe_output, args=("N1", node1.stdout), daemon=True).start() + threading.Thread(target=pipe_output, args=("N2", node2.stdout), daemon=True).start() + + # Simulate a local edit on Node Alpha (N1) after a delay to test real-time sync + def simulate_local_edit(): + time.sleep(22) # Wait for initial push and START_WATCHING + sync_file = "/tmp/cortex-sync-alpha/test-session-001/hello.py" + print(f"\n[📝] User Sim: Editing {sync_file} locally on Node Alpha...") + with open(sync_file, "a") as f: + f.write("\n# BROADCAST TEST: User added this line on Alpha!\n") + + threading.Thread(target=simulate_local_edit, daemon=True).start() + + time.sleep(40) + + # 4. Cleanup + print("\n[🛑] Test Finished. Terminating processes...") + orchestrator.terminate() + node1.terminate() + node2.terminate() + + time.sleep(2) + orchestrator.kill() + node1.kill() + node2.kill() + print("[✅] Done.") + +if __name__ == "__main__": + run_mesh_test()