import logging
import time
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class MeshNodeRecord:
node_id: str
user_id: str
metadata: Dict[str, Any]
last_seen: float = field(default_factory=time.time)
transport: Optional[Any] = None # Transport used for this node
stats: Dict[str, Any] = field(default_factory=dict)
class MeshServerCore:
"""
Portable Orchestration Engine for a Mesh Hub.
Manages node registrations, health tracking, and task routing.
Designed for zero-dependency portability.
"""
def __init__(self):
self.nodes: Dict[str, MeshNodeRecord] = {}
# Callbacks for application integration
self.on_node_online = None # Callable[[MeshNodeRecord], None]
self.on_node_offline = None
self.on_message_received = None # Callable[[node_id, message], None]
def register_node(self, node_id: str, user_id: str, metadata: Dict[str, Any], transport: Any) -> MeshNodeRecord:
"""Registers or updates a node in the mesh."""
record = MeshNodeRecord(node_id=node_id, user_id=user_id, metadata=metadata, transport=transport)
self.nodes[node_id] = record
logger.info(f"[MeshServerCore] Node Registered: {node_id} (User: {user_id})")
if self.on_node_online: self.on_node_online(record)
return record
def deregister_node(self, node_id: str):
"""Removes a node from the active registry."""
if node_id in self.nodes:
record = self.nodes.pop(node_id)
logger.warning(f"[MeshServerCore] Node Offline: {node_id}")
if self.on_node_offline: self.on_node_offline(record)
def get_node(self, node_id: str) -> Optional[MeshNodeRecord]:
return self.nodes.get(node_id)
def list_nodes(self) -> List[MeshNodeRecord]:
return list(self.nodes.values())
def update_health(self, node_id: str, metrics: Dict[str, Any]):
"""Updates health stats for a node."""
if node_id in self.nodes:
node = self.nodes[node_id]
node.last_seen = time.time()
node.stats.update(metrics)
def dispatch(self, node_id: str, message: Any, priority: int = 1):
"""Sends a message to a specific node via its registered transport."""
node = self.get_node(node_id)
if node and node.transport:
node.transport.send(message, priority=priority)
else:
raise Exception(f"Node {node_id} not found or transport unavailable.")
def handle_inbound(self, node_id: str, message: Any):
"""Entry point for transport-level messages into the engine."""
if self.on_message_received:
self.on_message_received(node_id, message)