Newer
Older
cortex-hub / ai-hub / integration_tests / test_node_registration.py
import os
import httpx
import pytest
import uuid

BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8002/api/v1")

def _get_user_id() -> str:
    """Gets the active super-admin or test runner user ID."""
    return os.getenv("SYNC_TEST_USER_ID", "c4401d34-8784-4d6e-93a0-c702bd202b66")

def _headers():
    return {"X-User-ID": _get_user_id(), "Authorization": os.getenv("SYNC_TEST_AUTH_TOKEN", "")}

import subprocess
import time

def test_node_full_lifecycle_and_api_coverage():
    user_id = _get_user_id()
    node_id = f"test-integration-node-{uuid.uuid4().hex[:8]}"
    display_name = f"Integration Lifecycle {node_id}"

    payload = {
        "node_id": node_id,
        "display_name": display_name,
        "is_active": True,
        "skill_config": {"shell": {"enabled": True}, "sync": {"enabled": True}}
    }

    try:
        with httpx.Client(timeout=15.0) as client:
            # --- ADMIN ENDPOINTS ---
            # POST /nodes/admin
            r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": user_id}, json=payload, headers=_headers())
            assert r_node.status_code == 200
            node_data = r_node.json()
            invite_token = node_data["invite_token"]

            # GET /nodes/admin
            r_list = client.get(f"{BASE_URL}/nodes/admin", params={"admin_id": user_id}, headers=_headers())
            assert r_list.status_code == 200
            assert any(n["node_id"] == node_id for n in r_list.json())

            # GET /nodes/admin/{node_id}
            r_get = client.get(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, headers=_headers())
            assert r_get.status_code == 200

            # PATCH /nodes/admin/{node_id}
            r_patch = client.patch(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, json={"display_name": "Updated Name"}, headers=_headers())
            assert r_patch.status_code == 200

            # POST /nodes/admin/{node_id}/access
            group_r = client.post(f"{BASE_URL}/users/admin/groups", json={"name": f"Group for {node_id}"}, headers=_headers())
            group_id = group_r.json()["id"]
            client.put(f"{BASE_URL}/users/admin/users/{user_id}/group", json={"group_id": group_id}, headers=_headers())

            acc_r = client.post(f"{BASE_URL}/nodes/admin/{node_id}/access", params={"admin_id": user_id}, json={"group_id": group_id, "access_level": "use"}, headers=_headers())
            assert acc_r.status_code == 200

            # DELETE /nodes/admin/{node_id}/access/{group_id} (Revoke and re-grant for test)
            rev_r = client.delete(f"{BASE_URL}/nodes/admin/{node_id}/access/{group_id}", params={"admin_id": user_id}, headers=_headers())
            assert rev_r.status_code == 200
            client.post(f"{BASE_URL}/nodes/admin/{node_id}/access", params={"admin_id": user_id}, json={"group_id": group_id, "access_level": "use"}, headers=_headers())

            # GET /nodes/admin/{node_id}/config.yaml
            conf_r = client.get(f"{BASE_URL}/nodes/admin/{node_id}/config.yaml", params={"admin_id": user_id}, headers=_headers())
            assert conf_r.status_code == 200

            # GET /nodes/provision/{node_id}
            prov_r = client.get(f"{BASE_URL}/nodes/provision/{node_id}", params={"token": invite_token}, headers=_headers())
            assert prov_r.status_code == 200

            # GET /nodes/admin/{node_id}/download
            dl_r = client.get(f"{BASE_URL}/nodes/admin/{node_id}/download", params={"admin_id": user_id}, headers=_headers())
            assert dl_r.status_code == 200

            # POST /nodes/validate-token (Internal)
            val_r = client.post(f"{BASE_URL}/nodes/validate-token", params={"token": invite_token, "node_id": node_id}, headers=_headers())
            assert val_r.status_code == 200

            # --- SPAWN NODE IMPERATIVELY ---
            network = "cortex-hub_default"
            image_proc = subprocess.run(["docker", "build", "-q", "./agent-node"], capture_output=True, text=True)
            image_id = image_proc.stdout.strip()

            subprocess.run([
                "docker", "run", "-d", "--name", node_id, "--network", network,
                "-e", f"HUB_URL=http://ai-hub:8000", "-e", f"AGENT_NODE_ID={node_id}", "-e", f"AGENT_AUTH_TOKEN={invite_token}",
                "-e", "GRPC_ENDPOINT=ai-hub:50051", "-e", "AGENT_TLS_ENABLED=false",
                "-v", f"{node_id}_sync:/tmp/cortex-sync", image_id
            ], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

            # Wait for connection
            connected = False
            for _ in range(30):
                st = client.get(f"{BASE_URL}/nodes/{node_id}/status", headers=_headers())
                if st.status_code == 200 and st.json().get("status") == "online":
                    connected = True
                    break
                time.sleep(1)
            assert connected, "Node never successfully connected to Hub"

            # --- USER ENDPOINTS ---
            # GET /nodes/
            n_list = client.get(f"{BASE_URL}/nodes/", params={"user_id": user_id}, headers=_headers())
            assert n_list.status_code == 200

            # GET /nodes/{node_id}/status (Already tested above)

            # GET /nodes/{node_id}/terminal
            term_r = client.get(f"{BASE_URL}/nodes/{node_id}/terminal", headers=_headers())
            assert term_r.status_code == 200

            # POST /nodes/{node_id}/dispatch
            dp_r = client.post(f"{BASE_URL}/nodes/{node_id}/dispatch", params={"user_id": user_id}, json={"command": "ls -la /"}, headers=_headers())
            assert dp_r.status_code == 200
            task_id = dp_r.json().get("task_id")

            # POST /nodes/{node_id}/cancel
            can_r = client.post(f"{BASE_URL}/nodes/{node_id}/cancel", params={"task_id": task_id}, headers=_headers())
            assert can_r.status_code == 200

            # PATCH & GET /nodes/preferences
            pref_p = client.patch(f"{BASE_URL}/nodes/preferences", params={"user_id": user_id}, json={"default_node_ids": [node_id]}, headers=_headers())
            assert pref_p.status_code == 200
            pref_g = client.get(f"{BASE_URL}/nodes/preferences", params={"user_id": user_id}, headers=_headers())
            assert pref_g.status_code == 200

            # --- FILE NAVIGATOR ENDPOINTS ---
            r_sess = client.post(f"{BASE_URL}/sessions/", headers=_headers(), json={"user_id": user_id, "provider_name": "gemini", "feature_name": "swarm_control"})
            sess_id = str(r_sess.json().get("id"))

            # POST /fs/touch
            test_fname = f"test_file_{uuid.uuid4().hex[:6]}.txt"
            test_file_path = test_fname  # NO LEADING SLASH
            fs_touch = client.post(f"{BASE_URL}/nodes/{node_id}/fs/touch", json={"path": test_file_path, "is_dir": False, "session_id": sess_id}, headers=_headers())
            assert fs_touch.status_code == 200, fs_touch.text

            # GET /fs/ls (POLL)
            found = False
            for _ in range(5):
                fs_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params={"path": "/", "session_id": sess_id}, headers=_headers())
                assert fs_ls.status_code == 200
                items = fs_ls.json().get("files", [])
                if any(item.get("name") == test_fname for item in items):
                    found = True
                    break
                time.sleep(1)
            assert found, f"Expected {test_fname} not found in ls output: {items}"

            # POST /fs/upload
            files = {"file": ("test_file2.txt", b"Hello Cortex!")}
            fs_up = client.post(f"{BASE_URL}/nodes/{node_id}/fs/upload", params={"path": "/", "session_id": sess_id}, files=files, headers=_headers())
            assert fs_up.status_code == 200
            
            # GET /fs/cat (POLL)
            found_content = False
            for _ in range(5):
                fs_cat = client.get(f"{BASE_URL}/nodes/{node_id}/fs/cat", params={"path": "test_file2.txt", "session_id": sess_id}, headers=_headers())
                if fs_cat.status_code == 200 and getattr(fs_cat, "text", "") and "Hello Cortex!" in getattr(fs_cat, "text", ""):
                    found_content = True
                    break
                time.sleep(1)
            assert found_content, "Uploaded content not returned by cat"

            # GET /fs/download
            fs_dl = client.get(f"{BASE_URL}/nodes/{node_id}/fs/download", params={"path": "test_file2.txt", "session_id": sess_id}, headers=_headers())
            assert fs_dl.status_code == 200
            assert fs_dl.content == b"Hello Cortex!"

            # POST /fs/rm (Delete both files)
            fs_rm = client.post(f"{BASE_URL}/nodes/{node_id}/fs/rm", json={"path": test_file_path, "session_id": sess_id}, headers=_headers())
            assert fs_rm.status_code == 200
            client.post(f"{BASE_URL}/nodes/{node_id}/fs/rm", json={"path": "test_file2.txt", "session_id": sess_id}, headers=_headers())
            
            # Verify deletion with GET /fs/cat returning 404 (POLL)
            deleted = False
            last_err = ""
            for _ in range(5):
                fs_cat_404 = client.get(f"{BASE_URL}/nodes/{node_id}/fs/cat", params={"path": test_file_path, "session_id": sess_id}, headers=_headers())
                if fs_cat_404.status_code == 404:
                    deleted = True
                    break
                else:
                    last_err = f"Code: {fs_cat_404.status_code}, Text: {fs_cat_404.text}"
                time.sleep(1)
            assert deleted, f"File was not deleted. Last response: {last_err}"

            # --- TEARDOWN ---
            # DELETE /nodes/admin/{node_id}
            del_r = client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, headers=_headers())
            assert del_r.status_code == 200

    finally:
        subprocess.run(["docker", "rm", "-f", node_id], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        # Re-map the user preferences back to test-node-1 which is the stable background runner
        try:
            with httpx.Client() as client:
                client.patch(f"{BASE_URL}/nodes/preferences", params={"user_id": user_id}, json={"default_node_ids": ["test-node-1"]}, headers=_headers())
        except Exception:
            pass