diff --git a/poc-grpc-agent/orchestrator/app.py b/poc-grpc-agent/orchestrator/app.py index 7507f55..0812478 100644 --- a/poc-grpc-agent/orchestrator/app.py +++ b/poc-grpc-agent/orchestrator/app.py @@ -45,9 +45,17 @@ time.sleep(SIMULATION_DELAY_SEC) print("\n[🧠] AI Simulation Start...", flush=True) - target_node = "agent-node-007" + # Collaborative Mesh Test: Pushing Shared Work + print("[📦📤] Pushing shared tasks to Global Work Pool...") + orch.pool.push_work("shared-mesh-001", "uname -a") + time.sleep(2) + orch.pool.push_work("shared-mesh-002", "uptime") + time.sleep(5) # Let nodes claim - # Phase 1: Shell + target_node = "agent-node-001" + + # Phase 1: Direct Shell Task + print(f"\n[📤] Dispatching shell task to {target_node}") res_single = orch.assistant.dispatch_single(target_node, 'uname -a') print(f" Uname Output: {res_single}", flush=True) diff --git a/poc-grpc-agent/orchestrator/core/pool.py b/poc-grpc-agent/orchestrator/core/pool.py index 222a28b..f53a1db 100644 --- a/poc-grpc-agent/orchestrator/core/pool.py +++ b/poc-grpc-agent/orchestrator/core/pool.py @@ -4,8 +4,17 @@ """Thread-safe pool of unassigned tasks that can be claimed by any node.""" def __init__(self): self.lock = threading.Lock() - self.available = {"shared-001": "uname -a", "shared-002": "uptime"} + self.available = {} # task_id -> payload + self.on_new_work = None # Callback to notify nodes + def push_work(self, task_id, payload): + """Adds new task to global discovery pool.""" + with self.lock: + self.available[task_id] = payload + print(f" [📦] New Shared Task: {task_id}") + if self.on_new_work: + self.on_new_work(task_id) + def claim(self, task_id, node_id): """Allows a node to pull a specific task from the pool.""" with self.lock: diff --git a/poc-grpc-agent/orchestrator/services/grpc_server.py b/poc-grpc-agent/orchestrator/services/grpc_server.py index 7112d89..27d116c 100644 --- a/poc-grpc-agent/orchestrator/services/grpc_server.py +++ b/poc-grpc-agent/orchestrator/services/grpc_server.py @@ -15,7 +15,17 @@ self.journal = TaskJournal() self.pool = GlobalWorkPool() self.assistant = TaskAssistant(self.registry, self.journal, self.pool) + self.pool.on_new_work = self._broadcast_work + def _broadcast_work(self, _): + """Pushes work notifications to all active nodes.""" + with self.registry.lock: + for node_id, node in self.registry.nodes.items(): + print(f" [📢] Broadcasting availability to {node_id}") + node["queue"].put(agent_pb2.ServerTaskMessage( + work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=self.pool.list_available()) + )) + def SyncConfiguration(self, request, context): """Standard Handshake: Authenticate and Send Policy.""" # Pre-registration for metadata search @@ -58,19 +68,21 @@ threading.Thread(target=_read_results, daemon=True, name=f"Results-{node_id}").start() # 3. Work Dispatcher (Main Stream) + last_keepalive = 0 while context.is_active(): try: # Non-blocking wait to check context periodically msg = node["queue"].get(timeout=1.0) yield msg - print(f" [🚀] Streamed message to {node_id}") except queue.Empty: - # Minimal background traffic keeps connection alive - if self.pool.available: - # Only broadcast every 5s or if state changes in a real system - yield agent_pb2.ServerTaskMessage( - work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=self.pool.list_available()) - ) + # Occasional broadcast to nodes to ensure pool sync + now = time.time() + if (now - last_keepalive) > 10.0: + last_keepalive = now + if self.pool.available: + yield agent_pb2.ServerTaskMessage( + work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=self.pool.list_available()) + ) continue except StopIteration: pass diff --git a/poc-grpc-agent/scripts/multi_node_test.sh b/poc-grpc-agent/scripts/multi_node_test.sh new file mode 100755 index 0000000..1396986 --- /dev/null +++ b/poc-grpc-agent/scripts/multi_node_test.sh @@ -0,0 +1,47 @@ +#!/bin/bash +# 🚀 Collaborative Multi-Agent Mesh Test Script + +echo "--- 🛠️ Antigravity Mesh Bootstrap ---" + +# 1. Start Orchestrator in Background +export PYTHONPATH=. +export PYTHONUNBUFFERED=1 +export SIMULATION_DELAY_SEC=10 +python3 orchestrator/app.py > server_mesh.log 2>&1 & +ORCH_PID=$! +echo "[🛡️] Orchestrator PID: $ORCH_PID" + +sleep 3 + +# 2. Start Agent Node 001 +export AGENT_NODE_ID="agent-node-001" +python3 agent_node/main.py > node_001.log 2>&1 & +NODE_001_PID=$! +echo "[🤖] Agent 001 PID: $NODE_001_PID" + +# 3. Start Agent Node 002 +export AGENT_NODE_ID="agent-node-002" +python3 agent_node/main.py > node_002.log 2>&1 & +NODE_002_PID=$! +echo "[🤖] Agent 002 PID: $NODE_002_PID" + +echo "[⏳] Simulation Running (30s)..." +sleep 40 + +# 4. Cleanup +echo "\n--- 🛑 Shutdown Mesh ---" +kill -TERM $NODE_001_PID +kill -TERM $NODE_002_PID +kill -TERM $ORCH_PID +sleep 3 + +echo "\n--- 📊 Mesh Observations ---" +echo "--- SERVER LOG ---" +tail -n 20 server_mesh.log +echo "\n--- NODE 001 LOG ---" +tail -n 15 node_001.log +echo "\n--- NODE 002 LOG ---" +tail -n 15 node_002.log + +# Cleanup logs if needed +# rm server_mesh.log node_001.log node_002.log