import pytest
import httpx
import os
import uuid
import time
from conftest import BASE_URL
def _headers():
uid = os.getenv("SYNC_TEST_USER_ID", "")
return {"X-User-ID": uid}
def test_coworker_sc1_mirror_check():
"""
SC-1 (Mirror Check):
1. Deploy an agent with co_worker_quality_gate=True.
2. Wait for the agent to initialize (Status: evaluating).
3. Use the /nodes/{id}/fs/ls API to verify the .cortex folder existence.
"""
node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1")
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
instance_id = None
with httpx.Client(timeout=30.0) as client:
try:
# 2. Deploy Agent with co_worker_quality_gate=True
deploy_payload = {
"name": "SC-1 Mirror Agent",
"description": "Tests .cortex mirror initialization",
"system_prompt": "You are a test agent. Create a simple hello world python script.",
"max_loop_iterations": 1,
"mesh_node_id": node_id,
"provider_name": "gemini",
"model_name": "gemini-1.5-flash",
"trigger_type": "interval",
"interval_seconds": 60,
"co_worker_quality_gate": True,
"initial_prompt": "Create app.py that prints hello.",
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200, f"Deploy failed: {r_deploy.text}"
instance_id = r_deploy.json()["instance_id"]
# 3. Wait for agent to initialize (Status: evaluating)
print(f"\n[test] Waiting for agent {instance_id} to reach 'evaluating' status...")
found_evaluating = False
sync_workspace_id = r_deploy.json().get("sync_workspace_id")
for _ in range(30): # 60s timeout
r_agent = client.get(f"{BASE_URL}/agents/{instance_id}", headers=_headers())
if r_agent.status_code == 200:
agent = r_agent.json()
status = agent.get("evaluation_status")
print(f" [debug] Current status: '{status}'")
if status and status != "None":
found_evaluating = True
break
time.sleep(2)
assert found_evaluating, f"Agent did not reach 'evaluating' status."
# 4. Use the /nodes/{id}/fs/ls API to verify the .cortex folder existence
params = {"path": ".cortex", "session_id": sync_workspace_id}
r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params=params, headers=_headers())
assert r_ls.status_code == 200, f"Failed to ls .cortex: {r_ls.text}"
data = r_ls.json()
filenames = [f["name"] for f in data.get("files", [])]
# Verify rubric.md and history.log are present as per test plan
assert any("rubric.md" in f for f in filenames), f"rubric.md not found in {filenames}"
assert any("history.log" in f for f in filenames), f"history.log not found in {filenames}"
finally:
if instance_id:
client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())
def test_coworker_sc3_limit_check():
"""
SC-3 (Limit Check):
1. Deploy an agent with max_rework_attempts=1 and rework_threshold=100.
2. Trigger a run.
3. Poll the /agents endpoint until evaluation_status == 'failed_limit'.
4. Verify the latest_quality_score is present in the response.
"""
node_id = os.getenv("SYNC_TEST_NODE2", "test-node-2")
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
instance_id = None
with httpx.Client(timeout=30.0) as client:
try:
# 2. Deploy Agent with max_rework_attempts=1 and rework_threshold=100
deploy_payload = {
"name": "SC-3 Limit Agent",
"system_prompt": "You are a test agent. Create a simple hello world python script.",
"max_loop_iterations": 5,
"mesh_node_id": node_id,
"provider_name": "gemini",
"model_name": "gemini-1.5-flash",
"trigger_type": "webhook", # Use webhook to trigger manually
"co_worker_quality_gate": True,
"max_rework_attempts": 1,
"rework_threshold": 100, # Impossible to pass
"default_prompt": "Create app.py that prints hello.",
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200, f"Deploy failed: {r_deploy.text}"
instance_id = r_deploy.json()["instance_id"]
# 3. Get the webhook secret and trigger it
r_trig = client.get(f"{BASE_URL}/agents/{instance_id}/triggers", headers=_headers())
webhook_trigger = next(t for t in r_trig.json() if t["trigger_type"] == "webhook")
secret = webhook_trigger["webhook_secret"]
r_hook = client.post(
f"{BASE_URL}/agents/{instance_id}/webhook",
params={"token": secret},
json={"prompt": "Go!"}
)
assert r_hook.status_code == 202
# 4. Poll until evaluation_status == 'failed_limit'
print(f"\n[test] Waiting for agent {instance_id} to reach 'failed_limit' status...")
failed_limit = False
latest_score = None
for _ in range(120): # 240s timeout
r_agents = client.get(f"{BASE_URL}/agents", headers=_headers())
if r_agents.status_code == 200:
agents = r_agents.json()
agent = next((a for a in agents if a["id"] == instance_id), None)
if agent:
status = agent.get("evaluation_status")
latest_score = agent.get("latest_quality_score")
if status == "failed_limit":
failed_limit = True
break
time.sleep(2)
assert failed_limit, f"Agent did not reach 'failed_limit' status."
assert latest_score is not None, "latest_quality_score should be present in the response"
finally:
if instance_id:
client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())
def test_coworker_sc2_rework_loop():
"""
SC-2 (The Rework Loop):
1. Deploy agent with a conflicting requirement to force at least one failure.
2. Poll for evaluation_status to be 'reworking'.
3. Verify history.log has a failed entry.
"""
node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1")
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
instance_id = None
with httpx.Client(timeout=30.0) as client:
try:
# 2. Deploy Agent with rework loop
deploy_payload = {
"name": "SC-2 Rework Agent",
"system_prompt": "You are a stubborn tester.",
"max_loop_iterations": 2,
"mesh_node_id": node_id,
"provider_name": "gemini",
"model_name": "gemini-1.5-flash",
"trigger_type": "webhook",
"co_worker_quality_gate": True,
"max_rework_attempts": 3,
"rework_threshold": 100,
"default_prompt": "Create app.py that prints hello, but deliberately make a syntax error on your first try.",
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200
instance_id = r_deploy.json()["instance_id"]
r_trig = client.get(f"{BASE_URL}/agents/{instance_id}/triggers", headers=_headers())
secret = next(t for t in r_trig.json() if t["trigger_type"] == "webhook")["webhook_secret"]
client.post(f"{BASE_URL}/agents/{instance_id}/webhook", params={"token": secret}, json={"prompt": "Go!"})
found_reworking = False
for _ in range(120):
r_agents = client.get(f"{BASE_URL}/agents", headers=_headers())
if r_agents.status_code == 200:
agent = next((a for a in r_agents.json() if a["id"] == instance_id), None)
if agent and agent.get("evaluation_status") == "reworking":
found_reworking = True
break
time.sleep(2)
assert found_reworking, "Agent never entered 'reworking' status."
sync_workspace_id = r_deploy.json().get("sync_workspace_id")
r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/cat", params={"path": ".cortex/history.log", "session_id": sync_workspace_id}, headers=_headers())
assert r_ls.status_code == 200
assert "score" in r_ls.text, "history.log should contain score entries if it reached rework phase."
finally:
if instance_id: client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())
def test_coworker_sc4_context_compaction():
"""
SC-4 (Context Compaction):
1. Simply deploy an agent and mock/simulate conditions if actual compaction is hard to trigger.
(This is a placeholder test that checks the agent can take higher max_rework_attempts bounds.)
"""
node_id = os.getenv("SYNC_TEST_NODE2", "test-node-2")
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
instance_id = None
with httpx.Client(timeout=30.0) as client:
try:
# Register node check removed (leveraging session-scoped nodes)
deploy_payload = {
"name": "SC-4 Compaction Agent",
"system_prompt": "Tester",
"max_loop_iterations": 1,
"mesh_node_id": node_id,
"provider_name": "gemini",
"model_name": "gemini-1.5-flash",
"trigger_type": "interval",
"co_worker_quality_gate": True,
"max_rework_attempts": 5,
"rework_threshold": 95,
"default_prompt": "Placeholder",
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200
instance_id = r_deploy.json()["instance_id"]
finally:
if instance_id: client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())