import os
import time
import subprocess
import pytest
import httpx
from datetime import datetime
BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8002/api/v1")
ADMIN_EMAIL = os.getenv("SUPER_ADMINS", "admin@jerxie.com").split(',')[0]
ADMIN_PASSWORD = os.getenv("CORTEX_ADMIN_PASSWORD", "admin")
NODE_1 = os.getenv("SYNC_TEST_NODE1", "test-node-1")
NODE_2 = os.getenv("SYNC_TEST_NODE2", "test-node-2")
@pytest.fixture(scope="session")
def setup_mesh_environment():
"""
Simulates the CUJ:
1. Login as super admin.
2. Add API provider configurations (using env vars).
3. Create a group.
4. Register nodes and assign nodes to the group.
5. Spin up node docker containers with correct tokens.
"""
print("\n[conftest] Starting Mesh Integration Setup...")
client = httpx.Client(timeout=10.0)
# 1. Login
print(f"[conftest] Logging in as {ADMIN_EMAIL}...")
# NOTE: The Hub uses /users/login/local
login_data = {
"email": ADMIN_EMAIL,
"password": ADMIN_PASSWORD
}
r = client.post(f"{BASE_URL}/users/login/local", json=login_data)
assert r.status_code == 200, f"Login failed: {r.text}"
# After a successful login, we need to extract user_id directly from the response.
# The actual token and headers are no longer directly used in this setup block,
# as the goal is to only provide the user ID to tests.
user_id = r.json().get("user_id")
assert user_id, "No user_id found in local login response."
os.environ["SYNC_TEST_USER_ID"] = user_id
client.headers.update({"X-User-ID": user_id})
# 2. Add API Providers and Configure LLM RBAC
print("[conftest] Configuring LLM provider and grouping...")
# Enable Gemini securely
prefs_payload = {
"llm": {
"active_provider": "gemini",
"providers": {
"gemini": {
"api_key": os.getenv("GEMINI_API_KEY", ""),
"model": "gemini/gemini-3-flash-preview"
}
}
},
"tts": {}, "stt": {}, "statuses": {}
}
client.put(f"{BASE_URL}/users/me/config", json=prefs_payload)
# Establish a Group securely provisioned for AI Usage
group_payload = {
"name": "Integration Default Group",
"description": "Global RBAC group for all integration tasks",
"policy": {"llm": ["gemini"]}
}
r_group = client.post(f"{BASE_URL}/users/admin/groups", json=group_payload)
if r_group.status_code == 409:
r_groups = client.get(f"{BASE_URL}/users/admin/groups")
target_group = next(g for g in r_groups.json() if g["name"] == group_payload["name"])
group_id = target_group["id"]
# Update policy to ensure it remains clean
client.put(f"{BASE_URL}/users/admin/groups/{group_id}", json=group_payload)
else:
group_id = r_group.json().get("id")
# Bind the admin testing account into this fully capable RBAC group
client.put(f"{BASE_URL}/users/admin/users/{user_id}/group", json={"group_id": group_id})
# 3. Register Nodes
print("[conftest] Registering test nodes...")
tokens = {}
for node_id in [NODE_1, NODE_2]:
payload = {
"node_id": node_id,
"display_name": f"Integration {node_id}",
"is_active": True,
"skill_config": {"shell": {"enabled": True}, "sync": {"enabled": True}}
}
r_node = client.post(
f"{BASE_URL}/nodes/admin",
params={"admin_id": user_id},
json=payload
)
# If node already exists, let's grab it or regenerate its token
if r_node.status_code in (400, 409) and ("already registered" in r_node.text or "already exists" in r_node.text):
print(f"[conftest] Node {node_id} already registered. Assuming existing shared-key...")
tokens[node_id] = "cortex-secret-shared-key"
else:
assert r_node.status_code == 200, f"Node registration failed: {r_node.text}"
tokens[node_id] = r_node.json().get("invite_token")
# 4. Add Group & Assign Permission (optional - tests use the user_id that registered it for now,
# but per CUJ we can mimic group creation)
print("[conftest] Creating access group...")
# Note: Using /users/admin/groups if it exists...
group_r = client.post(f"{BASE_URL}/users/admin/groups", json={
"name": "Integration Test Group",
"description": "Integration Test Group"
})
if group_r.status_code == 200:
group_id = group_r.json().get("id")
# Give group access to nodes
for node_id in [NODE_1, NODE_2]:
client.post(f"{BASE_URL}/nodes/admin/{node_id}/access", json={
"group_id": group_id,
"access_level": "use"
})
# 5. Start Node Processes
is_docker_disabled = os.getenv("SKIP_DOCKER_NODES", "true").lower() == "true"
node_processes = []
if not is_docker_disabled:
print("[conftest] Starting local docker node containers...")
network = "cortex-hub_default"
subprocess.run(["docker", "rm", "-f", NODE_1, NODE_2], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
image_proc = subprocess.run(["docker", "build", "-q", "./agent-node"], capture_output=True, text=True)
image_id = image_proc.stdout.strip()
for node_id in [NODE_1, NODE_2]:
cmd = ["docker", "run", "-d", "--name", node_id, "--network", network, "-e", f"AGENT_NODE_ID={node_id}", "-e", f"AGENT_AUTH_TOKEN={tokens[node_id]}", "-e", "GRPC_ENDPOINT=ai-hub:50051", "-e", "HUB_URL=http://ai-hub:8000", "-e", "AGENT_TLS_ENABLED=false", image_id]
subprocess.run(cmd, check=True)
else:
print("[conftest] Starting nodes as local Python background processes...")
# Resolve URLs for local process tests (assuming hub runs on localhost outside docker)
grpc_ep = os.getenv("TEST_GRPC_ENDPOINT", "127.0.0.1:50051")
http_ep = os.getenv("TEST_HUB_URL", "http://127.0.0.1:8000")
# Determine the agent node source directory
agent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "agent-node"))
if not os.path.exists(agent_dir):
agent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "agent-node")) # Fallback
for node_id in [NODE_1, NODE_2]:
env = os.environ.copy()
env["AGENT_NODE_ID"] = node_id
env["AGENT_AUTH_TOKEN"] = tokens[node_id]
env["GRPC_ENDPOINT"] = grpc_ep
env["HUB_URL"] = http_ep
env["AGENT_TLS_ENABLED"] = "false"
env["PYTHONPATH"] = f"{agent_dir}/src"
proc = subprocess.Popen(
["python3", "-m", "agent_node.node"],
env=env,
cwd=agent_dir,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
node_processes.append(proc)
print("[conftest] Waiting for nodes to connect to mesh...")
time.sleep(5)
client.close()
yield node_processes
# 6. Teardown
print("\n[conftest] Tearing down nodes...")
if not is_docker_disabled:
subprocess.run(["docker", "rm", "-f", NODE_1, NODE_2], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
for proc in node_processes:
proc.terminate()
proc.wait()
@pytest.fixture(autouse=True)
def run_around_tests(setup_mesh_environment):
"""Ensure setup runs for all tests."""
yield