import os
import httpx
import pytest
import uuid
import yaml
BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8000/api/v1")
def _get_user_id(client) -> str:
"""Helper to login and get user_id."""
admin_email = os.getenv("SUPER_ADMINS", "admin@example.com").split(',')[0].strip()
admin_password = os.getenv("CORTEX_ADMIN_PASSWORD", "admin")
r = client.post(f"{BASE_URL}/users/login/local", json={
"email": admin_email,
"password": admin_password
})
if r.status_code != 200:
# Fallback for dev environments where user might already be bootstrapped
# but password might be different or OIDC enabled
return os.getenv("SYNC_TEST_USER_ID", "default-admin-id")
return r.json().get("user_id")
def test_windows_node_provisioning_paths():
"""
Verifies that provisioning a Windows node (via /provision/ps1)
correctly generates Windows-style paths in the configuration.
"""
with httpx.Client(timeout=10.0) as client:
# 1. Setup Admin
user_id = _get_user_id(client)
headers = {"X-User-ID": user_id}
# 2. Register a Windows Node
node_id = f"test-win-node-{uuid.uuid4().hex[:6]}"
payload = {
"node_id": node_id,
"display_name": "Test Windows Node",
"is_active": True,
"skill_config": {"shell": {"enabled": True}, "sync": {"enabled": True}}
}
reg_r = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": user_id}, json=payload, headers=headers)
assert reg_r.status_code == 200
invite_token = reg_r.json()["invite_token"]
# 3. Fetch PS1 Provisioning Script
# This endpoint triggers generate_node_config_yaml(node, skill_overrides={"is_windows": True})
prov_r = client.get(f"{BASE_URL}/nodes/provision/ps1/{node_id}", params={"token": invite_token})
assert prov_r.status_code == 200
script_content = prov_r.text
# 4. Assert Windows-specific paths are present in the script's YAML block
assert "sync_root: C:\\CortexAgent\\sync" in script_content
assert "fs_root: C:\\" in script_content
# 5. Extract and verify YAML block integrity
# The script contains: $configContent = @" ... "@
# We look for the content between the heredoc markers
try:
yaml_block = script_content.split('@"')[1].split('"@')[0].strip()
# Clean up potential PowerShell escaping or formatting
# Note: The Hub might replace — with - as seen in the template
config = yaml.safe_load(yaml_block)
assert config["sync_root"] == "C:\\CortexAgent\\sync"
assert config["fs_root"] == "C:\\"
assert config["node_id"] == node_id
print(f"✅ Verified Windows paths for {node_id}")
except (IndexError, yaml.YAMLError) as e:
pytest.fail(f"Failed to parse YAML from PS1 script: {e}")
# Cleanup
client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, headers=headers)
if __name__ == "__main__":
# Manual run support
test_windows_node_provisioning_paths()