Newer
Older
cortex-hub / mesh-sdk / mesh_core / server_engine.py
@Antigravity AI Antigravity AI 17 days ago 2 KB half done refactoring
import logging
import time
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field

logger = logging.getLogger(__name__)

@dataclass
class MeshNodeRecord:
    node_id: str
    user_id: str
    metadata: Dict[str, Any]
    last_seen: float = field(default_factory=time.time)
    transport: Optional[Any] = None  # Transport used for this node
    stats: Dict[str, Any] = field(default_factory=dict)

class MeshServerCore:
    """
    Portable Orchestration Engine for a Mesh Hub.
    Manages node registrations, health tracking, and task routing.
    Designed for zero-dependency portability.
    """
    def __init__(self):
        self.nodes: Dict[str, MeshNodeRecord] = {}
        
        # Callbacks for application integration
        self.on_node_online = None  # Callable[[MeshNodeRecord], None]
        self.on_node_offline = None
        self.on_message_received = None # Callable[[node_id, message], None]

    def register_node(self, node_id: str, user_id: str, metadata: Dict[str, Any], transport: Any) -> MeshNodeRecord:
        """Registers or updates a node in the mesh."""
        record = MeshNodeRecord(node_id=node_id, user_id=user_id, metadata=metadata, transport=transport)
        self.nodes[node_id] = record
        
        logger.info(f"[MeshServerCore] Node Registered: {node_id} (User: {user_id})")
        if self.on_node_online: self.on_node_online(record)
        return record

    def deregister_node(self, node_id: str):
        """Removes a node from the active registry."""
        if node_id in self.nodes:
            record = self.nodes.pop(node_id)
            logger.warning(f"[MeshServerCore] Node Offline: {node_id}")
            if self.on_node_offline: self.on_node_offline(record)

    def get_node(self, node_id: str) -> Optional[MeshNodeRecord]:
        return self.nodes.get(node_id)

    def list_nodes(self) -> List[MeshNodeRecord]:
        return list(self.nodes.values())

    def update_health(self, node_id: str, metrics: Dict[str, Any]):
        """Updates health stats for a node."""
        if node_id in self.nodes:
            node = self.nodes[node_id]
            node.last_seen = time.time()
            node.stats.update(metrics)

    def dispatch(self, node_id: str, message: Any, priority: int = 1):
        """Sends a message to a specific node via its registered transport."""
        node = self.get_node(node_id)
        if node and node.transport:
            node.transport.send(message, priority=priority)
        else:
            raise Exception(f"Node {node_id} not found or transport unavailable.")

    def handle_inbound(self, node_id: str, message: Any):
        """Entry point for transport-level messages into the engine."""
        if self.on_message_received:
            self.on_message_received(node_id, message)