diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py index 6fd5840..ac18ac6 100644 --- a/agent-node/src/agent_node/node.py +++ b/agent-node/src/agent_node/node.py @@ -27,191 +27,102 @@ from agent_node.utils.watchdog import watchdog from agent_node.core.regex_patterns import ANSI_ESCAPE +from mesh_core.transport_grpc import GrpcMeshTransport +from mesh_core.node_engine import MeshNodeCore +import agent_node.config as config +from agent_node.utils.watchdog import watchdog +from agent_node.utils.network import get_secure_stub + logger = logging.getLogger(__name__) -class AgentNode: +class AgentNode(MeshNodeCore): """ - Agent Core: Orchestrates local skills and maintains gRPC connectivity. - Refactored for structural clarity and modular message handling. + Agent Core: Orchestrates local skills and maintains connectivity. + Now leverages MeshNodeCore for transport-agnostic orchestration. """ def __init__(self): - self.node_id = config.NODE_ID + # 1. Initialize Transport (gRPC by default for production) + # Pass the secure stub factory to keep existing security logic + self.transport = GrpcMeshTransport(config.NODE_ID, get_secure_stub) + + # 2. Initialize Core Engine + super().__init__(config.NODE_ID, self.transport) + + # 3. Initialize Agent-Specific Modules self.sandbox = SandboxEngine() self.sync_mgr = NodeSyncManager() self.skills = SkillManager(max_workers=config.MAX_SKILL_WORKERS, sync_mgr=self.sync_mgr) self.watcher = WorkspaceWatcher(self._on_sync_delta) - self.task_queue = queue.Queue(maxsize=250) - self.stub = None - self.channel = None self._stop_event = threading.Event() - self._refresh_stub() - + self.io_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="NodeIO") self.io_semaphore = threading.Semaphore(50) self.write_locks = {} self.lock_map_mutex = threading.Lock() - def _refresh_stub(self): - """Force refreshes the gRPC channel and stub.""" - if self.channel: - try: self.channel.close() - except: pass - self.stub, self.channel = get_secure_stub() - self._setup_connectivity_watcher() + # 4. Bind Mesh Events + self.on_task = self._handle_task + self.on_cancel = self._handle_cancel + self.on_policy = self._on_policy_update + self.on_sync = self._handle_file_sync + self.on_ready = self._on_mesh_ready + self.on_disconnect = self._on_mesh_disconnect - def _setup_connectivity_watcher(self): - """Monitors gRPC channel state.""" - import grpc - self._last_grpc_state = None - def _on_state_change(state): - if not self._stop_event.is_set() and state != self._last_grpc_state: - print(f"[*] [gRPC-State] {state}", flush=True) - self._last_grpc_state = state - self.channel.subscribe(_on_state_change, try_to_connect=True) + def _on_mesh_ready(self, msg): + print(f"[Mesh] Connected and Authorized. Policy Synced.") + if hasattr(msg, 'policy_update'): + self._on_policy_update(msg.policy_update) + elif hasattr(msg, 'policy'): + self._on_policy_update(msg.policy) + + def _on_mesh_disconnect(self): + print(f"[Mesh] Disconnected from Hub.") + + def _on_policy_update(self, policy): + self.sandbox.sync(policy) + self._apply_skill_config(policy.skill_config_json) def sync_configuration(self): - """Handshake with the Orchestrator to sync policy and metadata.""" - while True: - config.reload() - self.node_id = config.NODE_ID - if not self.stub: self._refresh_stub() - - print(f"[*] Handshake with Orchestrator: {self.node_id}") - caps = self._collect_capabilities() - reg_req = agent_pb2.RegistrationRequest( - node_id=self.node_id, auth_token=config.AUTH_TOKEN, - node_description=config.NODE_DESC, - capabilities={k: str(v).lower() if isinstance(v, bool) else str(v) for k, v in caps.items()} - ) - - try: - res = self.stub.SyncConfiguration(reg_req, timeout=10) - if res.success: - self.sandbox.sync(res.policy) - self._apply_skill_config(res.policy.skill_config_json) - print("[OK] Handshake successful. Policy Synced.") - break - else: - print(f"[Error] Rejection: {res.error_message}. Retrying in 5s...") - time.sleep(5) - except Exception as e: - print(f"[Error] Connection Fail: {str(e)}. Retrying in 5s...") - time.sleep(5) - - def _apply_skill_config(self, config_json): - """Applies dynamic skill configurations from the server.""" - if not config_json: return - try: - cfg = json.loads(config_json) - for skill in self.skills.skills.values(): - if hasattr(skill, "apply_config"): skill.apply_config(cfg) - except Exception as e: - logger.error(f"Error applying skill config: {e}") - - def _collect_capabilities(self) -> dict: - """Collects hardware and OS metadata.""" - from agent_node.utils.platform_metrics import get_platform_metrics - caps = get_platform_metrics().collect_capabilities() - try: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.settimeout(0) - s.connect(('10.254.254.254', 1)) - caps["local_ip"] = s.getsockname()[0] - s.close() - except: caps["local_ip"] = "unknown" - return caps + """ Handshake now handled by Transport/Core. This remains for compatibility if needed. """ + # In the new SDK model, the transport handles the initial SyncConfiguration call + # or it's wrapped in the connect() flow. + pass def start_health_reporting(self): - """Launches the background health reporting stream.""" - from agent_node.utils.platform_metrics import get_platform_metrics - metrics_tool = get_platform_metrics() - + """ Launches background health reporting using the transport. """ def _report(): while not self._stop_event.is_set(): - try: - def _gen(): - while not self._stop_event.is_set(): - ids = self.skills.get_active_ids() - vmem = psutil.virtual_memory() if psutil else None - yield agent_pb2.Heartbeat( - node_id=self.node_id, - cpu_usage_percent=psutil.cpu_percent() if psutil else 0, - memory_usage_percent=vmem.percent if vmem else 0, - active_worker_count=len(ids), - max_worker_capacity=config.MAX_SKILL_WORKERS, - running_task_ids=ids, - cpu_count=psutil.cpu_count() if psutil else 0, - memory_used_gb=vmem.used/(1024**3) if vmem else 0, - memory_total_gb=vmem.total/(1024**3) if vmem else 0, - load_avg=metrics_tool.get_load_avg() - ) - time.sleep(max(0, config.HEALTH_REPORT_INTERVAL - 1.0)) - - for _ in self.stub.ReportHealth(_gen()): watchdog.tick() - except Exception as e: - time.sleep(5) + if self.transport.is_connected(): + try: + ids = self.skills.get_active_ids() + vmem = psutil.virtual_memory() if psutil else None + hb = agent_pb2.Heartbeat( + node_id=self.node_id, + cpu_usage_percent=psutil.cpu_percent() if psutil else 0, + memory_usage_percent=vmem.percent if vmem else 0, + active_worker_count=len(ids), + max_worker_capacity=config.MAX_SKILL_WORKERS, + running_task_ids=ids, + cpu_count=psutil.cpu_count() if psutil else 0, + memory_used_gb=vmem.used/(1024**3) if vmem else 0, + memory_total_gb=vmem.total/(1024**3) if vmem else 0 + ) + # Health is sent via a separate stream in gRPC, + # so we use the transport's specialized method if it exists + if hasattr(self.transport, 'send_health'): + self.transport.send_health(hb) + watchdog.tick() + except Exception as e: + logger.error(f"Health report error: {e}") + time.sleep(config.HEALTH_REPORT_INTERVAL) threading.Thread(target=_report, daemon=True, name="HealthReporter").start() def run_task_stream(self): - """Main bi-directional task stream with auto-reconnection.""" - while True: - try: - def _gen(): - yield agent_pb2.ClientTaskMessage(announce=agent_pb2.NodeAnnounce(node_id=self.node_id)) - last_heartbeat = time.time() - while not self._stop_event.is_set(): - # Use a small timeout to ensure we check the heartbeat timer regardless of traffic - try: - msg = self.task_queue.get(timeout=1.0) - try: - yield msg - except Exception as ye: - print(f"[!] Critical Error yielding TaskMessage: {ye}") - break - except queue.Empty: - pass - - # Absolute heartbeat check (every 10s) - if time.time() - last_heartbeat >= 10.0: - try: - yield agent_pb2.ClientTaskMessage( - skill_event=agent_pb2.SkillEvent(keep_alive=True) - ) - last_heartbeat = time.time() - except Exception as ye: - print(f"[!] Critical Error yielding KeepAlive: {ye}") - break - - responses = self.stub.TaskStream(_gen()) - print(f"[*] Task stream connected ({self.node_id}).") - for msg in responses: - watchdog.tick() - self._process_server_message(msg) - except Exception as e: - print(f"[Error] Task stream error: {e}") - self._refresh_stub() - time.sleep(5) - - def _process_server_message(self, msg): - """Routes inbound server messages to their respective handlers.""" - try: - kind = msg.WhichOneof('payload') - if not verify_server_message_signature(msg): - print(f"[!] Signature mismatch for {kind}. Proceeding anyway (DEBUG).") - - if kind == 'task_request': self._handle_task(msg.task_request) - elif kind == 'task_cancel': self._handle_cancel(msg.task_cancel) - elif kind == 'work_pool_update': self._handle_work_pool(msg.work_pool_update) - elif kind == 'file_sync': - # M6: Offload ALL file sync processing to executor to avoid blocking gRPC stream - self.io_executor.submit(self._handle_file_sync, msg.file_sync) - elif kind == 'policy_update': - self.sandbox.sync(msg.policy_update) - self._apply_skill_config(msg.policy_update.skill_config_json) - except Exception as e: - print(f"[!] Error processing server message '{kind}': {e}") - traceback.print_exc() + """ Starts the core engine which manages the task stream. """ + self.start() # From MeshNodeCore + while not self._stop_event.is_set(): + time.sleep(1) def _handle_cancel(self, cancel_req): """Cancels an active task or an entire session's background tasks.""" @@ -229,7 +140,7 @@ for tid in update.available_task_ids: import random time.sleep(random.uniform(0.1, 0.5)) - self.task_queue.put(agent_pb2.ClientTaskMessage( + self.send_message(agent_pb2.ClientTaskMessage( task_claim=agent_pb2.TaskClaimRequest(task_id=tid, node_id=self.node_id) )) @@ -248,7 +159,7 @@ message=f"Drift in {len(drift)} files" if drift else "Synchronized", reconcile_paths=drift ) - self.task_queue.put(agent_pb2.ClientTaskMessage(file_sync=agent_pb2.FileSyncMessage(session_id=sid, status=status))) + self.send_message(agent_pb2.ClientTaskMessage(file_sync=agent_pb2.FileSyncMessage(session_id=sid, status=status))) def _on_sync_data(self, sid, file_data): """Offloads disk I/O to a background worker pool.""" @@ -319,11 +230,11 @@ """Splits large manifests into chunks for gRPC streaming.""" chunk_size = 1000 if not files: - self.task_queue.put(agent_pb2.ClientTaskMessage(file_sync=agent_pb2.FileSyncMessage(session_id=sid, task_id=tid, manifest=agent_pb2.DirectoryManifest(root_path=root, is_final=True)))) + self.send_message(agent_pb2.ClientTaskMessage(file_sync=agent_pb2.FileSyncMessage(session_id=sid, task_id=tid, manifest=agent_pb2.DirectoryManifest(root_path=root, is_final=True)))) return for i in range(0, len(files), chunk_size): chunk = files[i:i+chunk_size] - self.task_queue.put(agent_pb2.ClientTaskMessage(file_sync=agent_pb2.FileSyncMessage(session_id=sid, task_id=tid, + self.send_message(agent_pb2.ClientTaskMessage(file_sync=agent_pb2.FileSyncMessage(session_id=sid, task_id=tid, manifest=agent_pb2.DirectoryManifest(root_path=root, files=chunk, chunk_index=i//chunk_size, is_final=(i+chunk_size >= len(files)))))) def _handle_fs_write(self, session_id, rel_path, content, is_dir, task_id=""): @@ -404,7 +315,7 @@ if not chunk and idx > 0: break hasher.update(chunk) is_final = f.tell() >= size - self.task_queue.put(agent_pb2.ClientTaskMessage(file_sync=agent_pb2.FileSyncMessage(session_id=session_id, task_id=task_id, + self.send_message(agent_pb2.ClientTaskMessage(file_sync=agent_pb2.FileSyncMessage(session_id=session_id, task_id=task_id, file_data=agent_pb2.FilePayload(path=rel_path.replace("\\", "/"), chunk=zlib.compress(chunk), chunk_index=idx, is_final=is_final, hash=hasher.hexdigest() if is_final else "", compressed=True)))) if is_final: break idx += 1 @@ -421,7 +332,7 @@ def _on_event(self, event): """Forwards skill events to the gRPC stream.""" - self.task_queue.put(event if isinstance(event, agent_pb2.ClientTaskMessage) else agent_pb2.ClientTaskMessage(skill_event=event)) + self.send_message(event if isinstance(event, agent_pb2.ClientTaskMessage) else agent_pb2.ClientTaskMessage(skill_event=event)) def _on_finish(self, task_id, result, trace_id): """Finalizes a task and sends the response back to the Hub.""" @@ -440,16 +351,16 @@ logger.warning(f"Truncating excessive {field} ({len(val):,} bytes) for task {task_id}") setattr(res, field, val[:SAFETY_CAP // 2] + "\n... [TRUNCATED DUE TO SIZE] ...\n" + val[-(SAFETY_CAP // 2):]) - self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=res)) + self.send_message(agent_pb2.ClientTaskMessage(task_response=res)) def _send_sync_ok(self, sid, tid, msg): - self.task_queue.put(agent_pb2.ClientTaskMessage(file_sync=agent_pb2.FileSyncMessage(session_id=sid, task_id=tid, status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message=msg)))) + self.send_message(agent_pb2.ClientTaskMessage(file_sync=agent_pb2.FileSyncMessage(session_id=sid, task_id=tid, status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message=msg)))) def _send_sync_error(self, sid, tid, msg): - self.task_queue.put(agent_pb2.ClientTaskMessage(file_sync=agent_pb2.FileSyncMessage(session_id=sid, task_id=tid, status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=msg)))) + self.send_message(agent_pb2.ClientTaskMessage(file_sync=agent_pb2.FileSyncMessage(session_id=sid, task_id=tid, status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=msg)))) def _on_sync_delta(self, session_id, payload): - self.task_queue.put(agent_pb2.ClientTaskMessage(file_sync=payload if isinstance(payload, agent_pb2.FileSyncMessage) else agent_pb2.FileSyncMessage(session_id=session_id, file_data=payload))) + self.send_message(agent_pb2.ClientTaskMessage(file_sync=payload if isinstance(payload, agent_pb2.FileSyncMessage) else agent_pb2.FileSyncMessage(session_id=session_id, file_data=payload))) def shutdown(self): """Gracefully shuts down the node.""" diff --git a/agent-node/src/mesh_core b/agent-node/src/mesh_core new file mode 120000 index 0000000..276313b --- /dev/null +++ b/agent-node/src/mesh_core @@ -0,0 +1 @@ +../../mesh-sdk/mesh_core \ No newline at end of file diff --git a/agent-node/src/shared_core/__init__.py b/agent-node/src/shared_core/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/agent-node/src/shared_core/__init__.py +++ /dev/null diff --git a/agent-node/src/shared_core/ignore.py b/agent-node/src/shared_core/ignore.py deleted file mode 100644 index 69e2884..0000000 --- a/agent-node/src/shared_core/ignore.py +++ /dev/null @@ -1,41 +0,0 @@ -import os -import fnmatch - -class CortexIgnore: - """Handles .cortexignore (and .gitignore) pattern matching.""" - def __init__(self, root_path, is_upstream=False): - self.root_path = root_path - self.is_upstream = is_upstream - self.patterns = self._load_patterns() - - def _load_patterns(self): - patterns = [".git", "node_modules", ".cortex_sync", "__pycache__", "*.pyc"] # Default ignores - if self.is_upstream: - patterns.append(".skills") - ignore_file = os.path.join(self.root_path, ".cortexignore") - if not os.path.exists(ignore_file): - ignore_file = os.path.join(self.root_path, ".gitignore") - - if os.path.exists(ignore_file): - with open(ignore_file, "r") as f: - for line in f: - line = line.strip() - if line and not line.startswith("#"): - patterns.append(line) - return patterns - - def is_ignored(self, rel_path): - """Returns True if the path matches any ignore pattern.""" - for pattern in self.patterns: - # Handle directory patterns - if pattern.endswith("/"): - if rel_path.startswith(pattern) or f"/{pattern}" in f"/{rel_path}": - return True - # Standard glob matching - if fnmatch.fnmatch(rel_path, pattern) or fnmatch.fnmatch(os.path.basename(rel_path), pattern): - return True - # Handle nested matches - for part in rel_path.split(os.sep): - if fnmatch.fnmatch(part, pattern): - return True - return False diff --git a/ai-hub/app/core/grpc/mesh_core b/ai-hub/app/core/grpc/mesh_core new file mode 120000 index 0000000..34136e5 --- /dev/null +++ b/ai-hub/app/core/grpc/mesh_core @@ -0,0 +1 @@ +../../../../mesh-sdk/mesh_core \ No newline at end of file diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index b4a50db..5acee2e 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -18,14 +18,41 @@ from app.core.grpc.utils.crypto import sign_payload, sign_bytes from app.config import settings +from mesh_core.server_engine import MeshServerCore +from mesh_core.transport import IMeshTransport + logger = logging.getLogger(__name__) +class GrpcServerTransport(IMeshTransport): + """ + gRPC implementation of IMeshTransport for the server side. + Wraps a single bi-directional stream. + """ + def __init__(self, context): + self.context = context + self.send_queue = queue.PriorityQueue() + self.listener = None + + def connect(self): pass + def set_listener(self, listener): self.listener = listener + def is_connected(self): return self.context.is_active() + def close(self): self.context.abort(grpc.StatusCode.CANCELLED, "Transport closed") + + def send(self, message, priority: int = 1): + # Security: Sign every outbound message before queuing + message.signature = "" + msg_bytes = message.SerializeToString(deterministic=True) + message.signature = sign_bytes(msg_bytes) + # PriorityQueue expects (priority, timestamp, item) to ensure stable ordering + self.send_queue.put((priority, time.time(), message)) + # M4: Token validation is now handled directly via NodeRegistryService class AgentOrchestrator(agent_pb2_grpc.AgentOrchestratorServicer): """Integrated gRPC Servicer for Agent Orchestration within AI Hub.""" def __init__(self, registry): self.registry = registry # Injected NodeRegistryService + self.mesh_core = MeshServerCore() self.journal = TaskJournal() self.pool = GlobalWorkPool() self.mirror = GhostMirrorManager(storage_root=os.path.join(settings.DATA_DIR, "mirrors")) @@ -33,8 +60,13 @@ self.io_locks_lock = threading.Lock() self.assistant = TaskAssistant(self.registry, self.journal, self.pool, self.mirror) self.pool.on_new_work = self._broadcast_work - self.manifest_accumulators = {} # NEW: For paginated manifests + self.manifest_accumulators = {} self.manifest_accumulators_lock = threading.Lock() + + # Bind Mesh Events + self.mesh_core.on_message_received = self._on_mesh_message + self.mesh_core.on_node_online = self._on_mesh_online + self.mesh_core.on_node_offline = self._on_mesh_offline # 4. Mesh Observation (Aggregated Health Dashboard) threading.Thread(target=self._monitor_mesh, daemon=True, name="MeshMonitor").start() @@ -42,6 +74,17 @@ # 5. Mirror Cleanup Loop (Purge archived sessions) threading.Thread(target=self._mirror_cleanup_loop, daemon=True, name="MirrorCleanup").start() + def _on_mesh_online(self, node): + logger.info(f"[Mesh] Node Online: {node.node_id}") + + def _on_mesh_offline(self, node): + logger.warning(f"[Mesh] Node Offline: {node.node_id}") + + def _on_mesh_message(self, node_id, msg): + node = self.registry.get_node(node_id) + if node: + self._handle_client_message(msg, node_id, node) + def _monitor_mesh(self): """Periodically records status of all nodes in the mesh for diagnostics.""" log_path = os.path.join(settings.DATA_DIR, "mesh_status.log") # Use DATA_DIR for visibility @@ -99,7 +142,7 @@ live_nodes = self.registry.list_nodes() for _node in live_nodes: try: - _node.send_message(agent_pb2.ServerTaskMessage( + self.mesh_core.dispatch(_node.node_id, agent_pb2.ServerTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id="system", control=agent_pb2.SyncControl( @@ -121,7 +164,7 @@ for node in live_nodes: logger.debug(f"[📢] Broadcasting availability to {node.node_id}") # Priority 1: Interactive/Work Dispatch - node.send_message(agent_pb2.ServerTaskMessage( + self.mesh_core.dispatch(node.node_id, agent_pb2.ServerTaskMessage( work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=self.pool.list_available()) ), priority=1) @@ -182,7 +225,7 @@ policy = self._build_sandbox_policy(skill_config) logger.info(f"[🔒] Pushing LIVE policy update to {node_id} (Mode: {policy.mode}, Allowed: {len(policy.allowed_commands)})") - node.queue.put(agent_pb2.ServerTaskMessage( + self.mesh_core.dispatch(node_id, agent_pb2.ServerTaskMessage( policy_update=policy )) @@ -226,111 +269,91 @@ def TaskStream(self, request_iterator, context): """Persistent bi-directional stream for dispatching work and collecting results.""" - # 1. Identification Handshake - node_id = "unknown" - node = None + node_id = "unknown" try: first_msg = next(request_iterator) if not first_msg.HasField("announce"): logger.warning("[!] Initial TaskStream message MUST be 'announce'") return - node_id = first_msg.announce.node_id - logger.info(f"[*] Node {node_id} Attempting to establish TaskStream...") - except StopIteration: - logger.warning("[!] Empty TaskStream received.") - return except Exception as e: logger.error(f"[!] TaskStream handshake error: {e}") return node = self.registry.get_node(node_id) if not node: - # Hub Restart Recovery: Attempt to pull last known registration from DB - logger.info(f"[*] Node {node_id} not in memory. Attempting DB recovery...") - if self.registry.try_recovery_register(node_id): - logger.info(f"[*] Node {node_id} recovered successfully.") - node = self.registry.get_node(node_id) - else: - logger.warning(f"[!] Node {node_id} recovery failed.") - - if not node: - logger.warning(f"[!] TaskStream rejected: Node {node_id} not registered via SyncConfiguration.") + if not self.registry.try_recovery_register(node_id): + logger.warning(f"[!] TaskStream rejected: Node {node_id} not registered.") return + node = self.registry.get_node(node_id) - # M6: Eject 'Zombie' connections. If this node already has a TaskStream context, - # it might be a stale connection from a previous incarnation of the agent. - # We abort the old one to ensure the new one takes over the LiveNodeRecord. + # M6: Eject 'Zombie' connections if node.stream: - logger.warning(f"[*] Node {node_id} has a STALE TaskStream. Ejecting Zombie connection...") - try: - node.stream.abort(grpc.StatusCode.ABORTED, "New connection established.") - # Give the old finally block a tiny slice of time to run deregister if needed, - # though we are about to overwrite it anyway. - time.sleep(0.1) - except Exception as ae: - logger.error(f"[!] Failed to abort stale stream for {node_id}: {ae}") + logger.warning(f"[*] Node {node_id} has a STALE TaskStream. Ejecting Zombie...") + try: node.stream.abort(grpc.StatusCode.ABORTED, "New connection established.") + except: pass + time.sleep(0.1) - node.stream = context # Track the active stream - logger.info(f"[*] Node {node_id} Online (TaskStream established)") + # 1. Initialize Transport + transport = GrpcServerTransport(context) + node.stream = context # Still need for assistant.reconcile legacy calls + + # 2. Register in Mesh Core + self.mesh_core.register_node(node_id, node.user_id, node.metadata, transport) try: self.registry.update_stats(node_id, {"status": "streaming"}) - # M6: Offload reconciliation to a background thread to prevent blocking the stream initialization - # especially when the database (on NFS) is experiencing latency. - # M6: Offload reconciliation to a background thread to prevent blocking the stream initialization - # especially when the database (on NFS) is experiencing latency. - if node_id != "media-windows-server": - threading.Thread( - target=self.assistant.reconcile_node, - args=(node_id,), - daemon=True, - name=f"Reconcile-{node_id}" - ).start() - else: - logger.info(f"[*] Skipping initial reconciliation for {node_id} to ensure TaskStream stability.") - + # Start Background Reader def _read_results(): try: for msg in request_iterator: - try: - self._handle_client_message(msg, node_id, node) - except Exception as inner_e: - logger.error(f"[!] Error processing task message from {node_id}: {inner_e}", exc_info=True) + self.mesh_core.handle_inbound(node_id, msg) except Exception as e: logger.warning(f"Results listener closed for {node_id}: {e}") threading.Thread(target=_read_results, daemon=True, name=f"Results-{node_id}").start() + # 3. Work Dispatcher (Main Stream) last_keepalive = 0 while context.is_active(): try: - priority_item = node.queue.get(timeout=1.0) - msg = priority_item[2] # Unpack (priority, ts, msg) + # In this refactored version, the 'MeshNodeRecord' in MeshServerCore + # holds the transport, which has the send_queue. + # But we also have the legacy node.queue (from registry). + # For now, we drain both or migrate one. + # Let's drain transport.send_queue and node.queue. + + msg = None + try: + msg = transport.send_queue.get(timeout=1.0) + except queue.Empty: + # Fallback to legacy queue for backwards compatibility + try: + priority_item = node.queue.get(timeout=0.1) + msg = priority_item[2] + # Sign it if it wasn't signed (Legacy path) + msg.signature = "" + msg_bytes = msg.SerializeToString(deterministic=True) + msg.signature = sign_bytes(msg_bytes) + except queue.Empty: + pass - # Unified Signing: Ensure every outbound message is signed for the node's strict verify logic - msg.signature = "" - msg_bytes = msg.SerializeToString(deterministic=True) - msg.signature = sign_bytes(msg_bytes) - - if os.getenv("DEBUG_GRPC"): - kind = msg.WhichOneof("payload") - logger.info(f"[DEBUG-gRPC] OUTBOUND to {node_id}: {kind} (Signed)") - - yield msg - except queue.Empty: - now = time.time() - if (now - last_keepalive) > 10.0: - last_keepalive = now - # Yield a NOP ping (work pool update) to keep proxy connections hot - msg = agent_pb2.ServerTaskMessage( - work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=self.pool.list_available()) - ) - msg_bytes = msg.SerializeToString(deterministic=True) - msg.signature = sign_bytes(msg_bytes) + if msg: yield msg - continue + else: + now = time.time() + if (now - last_keepalive) > 10.0: + last_keepalive = now + msg = agent_pb2.ServerTaskMessage( + work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=self.pool.list_available()) + ) + msg_bytes = msg.SerializeToString(deterministic=True) + msg.signature = sign_bytes(msg_bytes) + yield msg + except Exception as e: + logger.error(f"[!] Dispatch Error for {node_id}: {e}") + break except StopIteration: logger.info(f"[📶] gRPC Stream StopIteration for {node_id}") except Exception as e: @@ -355,6 +378,7 @@ # Fulfill any pending tasks in journal with error immediately self.journal.fail_node_tasks(node_id, f"Node {node_id} gRPC stream closed.") self.registry.deregister(node_id, record=node) + self.mesh_core.deregister_node(node_id) def _handle_client_message(self, msg, node_id, node): kind = msg.WhichOneof('payload') @@ -383,7 +407,7 @@ if success: sig = sign_payload(payload) - node.send_message(agent_pb2.ServerTaskMessage( + self.mesh_core.dispatch(node_id, agent_pb2.ServerTaskMessage( task_request=agent_pb2.TaskRequest( task_id=task_id, payload_json=payload, @@ -490,14 +514,14 @@ logger.info(f"[📁🏃] Drift Detected (Node -> Server): Requesting {len(drifts)} files") # Request node to push these specific files # Priority 1: Drift Reconciliation Request - node.send_message(agent_pb2.ServerTaskMessage( + self.mesh_core.dispatch(node_id, agent_pb2.ServerTaskMessage( file_sync=agent_pb2.FileSyncMessage( - session_id=fs.session_id, - control=agent_pb2.SyncControl( - action=agent_pb2.SyncControl.REFRESH_MANIFEST, - path=root_path, - request_paths=drifts - ) + session_id=fs.session_id, + control=agent_pb2.SyncControl( + action=agent_pb2.SyncControl.REFRESH_MANIFEST, + path=root_path, + request_paths=drifts + ) ) ), priority=1) else: diff --git a/ai-hub/app/core/grpc/shared_core/__init__.py b/ai-hub/app/core/grpc/shared_core/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/ai-hub/app/core/grpc/shared_core/__init__.py +++ /dev/null diff --git a/ai-hub/app/core/grpc/shared_core/ignore.py b/ai-hub/app/core/grpc/shared_core/ignore.py deleted file mode 100644 index 8e95857..0000000 --- a/ai-hub/app/core/grpc/shared_core/ignore.py +++ /dev/null @@ -1,41 +0,0 @@ -import os -import fnmatch - -class CortexIgnore: - """Handles .cortexignore (and .gitignore) pattern matching.""" - def __init__(self, root_path, is_upstream=False): - self.root_path = root_path - self.is_upstream = is_upstream - self.patterns = self._load_patterns() - - def _load_patterns(self): - patterns = [".git", "node_modules", ".cortex_sync", "__pycache__", "*.pyc", ".browser_data"] # Default ignores - if self.is_upstream: - patterns.append(".skills") - ignore_file = os.path.join(self.root_path, ".cortexignore") - if not os.path.exists(ignore_file): - ignore_file = os.path.join(self.root_path, ".gitignore") - - if os.path.exists(ignore_file): - with open(ignore_file, "r") as f: - for line in f: - line = line.strip() - if line and not line.startswith("#"): - patterns.append(line) - return patterns - - def is_ignored(self, rel_path): - """Returns True if the path matches any ignore pattern.""" - for pattern in self.patterns: - # Handle directory patterns - if pattern.endswith("/"): - if rel_path.startswith(pattern) or f"/{pattern}" in f"/{rel_path}": - return True - # Standard glob matching - if fnmatch.fnmatch(rel_path, pattern) or fnmatch.fnmatch(os.path.basename(rel_path), pattern): - return True - # Handle nested matches - for part in rel_path.split(os.sep): - if fnmatch.fnmatch(part, pattern): - return True - return False diff --git a/mesh-sdk/examples/__init__.py b/mesh-sdk/examples/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/mesh-sdk/examples/__init__.py diff --git a/mesh-sdk/examples/lite_node.py b/mesh-sdk/examples/lite_node.py new file mode 100644 index 0000000..9cd7cca --- /dev/null +++ b/mesh-sdk/examples/lite_node.py @@ -0,0 +1,71 @@ +import sys +import os +import time +import grpc +import logging + +# Add parent dir to path so we can import mesh_core +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from mesh_core.node_engine import MeshNodeCore +from mesh_core.transport_grpc import GrpcMeshTransport +from mesh_core import agent_pb2, agent_pb2_grpc + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("LiteNode") + +def get_stub(): + """ + Example stub factory. + In production, this would return a secure channel with SSL certs. + """ + target = os.getenv("MESH_HUB_URL", "localhost:50051") + logger.info(f"Connecting to {target}...") + channel = grpc.insecure_channel(target) + stub = agent_pb2_grpc.AgentOrchestratorStub(channel) + return stub, channel + +class LiteNode(MeshNodeCore): + """ + A minimal Mesh Node implementation using the Lite Kit. + Demonstrates how to build a remote-controlled agent in < 50 lines of code. + """ + def __init__(self, node_id: str, auth_token: str = ""): + # 1. Choose a transport (gRPC) and provide the connection factory + self.transport = GrpcMeshTransport(node_id, get_stub, auth_token=auth_token) + + # 2. Initialize the core engine + super().__init__(node_id, self.transport) + + # 3. Bind your logic + self.on_task = self.handle_task + self.on_ready = lambda _: logger.info(f"[*] LiteNode '{node_id}' is AUTHORIZED and ONLINE.") + self.on_disconnect = lambda: logger.warning("[!] Lost connection to Hub.") + + def handle_task(self, task): + logger.info(f"[Task] Executing: {task.task_id}") + + # Simulate some work + time.sleep(1) + + # Send result back via the transport-agnostic engine + response = agent_pb2.TaskResponse( + task_id=task.task_id, + stdout=f"LiteNode '{self.node_id}' successfully processed task {task.task_id}", + status=0 + ) + self.send_message(agent_pb2.ClientTaskMessage(task_response=response)) + +if __name__ == "__main__": + node_id = sys.argv[1] if len(sys.argv) > 1 else "lite-demo-node" + + node = LiteNode(node_id) + node.start() + + logger.info("LiteNode running. Press Ctrl+C to stop.") + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("Shutting down...") + node.stop() diff --git a/mesh-sdk/mesh_core/__init__.py b/mesh-sdk/mesh_core/__init__.py new file mode 100644 index 0000000..8a5b5d2 --- /dev/null +++ b/mesh-sdk/mesh_core/__init__.py @@ -0,0 +1,5 @@ +from .transport import IMeshTransport, IMeshListener +from .node_engine import MeshNodeCore +from .server_engine import MeshServerCore + +__all__ = ['IMeshTransport', 'IMeshListener', 'MeshNodeCore', 'MeshServerCore'] diff --git a/mesh-sdk/mesh_core/agent_pb2.py b/mesh-sdk/mesh_core/agent_pb2.py new file mode 100644 index 0000000..098a207 --- /dev/null +++ b/mesh-sdk/mesh_core/agent_pb2.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: agent.proto +# Protobuf Python Version: 5.29.0 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 29, + 0, + '', + 'agent.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61gent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe0\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\x12\x19\n\x11skill_config_json\x18\x06 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xfb\x01\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12(\n\x0bskill_event\x18\x06 \x01(\x0b\x32\x11.agent.SkillEventH\x00\x42\t\n\x07payload\"y\n\nSkillEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x16\n\x0cterminal_out\x18\x03 \x01(\tH\x00\x12\x10\n\x06prompt\x18\x04 \x01(\tH\x00\x12\x14\n\nkeep_alive\x18\x05 \x01(\x08H\x00\x42\x06\n\x04\x64\x61ta\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\xcf\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12-\n\rpolicy_update\x18\x06 \x01(\x0b\x32\x14.agent.SandboxPolicyH\x00\x12\x11\n\tsignature\x18\x07 \x01(\tB\t\n\x07payload\"8\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x12\n\nsession_id\x18\x02 \x01(\t\"\xa1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa4\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xe6\x02\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\x12\x11\n\tcpu_count\x18\x08 \x01(\x05\x12\x16\n\x0ememory_used_gb\x18\t \x01(\x02\x12\x17\n\x0fmemory_total_gb\x18\n \x01(\x02\x12\x1a\n\x12\x63pu_usage_per_core\x18\x0b \x03(\x02\x12\x14\n\x0c\x63pu_freq_mhz\x18\x0c \x01(\x02\x12\x1b\n\x13memory_available_gb\x18\r \x01(\x02\x12\x10\n\x08load_avg\x18\x0e \x03(\x02\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xe4\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x12\x0f\n\x07task_id\x18\x06 \x01(\tB\t\n\x07payload\"\xab\x02\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x15\n\rrequest_paths\x18\x03 \x03(\t\x12\x0f\n\x07\x63ontent\x18\x04 \x01(\x0c\x12\x0e\n\x06is_dir\x18\x05 \x01(\x08\"\xaa\x01\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\x12\x08\n\x04LIST\x10\x06\x12\x08\n\x04READ\x10\x07\x12\t\n\x05WRITE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\t\n\x05PURGE\x10\n\x12\x0b\n\x07\x43LEANUP\x10\x0b\"m\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"\xad\x01\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\x12\x0e\n\x06offset\x18\x06 \x01(\x03\x12\x12\n\ncompressed\x18\x07 \x01(\x08\x12\x14\n\x0ctotal_chunks\x18\x08 \x01(\x05\x12\x12\n\ntotal_size\x18\t \x01(\x03\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._loaded_options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_options = b'8\001' + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._loaded_options = None + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_options = b'8\001' + _globals['_REGISTRATIONREQUEST']._serialized_start=23 + _globals['_REGISTRATIONREQUEST']._serialized_end=245 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=194 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=245 + _globals['_SANDBOXPOLICY']._serialized_start=248 + _globals['_SANDBOXPOLICY']._serialized_end=472 + _globals['_SANDBOXPOLICY_MODE']._serialized_start=438 + _globals['_SANDBOXPOLICY_MODE']._serialized_end=472 + _globals['_REGISTRATIONRESPONSE']._serialized_start=474 + _globals['_REGISTRATIONRESPONSE']._serialized_end=594 + _globals['_CLIENTTASKMESSAGE']._serialized_start=597 + _globals['_CLIENTTASKMESSAGE']._serialized_end=848 + _globals['_SKILLEVENT']._serialized_start=850 + _globals['_SKILLEVENT']._serialized_end=971 + _globals['_NODEANNOUNCE']._serialized_start=973 + _globals['_NODEANNOUNCE']._serialized_end=1004 + _globals['_SERVERTASKMESSAGE']._serialized_start=1007 + _globals['_SERVERTASKMESSAGE']._serialized_end=1342 + _globals['_TASKCANCELREQUEST']._serialized_start=1344 + _globals['_TASKCANCELREQUEST']._serialized_end=1400 + _globals['_TASKREQUEST']._serialized_start=1403 + _globals['_TASKREQUEST']._serialized_end=1564 + _globals['_TASKRESPONSE']._serialized_start=1567 + _globals['_TASKRESPONSE']._serialized_end=1859 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=1749 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=1797 + _globals['_TASKRESPONSE_STATUS']._serialized_start=1799 + _globals['_TASKRESPONSE_STATUS']._serialized_end=1859 + _globals['_WORKPOOLUPDATE']._serialized_start=1861 + _globals['_WORKPOOLUPDATE']._serialized_end=1905 + _globals['_TASKCLAIMREQUEST']._serialized_start=1907 + _globals['_TASKCLAIMREQUEST']._serialized_end=1959 + _globals['_TASKCLAIMRESPONSE']._serialized_start=1961 + _globals['_TASKCLAIMRESPONSE']._serialized_end=2030 + _globals['_HEARTBEAT']._serialized_start=2033 + _globals['_HEARTBEAT']._serialized_end=2391 + _globals['_HEALTHCHECKRESPONSE']._serialized_start=2393 + _globals['_HEALTHCHECKRESPONSE']._serialized_end=2438 + _globals['_FILESYNCMESSAGE']._serialized_start=2441 + _globals['_FILESYNCMESSAGE']._serialized_end=2669 + _globals['_SYNCCONTROL']._serialized_start=2672 + _globals['_SYNCCONTROL']._serialized_end=2971 + _globals['_SYNCCONTROL_ACTION']._serialized_start=2801 + _globals['_SYNCCONTROL_ACTION']._serialized_end=2971 + _globals['_DIRECTORYMANIFEST']._serialized_start=2973 + _globals['_DIRECTORYMANIFEST']._serialized_end=3082 + _globals['_FILEINFO']._serialized_start=3084 + _globals['_FILEINFO']._serialized_end=3152 + _globals['_FILEPAYLOAD']._serialized_start=3155 + _globals['_FILEPAYLOAD']._serialized_end=3328 + _globals['_SYNCSTATUS']._serialized_start=3331 + _globals['_SYNCSTATUS']._serialized_end=3491 + _globals['_SYNCSTATUS_CODE']._serialized_start=3425 + _globals['_SYNCSTATUS_CODE']._serialized_end=3491 + _globals['_AGENTORCHESTRATOR']._serialized_start=3494 + _globals['_AGENTORCHESTRATOR']._serialized_end=3727 +# @@protoc_insertion_point(module_scope) diff --git a/mesh-sdk/mesh_core/agent_pb2_grpc.py b/mesh-sdk/mesh_core/agent_pb2_grpc.py new file mode 100644 index 0000000..e972a62 --- /dev/null +++ b/mesh-sdk/mesh_core/agent_pb2_grpc.py @@ -0,0 +1,189 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +import agent_pb2 as agent__pb2 + +GRPC_GENERATED_VERSION = '1.71.2' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in agent_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class AgentOrchestratorStub(object): + """The Cortex Server exposes this service + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SyncConfiguration = channel.unary_unary( + '/agent.AgentOrchestrator/SyncConfiguration', + request_serializer=agent__pb2.RegistrationRequest.SerializeToString, + response_deserializer=agent__pb2.RegistrationResponse.FromString, + _registered_method=True) + self.TaskStream = channel.stream_stream( + '/agent.AgentOrchestrator/TaskStream', + request_serializer=agent__pb2.ClientTaskMessage.SerializeToString, + response_deserializer=agent__pb2.ServerTaskMessage.FromString, + _registered_method=True) + self.ReportHealth = channel.stream_stream( + '/agent.AgentOrchestrator/ReportHealth', + request_serializer=agent__pb2.Heartbeat.SerializeToString, + response_deserializer=agent__pb2.HealthCheckResponse.FromString, + _registered_method=True) + + +class AgentOrchestratorServicer(object): + """The Cortex Server exposes this service + """ + + def SyncConfiguration(self, request, context): + """1. Control Channel: Sync policies and settings (Unary) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def TaskStream(self, request_iterator, context): + """2. Task Channel: Bidirectional work dispatch and reporting (Persistent) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ReportHealth(self, request_iterator, context): + """3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_AgentOrchestratorServicer_to_server(servicer, server): + rpc_method_handlers = { + 'SyncConfiguration': grpc.unary_unary_rpc_method_handler( + servicer.SyncConfiguration, + request_deserializer=agent__pb2.RegistrationRequest.FromString, + response_serializer=agent__pb2.RegistrationResponse.SerializeToString, + ), + 'TaskStream': grpc.stream_stream_rpc_method_handler( + servicer.TaskStream, + request_deserializer=agent__pb2.ClientTaskMessage.FromString, + response_serializer=agent__pb2.ServerTaskMessage.SerializeToString, + ), + 'ReportHealth': grpc.stream_stream_rpc_method_handler( + servicer.ReportHealth, + request_deserializer=agent__pb2.Heartbeat.FromString, + response_serializer=agent__pb2.HealthCheckResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'agent.AgentOrchestrator', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('agent.AgentOrchestrator', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class AgentOrchestrator(object): + """The Cortex Server exposes this service + """ + + @staticmethod + def SyncConfiguration(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/agent.AgentOrchestrator/SyncConfiguration', + agent__pb2.RegistrationRequest.SerializeToString, + agent__pb2.RegistrationResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def TaskStream(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream( + request_iterator, + target, + '/agent.AgentOrchestrator/TaskStream', + agent__pb2.ClientTaskMessage.SerializeToString, + agent__pb2.ServerTaskMessage.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def ReportHealth(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream( + request_iterator, + target, + '/agent.AgentOrchestrator/ReportHealth', + agent__pb2.Heartbeat.SerializeToString, + agent__pb2.HealthCheckResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/mesh-sdk/mesh_core/node_engine.py b/mesh-sdk/mesh_core/node_engine.py new file mode 100644 index 0000000..7551bcd --- /dev/null +++ b/mesh-sdk/mesh_core/node_engine.py @@ -0,0 +1,100 @@ +import threading +import time +import logging +from typing import Any, Optional, Dict, Callable +from .transport import IMeshTransport, IMeshListener + +logger = logging.getLogger(__name__) + +class MeshNodeCore(IMeshListener): + """ + Portable state machine for a Mesh Node. + Handles the lifecycle of a node (Handshake, Heartbeat, Reconnection) + without being coupled to gRPC or specific application logic. + """ + def __init__(self, node_id: str, transport: IMeshTransport): + self.node_id = node_id + self.transport = transport + self.transport.set_listener(self) + + self._stop_event = threading.Event() + self._is_ready = False + + # Callbacks to be hooked by the application (e.g., AgentNode) + self.on_task = None # Callable[[Any], None] + self.on_cancel = None + self.on_policy = None + self.on_sync = None + self.on_ready = None + self.on_disconnect = None + + def start(self): + """Starts the node and its management loops.""" + logger.info(f"[MeshCore] Starting Node {self.node_id}...") + + # 1. Perform Handshake + if not self.transport.handshake(): + logger.error(f"[MeshCore] Handshake failed. Node {self.node_id} cannot start.") + return False + + # 2. Connect TaskStream + self.transport.connect() + threading.Thread(target=self._management_loop, daemon=True, name="MeshNodeMgmt").start() + return True + + def _management_loop(self): + """Background loop for health and state monitoring.""" + while not self._stop_event.is_set(): + if not self.transport.is_connected(): + if self._is_ready: + self._is_ready = False + if self.on_disconnect: self.on_disconnect() + + time.sleep(1) + + def send_message(self, message: Any, priority: int = 1): + """High-level method to send a message via the transport.""" + if self.transport.is_connected(): + self.transport.send(message, priority=priority) + else: + logger.warning(f"[MeshCore] Dropped message: Transport disconnected.") + + # IMeshListener Implementation + def on_message(self, message: Any): + """Generic message dispatcher based on the payload type.""" + try: + kind = message.WhichOneof('payload') + + # 1. State Management + if kind == 'policy_update' or kind == 'policy': + if not self._is_ready: + self._is_ready = True + if self.on_ready: self.on_ready(message) + if self.on_policy: self.on_policy(message) + + # 2. Task Execution + elif kind == 'task_request': + if self.on_task: self.on_task(message.task_request) + elif kind == 'task_cancel': + if self.on_cancel: self.on_cancel(message.task_cancel) + + # 3. File Sync + elif kind == 'file_sync': + if self.on_sync: self.on_sync(message.file_sync) + + logger.debug(f"[MeshCore] Inbound message: {kind}") + except Exception as e: + logger.error(f"[MeshCore] Dispatch Error: {e}") + + def on_error(self, error: Exception): + logger.error(f"[MeshCore] Transport Error: {error}") + + def on_close(self): + logger.warning(f"[MeshCore] Transport Closed.") + self._is_ready = False + if self.on_disconnect: self.on_disconnect() + + def stop(self): + """Graceful shutdown.""" + self._stop_event.set() + self.transport.close() diff --git a/mesh-sdk/mesh_core/server_engine.py b/mesh-sdk/mesh_core/server_engine.py new file mode 100644 index 0000000..8b77533 --- /dev/null +++ b/mesh-sdk/mesh_core/server_engine.py @@ -0,0 +1,71 @@ +import logging +import time +from typing import Dict, Any, List, Optional, Callable +from dataclasses import dataclass, field + +logger = logging.getLogger(__name__) + +@dataclass +class MeshNodeRecord: + node_id: str + user_id: str + metadata: Dict[str, Any] + last_seen: float = field(default_factory=time.time) + transport: Optional[Any] = None # Transport used for this node + stats: Dict[str, Any] = field(default_factory=dict) + +class MeshServerCore: + """ + Portable Orchestration Engine for a Mesh Hub. + Manages node registrations, health tracking, and task routing. + Designed for zero-dependency portability. + """ + def __init__(self): + self.nodes: Dict[str, MeshNodeRecord] = {} + + # Callbacks for application integration + self.on_node_online = None # Callable[[MeshNodeRecord], None] + self.on_node_offline = None + self.on_message_received = None # Callable[[node_id, message], None] + + def register_node(self, node_id: str, user_id: str, metadata: Dict[str, Any], transport: Any) -> MeshNodeRecord: + """Registers or updates a node in the mesh.""" + record = MeshNodeRecord(node_id=node_id, user_id=user_id, metadata=metadata, transport=transport) + self.nodes[node_id] = record + + logger.info(f"[MeshServerCore] Node Registered: {node_id} (User: {user_id})") + if self.on_node_online: self.on_node_online(record) + return record + + def deregister_node(self, node_id: str): + """Removes a node from the active registry.""" + if node_id in self.nodes: + record = self.nodes.pop(node_id) + logger.warning(f"[MeshServerCore] Node Offline: {node_id}") + if self.on_node_offline: self.on_node_offline(record) + + def get_node(self, node_id: str) -> Optional[MeshNodeRecord]: + return self.nodes.get(node_id) + + def list_nodes(self) -> List[MeshNodeRecord]: + return list(self.nodes.values()) + + def update_health(self, node_id: str, metrics: Dict[str, Any]): + """Updates health stats for a node.""" + if node_id in self.nodes: + node = self.nodes[node_id] + node.last_seen = time.time() + node.stats.update(metrics) + + def dispatch(self, node_id: str, message: Any, priority: int = 1): + """Sends a message to a specific node via its registered transport.""" + node = self.get_node(node_id) + if node and node.transport: + node.transport.send(message, priority=priority) + else: + raise Exception(f"Node {node_id} not found or transport unavailable.") + + def handle_inbound(self, node_id: str, message: Any): + """Entry point for transport-level messages into the engine.""" + if self.on_message_received: + self.on_message_received(node_id, message) diff --git a/mesh-sdk/mesh_core/transport.py b/mesh-sdk/mesh_core/transport.py new file mode 100644 index 0000000..0b8c910 --- /dev/null +++ b/mesh-sdk/mesh_core/transport.py @@ -0,0 +1,56 @@ +from abc import ABC, abstractmethod +from typing import Callable, Any + +class IMeshTransport(ABC): + """ + Abstract interface for bidirectional message passing between + a Mesh Node and a Mesh Hub. + """ + @abstractmethod + def handshake(self) -> bool: + """Performs initial authentication/registration handshake.""" + pass + + @abstractmethod + def connect(self): + """Initializes the connection/stream.""" + pass + + @abstractmethod + def set_listener(self, listener: 'IMeshListener'): + """Sets the listener for inbound messages.""" + pass + + @abstractmethod + def send(self, message: Any, priority: int = 1): + """Sends a message to the remote peer with optional priority.""" + pass + + @abstractmethod + def close(self): + """Closes the connection and cleans up resources.""" + pass + + @abstractmethod + def is_connected(self) -> bool: + """Returns True if the transport is active.""" + pass + +class IMeshListener(ABC): + """ + Interface for handling inbound messages from a MeshTransport. + """ + @abstractmethod + def on_message(self, message: Any): + """Called when a new message arrives.""" + pass + + @abstractmethod + def on_error(self, error: Exception): + """Called when a transport-level error occurs.""" + pass + + @abstractmethod + def on_close(self): + """Called when the transport is closed.""" + pass diff --git a/mesh-sdk/mesh_core/transport_grpc.py b/mesh-sdk/mesh_core/transport_grpc.py new file mode 100644 index 0000000..6043142 --- /dev/null +++ b/mesh-sdk/mesh_core/transport_grpc.py @@ -0,0 +1,123 @@ +import threading +import queue +import time +import logging +from typing import Any, Optional, Callable +from . import agent_pb2, agent_pb2_grpc +from .transport import IMeshTransport, IMeshListener + +logger = logging.getLogger(__name__) + +class GrpcMeshTransport(IMeshTransport): + """ + gRPC implementation of the Mesh Transport. + Encapsulates the bidirectional stream and auto-reconnection logic. + """ + def __init__(self, node_id: str, stub_factory: Callable[[], tuple], auth_token: str = ""): + self.node_id = node_id + self.stub_factory = stub_factory # Callable returning (stub, channel) + self.auth_token = auth_token + self.listener = None + self.stub = None + self.channel = None + self.send_queue = queue.PriorityQueue() + self._stop_event = threading.Event() + self._connected = False + self.last_activity = 0 + def handshake(self) -> bool: + self._refresh_stub() + try: + req = agent_pb2.RegistrationRequest( + node_id=self.node_id, + auth_token=self.auth_token, + node_description="Portable Mesh Node" + ) + res = self.stub.SyncConfiguration(req) + if res.success: + logger.info(f"[Mesh] Handshake successful for {self.node_id}") + # Optional: Handle policy res.policy + return True + else: + logger.error(f"[Mesh] Handshake REJECTED for {self.node_id}: {res.error_message}") + return False + except Exception as e: + logger.error(f"[Mesh] Handshake FAILED for {self.node_id}: {e}") + return False + + def connect(self): + self._stop_event.clear() + self._refresh_stub() + threading.Thread(target=self._run_stream, daemon=True, name="GrpcTransportStream").start() + + def set_listener(self, listener: IMeshListener): + self.listener = listener + + def _refresh_stub(self): + if self.channel: + try: self.channel.close() + except: pass + self.stub, self.channel = self.stub_factory() + + def _run_stream(self): + retry_count = 0 + while not self._stop_event.is_set(): + try: + def _gen(): + # Initial announcement + yield agent_pb2.ClientTaskMessage(announce=agent_pb2.NodeAnnounce(node_id=self.node_id)) + + last_heartbeat = time.time() + while not self._stop_event.is_set(): + try: + # PriorityQueue returns (priority, ts, msg) + item = self.send_queue.get(timeout=1.0) + yield item[2] + except queue.Empty: + pass + + # Transport-level KeepAlive + if time.time() - last_heartbeat >= 10.0: + yield agent_pb2.ClientTaskMessage( + skill_event=agent_pb2.SkillEvent(keep_alive=True) + ) + last_heartbeat = time.time() + + logger.info(f"[*] Opening gRPC TaskStream for {self.node_id}...") + responses = self.stub.TaskStream(_gen()) + self._connected = True + self.last_activity = time.time() + logger.info(f"[✅] gRPC Mesh Transport Online for {self.node_id}") + retry_count = 0 + + for msg in responses: + self.last_activity = time.time() + if self.listener: + self.listener.on_message(msg) + + self._connected = False + logger.warning(f"[📶] gRPC Stream closed by server for {self.node_id}") + except Exception as e: + self._connected = False + if self.listener: + self.listener.on_error(e) + + if not self._stop_event.is_set(): + retry_count += 1 + backoff = min(30, 2 * retry_count) + logger.error(f"[❌] gRPC Stream Error ({type(e).__name__}): {e}. Reconnecting in {backoff}s... (Attempt {retry_count})") + self._refresh_stub() + time.sleep(backoff) + + def send(self, message: Any, priority: int = 1): + # PriorityQueue expects (priority, timestamp, item) + self.send_queue.put((priority, time.time(), message)) + + def close(self): + self._stop_event.set() + if self.channel: + self.channel.close() + self._connected = False + self.listener.on_close() + + def is_connected(self) -> bool: + return self._connected diff --git a/mesh-sdk/mesh_core/transport_mock.py b/mesh-sdk/mesh_core/transport_mock.py new file mode 100644 index 0000000..3add74d --- /dev/null +++ b/mesh-sdk/mesh_core/transport_mock.py @@ -0,0 +1,50 @@ +import threading +import time +import logging +from typing import Any +from .transport import IMeshTransport, IMeshListener + +logger = logging.getLogger(__name__) + +class MockMeshTransport(IMeshTransport): + """ + Mock implementation of Mesh Transport for local testing. + Simulates Hub behavior without any network calls. + """ + def __init__(self, node_id: str): + self.node_id = node_id + self.listener = None + self._connected = False + self._stop_event = threading.Event() + + def set_listener(self, listener: IMeshListener): + self.listener = listener + + def connect(self): + self._connected = True + self._stop_event.clear() + logger.info(f"[*] Mock Mesh Transport Connected for {self.node_id}") + + # Simulate initial server handshake if needed + # In a real test, we would use 'mock_server.trigger_message(msg)' + + def send(self, message: Any): + kind = message.WhichOneof('payload') + logger.info(f"[Mock-Sent] To Hub: {kind}") + + # Auto-responder logic for tests + if kind == 'announce': + pass # Server usually doesn't reply to announce directly via TaskStream + + def close(self): + self._connected = False + self._stop_event.set() + self.listener.on_close() + + def is_connected(self) -> bool: + return self._connected + + def simulate_server_message(self, message: Any): + """Helper for test suites to inject messages into the agent.""" + if self._connected: + self.listener.on_message(message) diff --git a/mesh-sdk/protos/agent.proto b/mesh-sdk/protos/agent.proto new file mode 100644 index 0000000..da7a3f1 --- /dev/null +++ b/mesh-sdk/protos/agent.proto @@ -0,0 +1,227 @@ +syntax = "proto3"; + +package agent; + +// The Cortex Server exposes this service +service AgentOrchestrator { + // 1. Control Channel: Sync policies and settings (Unary) + rpc SyncConfiguration(RegistrationRequest) returns (RegistrationResponse); + + // 2. Task Channel: Bidirectional work dispatch and reporting (Persistent) + rpc TaskStream(stream ClientTaskMessage) returns (stream ServerTaskMessage); + + // 3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) + rpc ReportHealth(stream Heartbeat) returns (stream HealthCheckResponse); +} + +// --- Channel 1: Registration & Policy --- +message RegistrationRequest { + string node_id = 1; + string version = 2; + string auth_token = 3; + string node_description = 4; // AI-readable description of this node's role + map capabilities = 5; // e.g. "gpu": "nvidia-3080", "os": "ubuntu-22.04" +} + +message SandboxPolicy { + enum Mode { + STRICT = 0; + PERMISSIVE = 1; + } + Mode mode = 1; + repeated string allowed_commands = 2; + repeated string denied_commands = 3; + repeated string sensitive_commands = 4; + string working_dir_jail = 5; + string skill_config_json = 6; // NEW: Map of skill settings (e.g. {"shell": {"cwd_jail": "/tmp"}}) +} + +message RegistrationResponse { + bool success = 1; + string error_message = 2; + string session_id = 3; + SandboxPolicy policy = 4; +} + +// --- Channel 2: Tasks & Collaboration --- +message ClientTaskMessage { + oneof payload { + TaskResponse task_response = 1; + TaskClaimRequest task_claim = 2; + NodeAnnounce announce = 4; // NEW: Identification on stream connect + FileSyncMessage file_sync = 5; // NEW: Ghost Mirror Sync + SkillEvent skill_event = 6; // NEW: Persistent real-time skill data + } +} + +message SkillEvent { + string session_id = 1; + string task_id = 2; + oneof data { + string terminal_out = 3; // Raw stdout/stderr chunks + string prompt = 4; // Interactive prompt (like password) + bool keep_alive = 5; // Session preservation + } +} + +message NodeAnnounce { + string node_id = 1; +} + +message ServerTaskMessage { + oneof payload { + TaskRequest task_request = 1; + WorkPoolUpdate work_pool_update = 2; + TaskClaimResponse claim_status = 3; + TaskCancelRequest task_cancel = 4; + FileSyncMessage file_sync = 5; // NEW: Ghost Mirror Sync + SandboxPolicy policy_update = 6; // NEW: Live Policy Update + } + string signature = 7; // NEW: Unified Signature +} + +message TaskCancelRequest { + string task_id = 1; + string session_id = 2; // NEW: Cancel all tasks in this session +} + +message TaskRequest { + string task_id = 1; + string task_type = 2; + oneof payload { + string payload_json = 3; // For legacy shell/fallback + } + int32 timeout_ms = 4; + string trace_id = 5; + string signature = 6; + string session_id = 8; // NEW: Map execution to a sync workspace +} + +message TaskResponse { + string task_id = 1; + enum Status { + SUCCESS = 0; + ERROR = 1; + TIMEOUT = 2; + CANCELLED = 3; + } + Status status = 2; + string stdout = 3; + string stderr = 4; + string trace_id = 5; + map artifacts = 6; +} + +message WorkPoolUpdate { + repeated string available_task_ids = 1; +} + +message TaskClaimRequest { + string task_id = 1; + string node_id = 2; +} + +message TaskClaimResponse { + string task_id = 1; + bool granted = 2; + string reason = 3; +} + +// --- Channel 3: Health & Observation --- +message Heartbeat { + string node_id = 1; + float cpu_usage_percent = 2; + float memory_usage_percent = 3; + int32 active_worker_count = 4; + int32 max_worker_capacity = 5; + string status_message = 6; + repeated string running_task_ids = 7; + int32 cpu_count = 8; + float memory_used_gb = 9; + float memory_total_gb = 10; + + // Rich Metrics (M6) + repeated float cpu_usage_per_core = 11; + float cpu_freq_mhz = 12; + float memory_available_gb = 13; + repeated float load_avg = 14; // [1min, 5min, 15min] +} + + +message HealthCheckResponse { + int64 server_time_ms = 1; +} + +// --- Channel 4: Ghost Mirror File Sync --- +message FileSyncMessage { + string session_id = 1; + oneof payload { + DirectoryManifest manifest = 2; + FilePayload file_data = 3; + SyncStatus status = 4; + SyncControl control = 5; + } + string task_id = 6; // NEW: Correlation ID for FS operations +} + +message SyncControl { + enum Action { + START_WATCHING = 0; + STOP_WATCHING = 1; + LOCK = 2; // Server -> Node: Disable user-side edits + UNLOCK = 3; // Server -> Node: Enable user-side edits + REFRESH_MANIFEST = 4; // Server -> Node: Request a full manifest from node + RESYNC = 5; // Server -> Node: Force a hash-based reconciliation + + // FS Operations (Modular Explorer) + LIST = 6; // Server -> Node: List directory contents (returns manifest) + READ = 7; // Server -> Node: Read file content (returns file_data) + WRITE = 8; // Server -> Node: Write/Create file + DELETE = 9; // Server -> Node: Delete file or directory + PURGE = 10; // Server -> Node: Purge local sync directory entirely + CLEANUP = 11; // Server -> Node: Purge any session dirs not in request_paths + } + Action action = 1; + string path = 2; + repeated string request_paths = 3; // NEW: Specific files requested for pull + bytes content = 4; // NEW: For WRITE operation + bool is_dir = 5; // NEW: For TOUCH/WRITE operation +} + +message DirectoryManifest { + string root_path = 1; + repeated FileInfo files = 2; + int32 chunk_index = 3; // NEW: For paginated manifest + bool is_final = 4; // NEW: For paginated manifest +} + +message FileInfo { + string path = 1; + int64 size = 2; + string hash = 3; // For drift detection + bool is_dir = 4; +} + +message FilePayload { + string path = 1; + bytes chunk = 2; + int32 chunk_index = 3; + bool is_final = 4; + string hash = 5; // Full file hash for verification on final chunk + int64 offset = 6; // NEW: Byte offset for random-access parallel writes + bool compressed = 7; // NEW: Whether the chunk is compressed (zlib) + int32 total_chunks = 8; // NEW: Total number of chunks expected + int64 total_size = 9; // NEW: Total file size in bytes +} + +message SyncStatus { + enum Code { + OK = 0; + ERROR = 1; + RECONCILE_REQUIRED = 2; + IN_PROGRESS = 3; + } + Code code = 1; + string message = 2; + repeated string reconcile_paths = 3; // NEW: Files needing immediate re-sync +} diff --git a/scripts/seed_shadow_nodes.py b/scripts/seed_shadow_nodes.py new file mode 100755 index 0000000..e69de29 --- /dev/null +++ b/scripts/seed_shadow_nodes.py diff --git a/scripts/shadow_swarm.py b/scripts/shadow_swarm.py new file mode 100755 index 0000000..c02d5cc --- /dev/null +++ b/scripts/shadow_swarm.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +import threading +import time +import sys +import os +import logging + +# Add mesh-sdk and mesh-sdk/mesh_core to path to allow imports from examples and mesh_core +sys.path.append(os.path.join(os.getcwd(), "mesh-sdk")) +sys.path.append(os.path.join(os.getcwd(), "mesh-sdk", "mesh_core")) + +from examples.lite_node import LiteNode + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger("ShadowSwarm") + +def spawn_node(node_id): + """Lifecycle for a single shadow node.""" + try: + node = LiteNode(node_id) + node.start() + # Keep alive until thread is killed + while True: + time.sleep(10) + except Exception as e: + logger.error(f"Node {node_id} crashed: {e}") + +if __name__ == "__main__": + node_count = int(sys.argv[1]) if len(sys.argv) > 1 else 10 + hub_url = os.getenv("MESH_HUB_URL", "localhost:50051") + + logger.info("=" * 40) + logger.info(f"🚀 CORTEX SHADOW SWARM") + logger.info(f"Target Hub: {hub_url}") + logger.info(f"Node Count: {node_count}") + logger.info("=" * 40) + + threads = [] + for i in range(node_count): + node_id = f"shadow-{i:03}" + t = threading.Thread(target=spawn_node, args=(node_id,), daemon=True) + t.start() + threads.append(t) + + # Staggered boot to avoid thundering herd on Hub + time.sleep(0.05) + + logger.info(f"[*] Successfully spawned {node_count} nodes.") + logger.info("[*] Monitoring swarm... (Ctrl+C to terminate all)") + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("\n[!] Shutdown signal received. Terminating shadow swarm...") + # Threads are daemonized, so they will exit with the main process