import pytest
import httpx
import os
import uuid
from conftest import BASE_URL
def _headers():
uid = os.getenv("SYNC_TEST_USER_ID", "")
return {"X-User-ID": uid}
def test_agent_lifecycle_and_api_coverage():
"""
Test suite covering Agent API endpoints:
1. Register Node
2. Register Template
3. Register Session
4. Register Instance
5. Register Trigger
6. Verify Agent Periodical Execution via Session Messages
7. List Agents
8. Stop Agent
9. Remove Agent
"""
node_id = f"test-agent-node-{uuid.uuid4().hex[:8]}"
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
with httpx.Client(timeout=10.0) as client:
# 1. Register a test node specifically for this agent testing
node_payload = {
"node_id": node_id,
"display_name": "Agent Test Node",
"is_active": True,
"skill_config": {"shell": {"enabled": True}}
}
r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": admin_id}, json=node_payload)
# If conflicts, clear first
if r_node.status_code in (400, 409):
client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": admin_id})
r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": admin_id}, json=node_payload)
assert r_node.status_code == 200, f"Node registration failed: {r_node.text}"
# 2. Deploy Agent using the unified endpoint (matching UI behavior)
deploy_payload = {
"name": "Cron Print Agent",
"description": "Periodically prints to the node console",
"system_prompt": "You are a cron agent. Run shell tasks periodically.",
"max_loop_iterations": 1,
"mesh_node_id": node_id,
"provider_name": "gemini",
"trigger_type": "interval",
"interval_seconds": 5,
"default_prompt": "Hello test agent! Just reply the word 'Acknowledged' and nothing else.",
"initial_prompt": None
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200, f"Deploy unified endpoint failed: {r_deploy.text}"
deploy_res = r_deploy.json()
instance_id = deploy_res["instance_id"]
session_id = deploy_res["session_id"]
template_id = deploy_res["template_id"]
# 3. VERIFY NODE BINDING (Fixing the exact edge case)
r_sess_check = client.get(f"{BASE_URL}/sessions/{session_id}", headers=_headers())
assert r_sess_check.status_code == 200, "Could not fetch agent session"
assert node_id in (r_sess_check.json().get("attached_node_ids") or []), "Node ID was NOT attached to the session during deployment!"
# We need to fetch the trigger ID for later checks
r_trig_get = client.get(f"{BASE_URL}/agents/{instance_id}/triggers", headers=_headers())
trigger_id = r_trig_get.json()[0]["id"]
# 6. Verify Agent Periodical Execution
print("\n[test] Waiting for background interval scheduler to wake the agent (timeout 60s)...")
import time
messages = []
for _ in range(30): # 30 * 2s = 60s
r_msgs = client.get(f"{BASE_URL}/sessions/{session_id}/messages", headers=_headers())
assert r_msgs.status_code == 200, f"Failed to fetch session messages: {r_msgs.text}"
messages = r_msgs.json()["messages"]
if any(m["sender"] == "assistant" for m in messages):
break
time.sleep(2)
print(f"\n[test] Agent Messages Count: {len(messages)}")
assert any(m["sender"] == "assistant" for m in messages), f"The agent failed to generate any response within 60s! History: {messages}"
# 7. Test if agent is in the active list
r_list = client.get(f"{BASE_URL}/agents", headers=_headers())
assert r_list.status_code == 200
agents = r_list.json()
assert any(a["id"] == instance_id for a in agents), "Instance not found in active list"
# Fetch triggers back
r_trig_get = client.get(f"{BASE_URL}/agents/{instance_id}/triggers", headers=_headers())
assert r_trig_get.status_code == 200
assert any(t["id"] == trigger_id for t in r_trig_get.json()), "Trigger not found on instance"
# 8. Test Stop Update / Config Update
r_stop = client.patch(f"{BASE_URL}/agents/{instance_id}/status", json={"status": "stopped"}, headers=_headers())
assert r_stop.status_code == 200
assert r_stop.json()["status"] == "stopped"
r_cfg = client.patch(f"{BASE_URL}/agents/{instance_id}/config", json={"name": "Updated Cron Agent", "mesh_node_id": node_id}, headers=_headers())
assert r_cfg.status_code == 200
# 9. Test Remove (delete agent directly, verifying cascading trigger deletion)
r_del_agent = client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())
assert r_del_agent.status_code == 200, f"Cascading delete failed: {r_del_agent.text}"
# Cleanup Node
client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": admin_id})
def test_agent_webhook_trigger():
"""
Test Agent Webhook Triggering:
1. Deploy agent with webhook trigger
2. Obtain secret token
3. Call webhook with custom prompt + token
4. Verify response contains custom prompt indicator
"""
node_id = f"test-webhook-node-{uuid.uuid4().hex[:8]}"
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
with httpx.Client(timeout=10.0) as client:
# 1. Register a test node
node_payload = {
"node_id": node_id,
"display_name": "Webhook Test Node",
"is_active": True,
"skill_config": {"shell": {"enabled": True}}
}
client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": admin_id}, json=node_payload)
# 2. Deploy Agent with Webhook Trigger
deploy_payload = {
"name": "Webhook Agent",
"system_prompt": "You are a helpful assistant. Just acknowledge the received webhook prompt.",
"max_loop_iterations": 1,
"mesh_node_id": node_id,
"provider_name": "gemini",
"trigger_type": "webhook",
"default_prompt": "Standard Webhook Prompt",
"initial_prompt": None
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200, f"Deploy failed: {r_deploy.text}"
deploy_res = r_deploy.json()
instance_id = deploy_res["instance_id"]
session_id = deploy_res["session_id"]
# 3. Get the webhook secret
r_trig = client.get(f"{BASE_URL}/agents/{instance_id}/triggers", headers=_headers())
assert r_trig.status_code == 200
webhook_trigger = next(t for t in r_trig.json() if t["trigger_type"] == "webhook")
secret = webhook_trigger["webhook_secret"]
assert secret is not None
# 4. Trigger the Webhook with a custom prompt
custom_msg = "INTER-AGENT-SIGNAL-BEEP-BOOP"
r_hook = client.post(
f"{BASE_URL}/agents/{instance_id}/webhook",
params={"token": secret},
json={"prompt": f"Please respond exactly with: {custom_msg}"}
)
assert r_hook.status_code == 202, f"Webhook trigger failed: {r_hook.text}"
# 5. Wait for agent to process
print(f"\n[test] Waiting for agent to process webhook signal '{custom_msg}'...")
import time
found = False
for _ in range(30):
r_msgs = client.get(f"{BASE_URL}/sessions/{session_id}/messages", headers=_headers())
msgs = r_msgs.json()["messages"]
# Look for assistant response containing our custom signal
if any(custom_msg in (m.get("content") or "") for m in msgs if m["sender"] == "assistant"):
found = True
break
time.sleep(2)
assert found, "The agent did not process the custom webhook prompt correctly."
print("[test] Webhook custom prompt processed successfully!")
# 6. Cleanup
client.delete(f"{BASE_URL}/agents/{instance_id}", headers=_headers())
client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": admin_id})