import os
import httpx
import pytest
import uuid
BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8002/api/v1")
def _get_user_id() -> str:
"""Gets the active super-admin or test runner user ID."""
return os.getenv("SYNC_TEST_USER_ID", "c4401d34-8784-4d6e-93a0-c702bd202b66")
def _headers():
return {"X-User-ID": _get_user_id(), "Authorization": os.getenv("SYNC_TEST_AUTH_TOKEN", "")}
import subprocess
import time
def test_node_full_lifecycle_and_api_coverage():
user_id = _get_user_id()
node_id = f"test-integration-node-{uuid.uuid4().hex[:8]}"
display_name = f"Integration Lifecycle {node_id}"
payload = {
"node_id": node_id,
"display_name": display_name,
"is_active": True,
"skill_config": {"shell": {"enabled": True}, "sync": {"enabled": True}}
}
try:
with httpx.Client(timeout=15.0) as client:
# --- ADMIN ENDPOINTS ---
# POST /nodes/admin
r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": user_id}, json=payload, headers=_headers())
assert r_node.status_code == 200
node_data = r_node.json()
invite_token = node_data["invite_token"]
# GET /nodes/admin
r_list = client.get(f"{BASE_URL}/nodes/admin", params={"admin_id": user_id}, headers=_headers())
assert r_list.status_code == 200
assert any(n["node_id"] == node_id for n in r_list.json())
# GET /nodes/admin/{node_id}
r_get = client.get(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, headers=_headers())
assert r_get.status_code == 200
# PATCH /nodes/admin/{node_id}
r_patch = client.patch(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, json={"display_name": "Updated Name"}, headers=_headers())
assert r_patch.status_code == 200
# POST /nodes/admin/{node_id}/access
group_r = client.post(f"{BASE_URL}/users/admin/groups", json={"name": f"Group for {node_id}"}, headers=_headers())
group_id = group_r.json()["id"]
client.put(f"{BASE_URL}/users/admin/users/{user_id}/group", json={"group_id": group_id}, headers=_headers())
acc_r = client.post(f"{BASE_URL}/nodes/admin/{node_id}/access", params={"admin_id": user_id}, json={"group_id": group_id, "access_level": "use"}, headers=_headers())
assert acc_r.status_code == 200
# DELETE /nodes/admin/{node_id}/access/{group_id} (Revoke and re-grant for test)
rev_r = client.delete(f"{BASE_URL}/nodes/admin/{node_id}/access/{group_id}", params={"admin_id": user_id}, headers=_headers())
assert rev_r.status_code == 200
client.post(f"{BASE_URL}/nodes/admin/{node_id}/access", params={"admin_id": user_id}, json={"group_id": group_id, "access_level": "use"}, headers=_headers())
# GET /nodes/admin/{node_id}/config.yaml
conf_r = client.get(f"{BASE_URL}/nodes/admin/{node_id}/config.yaml", params={"admin_id": user_id}, headers=_headers())
assert conf_r.status_code == 200
# GET /nodes/provision/{node_id}
prov_r = client.get(f"{BASE_URL}/nodes/provision/{node_id}", params={"token": invite_token}, headers=_headers())
assert prov_r.status_code == 200
# GET /nodes/admin/{node_id}/download
dl_r = client.get(f"{BASE_URL}/nodes/admin/{node_id}/download", params={"admin_id": user_id}, headers=_headers())
assert dl_r.status_code == 200
# POST /nodes/validate-token (Internal)
val_r = client.post(f"{BASE_URL}/nodes/validate-token", params={"token": invite_token, "node_id": node_id}, headers=_headers())
assert val_r.status_code == 200
# --- SPAWN NODE IMPERATIVELY ---
network = "cortex-hub_default"
image_proc = subprocess.run(["docker", "build", "-q", "./agent-node"], capture_output=True, text=True)
image_id = image_proc.stdout.strip()
subprocess.run([
"docker", "run", "-d", "--name", node_id, "--network", network,
"-e", f"HUB_URL=http://ai-hub:8000", "-e", f"AGENT_NODE_ID={node_id}", "-e", f"AGENT_AUTH_TOKEN={invite_token}",
"-e", "GRPC_ENDPOINT=ai-hub:50051", "-e", "AGENT_TLS_ENABLED=false",
"-v", f"{node_id}_sync:/tmp/cortex-sync", image_id
], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# Wait for connection
connected = False
for _ in range(30):
st = client.get(f"{BASE_URL}/nodes/{node_id}/status", headers=_headers())
if st.status_code == 200 and st.json().get("status") == "online":
connected = True
break
time.sleep(1)
assert connected, "Node never successfully connected to Hub"
# --- USER ENDPOINTS ---
# GET /nodes/
n_list = client.get(f"{BASE_URL}/nodes/", params={"user_id": user_id}, headers=_headers())
assert n_list.status_code == 200
# GET /nodes/{node_id}/status (Already tested above)
# GET /nodes/{node_id}/terminal
term_r = client.get(f"{BASE_URL}/nodes/{node_id}/terminal", headers=_headers())
assert term_r.status_code == 200
# POST /nodes/{node_id}/dispatch
dp_r = client.post(f"{BASE_URL}/nodes/{node_id}/dispatch", params={"user_id": user_id}, json={"command": "ls -la /"}, headers=_headers())
assert dp_r.status_code == 200
task_id = dp_r.json().get("task_id")
# POST /nodes/{node_id}/cancel
can_r = client.post(f"{BASE_URL}/nodes/{node_id}/cancel", params={"task_id": task_id}, headers=_headers())
assert can_r.status_code == 200
# PATCH & GET /nodes/preferences
pref_p = client.patch(f"{BASE_URL}/nodes/preferences", params={"user_id": user_id}, json={"default_node_ids": [node_id]}, headers=_headers())
assert pref_p.status_code == 200
pref_g = client.get(f"{BASE_URL}/nodes/preferences", params={"user_id": user_id}, headers=_headers())
assert pref_g.status_code == 200
# --- FILE NAVIGATOR ENDPOINTS ---
r_sess = client.post(f"{BASE_URL}/sessions/", headers=_headers(), json={"user_id": user_id, "provider_name": "gemini", "feature_name": "swarm_control"})
sess_id = str(r_sess.json().get("id"))
# POST /fs/touch
test_fname = f"test_file_{uuid.uuid4().hex[:6]}.txt"
test_file_path = test_fname # NO LEADING SLASH
fs_touch = client.post(f"{BASE_URL}/nodes/{node_id}/fs/touch", json={"path": test_file_path, "is_dir": False, "session_id": sess_id}, headers=_headers())
assert fs_touch.status_code == 200, fs_touch.text
# GET /fs/ls (POLL)
found = False
for _ in range(5):
fs_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params={"path": "/", "session_id": sess_id}, headers=_headers())
assert fs_ls.status_code == 200
items = fs_ls.json().get("files", [])
if any(item.get("name") == test_fname for item in items):
found = True
break
time.sleep(1)
assert found, f"Expected {test_fname} not found in ls output: {items}"
# POST /fs/upload
files = {"file": ("test_file2.txt", b"Hello Cortex!")}
fs_up = client.post(f"{BASE_URL}/nodes/{node_id}/fs/upload", params={"path": "/", "session_id": sess_id}, files=files, headers=_headers())
assert fs_up.status_code == 200
# GET /fs/cat (POLL)
found_content = False
for _ in range(5):
fs_cat = client.get(f"{BASE_URL}/nodes/{node_id}/fs/cat", params={"path": "test_file2.txt", "session_id": sess_id}, headers=_headers())
if fs_cat.status_code == 200 and getattr(fs_cat, "text", "") and "Hello Cortex!" in getattr(fs_cat, "text", ""):
found_content = True
break
time.sleep(1)
assert found_content, "Uploaded content not returned by cat"
# GET /fs/download
fs_dl = client.get(f"{BASE_URL}/nodes/{node_id}/fs/download", params={"path": "test_file2.txt", "session_id": sess_id}, headers=_headers())
assert fs_dl.status_code == 200
assert fs_dl.content == b"Hello Cortex!"
# POST /fs/rm (Delete both files)
fs_rm = client.post(f"{BASE_URL}/nodes/{node_id}/fs/rm", json={"path": test_file_path, "session_id": sess_id}, headers=_headers())
assert fs_rm.status_code == 200
client.post(f"{BASE_URL}/nodes/{node_id}/fs/rm", json={"path": "test_file2.txt", "session_id": sess_id}, headers=_headers())
# Verify deletion with GET /fs/cat returning 404 (POLL)
deleted = False
last_err = ""
for _ in range(5):
fs_cat_404 = client.get(f"{BASE_URL}/nodes/{node_id}/fs/cat", params={"path": test_file_path, "session_id": sess_id}, headers=_headers())
if fs_cat_404.status_code == 404:
deleted = True
break
else:
last_err = f"Code: {fs_cat_404.status_code}, Text: {fs_cat_404.text}"
time.sleep(1)
assert deleted, f"File was not deleted. Last response: {last_err}"
# --- TEARDOWN ---
# DELETE /nodes/admin/{node_id}
del_r = client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, headers=_headers())
assert del_r.status_code == 200
finally:
subprocess.run(["docker", "rm", "-f", node_id], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# Re-map the user preferences back to test-node-1 which is the stable background runner
try:
with httpx.Client() as client:
client.patch(f"{BASE_URL}/nodes/preferences", params={"user_id": user_id}, json={"default_node_ids": ["test-node-1"]}, headers=_headers())
except Exception:
pass