Newer
Older
cortex-hub / mesh-sdk / mesh_core / node_engine.py
@Antigravity AI Antigravity AI 17 days ago 3 KB half done refactoring
import threading
import time
import logging
from typing import Any, Optional, Dict, Callable
from .transport import IMeshTransport, IMeshListener

logger = logging.getLogger(__name__)

class MeshNodeCore(IMeshListener):
    """
    Portable state machine for a Mesh Node.
    Handles the lifecycle of a node (Handshake, Heartbeat, Reconnection) 
    without being coupled to gRPC or specific application logic.
    """
    def __init__(self, node_id: str, transport: IMeshTransport):
        self.node_id = node_id
        self.transport = transport
        self.transport.set_listener(self)
        
        self._stop_event = threading.Event()
        self._is_ready = False
        
        # Callbacks to be hooked by the application (e.g., AgentNode)
        self.on_task = None  # Callable[[Any], None]
        self.on_cancel = None
        self.on_policy = None
        self.on_sync = None
        self.on_ready = None
        self.on_disconnect = None

    def start(self):
        """Starts the node and its management loops."""
        logger.info(f"[MeshCore] Starting Node {self.node_id}...")
        
        # 1. Perform Handshake
        if not self.transport.handshake():
            logger.error(f"[MeshCore] Handshake failed. Node {self.node_id} cannot start.")
            return False

        # 2. Connect TaskStream
        self.transport.connect()
        threading.Thread(target=self._management_loop, daemon=True, name="MeshNodeMgmt").start()
        return True

    def _management_loop(self):
        """Background loop for health and state monitoring."""
        while not self._stop_event.is_set():
            if not self.transport.is_connected():
                if self._is_ready:
                    self._is_ready = False
                    if self.on_disconnect: self.on_disconnect()
            
            time.sleep(1)

    def send_message(self, message: Any, priority: int = 1):
        """High-level method to send a message via the transport."""
        if self.transport.is_connected():
            self.transport.send(message, priority=priority)
        else:
            logger.warning(f"[MeshCore] Dropped message: Transport disconnected.")

    # IMeshListener Implementation
    def on_message(self, message: Any):
        """Generic message dispatcher based on the payload type."""
        try:
            kind = message.WhichOneof('payload')
            
            # 1. State Management
            if kind == 'policy_update' or kind == 'policy':
                if not self._is_ready:
                    self._is_ready = True
                    if self.on_ready: self.on_ready(message)
                if self.on_policy: self.on_policy(message)
            
            # 2. Task Execution
            elif kind == 'task_request':
                if self.on_task: self.on_task(message.task_request)
            elif kind == 'task_cancel':
                if self.on_cancel: self.on_cancel(message.task_cancel)
            
            # 3. File Sync
            elif kind == 'file_sync':
                if self.on_sync: self.on_sync(message.file_sync)
                
            logger.debug(f"[MeshCore] Inbound message: {kind}")
        except Exception as e:
            logger.error(f"[MeshCore] Dispatch Error: {e}")

    def on_error(self, error: Exception):
        logger.error(f"[MeshCore] Transport Error: {error}")

    def on_close(self):
        logger.warning(f"[MeshCore] Transport Closed.")
        self._is_ready = False
        if self.on_disconnect: self.on_disconnect()

    def stop(self):
        """Graceful shutdown."""
        self._stop_event.set()
        self.transport.close()