import time
import subprocess
import os
import signal
def run_mesh_test():
print("[🚀] Starting Collaborative Mesh Test...")
print("[🛡️] Orchestrator: Starting...")
# 1. Start Orchestrator
orchestrator = subprocess.Popen(
["python3", "-m", "orchestrator.app"],
cwd="/app/poc-grpc-agent",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
time.sleep(3) # Wait for start
print("[🤖] Node Alpha: Starting...")
# 2. Start Agent Node 1
node1 = subprocess.Popen(
["python3", "-m", "agent_node.main"],
cwd="/app/poc-grpc-agent",
env={**os.environ, "AGENT_NODE_ID": "node-alpha", "CORTEX_SYNC_DIR": "/tmp/cortex-sync-alpha"},
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
print("[🤖] Node Beta: Starting...")
# 3. Start Agent Node 2
node2 = subprocess.Popen(
["python3", "-m", "agent_node.main"],
cwd="/app/poc-grpc-agent",
env={**os.environ, "AGENT_NODE_ID": "node-beta", "CORTEX_SYNC_DIR": "/tmp/cortex-sync-beta"},
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
print("[⏳] Running simulation for 40 seconds...")
start_time = time.time()
# Simple thread to print outputs in real-time
import threading
def pipe_output(name, pipe):
for line in pipe:
print(f"[{name}] {line.strip()}")
threading.Thread(target=pipe_output, args=("ORCH", orchestrator.stdout), daemon=True).start()
threading.Thread(target=pipe_output, args=("N1", node1.stdout), daemon=True).start()
threading.Thread(target=pipe_output, args=("N2", node2.stdout), daemon=True).start()
# Simulate a local edit on Node Alpha (N1) after a delay to test real-time sync
def simulate_local_edit():
time.sleep(22) # Wait for initial push and START_WATCHING
sync_file = "/tmp/cortex-sync-alpha/test-session-001/hello.py"
print(f"\n[📝] User Sim: Editing {sync_file} locally on Node Alpha...")
with open(sync_file, "a") as f:
f.write("\n# BROADCAST TEST: User added this line on Alpha!\n")
threading.Thread(target=simulate_local_edit, daemon=True).start()
time.sleep(40)
# 4. Cleanup
print("\n[🛑] Test Finished. Terminating processes...")
orchestrator.terminate()
node1.terminate()
node2.terminate()
time.sleep(2)
orchestrator.kill()
node1.kill()
node2.kill()
print("[✅] Done.")
if __name__ == "__main__":
run_mesh_test()