Newer
Older
cortex-hub / ai-hub / integration_tests / conftest.py
import os
import time
import subprocess
import pytest
import httpx
from datetime import datetime

BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8002/api/v1")
ADMIN_EMAIL = os.getenv("SUPER_ADMINS", "admin@jerxie.com").split(',')[0]
ADMIN_PASSWORD = os.getenv("CORTEX_ADMIN_PASSWORD", "admin")
NODE_1 = os.getenv("SYNC_TEST_NODE1", "test-node-1")
NODE_2 = os.getenv("SYNC_TEST_NODE2", "test-node-2")

@pytest.fixture(scope="session")
def setup_mesh_environment():
    """
    Simulates the CUJ:
    1. Login as super admin.
    2. Add API provider configurations (using env vars).
    3. Create a group.
    4. Register nodes and assign nodes to the group.
    5. Spin up node docker containers with correct tokens.
    """
    print("\n[conftest] Starting Mesh Integration Setup...")
    client = httpx.Client(timeout=10.0)

    # 1. Login
    print(f"[conftest] Logging in as {ADMIN_EMAIL}...")
    # NOTE: The Hub uses /users/login/local
    login_data = {
        "email": ADMIN_EMAIL,
        "password": ADMIN_PASSWORD
    }
    r = client.post(f"{BASE_URL}/users/login/local", json=login_data)
    assert r.status_code == 200, f"Login failed: {r.text}"
    
    # After a successful login, we need to extract user_id directly from the response.
    # The actual token and headers are no longer directly used in this setup block,
    # as the goal is to only provide the user ID to tests.
    user_id = r.json().get("user_id")
    assert user_id, "No user_id found in local login response."
    
    os.environ["SYNC_TEST_USER_ID"] = user_id
    client.headers.update({"X-User-ID": user_id})

    # 2. Add API Providers and Configure LLM RBAC
    print("[conftest] Configuring LLM provider and grouping...")
    
    # Enable Gemini securely
    prefs_payload = {
        "llm": {
            "active_provider": "gemini",
            "providers": {
                "gemini": {
                    "api_key": os.getenv("GEMINI_API_KEY", ""),
                    "model": "gemini/gemini-3-flash-preview"
                }
            }
        },
        "tts": {}, "stt": {}, "statuses": {}
    }
    client.put(f"{BASE_URL}/users/me/config", json=prefs_payload)
    
    # Establish a Group securely provisioned for AI Usage
    group_payload = {
        "name": "Integration Default Group",
        "description": "Global RBAC group for all integration tasks",
        "policy": {"llm": ["gemini"]}
    }
    r_group = client.post(f"{BASE_URL}/users/admin/groups", json=group_payload)
    
    if r_group.status_code == 409:
        r_groups = client.get(f"{BASE_URL}/users/admin/groups")
        target_group = next(g for g in r_groups.json() if g["name"] == group_payload["name"])
        group_id = target_group["id"]
        # Update policy to ensure it remains clean
        client.put(f"{BASE_URL}/users/admin/groups/{group_id}", json=group_payload)
    else:
        group_id = r_group.json().get("id")
        
    # Bind the admin testing account into this fully capable RBAC group
    client.put(f"{BASE_URL}/users/admin/users/{user_id}/group", json={"group_id": group_id})

    # 3. Register Nodes
    print("[conftest] Registering test nodes...")
    tokens = {}
    for node_id in [NODE_1, NODE_2]:
        payload = {
            "node_id": node_id,
            "display_name": f"Integration {node_id}",
            "is_active": True,
            "skill_config": {"shell": {"enabled": True}, "sync": {"enabled": True}}
        }
        r_node = client.post(
            f"{BASE_URL}/nodes/admin",
            params={"admin_id": user_id},
            json=payload
        )
        
        # If node already exists, let's grab it or regenerate its token
        if r_node.status_code in (400, 409) and ("already registered" in r_node.text or "already exists" in r_node.text):
            print(f"[conftest] Node {node_id} already registered. Assuming existing shared-key...")
            tokens[node_id] = "cortex-secret-shared-key"
        else:
            assert r_node.status_code == 200, f"Node registration failed: {r_node.text}"
            tokens[node_id] = r_node.json().get("invite_token")

    # 4. Add Group & Assign Permission (optional - tests use the user_id that registered it for now, 
    # but per CUJ we can mimic group creation)
    print("[conftest] Creating access group...")
    # Note: Using /users/admin/groups if it exists...
    group_r = client.post(f"{BASE_URL}/users/admin/groups", json={
        "name": "Integration Test Group",
        "description": "Integration Test Group"
    })
    if group_r.status_code == 200:
        group_id = group_r.json().get("id")
        # Give group access to nodes
        for node_id in [NODE_1, NODE_2]:
            client.post(f"{BASE_URL}/nodes/admin/{node_id}/access", json={
                "group_id": group_id,
                "access_level": "use"
            })

    # 5. Start Node Processes
    is_docker_disabled = os.getenv("SKIP_DOCKER_NODES", "true").lower() == "true"
    node_processes = []
    
    if not is_docker_disabled:
        print("[conftest] Starting local docker node containers...")
        network = "cortex-hub_default"
        subprocess.run(["docker", "rm", "-f", NODE_1, NODE_2], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        image_proc = subprocess.run(["docker", "build", "-q", "./agent-node"], capture_output=True, text=True)
        image_id = image_proc.stdout.strip()
        for node_id in [NODE_1, NODE_2]:
            cmd = ["docker", "run", "-d", "--name", node_id, "--network", network, "-e", f"AGENT_NODE_ID={node_id}", "-e", f"AGENT_AUTH_TOKEN={tokens[node_id]}", "-e", "GRPC_ENDPOINT=ai-hub:50051", "-e", "HUB_URL=http://ai-hub:8000", "-e", "AGENT_TLS_ENABLED=false", image_id]
            subprocess.run(cmd, check=True)
    else:
        print("[conftest] Starting nodes as local Python background processes...")
        # Resolve URLs for local process tests (assuming hub runs on localhost outside docker)
        grpc_ep = os.getenv("TEST_GRPC_ENDPOINT", "127.0.0.1:50051")
        http_ep = os.getenv("TEST_HUB_URL", "http://127.0.0.1:8000")
        
        # Determine the agent node source directory
        agent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "agent-node"))
        if not os.path.exists(agent_dir):
             agent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "agent-node")) # Fallback
             
        for node_id in [NODE_1, NODE_2]:
            env = os.environ.copy()
            env["AGENT_NODE_ID"] = node_id
            env["AGENT_AUTH_TOKEN"] = tokens[node_id]
            env["GRPC_ENDPOINT"] = grpc_ep
            env["HUB_URL"] = http_ep
            env["AGENT_TLS_ENABLED"] = "false"
            env["PYTHONPATH"] = f"{agent_dir}/src"
            
            proc = subprocess.Popen(
                ["python3", "-m", "agent_node.node"],
                env=env,
                cwd=agent_dir,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL
            )
            node_processes.append(proc)

    print("[conftest] Waiting for nodes to connect to mesh...")
    time.sleep(5)

    client.close()

    yield node_processes

    # 6. Teardown
    print("\n[conftest] Tearing down nodes...")
    if not is_docker_disabled:
        subprocess.run(["docker", "rm", "-f", NODE_1, NODE_2], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    else:
        for proc in node_processes:
            proc.terminate()
            proc.wait()

@pytest.fixture(autouse=True)
def run_around_tests(setup_mesh_environment):
    """Ensure setup runs for all tests."""
    yield