diff --git a/agent-node/src/agent_node/skills/shell_bridge.py b/agent-node/src/agent_node/skills/shell_bridge.py index c67d455..9f84ec5 100644 --- a/agent-node/src/agent_node/skills/shell_bridge.py +++ b/agent-node/src/agent_node/skills/shell_bridge.py @@ -329,7 +329,6 @@ # Input injection: execute command via sourced script to hide plumbing from ZLE/Readline echo try: - import tempfile script_path = os.path.join(tempfile.gettempdir(), f"ctx_{marker_id}.sh") script_content = f"{cmd}\n__ctx_exit=$?\necho \"__CORTEX_FIN_SH_{marker_id}__\" $__ctx_exit\nrm -f {script_path}\n" try: diff --git a/ai-hub/app/api/routes/nodes.py b/ai-hub/app/api/routes/nodes.py index 479b7a0..fe12411 100644 --- a/ai-hub/app/api/routes/nodes.py +++ b/ai-hub/app/api/routes/nodes.py @@ -847,7 +847,7 @@ raise HTTPException(status_code=500, detail="Node returned an empty response.") if isinstance(res, dict) and "error" in res: - status_code = 404 if res["error"] == "Offline" else 500 + status_code = 404 if "not found" in res["error"].lower() or res["error"] == "Offline" else 500 logger.warning(f"[FS] Explorer Error for {node_id}: {res['error']}") raise HTTPException(status_code=status_code, detail=res["error"]) @@ -885,8 +885,11 @@ if not res: raise HTTPException(status_code=500, detail="Node returned an empty response.") if isinstance(res, dict) and "error" in res: - raise HTTPException(status_code=500, detail=res["error"]) + status_code = 404 if "not found" in res["error"].lower() else 500 + raise HTTPException(status_code=status_code, detail=res["error"]) return res # Expecting {"content": "..."} + except HTTPException: + raise except AttributeError: raise HTTPException(status_code=500, detail="Orchestrator unavailable.") except Exception as e: @@ -918,6 +921,8 @@ if isinstance(res, dict) and "error" in res: raise HTTPException(status_code=500, detail=res["error"]) return res # Expecting {"success": bool, "message": str} + except HTTPException: + raise except AttributeError: raise HTTPException(status_code=500, detail="Orchestrator unavailable.") except Exception as e: @@ -993,6 +998,8 @@ session_id=session_id ) return res + except HTTPException: + raise except Exception as e: logger.error(f"[FS] Upload error: {e}") raise HTTPException(status_code=500, detail=str(e)) @@ -1014,6 +1021,8 @@ if not res: raise HTTPException(status_code=500, detail="Node returned an empty response.") return res + except HTTPException: + raise except Exception as e: logger.error(f"[FS] Delete error: {e}") raise HTTPException(status_code=500, detail=str(e)) diff --git a/ai-hub/app/api/routes/sessions.py b/ai-hub/app/api/routes/sessions.py index e4ab25c..6b18317 100644 --- a/ai-hub/app/api/routes/sessions.py +++ b/ai-hub/app/api/routes/sessions.py @@ -247,15 +247,19 @@ import app.protos.agent_pb2 as agent_pb2 live_nodes = orchestrator.registry.list_nodes() for node in live_nodes: - node.send_message(agent_pb2.ServerTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=sync_workspace_id, - control=agent_pb2.SyncControl( - action=agent_pb2.SyncControl.PURGE, - path="" + try: + node.send_message(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=sync_workspace_id, + control=agent_pb2.SyncControl( + action=agent_pb2.SyncControl.PURGE, + path="" + ) ) - ) - ), priority=0) + ), priority=0) + except Exception as e: + import logging + logging.exception(f"[πŸ“βš οΈ] Failed to send PURGE to node {node.node_id}: {e}") # Hub local purge mirror_path = os.path.join("/app/data/mirrors", sync_workspace_id) @@ -264,7 +268,6 @@ except Exception as e: import logging logging.exception(f"[πŸ“βš οΈ] Fast local purge failed for {sync_workspace_id}: {e}") - return {"message": "Session deleted successfully."} except HTTPException: raise diff --git a/ai-hub/app/core/grpc/core/mirror.py b/ai-hub/app/core/grpc/core/mirror.py index 4104d10..0369d34 100644 --- a/ai-hub/app/core/grpc/core/mirror.py +++ b/ai-hub/app/core/grpc/core/mirror.py @@ -20,10 +20,11 @@ """Returns a CortexIgnore instance for a session.""" return CortexIgnore(self.get_workspace_path(session_id)) - def get_workspace_path(self, session_id: str) -> str: + def get_workspace_path(self, session_id: str, create: bool = True) -> str: """Returns the local absolute path for a session's mirror.""" path = os.path.join(self.storage_root, session_id) - os.makedirs(path, exist_ok=True) + if create: + os.makedirs(path, exist_ok=True) return path def purge(self, session_id: str): @@ -235,7 +236,10 @@ def delete_file(self, session_id: str, rel_path: str): """Deletes a file or directory from the local mirror.""" - workspace = self.get_workspace_path(session_id) + workspace = self.get_workspace_path(session_id, create=False) + if not os.path.exists(workspace): + return # Parent folder is already gone + path_safe = rel_path.lstrip("/") safe_path = os.path.normpath(os.path.join(workspace, path_safe)) diff --git a/ai-hub/integration_tests/API_COVERAGE.md b/ai-hub/integration_tests/API_COVERAGE.md new file mode 100644 index 0000000..5ce45d4 --- /dev/null +++ b/ai-hub/integration_tests/API_COVERAGE.md @@ -0,0 +1,61 @@ +# Integration Test API Coverage & Todo List + +This document tracks which core system backend endpoints are covered natively by the E2E Integration Test Suite (`run_integration_tests.sh`). + +## Todo Works +- [ ] Parse mapping of remaining APIs from all markdown files in `docs/api_reference/` to complete the checklist. +- [ ] Build a generic `test_stt_tts.py` integration spec (Targeting `/audio` transcode routing or native gemini). +- [ ] Build a `test_agent_nodes_sandbox.py` spec for deep sandbox edge cases to ensure `workspace` restrictions work. +- [ ] Fully map coverage for the `/models` endpoints to `test_provider_config.py`. +- [ ] Write missing tests for `/users/me/profile` endpoints. +- [ ] Review `/skills` and `/knowledge` mapping against `test_browser_llm.py` which only touches standard capability mapping. + +## Covered APIs + +### Agent Nodes (`/nodes` & `/fs`) +Covered by: +- `integration_tests/test_node_registration.py` (Node Lifecycle) +- `integration_tests/test_file_sync.py` (Cross-Nodes File System Integrity Validation) +- [x] `POST /nodes/admin` (Register Node) +- [x] `GET /nodes/admin` (List All Nodes) +- [x] `GET /nodes/admin/{node_id}` (Get Node Detail) +- [x] `PATCH /nodes/admin/{node_id}` (Update Node Config) +- [x] `POST /nodes/admin/{node_id}/access` (Grant Group Access) +- [x] `DELETE /nodes/admin/{node_id}/access/{group_id}` (Revoke Group Access) +- [x] `GET /nodes/admin/{node_id}/config.yaml` (Download config) +- [x] `GET /nodes/provision/{node_id}` (Fetch provision script) +- [x] `GET /nodes/admin/{node_id}/download` (Download ZIP) +- [x] `DELETE /nodes/admin/{node_id}` (Deregister Node) +- [x] `POST /nodes/admin/mesh/reset` (Emergency Reset) +- [x] `GET /nodes/` (List Accessible Nodes) +- [x] `GET /nodes/{node_id}/status` (Online Check) +- [x] `GET /nodes/{node_id}/terminal` (Terminal History) +- [x] `POST /nodes/{node_id}/dispatch` (Dispatch Command) +- [x] `POST /nodes/{node_id}/cancel` (Cancel Task) +- [x] `PATCH /nodes/preferences` (Update User DB Prefs) +- [x] `GET /nodes/preferences` (Get User DB Prefs) +- [x] `POST /nodes/validate-token` (Internal Validator) +- [x] `GET /nodes/{node_id}/fs/ls` (List Dir) +- [x] `GET /nodes/{node_id}/fs/cat` (Read File) +- [x] `POST /nodes/{node_id}/fs/touch` (Create File/Dir) +- [x] `GET /nodes/{node_id}/fs/download` (Download File stream) +- [x] `POST /nodes/{node_id}/fs/upload` (Upload File stream) +- [x] `POST /nodes/{node_id}/fs/rm` (Delete File/Dir) + +### Users / Auth (`/users`) +Covered roughly by `integration_tests/test_login.py` +- [x] `POST /users/login/local` (Perform Day 1 Local Login) +- [ ] Check full coverage against `api_reference/users.md` for remaining methods + +### Sessions (`/sessions`) +Covered roughly by `integration_tests/test_llm_chat.py` and `test_browser_llm.py` +- [x] `POST /sessions` (Create Inference Session) +- [x] `POST /sessions/{id}/chat` (Inference Streaming Chat via Server-Sent Events) +- [x] `DELETE /sessions/{id}` (Archive Session and Wipe Linked Storage) +- [ ] Check full coverage against `api_reference/sessions.md` + +### LLM / Provider Config (`/users/me/config` & `/models`) +Covered roughly by `integration_tests/test_provider_config.py` +- [x] `POST /users/me/config/verify_llm` (Config validation proxy ping before save) +- [x] `PUT /users/me/config` (Apply Provider Key Update) +- [ ] Check full coverage against `api_reference/models.md` diff --git a/ai-hub/integration_tests/conftest.py b/ai-hub/integration_tests/conftest.py index 4b33842..f45d069 100644 --- a/ai-hub/integration_tests/conftest.py +++ b/ai-hub/integration_tests/conftest.py @@ -34,14 +34,13 @@ r = client.post(f"{BASE_URL}/users/login/local", json=login_data) assert r.status_code == 200, f"Login failed: {r.text}" - # After a successful login, we need to extract user_id directly from the response. - # The actual token and headers are no longer directly used in this setup block, - # as the goal is to only provide the user ID to tests. user_id = r.json().get("user_id") assert user_id, "No user_id found in local login response." os.environ["SYNC_TEST_USER_ID"] = user_id - client.headers.update({"X-User-ID": user_id}) + client.headers.update({ + "X-User-ID": user_id + }) # 2. Add API Providers and Configure LLM RBAC print("[conftest] Configuring LLM provider and grouping...") @@ -59,7 +58,8 @@ }, "tts": {}, "stt": {}, "statuses": {} } - client.put(f"{BASE_URL}/users/me/config", json=prefs_payload) + r_config = client.put(f"{BASE_URL}/users/me/config", json=prefs_payload) + assert r_config.status_code == 200, f"Failed to configure LLM provider: {r_config.text}" # Establish a Group securely provisioned for AI Usage group_payload = { @@ -97,13 +97,13 @@ json=payload ) - # If node already exists, let's grab it or regenerate its token - if r_node.status_code in (400, 409) and ("already registered" in r_node.text or "already exists" in r_node.text): - print(f"[conftest] Node {node_id} already registered. Assuming existing shared-key...") - tokens[node_id] = "cortex-secret-shared-key" - else: - assert r_node.status_code == 200, f"Node registration failed: {r_node.text}" - tokens[node_id] = r_node.json().get("invite_token") + # If node already exists, delete it and recreate to obtain a fresh token + if r_node.status_code in (400, 409): + client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}) + r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": user_id}, json=payload) + + assert r_node.status_code == 200, f"Node registration failed: {r_node.text}" + tokens[node_id] = r_node.json().get("invite_token") # 4. Add Group & Assign Permission (optional - tests use the user_id that registered it for now, # but per CUJ we can mimic group creation) @@ -133,7 +133,7 @@ image_proc = subprocess.run(["docker", "build", "-q", "./agent-node"], capture_output=True, text=True) image_id = image_proc.stdout.strip() for node_id in [NODE_1, NODE_2]: - cmd = ["docker", "run", "-d", "--name", node_id, "--network", network, "-e", f"AGENT_NODE_ID={node_id}", "-e", f"AGENT_AUTH_TOKEN={tokens[node_id]}", "-e", "GRPC_ENDPOINT=ai-hub:50051", "-e", "HUB_URL=http://ai-hub:8000", "-e", "AGENT_TLS_ENABLED=false", image_id] + cmd = ["docker", "run", "-d", "--name", node_id, "--network", network, "-e", f"AGENT_NODE_ID={node_id}", "-e", f"AGENT_AUTH_TOKEN={tokens[node_id]}", "-e", f"AGENT_SECRET_KEY={os.getenv('SECRET_KEY')}", "-e", "GRPC_ENDPOINT=ai-hub:50051", "-e", "HUB_URL=http://ai-hub:8000", "-e", "AGENT_TLS_ENABLED=false", image_id] subprocess.run(cmd, check=True) else: print("[conftest] Starting nodes as local Python background processes...") @@ -150,6 +150,7 @@ env = os.environ.copy() env["AGENT_NODE_ID"] = node_id env["AGENT_AUTH_TOKEN"] = tokens[node_id] + env["AGENT_SECRET_KEY"] = os.getenv('SECRET_KEY', 'integration-secret-key-123') env["GRPC_ENDPOINT"] = grpc_ep env["HUB_URL"] = http_ep env["AGENT_TLS_ENABLED"] = "false" diff --git a/ai-hub/integration_tests/test_file_sync.py b/ai-hub/integration_tests/test_file_sync.py new file mode 100644 index 0000000..2ccb35b --- /dev/null +++ b/ai-hub/integration_tests/test_file_sync.py @@ -0,0 +1,896 @@ +""" +File Sync Integration Tests +============================ +Verifies the end-to-end mesh file synchronisation behaviour across nodes and +the Hub server mirror. These tests run against a *live* deployment that has +at least two test nodes connected: + + β€’ test-node-1 (NODE_1 constant) + β€’ test-node-2 (NODE_2 constant) + +The Hub exposes REST endpoints used to: + - read files: GET /api/v1/nodes/{node_id}/fs/cat + - list dirs: GET /api/v1/nodes/{node_id}/fs/ls + - write files: POST /api/v1/nodes/{node_id}/fs/touch + - delete: POST /api/v1/nodes/{node_id}/fs/rm + +A shared swarm-control session is created once per test module so that all +nodes are in the same mesh workspace, and file operations propagate correctly. + +Environment assumptions +----------------------- + BASE_URL http://127.0.0.1:8000 (inside container) or http://192.168.68.113 (from outside) + NODE_1 test-node-1 + NODE_2 test-node-2 + USER_ID SYNC_TEST_USER_ID env var (required for node access checks) + TIMEOUT 10 s for small files, 60 s for 20 MB files +""" + +import os +import time +import uuid +import hashlib +import pytest +import httpx + +# ── Configuration ────────────────────────────────────────────────────────────── +BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8002/api/v1") +USER_ID = os.getenv("SYNC_TEST_USER_ID", "c4401d34-8784-4d6e-93a0-c702bd202b66") +NODE_1 = os.getenv("SYNC_TEST_NODE1", "test-node-1") +NODE_2 = os.getenv("SYNC_TEST_NODE2", "test-node-2") + +SMALL_FILE_TIMEOUT = 10 # seconds +LARGE_FILE_TIMEOUT = 60 # seconds (20 MB) +LARGE_FILE_SIZE_MB = 20 +POLL_INTERVAL = 0.5 # seconds + +# Paths β€” relative to BASE_URL +SESSIONS_PATH = "/sessions" +NODES_PATH = "/nodes" + + +# ── Module-level: skip the whole file if nodes are not online ────────────────── +def pytest_configure(config): + config.addinivalue_line( + "markers", + "requires_nodes: mark test as requiring live agent nodes to be connected", + ) + + +pytestmark = pytest.mark.requires_nodes + + +# ── Helpers ───────────────────────────────────────────────────────────────────── + +def _get_user_id() -> str: + return os.getenv("SYNC_TEST_USER_ID", "integration_tester_sync") + +def _headers(): + return {"X-User-ID": _get_user_id()} + + +def _unique(prefix="synctest"): + return f"{prefix}_{uuid.uuid4().hex[:8]}.txt" + + +def _large_content(mb: int = LARGE_FILE_SIZE_MB) -> str: + """Return a UTF-8 string of approximately `mb` megabytes.""" + line = "A" * 1023 + "\n" # 1 KB per line + return line * (mb * 1024) # mb * 1024 lines β‰ˆ mb MB + + +def _poll_until(fn, timeout: float, interval: float = POLL_INTERVAL): + """ + Repeatedly call fn() until it returns a truthy value or timeout expires. + Returns the last return value of fn(). + """ + deadline = time.time() + timeout + last = None + while time.time() < deadline: + try: + last = fn() + if last: + return last + except Exception: + pass + time.sleep(interval) + return last + + +def _cat(client: httpx.Client, node_id: str, path: str, session_id: str) -> str | None: + """Read a file from a node; return its text content or None on error.""" + r = client.get( + f"{NODES_PATH}/{node_id}/fs/cat", + params={"path": path, "session_id": session_id}, + headers=_headers(), + ) + if r.status_code == 200: + return r.json().get("content", "") + return None + + +def _mirror_cat(client: httpx.Client, path: str, session_id: str) -> str | None: + """ + Read a file from the Hub server mirror directly by asking node-1 for it + using the session_id workspace (the Hub mirror is queried when the node + reflects a workspace file). + + For the server-side write tests we can call the same endpoint but the + source is the Hub mirror, not the live node FS. + """ + # The Hub's /cat endpoint fetches from node, then caches in mirror. + # For "server wrote it β†’ does node have it?" we ask the node directly. + return _cat(client, NODE_1, path, session_id) + + +def _touch( + client: httpx.Client, + node_id: str, + path: str, + content: str, + session_id: str, + is_dir: bool = False, +) -> dict: + """Write a file to a node via the REST API.""" + r = client.post( + f"{NODES_PATH}/{node_id}/fs/touch", + json={"path": path, "content": content, "is_dir": is_dir, "session_id": session_id}, + headers=_headers(), + timeout=120.0, + ) + r.raise_for_status() + return r.json() + + +def _rm(client: httpx.Client, node_id: str, path: str, session_id: str) -> dict: + """Delete a file from a node via the REST API.""" + r = client.post( + f"{NODES_PATH}/{node_id}/fs/rm", + json={"path": path, "session_id": session_id}, + headers=_headers(), + timeout=30.0, + ) + r.raise_for_status() + return r.json() + + +def _file_missing(client: httpx.Client, node_id: str, path: str, session_id: str) -> bool: + """Return True if the file does NOT exist on the node.""" + r = client.get( + f"{NODES_PATH}/{node_id}/fs/cat", + params={"path": path, "session_id": session_id}, + headers=_headers(), + ) + return r.status_code != 200 + + +# ── Session fixture ───────────────────────────────────────────────────────────── + +@pytest.fixture(scope="module") +def sync_client(): + """Synchronous httpx client for the whole module (avoids asyncio overhead).""" + with httpx.Client(base_url=BASE_URL, timeout=60.0) as c: + # Quick connectivity + node-online check + try: + r = c.get(f"{NODES_PATH}/{NODE_1}/status", headers=_headers()) + if r.status_code not in (200, 404): + pytest.skip(f"{NODE_1} unreachable β€” hub returned {r.status_code}") + except Exception as exc: + pytest.skip(f"Hub unreachable at {BASE_URL}: {exc}") + yield c + + +@pytest.fixture(scope="module") +def swarm_session(sync_client: httpx.Client) -> str: + """ + Create (or reuse) one swarm_control session that has both test nodes + attached. Returned value is the workspace_id string used by all sync ops. + """ + # Create the session + r = sync_client.post( + f"{SESSIONS_PATH}/", + json={"user_id": _get_user_id(), "provider_name": "gemini", "feature_name": "swarm_control"}, + headers=_headers(), + ) + assert r.status_code == 200, f"Create session failed: {r.text}" + session_id = r.json()["id"] + + # Attach both nodes with source="empty" so they both watch the workspace + r2 = sync_client.post( + f"{SESSIONS_PATH}/{session_id}/nodes", + json={"node_ids": [NODE_1, NODE_2], "config": {"source": "empty"}}, + headers=_headers(), + ) + assert r2.status_code == 200, f"Attach nodes failed: {r2.text}" + workspace_id = r2.json().get("sync_workspace_id") + assert workspace_id, "Expected sync_workspace_id in response" + + # Give nodes a moment to ACK the workspace and start watching + time.sleep(2.0) + + yield workspace_id + + # Teardown: archive the session + sync_client.delete(f"{SESSIONS_PATH}/{session_id}", headers=_headers()) + + +# ══════════════════════════════════════════════════════════════════════════════ +# SMALL FILE TESTS (< 1 chunk) +# ══════════════════════════════════════════════════════════════════════════════ + +class TestSmallFileSync: + """Cases 1–4: single small-file create + delete in both directions.""" + + # ── Case 1: node-1 β†’ node-2 + server ─────────────────────────────────── + def test_case1_write_from_node1_visible_on_node2_and_server( + self, sync_client, swarm_session + ): + """ + Write a file from test-node-1; verify test-node-2 AND the server + mirror receive it within SMALL_FILE_TIMEOUT seconds. + """ + filename = _unique("case1") + content = f"Case 1 payload – node-1 β†’ mesh – {uuid.uuid4()}" + workspace = swarm_session + + print(f"\n[Case 1] Writing {filename!r} to {NODE_1} in workspace {workspace}") + result = _touch(sync_client, NODE_1, filename, content, workspace) + assert result.get("success"), f"Write failed: {result}" + + # Verify on node-2 + node2_content = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert node2_content is not None, ( + f"[Case 1] File '{filename}' did NOT appear on {NODE_2} within " + f"{SMALL_FILE_TIMEOUT}s" + ) + assert content in node2_content, ( + f"[Case 1] Content mismatch on {NODE_2}. Got: {node2_content!r}" + ) + print(f"[Case 1] βœ… {NODE_2} received the file.") + + # Verify on Hub server mirror (query node-1 with session scope uses mirror) + server_content = _poll_until( + lambda: _cat(sync_client, NODE_1, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert server_content is not None and content in server_content, ( + f"[Case 1] File '{filename}' not found on Hub mirror." + ) + print(f"[Case 1] βœ… Hub mirror has the file.") + + # ── Case 2: server β†’ node-1 + node-2 ─────────────────────────────────── + def test_case2_write_from_server_visible_on_all_nodes( + self, sync_client, swarm_session + ): + """ + Write a file via the server (the touch endpoint dispatches to all nodes + in the session + writes to Hub mirror). Verify both client nodes and + the mirror reflect it. + """ + filename = _unique("case2") + content = f"Case 2 payload – server β†’ mesh – {uuid.uuid4()}" + workspace = swarm_session + + print(f"\n[Case 2] Writing {filename!r} via server to workspace {workspace}") + # Intentionally write via node-1 (server-dispatched; Hub mirror updated first) + result = _touch(sync_client, NODE_1, filename, content, workspace) + assert result.get("success"), f"Write failed: {result}" + + # Node-2 should receive via broadcast + node2_content = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert node2_content is not None and content in node2_content, ( + f"[Case 2] File '{filename}' did NOT appear on {NODE_2}." + ) + print(f"[Case 2] βœ… {NODE_2} received the file.") + + # Node-1 should also have it (it was written directly to it and mirrored) + node1_content = _cat(sync_client, NODE_1, filename, workspace) + assert node1_content is not None and content in node1_content, ( + f"[Case 2] File '{filename}' not found on {NODE_1}." + ) + print(f"[Case 2] βœ… {NODE_1} has the file.") + + # ── Case 3: delete from server β†’ nodes purged ────────────────────────── + def test_case3_delete_from_server_purges_client_nodes( + self, sync_client, swarm_session + ): + """ + Create a file via server, then delete it via the server endpoint. + Verify both client nodes no longer have the file. + """ + filename = _unique("case3") + content = f"Case 3 – to be deleted by server – {uuid.uuid4()}" + workspace = swarm_session + + # Setup: write the file from node-1 (server-side orchestrated) + r = _touch(sync_client, NODE_1, filename, content, workspace) + assert r.get("success"), f"Setup write failed: {r}" + + # Make sure node-2 got it before we delete + got = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert got is not None, f"[Case 3] Setup: file not on {NODE_2}." + + print(f"\n[Case 3] Deleting {filename!r} from server (via {NODE_1} endpoint)") + _rm(sync_client, NODE_1, filename, workspace) + + # node-2 should no longer have the file + gone_node2 = _poll_until( + lambda: _file_missing(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert gone_node2, ( + f"[Case 3] File '{filename}' still present on {NODE_2} after server delete." + ) + print(f"[Case 3] βœ… {NODE_2} no longer has the file.") + + # node-1 should also have it gone + gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace) + assert gone_node1, ( + f"[Case 3] File '{filename}' still present on {NODE_1} after server delete." + ) + print(f"[Case 3] βœ… {NODE_1} no longer has the file.") + + # ── Case 4: delete from node-2 β†’ server + node-1 purged ─────────────── + def test_case4_delete_from_node2_purges_server_and_node1( + self, sync_client, swarm_session + ): + """ + Create a file, let it propagate, then delete it FROM node-2. + Verify the Hub mirror and node-1 no longer have the file. + """ + filename = _unique("case4") + content = f"Case 4 – to be deleted from node-2 – {uuid.uuid4()}" + workspace = swarm_session + + # Setup: write from node-1 so both nodes and mirror have it + r = _touch(sync_client, NODE_1, filename, content, workspace) + assert r.get("success"), f"Setup write failed: {r}" + + got_node2 = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert got_node2 is not None, f"[Case 4] Setup: file did not reach {NODE_2}." + + print(f"\n[Case 4] Deleting {filename!r} from {NODE_2}") + _rm(sync_client, NODE_2, filename, workspace) + + # Hub mirror (observed via node-1's workspace view) should purge + gone_server = _poll_until( + lambda: _file_missing(sync_client, NODE_1, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert gone_server, ( + f"[Case 4] File '{filename}' still present on Hub mirror after node-2 delete." + ) + print(f"[Case 4] βœ… Hub mirror no longer has the file.") + + # node-1 should also be purged + gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace) + assert gone_node1, ( + f"[Case 4] File '{filename}' still present on {NODE_1} after node-2 delete." + ) + print(f"[Case 4] βœ… {NODE_1} no longer has the file.") + + # ── Case 9: cat on deleted file returns quickly, not after timeout ────── + def test_case9_cat_deleted_file_returns_quickly_not_timeout( + self, sync_client, swarm_session + ): + """ + Regression test for the silent-return bug in _push_file (node side) + and the missing mirror short-circuit in cat() (hub side). + + Before the fix, reading a deleted file would stall for the full 15s + journal timeout because the node returned nothing and the hub just sat + waiting. After the fix: + - hub: cat() checks the mirror first; file absent β†’ instant "File not found" + - node: _push_file sends an ERROR SyncStatus immediately when file missing + + This test enforces that a cat call on a deleted file resolves in under + MAX_LATENCY_S seconds on BOTH nodes. + """ + MAX_LATENCY_S = 3.0 # well below the 15s journal timeout + filename = _unique("case9_latency") + content = f"Case 9 β€” delete latency probe β€” {uuid.uuid4()}" + workspace = swarm_session + + # Setup: write the file and wait for full propagation + r = _touch(sync_client, NODE_1, filename, content, workspace) + assert r.get("success"), f"[Case 9] Setup write failed: {r}" + synced = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert synced is not None, f"[Case 9] Setup: file did not propagate to {NODE_2}." + + # Delete from server + print(f"\n[Case 9] Deleting {filename!r}, then timing cat() on both nodes") + _rm(sync_client, NODE_1, filename, workspace) + + # Give delete broadcast a moment to reach nodes (but not the full poll timeout) + time.sleep(1.5) + + # Measure cat latency on node-1 (hub mirror path β€” should be instant) + t0 = time.time() + res1 = _cat(sync_client, NODE_1, filename, workspace) + latency_node1 = time.time() - t0 + assert res1 is None, ( + f"[Case 9] {NODE_1} still returned content after delete: {res1!r}" + ) + assert latency_node1 < MAX_LATENCY_S, ( + f"[Case 9] cat() on {NODE_1} took {latency_node1:.1f}s β€” expected < {MAX_LATENCY_S}s. " + f"Hub mirror short-circuit may be broken." + ) + print(f"[Case 9] βœ… {NODE_1} cat returned in {latency_node1:.2f}s (file absent, fast-fail).") + + # Measure cat latency on node-2 (hub mirror path β€” should also be instant) + t0 = time.time() + res2 = _cat(sync_client, NODE_2, filename, workspace) + latency_node2 = time.time() - t0 + assert res2 is None, ( + f"[Case 9] {NODE_2} still returned content after delete: {res2!r}" + ) + assert latency_node2 < MAX_LATENCY_S, ( + f"[Case 9] cat() on {NODE_2} took {latency_node2:.1f}s β€” expected < {MAX_LATENCY_S}s. " + f"Node _push_file may not be sending error status on missing file." + ) + print(f"[Case 9] βœ… {NODE_2} cat returned in {latency_node2:.2f}s (file absent, fast-fail).") + + +# ══════════════════════════════════════════════════════════════════════════════ +# NODE RECONNECT / RESYNC TESTS +# ══════════════════════════════════════════════════════════════════════════════ + +# Docker container names for the test nodes on the production server +_NODE_CONTAINER = { + "test-node-1": "cortex-test-1", + "test-node-2": "cortex-test-2", +} + +import subprocess +import os + +def _get_remote_env(): + try: + script_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.agent/utils/env_loader.sh")) + if os.path.exists(script_path): + cmd = f"source {script_path} >/dev/null 2>&1 && echo \"${{REMOTE_PASSWORD}}|${{REMOTE_USER}}|${{REMOTE_HOST}}\"" + res = subprocess.run(["bash", "-c", cmd], capture_output=True, text=True, check=True) + parts = res.stdout.strip().split("|") + if len(parts) == 3 and parts[0]: + return parts[0], parts[1], parts[2] + except Exception: + pass + return os.environ.get("REMOTE_PASSWORD", ""), os.environ.get("REMOTE_USER", "axieyangb"), os.environ.get("REMOTE_HOST", "192.168.68.113") + +_REMOTE_PASSWORD, _REMOTE_USER, _REMOTE_HOST = _get_remote_env() +_SSH_CMD = f"sshpass -p '{_REMOTE_PASSWORD}' ssh -o StrictHostKeyChecking=no {_REMOTE_USER}@{_REMOTE_HOST}" + + +def _restart_test_node(node_id: str): + """ + Restart the named test-node Docker container on the production server. + This wipes /tmp/cortex-sync on the node, simulating a real reboot. + """ + import subprocess + container = _NODE_CONTAINER.get(node_id) + if not container: + pytest.skip(f"No container mapping for {node_id}") + cmd = ( + f"sshpass -p '{_REMOTE_PASSWORD}' ssh -o StrictHostKeyChecking=no {_REMOTE_USER}@{_REMOTE_HOST} " + f"\"echo '{_REMOTE_PASSWORD}' | sudo -S docker restart {container}\"" + ) + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) + if result.returncode != 0: + pytest.skip(f"Could not restart {container}: {result.stderr}") + + +class TestNodeResync: + """ + Case 10: node reconnect / workspace resync after container restart. + + Real-world scenario: a test node restarts (deploy, crash, reboot) and + /tmp/cortex-sync is wiped. The Hub must re-push the workspace to the + reconnected node via manifest-driven reconciliation. + """ + + # ── Case 10: node-2 restart β†’ hub re-delivers workspace ──────────────── + def test_case10_node_resync_after_restart( + self, sync_client, swarm_session + ): + """ + 1. Write a file to node-1 and confirm node-2 received it. + 2. Restart the node-2 container (wipes /tmp/cortex-sync). + 3. Wait for node-2 to reconnect and receive the manifest from Hub. + 4. Assert that the file re-appears on node-2 within RESYNC_TIMEOUT. + + This guards against regressions in the push_workspace / manifest-driven + reconciliation loop that re-delivers Hub mirror contents to a freshly + reconnected node. + """ + RESYNC_TIMEOUT = 30 # seconds for node to reconnect + resync + RESTART_WAIT = 8 # seconds to allow container to come back up + + filename = _unique("case10_resync") + content = f"Case 10 β€” node resync after restart β€” {uuid.uuid4()}" + workspace = swarm_session + + # Setup: write from node-1, wait for node-2 to receive + r = _touch(sync_client, NODE_1, filename, content, workspace) + assert r.get("success"), f"[Case 10] Setup write failed: {r}" + + synced = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert synced is not None, f"[Case 10] Setup: file did not reach {NODE_2} before restart." + print(f"\n[Case 10] File confirmed on {NODE_2}. Restarting container…") + + # Restart node-2 container β€” wipes /tmp/cortex-sync + _restart_test_node(NODE_2) + + # Brief pause to let the container fully stop, then wait for reconnect + time.sleep(RESTART_WAIT) + print(f"[Case 10] Container restarted. Waiting for {NODE_2} to reconnect and resync…") + + # After reconnect, node sends its (now-empty) manifest β†’ Hub sends back + # all missing files. Poll until the file reappears. + resynced = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=RESYNC_TIMEOUT, + ) + assert resynced is not None, ( + f"[Case 10] File '{filename}' did NOT re-appear on {NODE_2} within " + f"{RESYNC_TIMEOUT}s after container restart. " + f"Manifest-driven resync may be broken." + ) + assert content in resynced, ( + f"[Case 10] Content mismatch on {NODE_2} after resync. Got: {resynced!r}" + ) + print(f"[Case 10] βœ… {NODE_2} resynced the file after container restart.") + + +# ══════════════════════════════════════════════════════════════════════════════ +# LARGE FILE TESTS (20 MB, multi-chunk) +# ══════════════════════════════════════════════════════════════════════════════ + +class TestLargeFileSync: + """Cases 5–8: 20 MB file create + delete in both directions.""" + + @pytest.fixture(scope="class", autouse=True) + def _large_content(self): + """Pre-build the 20 MB string once per class to save CPU time.""" + self.__class__._content = _large_content(LARGE_FILE_SIZE_MB) + self.__class__._expected_hash = hashlib.sha256( + self._content.encode() + ).hexdigest() + + # ── Case 5: 20 MB from node-1 β†’ server + node-2 ──────────────────────── + def test_case5_large_file_from_node1_to_server_and_node2( + self, sync_client, swarm_session + ): + """ + Create a 20 MB file from test-node-1. + Both server mirror and test-node-2 should receive it within + LARGE_FILE_TIMEOUT seconds. + """ + filename = _unique("case5_large") + workspace = swarm_session + + print(f"\n[Case 5] Writing {LARGE_FILE_SIZE_MB} MB file {filename!r} from {NODE_1}") + result = _touch(sync_client, NODE_1, filename, self._content, workspace) + assert result.get("success"), f"Write failed: {result}" + + # Verify node-2 received the file + node2_content = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=LARGE_FILE_TIMEOUT, + ) + assert node2_content is not None, ( + f"[Case 5] Large file did NOT appear on {NODE_2} within {LARGE_FILE_TIMEOUT}s." + ) + got_hash = hashlib.sha256(node2_content.encode()).hexdigest() + assert got_hash == self._expected_hash, ( + f"[Case 5] Hash mismatch on {NODE_2}. Expected {self._expected_hash}, got {got_hash}" + ) + print(f"[Case 5] βœ… {NODE_2} received and verified 20 MB large file.") + + # Verify server mirror + mirror_content = _cat(sync_client, NODE_1, filename, workspace) + assert mirror_content is not None, ( + f"[Case 5] Large file not on Hub mirror." + ) + print(f"[Case 5] βœ… Hub mirror has the large file.") + + # ── Case 6: 20 MB from server β†’ node-1 + node-2 ──────────────────────── + def test_case6_large_file_from_server_to_all_nodes( + self, sync_client, swarm_session + ): + """ + Write a 20 MB file via the server (touch endpoint with session scope). + Both client nodes should receive it within LARGE_FILE_TIMEOUT seconds. + """ + filename = _unique("case6_large") + workspace = swarm_session + + print(f"\n[Case 6] Writing {LARGE_FILE_SIZE_MB} MB file {filename!r} via server") + result = _touch(sync_client, NODE_1, filename, self._content, workspace) + assert result.get("success"), f"Write failed: {result}" + + # node-2 receives via mesh broadcast + node2_ok = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=LARGE_FILE_TIMEOUT, + ) + assert node2_ok is not None, ( + f"[Case 6] Large file did NOT appear on {NODE_2} within {LARGE_FILE_TIMEOUT}s." + ) + print(f"[Case 6] βœ… {NODE_2} received the 20 MB file.") + + node1_ok = _cat(sync_client, NODE_1, filename, workspace) + assert node1_ok is not None, f"[Case 6] File not on {NODE_1}." + print(f"[Case 6] βœ… {NODE_1} has the 20 MB file.") + + # ── Case 7: delete large file from server β†’ nodes purged ─────────────── + def test_case7_delete_large_file_from_server_purges_nodes( + self, sync_client, swarm_session + ): + """ + Write and fully sync a large file, then delete via server. + Verify all client nodes are purged. + """ + filename = _unique("case7_large") + workspace = swarm_session + + # Setup + r = _touch(sync_client, NODE_1, filename, self._content, workspace) + assert r.get("success"), f"Setup write failed: {r}" + + synced = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=LARGE_FILE_TIMEOUT, + ) + assert synced is not None, f"[Case 7] Setup: large file did not reach {NODE_2}." + + print(f"\n[Case 7] Deleting large file {filename!r} from server") + _rm(sync_client, NODE_1, filename, workspace) + + gone_node2 = _poll_until( + lambda: _file_missing(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert gone_node2, ( + f"[Case 7] Large file still present on {NODE_2} after server delete." + ) + print(f"[Case 7] βœ… {NODE_2} purged the large file.") + + gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace) + assert gone_node1, ( + f"[Case 7] Large file still present on {NODE_1} after server delete." + ) + print(f"[Case 7] βœ… {NODE_1} purged the large file.") + + # ── Case 8: delete large file from node-2 β†’ server + node-1 ─────────── + def test_case8_delete_large_file_from_node2_purges_server_and_node1( + self, sync_client, swarm_session + ): + """ + Write and sync a large file, then delete it FROM node-2. + Verify Hub mirror and node-1 are both purged. + """ + filename = _unique("case8_large") + workspace = swarm_session + + # Setup + r = _touch(sync_client, NODE_1, filename, self._content, workspace) + assert r.get("success"), f"Setup write failed: {r}" + + synced = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=LARGE_FILE_TIMEOUT, + ) + assert synced is not None, f"[Case 8] Setup: large file did not reach {NODE_2}." + + print(f"\n[Case 8] Deleting large file {filename!r} from {NODE_2}") + _rm(sync_client, NODE_2, filename, workspace) + + gone_server = _poll_until( + lambda: _file_missing(sync_client, NODE_1, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert gone_server, ( + f"[Case 8] Large file still on Hub mirror after {NODE_2} delete." + ) + print(f"[Case 8] βœ… Hub mirror purged the large file.") + + gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace) + assert gone_node1, ( + f"[Case 8] Large file still on {NODE_1} after {NODE_2} delete." + ) + print(f"[Case 8] βœ… {NODE_1} purged the large file.") + +# ══════════════════════════════════════════════════════════════════════════════ +# GIGABYTE FILE TEST (1000 MB) +# ══════════════════════════════════════════════════════════════════════════════ + +class TestGigabyteFileSync: + """Tests synchronizing a 1GB file across the mesh via DD CLI tool.""" + + def test_case_1gb_sync_from_client_to_server_and_node( + self, sync_client, swarm_session + ): + """ + Creates a 1 GB file on test-node-1 using the shell command `dd`. + Verifies that it syncs to both the server mirror and test-node-2. + """ + filename = _unique("gigabyte") + workspace = swarm_session + + print(f"\\n[Case 1GB] Disabling memory limit checks and triggering 1GB creation on {NODE_1}...") + + # Create a 1GB file consisting of zeros (highly compressible over the network) on NODE_1 directly. + # This will trigger the Inotify watcher to push chunks back up to the Hub. + # We output to the active session workspace path on the node. + # In the agent container, the workspace is at /tmp/cortex-sync/{swarm_session} + dd_command = f"dd if=/dev/zero of=/tmp/cortex-sync/{workspace}/{filename} bs=1M count=1000" + + r_disp = sync_client.post( + f"{NODES_PATH}/{NODE_1}/dispatch", + params={"user_id": _get_user_id()}, + json={"command": dd_command}, + headers=_headers(), + timeout=180.0 + ) + assert r_disp.status_code == 200, f"Failed to dispatch 1GB write to {NODE_1}" + + # Give the agent node ample time to write to disk and push chunks over gRPC. + # Wait up to 180 seconds. + def _check_node2_ls(): + r = sync_client.get( + f"{NODES_PATH}/{NODE_2}/fs/ls", + params={"path": ".", "session_id": workspace}, + headers=_headers(), + timeout=30.0 + ) + if r.status_code != 200: + return False + for f in r.json().get("files", []): + # Only return true when size is fully 1 GB (1000 * 1024 * 1024 = 1048576000) + if f.get("name") == filename and f.get("size", 0) >= 1048576000: + return f + return False + + print(f"[Case 1GB] Polling {NODE_2} for the file...") + node2_file = _poll_until(_check_node2_ls, timeout=180) + assert node2_file, f"1GB Large file {filename} did not reach {NODE_2} within 180s in full 1GB size." + print(f"[Case 1GB] βœ… {NODE_2} verified 1GB file sync with correct size.") + + # Verify Server Mirror also saw it and recorded 1GB size + def _check_server_ls(): + r = sync_client.get( + f"{NODES_PATH}/{NODE_1}/fs/ls", + params={"path": ".", "session_id": workspace}, + headers=_headers(), + timeout=30.0 + ) + if r.status_code != 200: + return False + for f in r.json().get("files", []): + if f.get("name") == filename and f.get("size", 0) >= 1048576000: + return f + return False + + server_file = _check_server_ls() + assert server_file, f"1GB Large file {filename} did not appear with 1GB size on Server Mirror." + print(f"[Case 1GB] βœ… Hub mirror successfully verified 1GB file sync with correct size.") + + # Cleanup + _rm(sync_client, NODE_1, filename, workspace) + + +# ══════════════════════════════════════════════════════════════════════════════ +# SESSION AUTO-PURGE TEST +# ══════════════════════════════════════════════════════════════════════════════ + +class TestSessionAutoPurge: + """Verifies that deleting a session purges the physical file system mirrors completely.""" + + def test_session_lifecycle_cleanup(self, sync_client): + """ + Creates a session, touches a file inside it, then deletes the session via API. + Verifies that both the server-side mirror folder and client-side tmp folders + are definitively purged and removed from the physical disk logic. + """ + import subprocess + + print("\n[Case Purge] Starting session cleanup lifecycle test...") + # 1. Create a throwaway session + r = sync_client.post( + f"{SESSIONS_PATH}/", + json={"user_id": _get_user_id(), "provider_name": "gemini", "feature_name": "auto-purge-test"}, + headers=_headers(), + ) + assert r.status_code == 200 + session_id = r.json()["id"] + + # Attach nodes + r2 = sync_client.post( + f"{SESSIONS_PATH}/{session_id}/nodes", + json={"node_ids": [NODE_1, NODE_2], "config": {"source": "empty"}}, + headers=_headers(), + ) + assert r2.status_code == 200 + workspace_id = r2.json().get("sync_workspace_id") + + # Give nodes a moment to ACK the workspace and create folders + time.sleep(2.0) + + # 2. Write a file + filename = _unique("autopurge") + res = _touch(sync_client, NODE_1, filename, "garbage payload", workspace_id) + assert res.get("success"), "Failed to write setup file for auto-purge test" + + # 3. Verify it reached Node 2 (assumes the filesystem structures were physically booted) + node2_ok = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace_id), + timeout=SMALL_FILE_TIMEOUT, + ) + assert node2_ok is not None, "Auto-purge setup file did not sync correctly to node 2" + print("[Case Purge] βœ… Session folders dynamically booted across the mesh") + + # 4. DELETE the Session + # Wait for the watcher to debounce (1s) and push the chunks + print("[Case Purge] Waiting 2 seconds to let the dog flush the chunks...") + time.sleep(2.0) + + print("[Case Purge] Calling API DELETE on the session...") + r_del = sync_client.delete(f"{SESSIONS_PATH}/{session_id}", headers=_headers()) + assert r_del.status_code == 200 + + # Wait a bit for PURGE propagation + print("[Case Purge] Waiting 3 seconds for propagation...") + time.sleep(3.0) + + # 5. Check client-side folders are purged using DISPATCH to run "ls" + # Node 1 + r_d1 = sync_client.post( + f"{NODES_PATH}/{NODE_1}/dispatch", + params={"user_id": _get_user_id()}, + json={"command": f"stat /tmp/cortex-sync/{workspace_id}"}, + headers=_headers() + ) + assert "No such file or directory" in r_d1.json().get("stderr", "") or r_d1.json().get("status") != "successful", ( + f"Node 1 failed to purge its physical tmp folder: {r_d1.text}" + ) + + # Node 2 + r_d2 = sync_client.post( + f"{NODES_PATH}/{NODE_2}/dispatch", + params={"user_id": _get_user_id()}, + json={"command": f"stat /tmp/cortex-sync/{workspace_id}"}, + headers=_headers() + ) + assert "No such file or directory" in r_d2.json().get("stderr", "") or r_d2.json().get("status") != "successful", ( + f"Node 2 failed to purge its physical tmp folder: {r_d2.text}" + ) + print("[Case Purge] βœ… Physical client-side (`/tmp/cortex-sync/...`) folders proactively erased on all nodes") + + # 6. Check server-side folder + # (Since the test runner is executed on host but ai_hub is Docker container, we can use docker exec) + cmd = ["docker", "exec", "ai_hub_service", "stat", f"/app/data/mirrors/{workspace_id}"] + # This should fail if it doesn't exist. + res_hub = subprocess.run(cmd, capture_output=True, text=True) + assert res_hub.returncode != 0, f"Server mirror folder still physically exists! stat matched: {res_hub.stdout}" + assert "No such file or directory" in res_hub.stderr, f"Unexpected error during server stat: {res_hub.stderr}" + + print("[Case Purge] βœ… Server-side physical mirror folder proactively erased") + diff --git a/ai-hub/integration_tests/test_file_sync.py.disabled b/ai-hub/integration_tests/test_file_sync.py.disabled deleted file mode 100644 index 831f890..0000000 --- a/ai-hub/integration_tests/test_file_sync.py.disabled +++ /dev/null @@ -1,719 +0,0 @@ -""" -File Sync Integration Tests -============================ -Verifies the end-to-end mesh file synchronisation behaviour across nodes and -the Hub server mirror. These tests run against a *live* deployment that has -at least two test nodes connected: - - β€’ test-node-1 (NODE_1 constant) - β€’ test-node-2 (NODE_2 constant) - -The Hub exposes REST endpoints used to: - - read files: GET /api/v1/nodes/{node_id}/fs/cat - - list dirs: GET /api/v1/nodes/{node_id}/fs/ls - - write files: POST /api/v1/nodes/{node_id}/fs/touch - - delete: POST /api/v1/nodes/{node_id}/fs/rm - -A shared swarm-control session is created once per test module so that all -nodes are in the same mesh workspace, and file operations propagate correctly. - -Environment assumptions ------------------------ - BASE_URL http://127.0.0.1:8000 (inside container) or http://192.168.68.113 (from outside) - NODE_1 test-node-1 - NODE_2 test-node-2 - USER_ID SYNC_TEST_USER_ID env var (required for node access checks) - TIMEOUT 10 s for small files, 60 s for 20 MB files -""" - -import os -import time -import uuid -import hashlib -import pytest -import httpx - -# ── Configuration ────────────────────────────────────────────────────────────── -BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8002/api/v1") -USER_ID = os.getenv("SYNC_TEST_USER_ID", "c4401d34-8784-4d6e-93a0-c702bd202b66") -NODE_1 = os.getenv("SYNC_TEST_NODE1", "test-node-1") -NODE_2 = os.getenv("SYNC_TEST_NODE2", "test-node-2") - -SMALL_FILE_TIMEOUT = 10 # seconds -LARGE_FILE_TIMEOUT = 60 # seconds (20 MB) -LARGE_FILE_SIZE_MB = 20 -POLL_INTERVAL = 0.5 # seconds - -# Paths β€” relative to BASE_URL -SESSIONS_PATH = "/sessions" -NODES_PATH = "/nodes" - - -# ── Module-level: skip the whole file if nodes are not online ────────────────── -def pytest_configure(config): - config.addinivalue_line( - "markers", - "requires_nodes: mark test as requiring live agent nodes to be connected", - ) - - -pytestmark = pytest.mark.requires_nodes - - -# ── Helpers ───────────────────────────────────────────────────────────────────── - -def _get_user_id() -> str: - return os.getenv("SYNC_TEST_USER_ID", "integration_tester_sync") - -def _headers(): - return {"X-User-ID": _get_user_id(), "Authorization": os.getenv("SYNC_TEST_AUTH_TOKEN", "")} - - -def _unique(prefix="synctest"): - return f"{prefix}_{uuid.uuid4().hex[:8]}.txt" - - -def _large_content(mb: int = LARGE_FILE_SIZE_MB) -> str: - """Return a UTF-8 string of approximately `mb` megabytes.""" - line = "A" * 1023 + "\n" # 1 KB per line - return line * (mb * 1024) # mb * 1024 lines β‰ˆ mb MB - - -def _poll_until(fn, timeout: float, interval: float = POLL_INTERVAL): - """ - Repeatedly call fn() until it returns a truthy value or timeout expires. - Returns the last return value of fn(). - """ - deadline = time.time() + timeout - last = None - while time.time() < deadline: - try: - last = fn() - if last: - return last - except Exception: - pass - time.sleep(interval) - return last - - -def _cat(client: httpx.Client, node_id: str, path: str, session_id: str) -> str | None: - """Read a file from a node; return its text content or None on error.""" - r = client.get( - f"{NODES_PATH}/{node_id}/fs/cat", - params={"path": path, "session_id": session_id}, - headers=_headers(), - ) - if r.status_code == 200: - return r.json().get("content", "") - return None - - -def _mirror_cat(client: httpx.Client, path: str, session_id: str) -> str | None: - """ - Read a file from the Hub server mirror directly by asking node-1 for it - using the session_id workspace (the Hub mirror is queried when the node - reflects a workspace file). - - For the server-side write tests we can call the same endpoint but the - source is the Hub mirror, not the live node FS. - """ - # The Hub's /cat endpoint fetches from node, then caches in mirror. - # For "server wrote it β†’ does node have it?" we ask the node directly. - return _cat(client, NODE_1, path, session_id) - - -def _touch( - client: httpx.Client, - node_id: str, - path: str, - content: str, - session_id: str, - is_dir: bool = False, -) -> dict: - """Write a file to a node via the REST API.""" - r = client.post( - f"{NODES_PATH}/{node_id}/fs/touch", - json={"path": path, "content": content, "is_dir": is_dir, "session_id": session_id}, - headers=_headers(), - timeout=120.0, - ) - r.raise_for_status() - return r.json() - - -def _rm(client: httpx.Client, node_id: str, path: str, session_id: str) -> dict: - """Delete a file from a node via the REST API.""" - r = client.post( - f"{NODES_PATH}/{node_id}/fs/rm", - json={"path": path, "session_id": session_id}, - headers=_headers(), - timeout=30.0, - ) - r.raise_for_status() - return r.json() - - -def _file_missing(client: httpx.Client, node_id: str, path: str, session_id: str) -> bool: - """Return True if the file does NOT exist on the node.""" - r = client.get( - f"{NODES_PATH}/{node_id}/fs/cat", - params={"path": path, "session_id": session_id}, - headers=_headers(), - ) - return r.status_code != 200 - - -# ── Session fixture ───────────────────────────────────────────────────────────── - -@pytest.fixture(scope="module") -def sync_client(): - """Synchronous httpx client for the whole module (avoids asyncio overhead).""" - with httpx.Client(base_url=BASE_URL, timeout=60.0) as c: - # Quick connectivity + node-online check - try: - r = c.get(f"{NODES_PATH}/{NODE_1}/status", headers=_headers()) - if r.status_code not in (200, 404): - pytest.skip(f"{NODE_1} unreachable β€” hub returned {r.status_code}") - except Exception as exc: - pytest.skip(f"Hub unreachable at {BASE_URL}: {exc}") - yield c - - -@pytest.fixture(scope="module") -def swarm_session(sync_client: httpx.Client) -> str: - """ - Create (or reuse) one swarm_control session that has both test nodes - attached. Returned value is the workspace_id string used by all sync ops. - """ - # Create the session - r = sync_client.post( - f"{SESSIONS_PATH}/", - json={"user_id": _get_user_id(), "provider_name": "gemini", "feature_name": "swarm_control"}, - headers=_headers(), - ) - assert r.status_code == 200, f"Create session failed: {r.text}" - session_id = r.json()["id"] - - # Attach both nodes with source="empty" so they both watch the workspace - r2 = sync_client.post( - f"{SESSIONS_PATH}/{session_id}/nodes", - json={"node_ids": [NODE_1, NODE_2], "config": {"source": "empty"}}, - headers=_headers(), - ) - assert r2.status_code == 200, f"Attach nodes failed: {r2.text}" - workspace_id = r2.json().get("sync_workspace_id") - assert workspace_id, "Expected sync_workspace_id in response" - - # Give nodes a moment to ACK the workspace and start watching - time.sleep(2.0) - - yield workspace_id - - # Teardown: archive the session - sync_client.delete(f"{SESSIONS_PATH}/{session_id}", headers=_headers()) - - -# ══════════════════════════════════════════════════════════════════════════════ -# SMALL FILE TESTS (< 1 chunk) -# ══════════════════════════════════════════════════════════════════════════════ - -class TestSmallFileSync: - """Cases 1–4: single small-file create + delete in both directions.""" - - # ── Case 1: node-1 β†’ node-2 + server ─────────────────────────────────── - def test_case1_write_from_node1_visible_on_node2_and_server( - self, sync_client, swarm_session - ): - """ - Write a file from test-node-1; verify test-node-2 AND the server - mirror receive it within SMALL_FILE_TIMEOUT seconds. - """ - filename = _unique("case1") - content = f"Case 1 payload – node-1 β†’ mesh – {uuid.uuid4()}" - workspace = swarm_session - - print(f"\n[Case 1] Writing {filename!r} to {NODE_1} in workspace {workspace}") - result = _touch(sync_client, NODE_1, filename, content, workspace) - assert result.get("success"), f"Write failed: {result}" - - # Verify on node-2 - node2_content = _poll_until( - lambda: _cat(sync_client, NODE_2, filename, workspace), - timeout=SMALL_FILE_TIMEOUT, - ) - assert node2_content is not None, ( - f"[Case 1] File '{filename}' did NOT appear on {NODE_2} within " - f"{SMALL_FILE_TIMEOUT}s" - ) - assert content in node2_content, ( - f"[Case 1] Content mismatch on {NODE_2}. Got: {node2_content!r}" - ) - print(f"[Case 1] βœ… {NODE_2} received the file.") - - # Verify on Hub server mirror (query node-1 with session scope uses mirror) - server_content = _poll_until( - lambda: _cat(sync_client, NODE_1, filename, workspace), - timeout=SMALL_FILE_TIMEOUT, - ) - assert server_content is not None and content in server_content, ( - f"[Case 1] File '{filename}' not found on Hub mirror." - ) - print(f"[Case 1] βœ… Hub mirror has the file.") - - # ── Case 2: server β†’ node-1 + node-2 ─────────────────────────────────── - def test_case2_write_from_server_visible_on_all_nodes( - self, sync_client, swarm_session - ): - """ - Write a file via the server (the touch endpoint dispatches to all nodes - in the session + writes to Hub mirror). Verify both client nodes and - the mirror reflect it. - """ - filename = _unique("case2") - content = f"Case 2 payload – server β†’ mesh – {uuid.uuid4()}" - workspace = swarm_session - - print(f"\n[Case 2] Writing {filename!r} via server to workspace {workspace}") - # Intentionally write via node-1 (server-dispatched; Hub mirror updated first) - result = _touch(sync_client, NODE_1, filename, content, workspace) - assert result.get("success"), f"Write failed: {result}" - - # Node-2 should receive via broadcast - node2_content = _poll_until( - lambda: _cat(sync_client, NODE_2, filename, workspace), - timeout=SMALL_FILE_TIMEOUT, - ) - assert node2_content is not None and content in node2_content, ( - f"[Case 2] File '{filename}' did NOT appear on {NODE_2}." - ) - print(f"[Case 2] βœ… {NODE_2} received the file.") - - # Node-1 should also have it (it was written directly to it and mirrored) - node1_content = _cat(sync_client, NODE_1, filename, workspace) - assert node1_content is not None and content in node1_content, ( - f"[Case 2] File '{filename}' not found on {NODE_1}." - ) - print(f"[Case 2] βœ… {NODE_1} has the file.") - - # ── Case 3: delete from server β†’ nodes purged ────────────────────────── - def test_case3_delete_from_server_purges_client_nodes( - self, sync_client, swarm_session - ): - """ - Create a file via server, then delete it via the server endpoint. - Verify both client nodes no longer have the file. - """ - filename = _unique("case3") - content = f"Case 3 – to be deleted by server – {uuid.uuid4()}" - workspace = swarm_session - - # Setup: write the file from node-1 (server-side orchestrated) - r = _touch(sync_client, NODE_1, filename, content, workspace) - assert r.get("success"), f"Setup write failed: {r}" - - # Make sure node-2 got it before we delete - got = _poll_until( - lambda: _cat(sync_client, NODE_2, filename, workspace), - timeout=SMALL_FILE_TIMEOUT, - ) - assert got is not None, f"[Case 3] Setup: file not on {NODE_2}." - - print(f"\n[Case 3] Deleting {filename!r} from server (via {NODE_1} endpoint)") - _rm(sync_client, NODE_1, filename, workspace) - - # node-2 should no longer have the file - gone_node2 = _poll_until( - lambda: _file_missing(sync_client, NODE_2, filename, workspace), - timeout=SMALL_FILE_TIMEOUT, - ) - assert gone_node2, ( - f"[Case 3] File '{filename}' still present on {NODE_2} after server delete." - ) - print(f"[Case 3] βœ… {NODE_2} no longer has the file.") - - # node-1 should also have it gone - gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace) - assert gone_node1, ( - f"[Case 3] File '{filename}' still present on {NODE_1} after server delete." - ) - print(f"[Case 3] βœ… {NODE_1} no longer has the file.") - - # ── Case 4: delete from node-2 β†’ server + node-1 purged ─────────────── - def test_case4_delete_from_node2_purges_server_and_node1( - self, sync_client, swarm_session - ): - """ - Create a file, let it propagate, then delete it FROM node-2. - Verify the Hub mirror and node-1 no longer have the file. - """ - filename = _unique("case4") - content = f"Case 4 – to be deleted from node-2 – {uuid.uuid4()}" - workspace = swarm_session - - # Setup: write from node-1 so both nodes and mirror have it - r = _touch(sync_client, NODE_1, filename, content, workspace) - assert r.get("success"), f"Setup write failed: {r}" - - got_node2 = _poll_until( - lambda: _cat(sync_client, NODE_2, filename, workspace), - timeout=SMALL_FILE_TIMEOUT, - ) - assert got_node2 is not None, f"[Case 4] Setup: file did not reach {NODE_2}." - - print(f"\n[Case 4] Deleting {filename!r} from {NODE_2}") - _rm(sync_client, NODE_2, filename, workspace) - - # Hub mirror (observed via node-1's workspace view) should purge - gone_server = _poll_until( - lambda: _file_missing(sync_client, NODE_1, filename, workspace), - timeout=SMALL_FILE_TIMEOUT, - ) - assert gone_server, ( - f"[Case 4] File '{filename}' still present on Hub mirror after node-2 delete." - ) - print(f"[Case 4] βœ… Hub mirror no longer has the file.") - - # node-1 should also be purged - gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace) - assert gone_node1, ( - f"[Case 4] File '{filename}' still present on {NODE_1} after node-2 delete." - ) - print(f"[Case 4] βœ… {NODE_1} no longer has the file.") - - # ── Case 9: cat on deleted file returns quickly, not after timeout ────── - def test_case9_cat_deleted_file_returns_quickly_not_timeout( - self, sync_client, swarm_session - ): - """ - Regression test for the silent-return bug in _push_file (node side) - and the missing mirror short-circuit in cat() (hub side). - - Before the fix, reading a deleted file would stall for the full 15s - journal timeout because the node returned nothing and the hub just sat - waiting. After the fix: - - hub: cat() checks the mirror first; file absent β†’ instant "File not found" - - node: _push_file sends an ERROR SyncStatus immediately when file missing - - This test enforces that a cat call on a deleted file resolves in under - MAX_LATENCY_S seconds on BOTH nodes. - """ - MAX_LATENCY_S = 3.0 # well below the 15s journal timeout - filename = _unique("case9_latency") - content = f"Case 9 β€” delete latency probe β€” {uuid.uuid4()}" - workspace = swarm_session - - # Setup: write the file and wait for full propagation - r = _touch(sync_client, NODE_1, filename, content, workspace) - assert r.get("success"), f"[Case 9] Setup write failed: {r}" - synced = _poll_until( - lambda: _cat(sync_client, NODE_2, filename, workspace), - timeout=SMALL_FILE_TIMEOUT, - ) - assert synced is not None, f"[Case 9] Setup: file did not propagate to {NODE_2}." - - # Delete from server - print(f"\n[Case 9] Deleting {filename!r}, then timing cat() on both nodes") - _rm(sync_client, NODE_1, filename, workspace) - - # Give delete broadcast a moment to reach nodes (but not the full poll timeout) - time.sleep(1.5) - - # Measure cat latency on node-1 (hub mirror path β€” should be instant) - t0 = time.time() - res1 = _cat(sync_client, NODE_1, filename, workspace) - latency_node1 = time.time() - t0 - assert res1 is None, ( - f"[Case 9] {NODE_1} still returned content after delete: {res1!r}" - ) - assert latency_node1 < MAX_LATENCY_S, ( - f"[Case 9] cat() on {NODE_1} took {latency_node1:.1f}s β€” expected < {MAX_LATENCY_S}s. " - f"Hub mirror short-circuit may be broken." - ) - print(f"[Case 9] βœ… {NODE_1} cat returned in {latency_node1:.2f}s (file absent, fast-fail).") - - # Measure cat latency on node-2 (hub mirror path β€” should also be instant) - t0 = time.time() - res2 = _cat(sync_client, NODE_2, filename, workspace) - latency_node2 = time.time() - t0 - assert res2 is None, ( - f"[Case 9] {NODE_2} still returned content after delete: {res2!r}" - ) - assert latency_node2 < MAX_LATENCY_S, ( - f"[Case 9] cat() on {NODE_2} took {latency_node2:.1f}s β€” expected < {MAX_LATENCY_S}s. " - f"Node _push_file may not be sending error status on missing file." - ) - print(f"[Case 9] βœ… {NODE_2} cat returned in {latency_node2:.2f}s (file absent, fast-fail).") - - -# ══════════════════════════════════════════════════════════════════════════════ -# NODE RECONNECT / RESYNC TESTS -# ══════════════════════════════════════════════════════════════════════════════ - -# Docker container names for the test nodes on the production server -_NODE_CONTAINER = { - "test-node-1": "cortex-test-1", - "test-node-2": "cortex-test-2", -} - -import subprocess -import os - -def _get_remote_env(): - try: - script_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.agent/utils/env_loader.sh")) - if os.path.exists(script_path): - cmd = f"source {script_path} >/dev/null 2>&1 && echo \"${{REMOTE_PASSWORD}}|${{REMOTE_USER}}|${{REMOTE_HOST}}\"" - res = subprocess.run(["bash", "-c", cmd], capture_output=True, text=True, check=True) - parts = res.stdout.strip().split("|") - if len(parts) == 3 and parts[0]: - return parts[0], parts[1], parts[2] - except Exception: - pass - return os.environ.get("REMOTE_PASSWORD", ""), os.environ.get("REMOTE_USER", "axieyangb"), os.environ.get("REMOTE_HOST", "192.168.68.113") - -_REMOTE_PASSWORD, _REMOTE_USER, _REMOTE_HOST = _get_remote_env() -_SSH_CMD = f"sshpass -p '{_REMOTE_PASSWORD}' ssh -o StrictHostKeyChecking=no {_REMOTE_USER}@{_REMOTE_HOST}" - - -def _restart_test_node(node_id: str): - """ - Restart the named test-node Docker container on the production server. - This wipes /tmp/cortex-sync on the node, simulating a real reboot. - """ - import subprocess - container = _NODE_CONTAINER.get(node_id) - if not container: - pytest.skip(f"No container mapping for {node_id}") - cmd = ( - f"sshpass -p '{_REMOTE_PASSWORD}' ssh -o StrictHostKeyChecking=no {_REMOTE_USER}@{_REMOTE_HOST} " - f"\"echo '{_REMOTE_PASSWORD}' | sudo -S docker restart {container}\"" - ) - result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) - if result.returncode != 0: - pytest.skip(f"Could not restart {container}: {result.stderr}") - - -class TestNodeResync: - """ - Case 10: node reconnect / workspace resync after container restart. - - Real-world scenario: a test node restarts (deploy, crash, reboot) and - /tmp/cortex-sync is wiped. The Hub must re-push the workspace to the - reconnected node via manifest-driven reconciliation. - """ - - # ── Case 10: node-2 restart β†’ hub re-delivers workspace ──────────────── - def test_case10_node_resync_after_restart( - self, sync_client, swarm_session - ): - """ - 1. Write a file to node-1 and confirm node-2 received it. - 2. Restart the node-2 container (wipes /tmp/cortex-sync). - 3. Wait for node-2 to reconnect and receive the manifest from Hub. - 4. Assert that the file re-appears on node-2 within RESYNC_TIMEOUT. - - This guards against regressions in the push_workspace / manifest-driven - reconciliation loop that re-delivers Hub mirror contents to a freshly - reconnected node. - """ - RESYNC_TIMEOUT = 30 # seconds for node to reconnect + resync - RESTART_WAIT = 8 # seconds to allow container to come back up - - filename = _unique("case10_resync") - content = f"Case 10 β€” node resync after restart β€” {uuid.uuid4()}" - workspace = swarm_session - - # Setup: write from node-1, wait for node-2 to receive - r = _touch(sync_client, NODE_1, filename, content, workspace) - assert r.get("success"), f"[Case 10] Setup write failed: {r}" - - synced = _poll_until( - lambda: _cat(sync_client, NODE_2, filename, workspace), - timeout=SMALL_FILE_TIMEOUT, - ) - assert synced is not None, f"[Case 10] Setup: file did not reach {NODE_2} before restart." - print(f"\n[Case 10] File confirmed on {NODE_2}. Restarting container…") - - # Restart node-2 container β€” wipes /tmp/cortex-sync - _restart_test_node(NODE_2) - - # Brief pause to let the container fully stop, then wait for reconnect - time.sleep(RESTART_WAIT) - print(f"[Case 10] Container restarted. Waiting for {NODE_2} to reconnect and resync…") - - # After reconnect, node sends its (now-empty) manifest β†’ Hub sends back - # all missing files. Poll until the file reappears. - resynced = _poll_until( - lambda: _cat(sync_client, NODE_2, filename, workspace), - timeout=RESYNC_TIMEOUT, - ) - assert resynced is not None, ( - f"[Case 10] File '{filename}' did NOT re-appear on {NODE_2} within " - f"{RESYNC_TIMEOUT}s after container restart. " - f"Manifest-driven resync may be broken." - ) - assert content in resynced, ( - f"[Case 10] Content mismatch on {NODE_2} after resync. Got: {resynced!r}" - ) - print(f"[Case 10] βœ… {NODE_2} resynced the file after container restart.") - - -# ══════════════════════════════════════════════════════════════════════════════ -# LARGE FILE TESTS (20 MB, multi-chunk) -# ══════════════════════════════════════════════════════════════════════════════ - -class TestLargeFileSync: - """Cases 5–8: 20 MB file create + delete in both directions.""" - - @pytest.fixture(scope="class", autouse=True) - def _large_content(self): - """Pre-build the 20 MB string once per class to save CPU time.""" - self.__class__._content = _large_content(LARGE_FILE_SIZE_MB) - self.__class__._expected_hash = hashlib.sha256( - self._content.encode() - ).hexdigest() - - # ── Case 5: 20 MB from node-1 β†’ server + node-2 ──────────────────────── - def test_case5_large_file_from_node1_to_server_and_node2( - self, sync_client, swarm_session - ): - """ - Create a 20 MB file from test-node-1. - Both server mirror and test-node-2 should receive it within - LARGE_FILE_TIMEOUT seconds. - """ - filename = _unique("case5_large") - workspace = swarm_session - - print(f"\n[Case 5] Writing {LARGE_FILE_SIZE_MB} MB file {filename!r} from {NODE_1}") - result = _touch(sync_client, NODE_1, filename, self._content, workspace) - assert result.get("success"), f"Write failed: {result}" - - # Verify node-2 received the file - node2_content = _poll_until( - lambda: _cat(sync_client, NODE_2, filename, workspace), - timeout=LARGE_FILE_TIMEOUT, - ) - assert node2_content is not None, ( - f"[Case 5] Large file did NOT appear on {NODE_2} within {LARGE_FILE_TIMEOUT}s." - ) - got_hash = hashlib.sha256(node2_content.encode()).hexdigest() - assert got_hash == self._expected_hash, ( - f"[Case 5] Hash mismatch on {NODE_2}. Expected {self._expected_hash}, got {got_hash}" - ) - print(f"[Case 5] βœ… {NODE_2} received and verified 20 MB large file.") - - # Verify server mirror - mirror_content = _cat(sync_client, NODE_1, filename, workspace) - assert mirror_content is not None, ( - f"[Case 5] Large file not on Hub mirror." - ) - print(f"[Case 5] βœ… Hub mirror has the large file.") - - # ── Case 6: 20 MB from server β†’ node-1 + node-2 ──────────────────────── - def test_case6_large_file_from_server_to_all_nodes( - self, sync_client, swarm_session - ): - """ - Write a 20 MB file via the server (touch endpoint with session scope). - Both client nodes should receive it within LARGE_FILE_TIMEOUT seconds. - """ - filename = _unique("case6_large") - workspace = swarm_session - - print(f"\n[Case 6] Writing {LARGE_FILE_SIZE_MB} MB file {filename!r} via server") - result = _touch(sync_client, NODE_1, filename, self._content, workspace) - assert result.get("success"), f"Write failed: {result}" - - # node-2 receives via mesh broadcast - node2_ok = _poll_until( - lambda: _cat(sync_client, NODE_2, filename, workspace), - timeout=LARGE_FILE_TIMEOUT, - ) - assert node2_ok is not None, ( - f"[Case 6] Large file did NOT appear on {NODE_2} within {LARGE_FILE_TIMEOUT}s." - ) - print(f"[Case 6] βœ… {NODE_2} received the 20 MB file.") - - node1_ok = _cat(sync_client, NODE_1, filename, workspace) - assert node1_ok is not None, f"[Case 6] File not on {NODE_1}." - print(f"[Case 6] βœ… {NODE_1} has the 20 MB file.") - - # ── Case 7: delete large file from server β†’ nodes purged ─────────────── - def test_case7_delete_large_file_from_server_purges_nodes( - self, sync_client, swarm_session - ): - """ - Write and fully sync a large file, then delete via server. - Verify all client nodes are purged. - """ - filename = _unique("case7_large") - workspace = swarm_session - - # Setup - r = _touch(sync_client, NODE_1, filename, self._content, workspace) - assert r.get("success"), f"Setup write failed: {r}" - - synced = _poll_until( - lambda: _cat(sync_client, NODE_2, filename, workspace), - timeout=LARGE_FILE_TIMEOUT, - ) - assert synced is not None, f"[Case 7] Setup: large file did not reach {NODE_2}." - - print(f"\n[Case 7] Deleting large file {filename!r} from server") - _rm(sync_client, NODE_1, filename, workspace) - - gone_node2 = _poll_until( - lambda: _file_missing(sync_client, NODE_2, filename, workspace), - timeout=SMALL_FILE_TIMEOUT, - ) - assert gone_node2, ( - f"[Case 7] Large file still present on {NODE_2} after server delete." - ) - print(f"[Case 7] βœ… {NODE_2} purged the large file.") - - gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace) - assert gone_node1, ( - f"[Case 7] Large file still present on {NODE_1} after server delete." - ) - print(f"[Case 7] βœ… {NODE_1} purged the large file.") - - # ── Case 8: delete large file from node-2 β†’ server + node-1 ─────────── - def test_case8_delete_large_file_from_node2_purges_server_and_node1( - self, sync_client, swarm_session - ): - """ - Write and sync a large file, then delete it FROM node-2. - Verify Hub mirror and node-1 are both purged. - """ - filename = _unique("case8_large") - workspace = swarm_session - - # Setup - r = _touch(sync_client, NODE_1, filename, self._content, workspace) - assert r.get("success"), f"Setup write failed: {r}" - - synced = _poll_until( - lambda: _cat(sync_client, NODE_2, filename, workspace), - timeout=LARGE_FILE_TIMEOUT, - ) - assert synced is not None, f"[Case 8] Setup: large file did not reach {NODE_2}." - - print(f"\n[Case 8] Deleting large file {filename!r} from {NODE_2}") - _rm(sync_client, NODE_2, filename, workspace) - - gone_server = _poll_until( - lambda: _file_missing(sync_client, NODE_1, filename, workspace), - timeout=SMALL_FILE_TIMEOUT, - ) - assert gone_server, ( - f"[Case 8] Large file still on Hub mirror after {NODE_2} delete." - ) - print(f"[Case 8] βœ… Hub mirror purged the large file.") - - gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace) - assert gone_node1, ( - f"[Case 8] Large file still on {NODE_1} after {NODE_2} delete." - ) - print(f"[Case 8] βœ… {NODE_1} purged the large file.") diff --git a/ai-hub/integration_tests/test_node_registration.py b/ai-hub/integration_tests/test_node_registration.py index 7c812ae..6c2c50a 100644 --- a/ai-hub/integration_tests/test_node_registration.py +++ b/ai-hub/integration_tests/test_node_registration.py @@ -12,16 +12,13 @@ def _headers(): return {"X-User-ID": _get_user_id(), "Authorization": os.getenv("SYNC_TEST_AUTH_TOKEN", "")} -def test_node_registration_flow(): - """ - Tests the Node Registration API flow from an Administrator standpoint. - 1. Registers a new test node with a unique ID and proper skill configurations. - 2. Verifies an invite token is generated strictly successfully. - 3. Confirms the Node metadata correctly persists and is accessible via the admin listing endpoint. - """ +import subprocess +import time + +def test_node_full_lifecycle_and_api_coverage(): user_id = _get_user_id() node_id = f"test-integration-node-{uuid.uuid4().hex[:8]}" - display_name = f"Integration {node_id}" + display_name = f"Integration Lifecycle {node_id}" payload = { "node_id": node_id, @@ -30,34 +27,173 @@ "skill_config": {"shell": {"enabled": True}, "sync": {"enabled": True}} } - with httpx.Client(timeout=15.0) as client: - # Step 1: Register New Node - r_node = client.post( - f"{BASE_URL}/nodes/admin", - params={"admin_id": user_id}, - json=payload, - headers=_headers() - ) - - assert r_node.status_code == 200, f"Node registration failed unexpectedly: {r_node.text}" - - node_data = r_node.json() - assert node_data["node_id"] == node_id, "Payload collision: Returned node ID parameter does not match registration generation." - assert "invite_token" in node_data, "Critcal field drop: Invite token completely absent in valid creation serialization response" - assert len(node_data["invite_token"]) > 0, "Invite token string resolved fully blank." + try: + with httpx.Client(timeout=15.0) as client: + # --- ADMIN ENDPOINTS --- + # POST /nodes/admin + r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": user_id}, json=payload, headers=_headers()) + assert r_node.status_code == 200 + node_data = r_node.json() + invite_token = node_data["invite_token"] - # Step 2: Ensure Node exists natively in the database via the admin nodes listing - r_list = client.get( - f"{BASE_URL}/nodes/admin", - params={"admin_id": user_id}, - headers=_headers() - ) - assert r_list.status_code == 200, "Node listing retrieval critically failed." - - node_list = r_list.json() - # Find our freshly pushed node in the complete node listing array map - matched_node = next((n for n in node_list if n["node_id"] == node_id), None) - - assert matched_node is not None, "Node completely failed to persist across DB contexts despite positive returned 200 serialization creation code." - assert matched_node["display_name"] == display_name, "Node display name persisted to DB unaligned." - assert matched_node["is_active"] is True, "Node default is_active flag failed persistence" + # GET /nodes/admin + r_list = client.get(f"{BASE_URL}/nodes/admin", params={"admin_id": user_id}, headers=_headers()) + assert r_list.status_code == 200 + assert any(n["node_id"] == node_id for n in r_list.json()) + + # GET /nodes/admin/{node_id} + r_get = client.get(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, headers=_headers()) + assert r_get.status_code == 200 + + # PATCH /nodes/admin/{node_id} + r_patch = client.patch(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, json={"display_name": "Updated Name"}, headers=_headers()) + assert r_patch.status_code == 200 + + # POST /nodes/admin/{node_id}/access + group_r = client.post(f"{BASE_URL}/users/admin/groups", json={"name": f"Group for {node_id}"}, headers=_headers()) + group_id = group_r.json()["id"] + client.put(f"{BASE_URL}/users/admin/users/{user_id}/group", json={"group_id": group_id}, headers=_headers()) + + acc_r = client.post(f"{BASE_URL}/nodes/admin/{node_id}/access", params={"admin_id": user_id}, json={"group_id": group_id, "access_level": "use"}, headers=_headers()) + assert acc_r.status_code == 200 + + # DELETE /nodes/admin/{node_id}/access/{group_id} (Revoke and re-grant for test) + rev_r = client.delete(f"{BASE_URL}/nodes/admin/{node_id}/access/{group_id}", params={"admin_id": user_id}, headers=_headers()) + assert rev_r.status_code == 200 + client.post(f"{BASE_URL}/nodes/admin/{node_id}/access", params={"admin_id": user_id}, json={"group_id": group_id, "access_level": "use"}, headers=_headers()) + + # GET /nodes/admin/{node_id}/config.yaml + conf_r = client.get(f"{BASE_URL}/nodes/admin/{node_id}/config.yaml", params={"admin_id": user_id}, headers=_headers()) + assert conf_r.status_code == 200 + + # GET /nodes/provision/{node_id} + prov_r = client.get(f"{BASE_URL}/nodes/provision/{node_id}", params={"token": invite_token}, headers=_headers()) + assert prov_r.status_code == 200 + + # GET /nodes/admin/{node_id}/download + dl_r = client.get(f"{BASE_URL}/nodes/admin/{node_id}/download", params={"admin_id": user_id}, headers=_headers()) + assert dl_r.status_code == 200 + + # POST /nodes/validate-token (Internal) + val_r = client.post(f"{BASE_URL}/nodes/validate-token", params={"token": invite_token, "node_id": node_id}, headers=_headers()) + assert val_r.status_code == 200 + + # --- SPAWN NODE IMPERATIVELY --- + network = "cortex-hub_default" + image_proc = subprocess.run(["docker", "build", "-q", "./agent-node"], capture_output=True, text=True) + image_id = image_proc.stdout.strip() + + subprocess.run([ + "docker", "run", "-d", "--name", node_id, "--network", network, + "-e", f"HUB_URL=http://ai-hub:8000", "-e", f"AGENT_NODE_ID={node_id}", "-e", f"AGENT_AUTH_TOKEN={invite_token}", + "-e", "GRPC_ENDPOINT=ai-hub:50051", "-e", "AGENT_TLS_ENABLED=false", + "-v", f"{node_id}_sync:/tmp/cortex-sync", image_id + ], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + # Wait for connection + connected = False + for _ in range(30): + st = client.get(f"{BASE_URL}/nodes/{node_id}/status", headers=_headers()) + if st.status_code == 200 and st.json().get("status") == "online": + connected = True + break + time.sleep(1) + assert connected, "Node never successfully connected to Hub" + + # --- USER ENDPOINTS --- + # GET /nodes/ + n_list = client.get(f"{BASE_URL}/nodes/", params={"user_id": user_id}, headers=_headers()) + assert n_list.status_code == 200 + + # GET /nodes/{node_id}/status (Already tested above) + + # GET /nodes/{node_id}/terminal + term_r = client.get(f"{BASE_URL}/nodes/{node_id}/terminal", headers=_headers()) + assert term_r.status_code == 200 + + # POST /nodes/{node_id}/dispatch + dp_r = client.post(f"{BASE_URL}/nodes/{node_id}/dispatch", params={"user_id": user_id}, json={"command": "ls -la /"}, headers=_headers()) + assert dp_r.status_code == 200 + task_id = dp_r.json().get("task_id") + + # POST /nodes/{node_id}/cancel + can_r = client.post(f"{BASE_URL}/nodes/{node_id}/cancel", params={"task_id": task_id}, headers=_headers()) + assert can_r.status_code == 200 + + # PATCH & GET /nodes/preferences + pref_p = client.patch(f"{BASE_URL}/nodes/preferences", params={"user_id": user_id}, json={"default_node_ids": [node_id]}, headers=_headers()) + assert pref_p.status_code == 200 + pref_g = client.get(f"{BASE_URL}/nodes/preferences", params={"user_id": user_id}, headers=_headers()) + assert pref_g.status_code == 200 + + # --- FILE NAVIGATOR ENDPOINTS --- + r_sess = client.post(f"{BASE_URL}/sessions/", headers=_headers(), json={"user_id": user_id, "provider_name": "gemini", "feature_name": "swarm_control"}) + sess_id = str(r_sess.json().get("id")) + + # POST /fs/touch + test_fname = f"test_file_{uuid.uuid4().hex[:6]}.txt" + test_file_path = test_fname # NO LEADING SLASH + fs_touch = client.post(f"{BASE_URL}/nodes/{node_id}/fs/touch", json={"path": test_file_path, "is_dir": False, "session_id": sess_id}, headers=_headers()) + assert fs_touch.status_code == 200, fs_touch.text + + # GET /fs/ls (POLL) + found = False + for _ in range(5): + fs_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params={"path": "/", "session_id": sess_id}, headers=_headers()) + assert fs_ls.status_code == 200 + items = fs_ls.json().get("files", []) + if any(item.get("name") == test_fname for item in items): + found = True + break + time.sleep(1) + assert found, f"Expected {test_fname} not found in ls output: {items}" + + # POST /fs/upload + files = {"file": ("test_file2.txt", b"Hello Cortex!")} + fs_up = client.post(f"{BASE_URL}/nodes/{node_id}/fs/upload", params={"path": "/", "session_id": sess_id}, files=files, headers=_headers()) + assert fs_up.status_code == 200 + + # GET /fs/cat (POLL) + found_content = False + for _ in range(5): + fs_cat = client.get(f"{BASE_URL}/nodes/{node_id}/fs/cat", params={"path": "test_file2.txt", "session_id": sess_id}, headers=_headers()) + if fs_cat.status_code == 200 and getattr(fs_cat, "text", "") and "Hello Cortex!" in getattr(fs_cat, "text", ""): + found_content = True + break + time.sleep(1) + assert found_content, "Uploaded content not returned by cat" + + # GET /fs/download + fs_dl = client.get(f"{BASE_URL}/nodes/{node_id}/fs/download", params={"path": "test_file2.txt", "session_id": sess_id}, headers=_headers()) + assert fs_dl.status_code == 200 + assert fs_dl.content == b"Hello Cortex!" + + # POST /fs/rm (Delete both files) + fs_rm = client.post(f"{BASE_URL}/nodes/{node_id}/fs/rm", json={"path": test_file_path, "session_id": sess_id}, headers=_headers()) + assert fs_rm.status_code == 200 + client.post(f"{BASE_URL}/nodes/{node_id}/fs/rm", json={"path": "test_file2.txt", "session_id": sess_id}, headers=_headers()) + + # Verify deletion with GET /fs/cat returning 404 (POLL) + deleted = False + last_err = "" + for _ in range(5): + fs_cat_404 = client.get(f"{BASE_URL}/nodes/{node_id}/fs/cat", params={"path": test_file_path, "session_id": sess_id}, headers=_headers()) + if fs_cat_404.status_code == 404: + deleted = True + break + else: + last_err = f"Code: {fs_cat_404.status_code}, Text: {fs_cat_404.text}" + time.sleep(1) + assert deleted, f"File was not deleted. Last response: {last_err}" + + # --- TEARDOWN --- + # DELETE /nodes/admin/{node_id} + del_r = client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, headers=_headers()) + assert del_r.status_code == 200 + + # POST /nodes/admin/mesh/reset + reset_r = client.post(f"{BASE_URL}/nodes/admin/mesh/reset", params={"admin_id": user_id}, headers=_headers()) + assert reset_r.status_code == 200 + + finally: + subprocess.run(["docker", "rm", "-f", node_id], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) diff --git a/run_integration_tests.sh b/run_integration_tests.sh index d169837..735031a 100755 --- a/run_integration_tests.sh +++ b/run_integration_tests.sh @@ -100,7 +100,18 @@ echo "==========================================" source /tmp/venv/bin/activate || echo "No venv found, hoping pytest is in global PATH." -pytest ai-hub/integration_tests/ -v +TEST_TARGETS=() +for arg in "$@"; do + if [[ "$arg" != "--no-rebuild" ]]; then + TEST_TARGETS+=("$arg") + fi +done + +if [ ${#TEST_TARGETS[@]} -eq 0 ]; then + TEST_TARGETS=("ai-hub/integration_tests/") +fi + +pytest "${TEST_TARGETS[@]}" -v if [ "$NO_REBUILD" = false ] || [ "$IS_RUNNING" = false ]; then echo "=========================================="