import os
import httpx
import pytest
import json
BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8002/api/v1")
def _headers():
return {
"X-User-ID": os.environ.get("SYNC_TEST_USER_ID", "")
}
def test_get_providers():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/users/me/config/providers", headers=_headers())
assert r.status_code == 200, f"Expected 200 OK: {r.text}"
providers = r.json()
assert isinstance(providers, list)
def test_get_binaries_status():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/nodes/provision/binaries/status", headers=_headers())
assert r.status_code == 200, f"Expected 200 OK: {r.text}"
status = r.json()
assert isinstance(status, dict)
def test_get_agent_telemetry_404():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/agents/non_existent_agent_id/telemetry", headers=_headers())
assert r.status_code == 404, f"Expected 404 Not Found: {r.status_code}"
def test_get_agent_dependencies_404():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/agents/non_existent_agent_id/dependencies", headers=_headers())
assert r.status_code == 404, f"Expected 404 Not Found: {r.status_code}"
def test_export_config():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/users/me/config/export", headers=_headers())
assert r.status_code == 200, f"Expected 200 OK: {r.text}"
assert r.headers.get("Content-Type") == "application/x-yaml"
def test_verify_tts_invalid_key():
payload = {
"provider_name": "google_gemini",
"api_key": "junk_invalid_key_123",
"model": ""
}
with httpx.Client(timeout=15.0) as client:
r = client.post(f"{BASE_URL}/users/me/config/verify_tts", headers=_headers(), json=payload)
assert r.status_code == 200
data = r.json()
assert data["success"] is False
def test_verify_stt_invalid_key():
payload = {
"provider_name": "gemini",
"api_key": "junk_invalid_key_123",
"model": ""
}
with httpx.Client(timeout=15.0) as client:
r = client.post(f"{BASE_URL}/users/me/config/verify_stt", headers=_headers(), json=payload)
assert r.status_code == 200
data = r.json()
assert data["success"] is False
def test_export_import_config():
with httpx.Client(timeout=10.0) as client:
# 1. Export
r_export = client.get(f"{BASE_URL}/users/me/config/export", headers=_headers())
assert r_export.status_code == 200
yaml_content = r_export.content
# 2. Import
files = {
"file": ("cortex_config.yaml", yaml_content, "application/x-yaml")
}
r_import = client.post(f"{BASE_URL}/users/me/config/import", headers=_headers(), files=files)
assert r_import.status_code == 200
# Verify response structure
data = r_import.json()
assert "llm" in data
def test_read_root():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/")
assert r.status_code == 200
assert r.json() == {"status": "AI Model Hub is running!"}
def test_get_status():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/status")
assert r.status_code == 200
data = r.json()
assert "status" in data
assert "oidc_enabled" in data
assert "version" in data
def test_get_auth_config():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/users/config")
assert r.status_code == 200
data = r.json()
assert "oidc_configured" in data
assert "allow_password_login" in data
def test_mcp_initialize():
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {}
}
with httpx.Client(timeout=10.0) as client:
r = client.post(f"{BASE_URL}/mcp/", headers=_headers(), json=payload)
assert r.status_code == 200
data = r.json()
assert data["jsonrpc"] == "2.0"
assert data["id"] == 1
assert "result" in data
assert data["result"]["protocolVersion"] == "2025-11-25"
def test_mcp_sse_connection():
with httpx.Client(timeout=10.0) as client:
with client.stream("GET", f"{BASE_URL}/mcp/sse", headers=_headers()) as r:
assert r.status_code == 200
for line in r.iter_lines():
if line.startswith("event: endpoint"):
break # Success
else:
pytest.fail("Did not receive endpoint event")
def test_mcp_tools_list():
payload = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}
with httpx.Client(timeout=10.0) as client:
r = client.post(f"{BASE_URL}/mcp/", headers=_headers(), json=payload)
assert r.status_code == 200
data = r.json()
assert data["jsonrpc"] == "2.0"
assert data["id"] == 2
assert "result" in data
assert "tools" in data["result"]
assert len(data["result"]["tools"]) > 0
def test_mcp_tools_call_get_app_info():
payload = {
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {
"name": "get_app_info",
"arguments": {}
}
}
with httpx.Client(timeout=10.0) as client:
r = client.post(f"{BASE_URL}/mcp/", headers=_headers(), json=payload)
assert r.status_code == 200
data = r.json()
assert data["jsonrpc"] == "2.0"
assert data["id"] == 3
assert "result" in data
assert "content" in data["result"]
def test_admin_config():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/admin/config", headers=_headers())
assert r.status_code == 200
config = r.json()
assert "app" in config
assert "oidc" in config
assert "swarm" in config
def test_admin_config_app():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/admin/config", headers=_headers())
current = r.json()["app"]
payload = {"allow_password_login": current["allow_password_login"]}
r_put = client.put(f"{BASE_URL}/admin/config/app", json=payload, headers=_headers())
assert r_put.status_code == 200
def test_admin_config_oidc_test():
payload = {"server_url": "https://accounts.google.com"}
with httpx.Client(timeout=10.0) as client:
r = client.post(f"{BASE_URL}/admin/config/oidc/test", json=payload, headers=_headers())
assert r.status_code == 200
assert "success" in r.json()
def test_admin_config_swarm_test():
with httpx.Client(timeout=10.0) as client:
nonce = "test-nonce-123"
r_get = client.get(f"{BASE_URL}/admin/config/swarm/test/{nonce}", headers=_headers())
assert r_get.status_code == 200
assert r_get.json()["nonce"] == nonce
import urllib.parse
parsed = urllib.parse.urlparse(BASE_URL)
base_host_port = f"{parsed.scheme}://{parsed.netloc}"
payload = {"external_endpoint": base_host_port}
r_post = client.post(f"{BASE_URL}/admin/config/swarm/test", json=payload, headers=_headers())
assert r_post.status_code == 200
assert "success" in r_post.json()
def test_nodes_admin_mesh_reset():
with httpx.Client(timeout=10.0) as client:
r = client.post(f"{BASE_URL}/nodes/admin/mesh/reset", params={"admin_id": _headers()["X-User-ID"]}, headers=_headers())
assert r.status_code in (200, 500)
def test_get_provision_scripts():
with httpx.Client(timeout=10.0) as client:
node_id = "dummy-node"
r_sh = client.get(f"{BASE_URL}/nodes/provision/sh/{node_id}", params={"token": "fake-token"}, headers=_headers())
assert r_sh.status_code in (200, 403, 404)
r_bin = client.get(f"{BASE_URL}/nodes/provision/binary/{node_id}/linux", params={"token": "fake-token"}, headers=_headers())
assert r_bin.status_code in (200, 403, 404)
def test_agent_update_endpoints():
secret_key = os.getenv("SECRET_KEY", "integration-secret-key-123")
headers = {"X-Agent-Token": secret_key}
with httpx.Client(timeout=10.0) as client:
r_ver = client.get(f"{BASE_URL}/agent/version", headers=headers)
assert r_ver.status_code in (200, 404)
r_dl = client.get(f"{BASE_URL}/agent/download", headers=headers)
assert r_dl.status_code in (200, 404, 503)
r_inst = client.get(f"{BASE_URL}/agent/installer", headers=headers)
assert r_inst.status_code in (200, 404)
@pytest.mark.asyncio
async def test_mcp_messages():
payload = {
"jsonrpc": "2.0",
"id": 4,
"method": "notifications/initialized",
"params": {}
}
async with httpx.AsyncClient(timeout=10.0) as client:
# 1. Connect to SSE to get session_id
async with client.stream("GET", f"{BASE_URL}/mcp/sse", headers=_headers()) as r:
assert r.status_code == 200
session_id = None
async for line in r.aiter_lines():
if line.startswith("data: "):
data_str = line.replace("data: ", "")
# data_str is the messages_url
import urllib.parse
parsed = urllib.parse.urlparse(data_str)
params = urllib.parse.parse_qs(parsed.query)
session_id = params.get("session_id", [None])[0]
break
assert session_id is not None, "Failed to get session_id"
# 2. Send message using a separate client to avoid connection sharing issues
async with httpx.AsyncClient(timeout=10.0) as client2:
r_msg = await client2.post(f"{BASE_URL}/mcp/messages", params={"session_id": session_id}, headers=_headers(), json=payload)
assert r_msg.status_code in (200, 202, 204)
def test_users_login_callback_fail():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/users/login/callback", params={"code": "dummy_code", "state": "http://localhost:3000"})
assert r.status_code in (400, 302, 307, 401, 500)
def test_users_login_redirect():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/users/login", follow_redirects=False)
assert r.status_code == 307 or r.status_code == 302
assert "location" in r.headers
def test_users_logout():
with httpx.Client(timeout=10.0) as client:
r = client.post(f"{BASE_URL}/users/logout", headers=_headers())
assert r.status_code == 200
assert r.json() == {"message": "Logged out successfully"}
def test_users_password_unauthorized():
payload = {
"current_password": "old_password",
"new_password": "new_password"
}
with httpx.Client(timeout=10.0) as client:
r = client.put(f"{BASE_URL}/users/password", json=payload)
assert r.status_code == 401 # Unauthorized without headers
def test_users_password():
payload = {
"current_password": "admin",
"new_password": "new_password"
}
with httpx.Client(timeout=10.0) as client:
r = client.put(f"{BASE_URL}/users/password", json=payload, headers=_headers())
assert r.status_code == 200
# Restore password
payload_restore = {
"current_password": "new_password",
"new_password": "admin"
}
r_restore = client.put(f"{BASE_URL}/users/password", json=payload_restore, headers=_headers())
assert r_restore.status_code == 200
def test_admin_config_oidc_put():
payload = {
"server_url": "https://accounts.google.com",
"client_id": "client_id",
"client_secret": "client_secret"
}
with httpx.Client(timeout=10.0) as client:
r = client.put(f"{BASE_URL}/admin/config/oidc", json=payload, headers=_headers())
assert r.status_code == 200
def test_admin_config_swarm_put():
payload = {
"external_endpoint": "http://localhost:8002"
}
with httpx.Client(timeout=10.0) as client:
r = client.put(f"{BASE_URL}/admin/config/swarm", json=payload, headers=_headers())
assert r.status_code == 200
def test_mcp_manifest():
import urllib.parse
parsed = urllib.parse.urlparse(BASE_URL)
base_host_port = f"{parsed.scheme}://{parsed.netloc}"
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{base_host_port}/.well-known/mcp/manifest.json")
assert r.status_code == 200
def test_agent_binary():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/agent/binary/linux", headers={"X-Agent-Token": "integration-secret-key-123"})
assert r.status_code in (200, 404, 503)
def test_agent_installer_ps1():
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{BASE_URL}/agent/installer/ps1", headers={"X-Agent-Token": "integration-secret-key-123"})
assert r.status_code in (200, 404)