Newer
Older
cortex-hub / poc-grpc-agent / orchestrator / core / registry.py
import threading
import queue
import time

class AbstractNodeRegistry:
    """Interface for finding and tracking Agent Nodes."""
    def register(self, node_id: str, q: queue.Queue, metadata: dict): raise NotImplementedError
    def update_stats(self, node_id: str, stats: dict): raise NotImplementedError
    def get_best(self) -> str: raise NotImplementedError
    def get_node(self, node_id: str) -> dict: raise NotImplementedError

class MemoryNodeRegistry(AbstractNodeRegistry):
    """In-memory implementation of the Node Registry."""
    def __init__(self):
        self.lock = threading.Lock()
        self.nodes = {} # node_id -> { stats: {}, queue: queue, metadata: {} }
        self.subscribers = set() # WebSocket connection objects

    def register(self, node_id, q, metadata):
        with self.lock:
            self.nodes[node_id] = {"stats": {}, "queue": q, "metadata": metadata}
            print(f"[📋] Registered Agent Node: {node_id}")

    def update_stats(self, node_id, stats):
        with self.lock:
            if node_id in self.nodes:
                self.nodes[node_id]["stats"].update(stats)

    def get_best(self):
        """Picks the agent with the lowest active worker count."""
        with self.lock:
            if not self.nodes: return None
            # Simple heuristic: sort by active worker count
            return sorted(self.nodes.items(), key=lambda x: x[1]["stats"].get("active_worker_count", 999))[0][0]

    def get_node(self, node_id):
        with self.lock:
            return self.nodes.get(node_id)
            
    def list_nodes(self):
        with self.lock:
            return list(self.nodes.keys())

    def subscribe(self, websocket):
        with self.lock:
            self.subscribers.add(websocket)

    def unsubscribe(self, websocket):
        with self.lock:
            if websocket in self.subscribers:
                self.subscribers.remove(websocket)

    def emit(self, node_id, event, data, task_id=None):
        """Broadcasts an event to all attached UI clients."""
        msg = {
            "node_id": node_id,
            "event": event,
            "data": data,
            "task_id": task_id,
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
        }
        # In a real app, this would use an async-friendly event loop or Redis PUB/SUB.
        # Here we just iterate. Note: caller is usually the gRPC thread.
        import json
        payload = json.dumps(msg)
        
        # We need to be careful with async WebSockets from a sync gRPC thread.
        # This implementation assumes the WebSocket handler will poll a queue 
        # or we use a separate 'Bridge' to push from sync to async.
        # For the POC, we'll log it; the AI Hub bridge will handle the actual WS push.
        print(f"[📡 EventBus] {node_id} -> {event}: {payload[:100]}...")
        
        # Internal registry record (optional)
        if "events" not in self.nodes.get(node_id, {}):
            if node_id in self.nodes:
                self.nodes[node_id]["events"] = []
            else: return
        self.nodes[node_id]["events"] = ([msg] + self.nodes[node_id]["events"])[:50]