Newer
Older
cortex-hub / ai-hub / integration_tests / test_coworker_flow.py
import pytest
import httpx
import os
import uuid
import time
from conftest import BASE_URL

def _headers():
    uid = os.getenv("SYNC_TEST_USER_ID", "")
    return {"X-User-ID": uid}

def test_coworker_sc1_mirror_check():
    """
    SC-1 (Mirror Check):
    1. Deploy an agent with co_worker_quality_gate=True.
    2. Wait for the agent to initialize (Status: evaluating).
    3. Use the /nodes/{id}/fs/ls API to verify the .cortex folder existence.
    """
    node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1")
    admin_id = os.getenv("SYNC_TEST_USER_ID", "")
    instance_id = None
    
    with httpx.Client(timeout=30.0) as client:
        try:
            # 2. Deploy Agent with co_worker_quality_gate=True
            deploy_payload = {
                "name": "SC-1 Mirror Agent",
                "description": "Tests .cortex mirror initialization",
                "system_prompt": "You are a test agent. Create a simple hello world python script.",
                "max_loop_iterations": 1,
                "mesh_node_id": node_id,
                "provider_name": "gemini",
                "model_name": "gemini-1.5-flash", 
                "trigger_type": "interval",
                "interval_seconds": 60, 
                "co_worker_quality_gate": True,
                "initial_prompt": "Create app.py that prints hello.",
            }
            r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
            assert r_deploy.status_code == 200, f"Deploy failed: {r_deploy.text}"
            instance_id = r_deploy.json()["instance_id"]

            # 3. Wait for agent to initialize (Status: evaluating)
            print(f"\n[test] Waiting for agent {instance_id} to reach 'evaluating' status...")
            found_evaluating = False
            sync_workspace_id = r_deploy.json().get("sync_workspace_id")
            for _ in range(30): # 60s timeout
                r_agent = client.get(f"{BASE_URL}/agents/{instance_id}", headers=_headers())
                if r_agent.status_code == 200:
                    agent = r_agent.json()
                    status = agent.get("evaluation_status")
                    print(f"  [debug] Current status: '{status}'")
                    if status and status != "None":
                        found_evaluating = True
                        break
                time.sleep(2)
            
            assert found_evaluating, f"Agent did not reach 'evaluating' status."

            # 4. Use the /nodes/{id}/fs/ls API to verify the .cortex folder existence
            params = {"path": ".cortex", "session_id": sync_workspace_id}
            r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params=params, headers=_headers())
            assert r_ls.status_code == 200, f"Failed to ls .cortex: {r_ls.text}"
            data = r_ls.json()
            filenames = [f["name"] for f in data.get("files", [])]
            # Verify rubric.md and history.log are present as per test plan
            assert any("rubric.md" in f for f in filenames), f"rubric.md not found in {filenames}"
            assert any("history.log" in f for f in filenames), f"history.log not found in {filenames}"

        finally:
            if instance_id:
                client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())

def test_coworker_sc3_limit_check():
    """
    SC-3 (Limit Check):
    1. Deploy an agent with max_rework_attempts=1 and rework_threshold=100.
    2. Trigger a run.
    3. Poll the /agents endpoint until evaluation_status == 'failed_limit'.
    4. Verify the latest_quality_score is present in the response.
    """
    node_id = os.getenv("SYNC_TEST_NODE2", "test-node-2")
    admin_id = os.getenv("SYNC_TEST_USER_ID", "")
    instance_id = None
    
    with httpx.Client(timeout=30.0) as client:
        try:
            # 2. Deploy Agent with max_rework_attempts=1 and rework_threshold=100
            deploy_payload = {
                "name": "SC-3 Limit Agent",
                "system_prompt": "You are a test agent. Create a simple hello world python script.",
                "max_loop_iterations": 2,
                "mesh_node_id": node_id,
                "provider_name": "gemini",
                "model_name": "gemini-1.5-flash",
                "trigger_type": "webhook", # Use webhook to trigger manually
                "co_worker_quality_gate": True,
                "max_rework_attempts": 1,
                "rework_threshold": 100, # Impossible to pass
                "default_prompt": "Create app.py that prints hello.",
            }
            r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
            assert r_deploy.status_code == 200, f"Deploy failed: {r_deploy.text}"
            instance_id = r_deploy.json()["instance_id"]

            # 3. Get the webhook secret and trigger it
            r_trig = client.get(f"{BASE_URL}/agents/{instance_id}/triggers", headers=_headers())
            webhook_trigger = next(t for t in r_trig.json() if t["trigger_type"] == "webhook")
            secret = webhook_trigger["webhook_secret"]
            
            r_hook = client.post(
                f"{BASE_URL}/agents/{instance_id}/webhook", 
                params={"token": secret},
                json={"prompt": "Go!"}
            )
            assert r_hook.status_code == 202

            # 4. Poll until evaluation_status == 'failed_limit'
            print(f"\n[test] Waiting for agent {instance_id} to reach 'failed_limit' status...")
            failed_limit = False
            latest_score = None
            for _ in range(180): # 360s timeout
                r_agents = client.get(f"{BASE_URL}/agents", headers=_headers())
                if r_agents.status_code == 200:
                    agents = r_agents.json()
                    agent = next((a for a in agents if a["id"] == instance_id), None)
                    if agent:
                        status = agent.get("evaluation_status")
                        latest_score = agent.get("latest_quality_score")
                        if status == "failed_limit":
                            failed_limit = True
                            break
                time.sleep(2)
            
            assert failed_limit, f"Agent did not reach 'failed_limit' status."
            assert latest_score is not None, "latest_quality_score should be present in the response"

        finally:
            if instance_id:
                client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())

