diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py index 81b9f80..7434831 100644 --- a/agent-node/src/agent_node/node.py +++ b/agent-node/src/agent_node/node.py @@ -73,6 +73,7 @@ self.on_sync = self._handle_file_sync self.on_ready = self._on_mesh_ready self.on_disconnect = self._on_mesh_disconnect + self.on_work_pool = self._handle_work_pool def _on_mesh_ready(self, msg): print(f"[Mesh] Connected and Authorized. Policy Synced.") diff --git a/mesh-sdk/mesh_core/engines/node.py b/mesh-sdk/mesh_core/engines/node.py index 36b4a7e..b3a669e 100644 --- a/mesh-sdk/mesh_core/engines/node.py +++ b/mesh-sdk/mesh_core/engines/node.py @@ -27,6 +27,7 @@ self.on_sync = None self.on_ready = None self.on_disconnect = None + self.on_work_pool = None def start(self): """Starts the node and its management loops.""" @@ -75,17 +76,21 @@ if not self._is_ready: self._is_ready = True if self.on_ready: self.on_ready(message) - if self.on_policy: self.on_policy(message) - + if self.on_policy: self.on_policy(getattr(message, kind)) + # 2. Task Execution elif kind == 'task_request': if self.on_task: self.on_task(message.task_request) elif kind == 'task_cancel': if self.on_cancel: self.on_cancel(message.task_cancel) - + # 3. File Sync elif kind == 'file_sync': if self.on_sync: self.on_sync(message.file_sync) + + # 4. Work Pool + elif kind == 'work_pool_update': + if self.on_work_pool: self.on_work_pool(message.work_pool_update) logger.debug(f"[MeshCore] Inbound message: {kind}") except Exception as e: diff --git a/mesh-sdk/mesh_core/transport/grpc.py b/mesh-sdk/mesh_core/transport/grpc.py index 73f3a36..5bed339 100644 --- a/mesh-sdk/mesh_core/transport/grpc.py +++ b/mesh-sdk/mesh_core/transport/grpc.py @@ -33,6 +33,7 @@ self._stop_event = threading.Event() self._connected = False self._health_thread_started = False + self._health_thread = None self.last_activity = 0 self._send_counter = itertools.count() # thread-safe atomic counter; avoids protobuf comparison in heapq @@ -192,9 +193,11 @@ self.health_queue.put(heartbeat) def _start_health_stream(self): - if self._health_thread_started: return + if self._health_thread is not None and self._health_thread.is_alive(): + return self._health_thread_started = True - threading.Thread(target=self._run_health_stream, daemon=True, name="GrpcHealthStream").start() + self._health_thread = threading.Thread(target=self._run_health_stream, daemon=True, name="GrpcHealthStream") + self._health_thread.start() def _run_health_stream(self): while not self._stop_event.is_set(): @@ -219,9 +222,11 @@ except Exception as e: logger.error(f"[Mesh] Health Stream Error: {e}") time.sleep(5) + self._health_thread_started = False def close(self): self._stop_event.set() + self._health_thread_started = False if self.channel: self.channel.close() self._connected = False