import pytest
import httpx
import os
import uuid
import time
from conftest import BASE_URL
def _headers():
uid = os.getenv("SYNC_TEST_USER_ID", "")
return {"X-User-ID": uid}
def test_coworker_sc1_mirror_check():
"""
SC-1 (Mirror Check):
1. Deploy an agent with co_worker_quality_gate=True.
2. Wait for the agent to initialize (Status: evaluating).
3. Use the /nodes/{id}/fs/ls API to verify the .cortex folder existence.
"""
node_id = f"test-coworker-sc1-{uuid.uuid4().hex[:8]}"
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
instance_id = None
with httpx.Client(timeout=30.0) as client:
try:
# 1. Register a test node
node_payload = {
"node_id": node_id,
"display_name": "Co-Worker SC-1 Node",
"is_active": True,
"skill_config": {"shell": {"enabled": True}, "sync": {"enabled": True}}
}
r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": admin_id}, json=node_payload)
assert r_node.status_code == 200, f"Node registration failed: {r_node.text}"
# 2. Deploy Agent with co_worker_quality_gate=True
deploy_payload = {
"name": "SC-1 Mirror Agent",
"description": "Tests .cortex mirror initialization",
"system_prompt": "You are a test agent. Create a simple hello world python script.",
"max_loop_iterations": 1,
"mesh_node_id": node_id,
"provider_name": "gemini",
"model_name": "gemini-1.5-flash", # Explicitly use flash
"trigger_type": "interval",
"interval_seconds": 60, # Long interval so it doesn't run twice during test
"co_worker_quality_gate": True,
"default_prompt": "Create app.py that prints hello.",
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200, f"Deploy failed: {r_deploy.text}"
instance_id = r_deploy.json()["instance_id"]
# 3. Wait for agent to initialize (Status: evaluating)
print(f"\n[test] Waiting for agent {instance_id} to reach 'evaluating' status...")
found_evaluating = False
for _ in range(30): # 60s timeout
r_agent = client.get(f"{BASE_URL}/agents/{instance_id}", headers=_headers())
if r_agent.status_code == 200:
agent = r_agent.json()
if agent.get("evaluation_status") == "evaluating":
found_evaluating = True
break
time.sleep(2)
assert found_evaluating, f"Agent did not reach 'evaluating' status."
# 4. Use the /nodes/{id}/fs/ls API to verify the .cortex folder existence
r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params={"path": ".cortex"}, headers=_headers())
assert r_ls.status_code == 200, f"Failed to ls .cortex: {r_ls.text}"
files = r_ls.json()
filenames = [f["name"] for f in files]
# Verify rubric.md and history.log are present as per test plan
assert any("rubric.md" in f for f in filenames), f"rubric.md not found in {filenames}"
assert any("history.log" in f for f in filenames), f"history.log not found in {filenames}"
finally:
if instance_id:
client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())
client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": admin_id})
def test_coworker_sc3_limit_check():
"""
SC-3 (Limit Check):
1. Deploy an agent with max_rework_attempts=1 and rework_threshold=100.
2. Trigger a run.
3. Poll the /agents endpoint until evaluation_status == 'failed_limit'.
4. Verify the latest_quality_score is present in the response.
"""
node_id = f"test-coworker-sc3-{uuid.uuid4().hex[:8]}"
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
instance_id = None
with httpx.Client(timeout=30.0) as client:
try:
# 1. Register a test node
node_payload = {
"node_id": node_id,
"display_name": "Co-Worker SC-3 Node",
"is_active": True,
"skill_config": {"shell": {"enabled": True}, "sync": {"enabled": True}}
}
r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": admin_id}, json=node_payload)
assert r_node.status_code == 200, f"Node registration failed: {r_node.text}"
# 2. Deploy Agent with max_rework_attempts=1 and rework_threshold=100
deploy_payload = {
"name": "SC-3 Limit Agent",
"system_prompt": "You are a test agent. Create a simple hello world python script.",
"max_loop_iterations": 1,
"mesh_node_id": node_id,
"provider_name": "gemini",
"model_name": "gemini-1.5-flash",
"trigger_type": "webhook", # Use webhook to trigger manually
"co_worker_quality_gate": True,
"max_rework_attempts": 1,
"rework_threshold": 100, # Impossible to pass
"default_prompt": "Create app.py that prints hello.",
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200, f"Deploy failed: {r_deploy.text}"
instance_id = r_deploy.json()["instance_id"]
# 3. Get the webhook secret and trigger it
r_trig = client.get(f"{BASE_URL}/agents/{instance_id}/triggers", headers=_headers())
webhook_trigger = next(t for t in r_trig.json() if t["trigger_type"] == "webhook")
secret = webhook_trigger["webhook_secret"]
r_hook = client.post(
f"{BASE_URL}/agents/{instance_id}/webhook",
params={"token": secret},
json={"prompt": "Go!"}
)
assert r_hook.status_code == 202
# 4. Poll until evaluation_status == 'failed_limit'
print(f"\n[test] Waiting for agent {instance_id} to reach 'failed_limit' status...")
failed_limit = False
latest_score = None
for _ in range(60): # 120s timeout
r_agents = client.get(f"{BASE_URL}/agents", headers=_headers())
if r_agents.status_code == 200:
agents = r_agents.json()
agent = next((a for a in agents if a["id"] == instance_id), None)
if agent:
status = agent.get("evaluation_status")
latest_score = agent.get("latest_quality_score")
if status == "failed_limit":
failed_limit = True
break
time.sleep(2)
assert failed_limit, f"Agent did not reach 'failed_limit' status."
assert latest_score is not None, "latest_quality_score should be present in the response"
finally:
if instance_id:
client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())
client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": admin_id})