diff --git a/agent-node/src/agent_node/core/updater.py b/agent-node/src/agent_node/core/updater.py index 7900eb9..ef4acaf 100644 --- a/agent-node/src/agent_node/core/updater.py +++ b/agent-node/src/agent_node/core/updater.py @@ -94,7 +94,13 @@ """ Single version check against the hub. If a newer version is available, triggers bootstrap_installer.py and restarts (does not return if applied). + Disabled when CORTEX_DISABLE_AUTO_UPDATE=1 (set this in Docker/container + environments where updates are delivered via image rebuilds, not in-place). """ + if os.environ.get("CORTEX_DISABLE_AUTO_UPDATE", "").strip().lower() in ("1", "true", "yes"): + logger.debug("[Updater] Skipped — CORTEX_DISABLE_AUTO_UPDATE is set.") + return + local = _read_local_version() logger.info(f"[Updater] Local version: {local}") diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py index 7434831..da7c652 100644 --- a/agent-node/src/agent_node/node.py +++ b/agent-node/src/agent_node/node.py @@ -103,6 +103,11 @@ """ Launches background health reporting using the transport. """ def _report(): while not self._stop_event.is_set(): + # Always tick — proves this thread is alive regardless of hub + # connectivity. Watchdog fires only if the reporter itself deadlocks, + # not merely because the hub is temporarily unreachable. + watchdog.tick() + is_conn = self.transport.is_connected() logger.debug(f"HealthReporter: connected={is_conn}") if is_conn: @@ -110,26 +115,22 @@ ids = self.skills.get_active_ids() vmem = psutil.virtual_memory() if psutil else None hb = agent_pb2.Heartbeat( - node_id=self.node_id, + node_id=self.node_id, cpu_usage_percent=psutil.cpu_percent() if psutil else 0, memory_usage_percent=vmem.percent if vmem else 0, - active_worker_count=len(ids), - max_worker_capacity=config.MAX_SKILL_WORKERS, + active_worker_count=len(ids), + max_worker_capacity=config.MAX_SKILL_WORKERS, running_task_ids=ids, cpu_count=psutil.cpu_count() if psutil else 0, memory_used_gb=vmem.used/(1024**3) if vmem else 0, memory_total_gb=vmem.total/(1024**3) if vmem else 0 ) - # Health is sent via a separate stream in gRPC, - # so we use the transport's specialized method if it exists if hasattr(self.transport, 'send_health'): self.transport.send_health(hb) - watchdog.tick() - logger.debug("HealthReporter: ticked watchdog") except Exception as e: logger.error(f"Health report error: {e}") else: - logger.warning("HealthReporter: skipped tick because not connected") + logger.warning("HealthReporter: hub unreachable, heartbeat skipped (gRPC reconnecting)") time.sleep(config.HEALTH_REPORT_INTERVAL) threading.Thread(target=_report, daemon=True, name="HealthReporter").start() diff --git a/mesh-sdk/mesh_core/engines/node.py b/mesh-sdk/mesh_core/engines/node.py index 33fd7d1..e802957 100644 --- a/mesh-sdk/mesh_core/engines/node.py +++ b/mesh-sdk/mesh_core/engines/node.py @@ -9,17 +9,30 @@ class MeshNodeCore(IMeshListener): """ Portable state machine for a Mesh Node. - Handles the lifecycle of a node (Handshake, Heartbeat, Reconnection) + Handles the lifecycle of a node (Handshake, Heartbeat, Reconnection) without being coupled to gRPC or specific application logic. """ + + # Single source of truth: proto field name → (callback_attr, extractor). + # To add a new message kind: add one entry here. Missing entries emit a + # WARNING in production immediately — no silent drops possible. + _DISPATCH = { + 'policy_update': ('on_policy', lambda m: m.policy_update), + 'policy': ('on_policy', lambda m: m.policy), + 'task_request': ('on_task', lambda m: m.task_request), + 'task_cancel': ('on_cancel', lambda m: m.task_cancel), + 'file_sync': ('on_sync', lambda m: m.file_sync), + 'work_pool_update': ('on_work_pool', lambda m: m.work_pool_update), + } + def __init__(self, node_id: str, transport: IMeshTransport): self.node_id = node_id self.transport = transport self.transport.set_listener(self) - + self._stop_event = threading.Event() self._is_ready = False - + # Callbacks to be hooked by the application (e.g., AgentNode) self.on_task = None # Callable[[Any], None] self.on_cancel = None @@ -66,38 +79,33 @@ # IMeshListener Implementation def on_message(self, message: Any): - """Generic message dispatcher based on the payload type.""" + """Routes inbound server messages via _DISPATCH. To handle a new proto + field, add one entry to _DISPATCH — no other changes needed.""" try: kind = message.WhichOneof('payload') print(f" [📥] MeshNodeCore Inbound: {kind}") - - # 1. State Management - if kind == 'policy_update' or kind == 'policy': - if not self._is_ready: - self._is_ready = True - if self.on_ready: self.on_ready(getattr(message, kind)) - if self.on_policy: self.on_policy(getattr(message, kind)) - # 2. Task Execution - elif kind == 'task_request': - if self.on_task: self.on_task(message.task_request) - elif kind == 'task_cancel': - if self.on_cancel: self.on_cancel(message.task_cancel) + entry = self._DISPATCH.get(kind) + if entry is None: + logger.warning(f"[MeshCore] Unhandled message kind: '{kind}' — add entry to _DISPATCH") + return - # 3. File Sync - elif kind == 'file_sync': - if self.on_sync: self.on_sync(message.file_sync) + cb_attr, extractor = entry + payload = extractor(message) - # 4. Work Pool - elif kind == 'work_pool_update': - if self.on_work_pool: self.on_work_pool(message.work_pool_update) + # Lifecycle hook: first policy message marks the node as ready. + if cb_attr == 'on_policy' and not self._is_ready: + self._is_ready = True + if self.on_ready: + self.on_ready(payload) - else: - logger.warning(f"[MeshCore] Unhandled message kind: '{kind}' — add a dispatch case") + cb = getattr(self, cb_attr) + if cb: + cb(payload) - logger.debug(f"[MeshCore] Inbound message: {kind}") + logger.debug(f"[MeshCore] Dispatched: {kind} → {cb_attr}") except Exception as e: - logger.error(f"[MeshCore] Dispatch Error: {e}") + logger.error(f"[MeshCore] Dispatch Error ({kind}): {e}") def on_error(self, error: Exception): logger.error(f"[MeshCore] Transport Error: {error}")