Newer
Older
cortex-hub / ai-hub / integration_tests / test_agents.py
import pytest
import httpx
import os
import uuid
from conftest import BASE_URL

def _headers():
    uid = os.getenv("SYNC_TEST_USER_ID", "")
    return {"X-User-ID": uid}

def test_agent_lifecycle_and_api_coverage():
    """
    Test suite covering Agent API endpoints:
    1. Register Node
    2. Register Template
    3. Register Session
    4. Register Instance
    5. Register Trigger
    6. Verify Agent Periodical Execution via Session Messages
    7. List Agents
    8. Stop Agent
    9. Remove Agent
    """
    node_id = f"test-agent-node-{uuid.uuid4().hex[:8]}"
    admin_id = os.getenv("SYNC_TEST_USER_ID", "")
    
    with httpx.Client(timeout=10.0) as client:
        # 1. Register a test node specifically for this agent testing
        node_payload = {
            "node_id": node_id,
            "display_name": "Agent Test Node",
            "is_active": True,
            "skill_config": {"shell": {"enabled": True}}
        }
        r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": admin_id}, json=node_payload)
        # If conflicts, clear first
        if r_node.status_code in (400, 409):
            client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": admin_id})
            r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": admin_id}, json=node_payload)
        assert r_node.status_code == 200, f"Node registration failed: {r_node.text}"

        # 2. Deploy Agent using the unified endpoint (matching UI behavior)
        deploy_payload = {
            "name": "Cron Print Agent",
            "description": "Periodically prints to the node console",
            "system_prompt": "You are a cron agent. Run shell tasks periodically.",
            "max_loop_iterations": 1,
            "mesh_node_id": node_id,
            "provider_name": "gemini",
            "trigger_type": "interval",
            "interval_seconds": 5,
            "default_prompt": "Hello test agent! Just reply the word 'Acknowledged' and nothing else.",
            "initial_prompt": None
        }
        r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
        assert r_deploy.status_code == 200, f"Deploy unified endpoint failed: {r_deploy.text}"
        deploy_res = r_deploy.json()
        
        instance_id = deploy_res["instance_id"]
        session_id = deploy_res["session_id"]
        template_id = deploy_res["template_id"]
        
        # 3. VERIFY NODE BINDING (Fixing the exact edge case)
        r_sess_check = client.get(f"{BASE_URL}/sessions/{session_id}", headers=_headers())
        assert r_sess_check.status_code == 200, "Could not fetch agent session"
        assert node_id in (r_sess_check.json().get("attached_node_ids") or []), "Node ID was NOT attached to the session during deployment!"
        
        # We need to fetch the trigger ID for later checks
        r_trig_get = client.get(f"{BASE_URL}/agents/{instance_id}/triggers", headers=_headers())
        trigger_id = r_trig_get.json()[0]["id"]

        
        # 6. Verify Agent Periodical Execution
        print("\\n[test] Waiting 15 seconds to allow background interval scheduler to wake the agent...")
        import time
        time.sleep(15)
        
        r_msgs = client.get(f"{BASE_URL}/sessions/{session_id}/messages", headers=_headers())
        assert r_msgs.status_code == 200, f"Failed to fetch session messages: {r_msgs.text}"
        messages = r_msgs.json()["messages"]
        print(f"\\n[test] Agent Messages Count: {len(messages)}")
        assert len(messages) > 0, "The agent failed to generate any response during its execution loop! It was not invoked or crashed silently."
        assert any(m["sender"] == "assistant" for m in messages), "No assistant (agent) messages generated in history!"

        # 7. Test if agent is in the active list
        r_list = client.get(f"{BASE_URL}/agents", headers=_headers())
        assert r_list.status_code == 200
        agents = r_list.json()
        assert any(a["id"] == instance_id for a in agents), "Instance not found in active list"
        
        # Fetch triggers back
        r_trig_get = client.get(f"{BASE_URL}/agents/{instance_id}/triggers", headers=_headers())
        assert r_trig_get.status_code == 200
        assert any(t["id"] == trigger_id for t in r_trig_get.json()), "Trigger not found on instance"
        
        # 8. Test Stop Update / Config Update
        r_stop = client.patch(f"{BASE_URL}/agents/{instance_id}/status", json={"status": "stopped"}, headers=_headers())
        assert r_stop.status_code == 200
        assert r_stop.json()["status"] == "stopped"

        r_cfg = client.patch(f"{BASE_URL}/agents/{instance_id}/config", json={"name": "Updated Cron Agent", "mesh_node_id": node_id}, headers=_headers())
        assert r_cfg.status_code == 200
        
        # 9. Test Remove (delete agent directly, verifying cascading trigger deletion)
        r_del_agent = client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())
        assert r_del_agent.status_code == 200, f"Cascading delete failed: {r_del_agent.text}"
        
        # Cleanup Node
        client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": admin_id})