def test_coworker_sc2_rework_loop():
    """
    SC-2 (The Rework Loop):
    1. Deploy agent with a conflicting requirement to force at least one failure.
    2. Poll for evaluation_status to be 'reworking'.
    3. Verify history.log has a failed entry.
    """
    node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1")
    admin_id = os.getenv("SYNC_TEST_USER_ID", "")
    instance_id = None
    
    with httpx.Client(timeout=30.0) as client:
        try:
            # 2. Deploy Agent with rework loop
            deploy_payload = {
                "name": "SC-2 Rework Agent",
                "system_prompt": "You are a stubborn tester.",
                "max_loop_iterations": 2,
                "mesh_node_id": node_id,
                "provider_name": "gemini",
                "model_name": "gemini-1.5-flash",
                "trigger_type": "webhook",
                "co_worker_quality_gate": True,
                "max_rework_attempts": 3,
                "rework_threshold": 101, 
                "default_prompt": "Create app.py that prints hello, but deliberately make a syntax error on your first try.",
            }
            r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
            assert r_deploy.status_code == 200
            instance_id = r_deploy.json()["instance_id"]

            r_trig = client.get(f"{BASE_URL}/agents/{instance_id}/triggers", headers=_headers())
            secret = next(t for t in r_trig.json() if t["trigger_type"] == "webhook")["webhook_secret"]
            
            client.post(f"{BASE_URL}/agents/{instance_id}/webhook", params={"token": secret}, json={"prompt": "Go!"})

            found_reworking = False
            for _ in range(120):
                r_agents = client.get(f"{BASE_URL}/agents", headers=_headers())
                if r_agents.status_code == 200:
                    agent = next((a for a in r_agents.json() if a["id"] == instance_id), None)
                    if agent and agent.get("evaluation_status") == "reworking":
                        found_reworking = True
                        break
                time.sleep(2)
            
            assert found_reworking, "Agent never entered 'reworking' status."
            
            sync_workspace_id = r_deploy.json().get("sync_workspace_id")
            r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/cat", params={"path": ".cortex/history.log", "session_id": sync_workspace_id}, headers=_headers())
            assert r_ls.status_code == 200
            assert "score" in r_ls.text, "history.log should contain score entries if it reached rework phase."

        finally:
            if instance_id: client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())

def test_coworker_sc4_context_compaction():
    """
    SC-4 (Context Compaction):
    1. Simply deploy an agent and mock/simulate conditions if actual compaction is hard to trigger.
    (This is a placeholder test that checks the agent can take higher max_rework_attempts bounds.)
    """
    node_id = os.getenv("SYNC_TEST_NODE2", "test-node-2")
    admin_id = os.getenv("SYNC_TEST_USER_ID", "")
    instance_id = None
    with httpx.Client(timeout=30.0) as client:
        try:
            # Register node check removed (leveraging session-scoped nodes)

            deploy_payload = {
                "name": "SC-4 Compaction Agent",
                "system_prompt": "Tester",
                "max_loop_iterations": 1,
                "mesh_node_id": node_id,
                "provider_name": "gemini",
                "model_name": "gemini-1.5-flash",
                "trigger_type": "interval",
                "co_worker_quality_gate": True,
                "max_rework_attempts": 5, 
                "rework_threshold": 95,
                "default_prompt": "Placeholder",
            }
            r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
            assert r_deploy.status_code == 200
            instance_id = r_deploy.json()["instance_id"]
        finally:
            if instance_id: client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())