Newer
Older
cortex-hub / poc-grpc-agent / test_mesh.py

import time
import subprocess
import os
import signal

def run_mesh_test():
    print("[🚀] Starting Collaborative Mesh Test...")
    print("[🛡️] Orchestrator: Starting...")
    
    # 1. Start Orchestrator
    orchestrator = subprocess.Popen(
        ["python3", "-m", "orchestrator.app"],
        cwd="/app/poc-grpc-agent",
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1
    )
    
    time.sleep(3) # Wait for start
    
    print("[🤖] Node Alpha: Starting...")
    # 2. Start Agent Node 1
    node1 = subprocess.Popen(
        ["python3", "-m", "agent_node.main"],
        cwd="/app/poc-grpc-agent",
        env={**os.environ, "AGENT_NODE_ID": "node-alpha", "CORTEX_SYNC_DIR": "/tmp/cortex-sync-alpha"},
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1
    )
    
    print("[🤖] Node Beta: Starting...")
    # 3. Start Agent Node 2
    node2 = subprocess.Popen(
        ["python3", "-m", "agent_node.main"],
        cwd="/app/poc-grpc-agent",
        env={**os.environ, "AGENT_NODE_ID": "node-beta", "CORTEX_SYNC_DIR": "/tmp/cortex-sync-beta"},
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1
    )
    
    print("[⏳] Running simulation for 40 seconds...")
    start_time = time.time()
    
    # Simple thread to print outputs in real-time
    import threading
    def pipe_output(name, pipe):
        for line in pipe:
            print(f"[{name}] {line.strip()}")

    threading.Thread(target=pipe_output, args=("ORCH", orchestrator.stdout), daemon=True).start()
    threading.Thread(target=pipe_output, args=("N1", node1.stdout), daemon=True).start()
    threading.Thread(target=pipe_output, args=("N2", node2.stdout), daemon=True).start()
    
    # Simulate a local edit on Node Alpha (N1) after a delay to test real-time sync
    def simulate_local_edit():
        time.sleep(22) # Wait for initial push and START_WATCHING
        sync_file = "/tmp/cortex-sync-alpha/test-session-001/hello.py"
        print(f"\n[📝] User Sim: Editing {sync_file} locally on Node Alpha...")
        with open(sync_file, "a") as f:
            f.write("\n# BROADCAST TEST: User added this line on Alpha!\n")
            
    threading.Thread(target=simulate_local_edit, daemon=True).start()

    time.sleep(40)
    
    # 4. Cleanup
    print("\n[🛑] Test Finished. Terminating processes...")
    orchestrator.terminate()
    node1.terminate()
    node2.terminate()
    
    time.sleep(2)
    orchestrator.kill()
    node1.kill()
    node2.kill()
    print("[✅] Done.")

if __name__ == "__main__":
    run_mesh_test()