import json
import urllib.request
import urllib.error
import uuid
import os
import sys
# Attempt to use local Hub if running, otherwise bail
BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8000/api/v1")
def request(url, method="GET", data=None, headers=None, params=None):
if params:
url += "?" + urllib.parse.urlencode(params)
req = urllib.request.Request(url, method=method)
if headers:
for k, v in headers.items():
req.add_header(k, v)
if data:
json_data = json.dumps(data).encode("utf-8")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, data=json_data) as response:
return response.getcode(), json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
return e.code, e.read().decode("utf-8")
else:
try:
with urllib.request.urlopen(req) as response:
body = response.read().decode("utf-8")
try:
return response.getcode(), json.loads(body)
except:
return response.getcode(), body
except urllib.error.HTTPError as e:
return e.code, e.read().decode("utf-8")
def test_windows_paths():
print(f"Testing against {BASE_URL}...")
# 1. Login or get UID
# Use the known admin UID from metadata if possible
user_id = "c4401d34-8784-4d6e-93a0-c702bd202b66" # Common test ID in this repo
headers = {"X-User-ID": user_id}
# 2. Register Windows Node
node_id = f"test-win-path-{uuid.uuid4().hex[:6]}"
payload = {
"node_id": node_id,
"display_name": "Integration Test Windows",
"is_active": True,
"skill_config": {"shell": {"enabled": True}, "sync": {"enabled": True}}
}
code, res = request(f"{BASE_URL}/nodes/admin", method="POST", data=payload, headers=headers, params={"admin_id": user_id})
if code != 200:
print(f"❌ Failed to register node: {code} {res}")
return False
invite_token = res["invite_token"]
print(f"✅ Node {node_id} registered.")
# 3. Fetch PS1 Provisioning
code, script = request(f"{BASE_URL}/nodes/provision/ps1/{node_id}", params={"token": invite_token})
if code != 200:
print(f"❌ Failed to fetch PS1: {code} {script}")
return False
# 4. Assertions
found_sync = "sync_root: C:\\CortexAgent\\sync" in script
found_fs = "fs_root: C:\\" in script
if found_sync and found_fs:
print("✅ SUCCESS: Windows paths correctly generated in PS1 script.")
else:
print("❌ FAILURE: Windows paths NOT found in script.")
if not found_sync: print(" Missing: sync_root: C:\\CortexAgent\\sync")
if not found_fs: print(" Missing: fs_root: C:\\")
# Cleanup
request(f"{BASE_URL}/nodes/admin/{node_id}", method="DELETE", headers=headers, params={"admin_id": user_id})
return found_sync and found_fs
if __name__ == "__main__":
import urllib.parse
success = test_windows_paths()
sys.exit(0 if success else 1)