Newer
Older
cortex-hub / poc-grpc-agent / orchestrator / app.py
import grpc
import time
import os
import sys
from concurrent import futures

# Add root to path to find protos
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

from protos import agent_pb2, agent_pb2_grpc
from orchestrator.config import (
    CERT_CA, CERT_SERVER_CRT, CERT_SERVER_KEY, 
    GRPC_HOST, GRPC_PORT, SIMULATION_DELAY_SEC, MAX_WORKERS
)
from orchestrator.services.grpc_server import AgentOrchestrator

def serve():
    print(f"[🛡️] Boss Plane Orchestrator Starting on {GRPC_HOST}:{GRPC_PORT}...")
    
    # 1. SSL/TLS Setup
    with open(CERT_SERVER_KEY, 'rb') as f: pkey = f.read()
    with open(CERT_SERVER_CRT, 'rb') as f: cert = f.read()
    with open(CERT_CA, 'rb') as f: ca = f.read()
    creds = grpc.ssl_server_credentials([(pkey, cert)], ca, True)
    
    # 2. Server Initialization
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=MAX_WORKERS))
    orch = AgentOrchestrator()
    agent_pb2_grpc.add_AgentOrchestratorServicer_to_server(orch, server)
    
    server.add_secure_port(f'{GRPC_HOST}:{GRPC_PORT}', creds)
    
    # 3. Start
    server.start()
    print("[🛡️] Boss Plane Refactored & Online.", flush=True)
    
    # 4. Simulation Launcher 
    # (In Production, this would be an API interface or Webhook handler)
    _run_simulation(orch)
    
    server.wait_for_termination()

def _run_simulation(orch):
    """Refactored AI Simulation logic using the TaskAssistant service."""
    time.sleep(SIMULATION_DELAY_SEC)
    print("\n[🧠] AI Simulation Start...", flush=True)
    
    # Collaborative Mesh Test: Pushing Shared Work
    print("[📦📤] Pushing shared tasks to Global Work Pool...")
    orch.pool.push_work("shared-mesh-001", "uname -a")
    time.sleep(2)
    orch.pool.push_work("shared-mesh-002", "uptime")
    time.sleep(5) # Let nodes claim
    
    active_nodes = orch.registry.list_nodes()
    if not active_nodes:
        print("[!] No nodes available for direct task simulation.")
        return
    target_node = active_nodes[0]
    
    # Ghost Mirror Sync Phase 1 & 2
    print("\n[🧠] AI Phase: Ghost Mirror Workspace Sync (Multi-Node Broadcast)...")
    for node_id in active_nodes:
        orch.assistant.push_workspace(node_id, "test-session-001")
    
    time.sleep(2)
    # Start watching only on the first node to test broadcast to others
    orch.assistant.control_sync(active_nodes[0], "test-session-001", action="START")
    
    # Phase 3: Context-Aware Skills (Shell + Browser)
    print("\n[🧠] AI Phase 3: Executing Context-Aware Shell Task...")
    res_single = orch.assistant.dispatch_single(target_node, 'ls -la', session_id="test-session-001")
    print(f"    CWD Listing Output: {res_single}", flush=True)
    
    # Phase 3: LOCK Test (Simulate an AI edit phase where user edits are blocked)
    time.sleep(10)
    print("\n[🔒] Orchestrator: Locking Node 0 to prevent user interference (Phase 3)...")
    orch.assistant.lock_workspace(active_nodes[0], "test-session-001")
    
    # Phase 4: Browser Bridge
    print("\n[🧠] AI Phase 4: Navigating Browser (Antigravity Bridge)...")
    nav_action = agent_pb2.BrowserAction(
        action=agent_pb2.BrowserAction.NAVIGATE,
        url="https://google.com",
        session_id="test-session-001"
    )
    res_browser = orch.assistant.dispatch_browser(target_node, nav_action, session_id="test-session-001")
    print(f"    Browser Result: {res_browser}", flush=True)

    # Stay alive for diagnostics
    time.sleep(60)
    # Phase 4 Pro: Perception & Evaluation
    print("\n[🧠] AI Phase 4 Pro: Perception & Advanced Logic...")
    a11y_action = agent_pb2.BrowserAction(
        action=agent_pb2.BrowserAction.GET_A11Y,
        session_id="antigravity-session-1"
    )
    res_a11y = orch.assistant.dispatch_browser(target_node, a11y_action)
    print(f"    A11y Result: {res_a11y.get('browser', {}).get('a11y')}")

    eval_action = agent_pb2.BrowserAction(
        action=agent_pb2.BrowserAction.EVAL,
        text="window.performance.now()",
        session_id="antigravity-session-1"
    )
    res_eval = orch.assistant.dispatch_browser(target_node, eval_action)
    print(f"    Eval Result: {res_eval.get('browser', {}).get('eval')}")

    # Real-time Events
    print("\n[🧠] AI Phase 4 Pro: Triggering Real-time Events (Tunneling)...")
    trigger_action = agent_pb2.BrowserAction(
        action=agent_pb2.BrowserAction.EVAL,
        text="console.log('Refactored Hello!'); fetch('https://example.com/api/ping');",
        session_id="antigravity-session-1"
    )
    orch.assistant.dispatch_browser(target_node, trigger_action)

if __name__ == '__main__':
    serve()