import time
import subprocess
import os
import signal
def run_mesh_test():
print("[🚀] Starting Collaborative Mesh Test...")
print("[🛡️] Orchestrator: Starting...")
# 1. Start Orchestrator
orchestrator = subprocess.Popen(
["python3", "-m", "orchestrator.app"],
cwd="/app/poc-grpc-agent",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
time.sleep(3) # Wait for start
print("[🤖] Node Alpha: Starting...")
# 2. Start Agent Node 1
node1 = subprocess.Popen(
["python3", "-m", "agent_node.main"],
cwd="/app/poc-grpc-agent",
env={**os.environ, "AGENT_NODE_ID": "node-alpha", "CORTEX_SYNC_DIR": "/tmp/cortex-sync-alpha"},
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
print("[🤖] Node Beta: Starting...")
# 3. Start Agent Node 2
node2 = subprocess.Popen(
["python3", "-m", "agent_node.main"],
cwd="/app/poc-grpc-agent",
env={**os.environ, "AGENT_NODE_ID": "node-beta", "CORTEX_SYNC_DIR": "/tmp/cortex-sync-beta"},
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
print("[⏳] Running simulation for 60 seconds...")
start_time = time.time()
# Simple thread to print outputs in real-time
import threading
def pipe_output(name, pipe):
for line in pipe:
print(f"[{name}] {line.strip()}")
threading.Thread(target=pipe_output, args=("ORCH", orchestrator.stdout), daemon=True).start()
threading.Thread(target=pipe_output, args=("N1", node1.stdout), daemon=True).start()
threading.Thread(target=pipe_output, args=("N2", node2.stdout), daemon=True).start()
# Simulate a local edit on Node Alpha (N1) after a delay to test real-time sync
def simulate_local_edit():
time.sleep(22)
root_alpha = "/tmp/cortex-sync-alpha/test-session-001"
os.makedirs(root_alpha, exist_ok=True)
# 1. Create .cortexignore
print(f"\n[📝] User Sim: Creating .cortexignore on Node Alpha...")
with open(os.path.join(root_alpha, ".cortexignore"), "w") as f:
f.write("*.tmp\nsecret.txt\n")
# 2. Edit hello.py (Should Sync)
sync_file = os.path.join(root_alpha, "hello.py")
print(f"[📝] User Sim: Editing {sync_file} (Should Sync)...")
with open(sync_file, "a") as f:
f.write("\n# Phase 3: Regular edit\n")
# 3. Create secret.txt (Should be IGNORED)
secret_file = os.path.join(root_alpha, "secret.txt")
print(f"[📝] User Sim: Creating {secret_file} (Should be IGNORED)...")
with open(secret_file, "w") as f:
f.write("THIS SHOULD NOT SYNC")
time.sleep(20) # Wait for Lock reliably
# 4. Workspace LOCK Test
print(f"\n[🔒] User Sim: Node Alpha should be LOCKED by Orchestrator now...")
with open(locked_file, "a") as f:
f.write("\n# USER TRYING TO EDIT WHILE LOCKED\n")
time.sleep(15)
# 5. Drift Recovery Test
print(f"\n[💥] User Sim: Corrupting {sync_file} on Node Alpha (Simulating Sync Drift)...")
with open(sync_file, "w") as f:
f.write("CORRUPTED CONTENT")
threading.Thread(target=simulate_local_edit, daemon=True).start()
time.sleep(60)
# 4. Cleanup
print("\n[🛑] Test Finished. Terminating processes...")
orchestrator.terminate()
node1.terminate()
node2.terminate()
time.sleep(2)
orchestrator.kill()
node1.kill()
node2.kill()
print("[✅] Done.")
if __name__ == "__main__":
run_mesh_test()