import pytest
import httpx
import os
import time
from conftest import BASE_URL
def _headers():
uid = os.getenv("SYNC_TEST_USER_ID", "")
return {"X-User-ID": uid}
def test_parallel_rubric_generation():
"""
Verifies that rubric generation and main agent execution happen in parallel.
We check for specific status transitions that indicate parallel work.
"""
node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1")
instance_id = None
with httpx.Client(timeout=30.0) as client:
try:
# 1. Deploy Agent with co_worker_quality_gate=True
deploy_payload = {
"name": "Parallel Coworker Test",
"description": "Tests parallel rubric generation",
"system_prompt": "You are a helpful assistant. Provide a brief summary of the history of the internet.",
"max_loop_iterations": 1,
"mesh_node_id": node_id,
"provider_name": "gemini",
"model_name": "gemini-1.5-flash",
"trigger_type": "webhook",
"co_worker_quality_gate": True,
"default_prompt": "Tell me about the history of the internet.",
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200, f"Deploy failed: {r_deploy.text}"
instance_id = r_deploy.json()["instance_id"]
# 2. Trigger the agent
r_trig = client.get(f"{BASE_URL}/agents/{instance_id}/triggers", headers=_headers())
secret = next(t for t in r_trig.json() if t["trigger_type"] == "webhook")["webhook_secret"]
client.post(f"{BASE_URL}/agents/{instance_id}/webhook", params={"token": secret}, json={"prompt": "Go!"})
# 3. Poll for the parallel status
print(f"\n[test] Polling for parallel status for agent {instance_id}...")
found_parallel_status = False
found_executing_status = False
# The window for "Initiating parallel rubric" might be small,
# so we poll frequently.
for _ in range(200):
r_agent = client.get(f"{BASE_URL}/agents/{instance_id}", headers=_headers())
if r_agent.status_code == 200:
agent = r_agent.json()
status = agent.get("evaluation_status")
print(f" [debug] Status: '{status}'")
if status and "Initiating parallel rubric" in status:
found_parallel_status = True
if status and "Main Agent" in status and "Executing" in status:
found_executing_status = True
# If we have already seen a parallel status OR we are now executing,
# that's good. The goal is to see 'Executing' while rubric might have been parallel.
break
time.sleep(0.5)
assert found_executing_status, "Agent did not reach executing status."
# 4. Wait for completion and evaluation
print(f"[test] Waiting for agent {instance_id} to finish evaluation...")
passed_or_failed = False
for _ in range(300):
r_agent = client.get(f"{BASE_URL}/agents/{instance_id}", headers=_headers())
if r_agent.status_code == 200:
agent = r_agent.json()
status = agent.get("evaluation_status")
print(f" [debug] Final Status Path: '{status}'")
if status and ("PASSED" in status or "failed" in status or "failed_limit" in status):
passed_or_failed = True
break
time.sleep(2)
assert passed_or_failed, "Agent did not finish evaluation."
finally:
if instance_id:
client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())