import threading
import queue
import time
class AbstractNodeRegistry:
"""Interface for finding and tracking Agent Nodes."""
def register(self, node_id: str, q: queue.Queue, metadata: dict): raise NotImplementedError
def update_stats(self, node_id: str, stats: dict): raise NotImplementedError
def get_best(self) -> str: raise NotImplementedError
def get_node(self, node_id: str) -> dict: raise NotImplementedError
class MemoryNodeRegistry(AbstractNodeRegistry):
"""In-memory implementation of the Node Registry."""
def __init__(self):
self.lock = threading.Lock()
self.nodes = {} # node_id -> { stats: {}, queue: queue, metadata: {} }
self.subscribers = set() # WebSocket connection objects
def register(self, node_id, q, metadata):
with self.lock:
self.nodes[node_id] = {"stats": {}, "queue": q, "metadata": metadata}
print(f"[📋] Registered Agent Node: {node_id}")
def update_stats(self, node_id, stats):
with self.lock:
if node_id in self.nodes:
self.nodes[node_id]["stats"].update(stats)
def get_best(self):
"""Picks the agent with the lowest active worker count."""
with self.lock:
if not self.nodes: return None
# Simple heuristic: sort by active worker count
return sorted(self.nodes.items(), key=lambda x: x[1]["stats"].get("active_worker_count", 999))[0][0]
def get_node(self, node_id):
with self.lock:
return self.nodes.get(node_id)
def list_nodes(self):
with self.lock:
return list(self.nodes.keys())
def subscribe(self, websocket):
with self.lock:
self.subscribers.add(websocket)
def unsubscribe(self, websocket):
with self.lock:
if websocket in self.subscribers:
self.subscribers.remove(websocket)
def emit(self, node_id, event, data, task_id=None):
"""Broadcasts an event to all attached UI clients."""
msg = {
"node_id": node_id,
"event": event,
"data": data,
"task_id": task_id,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
# In a real app, this would use an async-friendly event loop or Redis PUB/SUB.
# Here we just iterate. Note: caller is usually the gRPC thread.
import json
payload = json.dumps(msg)
# We need to be careful with async WebSockets from a sync gRPC thread.
# This implementation assumes the WebSocket handler will poll a queue
# or we use a separate 'Bridge' to push from sync to async.
# For the POC, we'll log it; the AI Hub bridge will handle the actual WS push.
print(f"[📡 EventBus] {node_id} -> {event}: {payload[:100]}...")
# Internal registry record (optional)
if "events" not in self.nodes.get(node_id, {}):
if node_id in self.nodes:
self.nodes[node_id]["events"] = []
else: return
self.nodes[node_id]["events"] = ([msg] + self.nodes[node_id]["events"])[:50]