import threading
import time
import logging
from typing import Any, Optional, Dict, Callable
from ..transport import IMeshTransport, IMeshListener
logger = logging.getLogger(__name__)
class MeshNodeCore(IMeshListener):
"""
Portable state machine for a Mesh Node.
Handles the lifecycle of a node (Handshake, Heartbeat, Reconnection)
without being coupled to gRPC or specific application logic.
"""
def __init__(self, node_id: str, transport: IMeshTransport):
self.node_id = node_id
self.transport = transport
self.transport.set_listener(self)
self._stop_event = threading.Event()
self._is_ready = False
# Callbacks to be hooked by the application (e.g., AgentNode)
self.on_task = None # Callable[[Any], None]
self.on_cancel = None
self.on_policy = None
self.on_sync = None
self.on_ready = None
self.on_disconnect = None
self.on_work_pool = None
def start(self):
"""Starts the node and its management loops."""
logger.info(f"[MeshCore] Starting Node {self.node_id}...")
# 1. Perform Handshake
policy = self.transport.handshake()
if not policy:
logger.error(f"[MeshCore] Handshake failed. Node {self.node_id} cannot start.")
return False
if self.on_policy and hasattr(policy, "mode"):
self.on_policy(policy)
# 2. Connect TaskStream
self.transport.connect()
threading.Thread(target=self._management_loop, daemon=True, name="MeshNodeMgmt").start()
return True
def _management_loop(self):
"""Background loop for health and state monitoring."""
while not self._stop_event.is_set():
if not self.transport.is_connected():
if self._is_ready:
self._is_ready = False
if self.on_disconnect: self.on_disconnect()
time.sleep(1)
def send_message(self, message: Any, priority: int = 1):
"""High-level method to send a message via the transport."""
if self.transport.is_connected():
self.transport.send(message, priority=priority)
else:
logger.warning(f"[MeshCore] Dropped message: Transport disconnected.")
# IMeshListener Implementation
def on_message(self, message: Any):
"""Generic message dispatcher based on the payload type."""
try:
kind = message.WhichOneof('payload')
print(f" [📥] MeshNodeCore Inbound: {kind}")
# 1. State Management
if kind == 'policy_update' or kind == 'policy':
if not self._is_ready:
self._is_ready = True
if self.on_ready: self.on_ready(message)
if self.on_policy: self.on_policy(getattr(message, kind))
# 2. Task Execution
elif kind == 'task_request':
if self.on_task: self.on_task(message.task_request)
elif kind == 'task_cancel':
if self.on_cancel: self.on_cancel(message.task_cancel)
# 3. File Sync
elif kind == 'file_sync':
if self.on_sync: self.on_sync(message.file_sync)
# 4. Work Pool
elif kind == 'work_pool_update':
if self.on_work_pool: self.on_work_pool(message.work_pool_update)
logger.debug(f"[MeshCore] Inbound message: {kind}")
except Exception as e:
logger.error(f"[MeshCore] Dispatch Error: {e}")
def on_error(self, error: Exception):
logger.error(f"[MeshCore] Transport Error: {error}")
def on_close(self):
logger.warning(f"[MeshCore] Transport Closed.")
self._is_ready = False
if self.on_disconnect: self.on_disconnect()
def stop(self):
"""Graceful shutdown."""
self._stop_event.set()
self.transport.close()