Newer
Older
cortex-hub / poc-grpc-agent / test_mesh.py

import time
import subprocess
import os
import signal

def run_mesh_test():
    print("[🚀] Starting Collaborative Mesh Test...")
    print("[🛡️] Orchestrator: Starting...")
    
    # 1. Start Orchestrator
    orchestrator = subprocess.Popen(
        ["python3", "-m", "orchestrator.app"],
        cwd="/app/poc-grpc-agent",
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1
    )
    
    time.sleep(3) # Wait for start
    
    print("[🤖] Node Alpha: Starting...")
    # 2. Start Agent Node 1
    node1 = subprocess.Popen(
        ["python3", "-m", "agent_node.main"],
        cwd="/app/poc-grpc-agent",
        env={**os.environ, "AGENT_NODE_ID": "node-alpha", "CORTEX_SYNC_DIR": "/tmp/cortex-sync-alpha"},
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1
    )
    
    print("[🤖] Node Beta: Starting...")
    # 3. Start Agent Node 2
    node2 = subprocess.Popen(
        ["python3", "-m", "agent_node.main"],
        cwd="/app/poc-grpc-agent",
        env={**os.environ, "AGENT_NODE_ID": "node-beta", "CORTEX_SYNC_DIR": "/tmp/cortex-sync-beta"},
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1
    )
    
    print("[⏳] Running simulation for 60 seconds...")
    start_time = time.time()
    
    # Simple thread to print outputs in real-time
    import threading
    def pipe_output(name, pipe):
        for line in pipe:
            print(f"[{name}] {line.strip()}")

    threading.Thread(target=pipe_output, args=("ORCH", orchestrator.stdout), daemon=True).start()
    threading.Thread(target=pipe_output, args=("N1", node1.stdout), daemon=True).start()
    threading.Thread(target=pipe_output, args=("N2", node2.stdout), daemon=True).start()
    
    # Simulate a local edit on Node Alpha (N1) after a delay to test real-time sync
    def simulate_local_edit():
        time.sleep(22)
        root_alpha = "/tmp/cortex-sync-alpha/test-session-001"
        os.makedirs(root_alpha, exist_ok=True)
        
        # 1. Create .cortexignore
        print(f"\n[📝] User Sim: Creating .cortexignore on Node Alpha...")
        with open(os.path.join(root_alpha, ".cortexignore"), "w") as f:
            f.write("*.tmp\nsecret.txt\n")

        # 2. Edit hello.py (Should Sync)
        sync_file = os.path.join(root_alpha, "hello.py")
        print(f"[📝] User Sim: Editing {sync_file} (Should Sync)...")
        with open(sync_file, "a") as f:
            f.write("\n# Phase 3: Regular edit\n")

        # 3. Create secret.txt (Should be IGNORED)
        secret_file = os.path.join(root_alpha, "secret.txt")
        print(f"[📝] User Sim: Creating {secret_file} (Should be IGNORED)...")
        with open(secret_file, "w") as f:
            f.write("THIS SHOULD NOT SYNC")

        time.sleep(20) # Wait for Lock reliably
        # 4. Workspace LOCK Test
        print(f"\n[🔒] User Sim: Node Alpha should be LOCKED by Orchestrator now...")
        locked_file = os.path.join(root_alpha, "hello.py")
        with open(locked_file, "a") as f:
            f.write("\n# USER TRYING TO EDIT WHILE LOCKED\n")
            
    threading.Thread(target=simulate_local_edit, daemon=True).start()

    time.sleep(60)
    
    # 4. Cleanup
    print("\n[🛑] Test Finished. Terminating processes...")
    orchestrator.terminate()
    node1.terminate()
    node2.terminate()
    
    time.sleep(2)
    orchestrator.kill()
    node1.kill()
    node2.kill()
    print("[✅] Done.")

if __name__ == "__main__":
    run_mesh_test()