import grpc
import time
import os
import sys
from concurrent import futures
# Add root to path to find protos
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from protos import agent_pb2, agent_pb2_grpc
from orchestrator.config import (
CERT_CA, CERT_SERVER_CRT, CERT_SERVER_KEY,
GRPC_HOST, GRPC_PORT, SIMULATION_DELAY_SEC, MAX_WORKERS
)
from orchestrator.services.grpc_server import AgentOrchestrator
def serve():
print(f"[🛡️] Boss Plane Orchestrator Starting on {GRPC_HOST}:{GRPC_PORT}...")
# 1. SSL/TLS Setup
with open(CERT_SERVER_KEY, 'rb') as f: pkey = f.read()
with open(CERT_SERVER_CRT, 'rb') as f: cert = f.read()
with open(CERT_CA, 'rb') as f: ca = f.read()
creds = grpc.ssl_server_credentials([(pkey, cert)], ca, True)
# 2. Server Initialization
server = grpc.server(futures.ThreadPoolExecutor(max_workers=MAX_WORKERS))
orch = AgentOrchestrator()
agent_pb2_grpc.add_AgentOrchestratorServicer_to_server(orch, server)
server.add_secure_port(f'{GRPC_HOST}:{GRPC_PORT}', creds)
# 3. Start
server.start()
print("[🛡️] Boss Plane Refactored & Online.", flush=True)
# 4. Simulation Launcher
# (In Production, this would be an API interface or Webhook handler)
_run_simulation(orch)
server.wait_for_termination()
def _run_simulation(orch):
"""Refactored AI Simulation logic using the TaskAssistant service."""
time.sleep(SIMULATION_DELAY_SEC)
print("\n[🧠] AI Simulation Start...", flush=True)
# Collaborative Mesh Test: Pushing Shared Work
print("[📦📤] Pushing shared tasks to Global Work Pool...")
orch.pool.push_work("shared-mesh-001", "uname -a")
time.sleep(2)
orch.pool.push_work("shared-mesh-002", "uptime")
time.sleep(5) # Let nodes claim
active_nodes = orch.registry.list_nodes()
if not active_nodes:
print("[!] No nodes available for direct task simulation.")
return
target_node = active_nodes[0]
# Ghost Mirror Sync Phase 1 & 2
print("\n[🧠] AI Phase: Ghost Mirror Workspace Sync (Multi-Node Broadcast)...")
for node_id in active_nodes:
orch.assistant.push_workspace(node_id, "test-session-001")
time.sleep(2)
# Start watching only on the first node to test broadcast to others
orch.assistant.control_sync(active_nodes[0], "test-session-001", action="START")
# Phase 3: Context-Aware Skills (Shell + Browser)
print("\n[🧠] AI Phase 3: Executing Context-Aware Shell Task...")
res_single = orch.assistant.dispatch_single(target_node, 'ls -la', session_id="test-session-001")
print(f" CWD Listing Output: {res_single}", flush=True)
# Phase 3: LOCK Test (Simulate an AI edit phase where user edits are blocked)
time.sleep(10)
print("\n[🔒] Orchestrator: Locking Node 0 to prevent user interference (Phase 3)...")
orch.assistant.lock_workspace(active_nodes[0], "test-session-001")
# Phase 4: Browser Bridge
print("\n[🧠] AI Phase 4: Navigating Browser (Antigravity Bridge)...")
nav_action = agent_pb2.BrowserAction(
action=agent_pb2.BrowserAction.NAVIGATE,
url="https://google.com",
session_id="test-session-001"
)
res_browser = orch.assistant.dispatch_browser(target_node, nav_action, session_id="test-session-001")
print(f" Browser Result: {res_browser}", flush=True)
# Stay alive for diagnostics
time.sleep(60)
# Phase 4 Pro: Perception & Evaluation
print("\n[🧠] AI Phase 4 Pro: Perception & Advanced Logic...")
a11y_action = agent_pb2.BrowserAction(
action=agent_pb2.BrowserAction.GET_A11Y,
session_id="antigravity-session-1"
)
res_a11y = orch.assistant.dispatch_browser(target_node, a11y_action)
print(f" A11y Result: {res_a11y.get('browser', {}).get('a11y')}")
eval_action = agent_pb2.BrowserAction(
action=agent_pb2.BrowserAction.EVAL,
text="window.performance.now()",
session_id="antigravity-session-1"
)
res_eval = orch.assistant.dispatch_browser(target_node, eval_action)
print(f" Eval Result: {res_eval.get('browser', {}).get('eval')}")
# Real-time Events
print("\n[🧠] AI Phase 4 Pro: Triggering Real-time Events (Tunneling)...")
trigger_action = agent_pb2.BrowserAction(
action=agent_pb2.BrowserAction.EVAL,
text="console.log('Refactored Hello!'); fetch('https://example.com/api/ping');",
session_id="antigravity-session-1"
)
orch.assistant.dispatch_browser(target_node, trigger_action)
if __name__ == '__main__':
serve()