diff --git a/ai-hub/integration_tests/conftest.py b/ai-hub/integration_tests/conftest.py index 97d1da2..b89a046 100644 --- a/ai-hub/integration_tests/conftest.py +++ b/ai-hub/integration_tests/conftest.py @@ -1,8 +1,40 @@ import httpx +import pytest import pytest_asyncio BASE_URL = "http://127.0.0.1:8001" + +def pytest_configure(config): + config.addinivalue_line( + "markers", + "requires_nodes: tests that need live agent nodes; skipped if hub/nodes are offline", + ) + + +def pytest_collection_modifyitems(config, items): + """ + Auto-skip any test marked with @pytest.mark.requires_nodes when the Hub + or its nodes are unreachable. + """ + import os + base = os.getenv("SYNC_TEST_BASE_URL", BASE_URL) + skip = pytest.mark.skip(reason="Hub/nodes not reachable — skipping file sync integration tests") + try: + import httpx as _httpx + # Probe the hub with a simple endpoint; any 2xx/4xx means the server is up + with _httpx.Client(base_url=base, timeout=5.0) as c: + r = c.get("/", follow_redirects=True) + hub_up = r.status_code < 500 + except Exception: + hub_up = False + + if not hub_up: + for item in items: + if item.get_closest_marker("requires_nodes"): + item.add_marker(skip) + + @pytest_asyncio.fixture(scope="session") def base_url(): """Fixture to provide the base URL for the tests.""" @@ -56,9 +88,4 @@ # Teardown: Delete the document after the test delete_response = await http_client.delete(f"/documents/{document_id}") - assert delete_response.status_code == 200 - - - # Teardown: Delete the document after the test - delete_response = await http_client.delete(f"/documents/{document_id}") assert delete_response.status_code == 200 \ No newline at end of file diff --git a/ai-hub/integration_tests/test_file_sync.py b/ai-hub/integration_tests/test_file_sync.py new file mode 100644 index 0000000..7e344fa --- /dev/null +++ b/ai-hub/integration_tests/test_file_sync.py @@ -0,0 +1,539 @@ +""" +File Sync Integration Tests +============================ +Verifies the end-to-end mesh file synchronisation behaviour across nodes and +the Hub server mirror. These tests run against a *live* deployment that has +at least two test nodes connected: + + • test-node-1 (NODE_1 constant) + • test-node-2 (NODE_2 constant) + +The Hub exposes REST endpoints used to: + - read files: GET /api/v1/nodes/{node_id}/fs/cat + - list dirs: GET /api/v1/nodes/{node_id}/fs/ls + - write files: POST /api/v1/nodes/{node_id}/fs/touch + - delete: POST /api/v1/nodes/{node_id}/fs/rm + +A shared swarm-control session is created once per test module so that all +nodes are in the same mesh workspace, and file operations propagate correctly. + +Environment assumptions +----------------------- + BASE_URL http://127.0.0.1:8001 (set by conftest.py) + NODE_1 test-node-1 + NODE_2 test-node-2 + USER_ID integration_tester_sync (auto-provisioned if absent) + TIMEOUT 10 s for small files, 60 s for 20 MB files +""" + +import os +import time +import uuid +import hashlib +import pytest +import httpx + +# ── Configuration ────────────────────────────────────────────────────────────── +BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8001") +USER_ID = "integration_tester_sync" +NODE_1 = os.getenv("SYNC_TEST_NODE1", "test-node-1") +NODE_2 = os.getenv("SYNC_TEST_NODE2", "test-node-2") + +SMALL_FILE_TIMEOUT = 10 # seconds +LARGE_FILE_TIMEOUT = 60 # seconds (20 MB) +LARGE_FILE_SIZE_MB = 20 +POLL_INTERVAL = 0.5 # seconds + +# Paths (note: the test server mounts routes with no /api/v1 prefix) +SESSIONS_PATH = "/sessions" +NODES_PATH = "/nodes" + + +# ── Module-level: skip the whole file if nodes are not online ────────────────── +def pytest_configure(config): + config.addinivalue_line( + "markers", + "requires_nodes: mark test as requiring live agent nodes to be connected", + ) + + +pytestmark = pytest.mark.requires_nodes + + +# ── Helpers ───────────────────────────────────────────────────────────────────── + +def _headers(): + return {"X-User-ID": USER_ID} + + +def _unique(prefix="synctest"): + return f"{prefix}_{uuid.uuid4().hex[:8]}.txt" + + +def _large_content(mb: int = LARGE_FILE_SIZE_MB) -> str: + """Return a UTF-8 string of approximately `mb` megabytes.""" + line = "A" * 1023 + "\n" # 1 KB per line + return line * (mb * 1024) # mb * 1024 lines ≈ mb MB + + +def _poll_until(fn, timeout: float, interval: float = POLL_INTERVAL): + """ + Repeatedly call fn() until it returns a truthy value or timeout expires. + Returns the last return value of fn(). + """ + deadline = time.time() + timeout + last = None + while time.time() < deadline: + try: + last = fn() + if last: + return last + except Exception: + pass + time.sleep(interval) + return last + + +def _cat(client: httpx.Client, node_id: str, path: str, session_id: str) -> str | None: + """Read a file from a node; return its text content or None on error.""" + r = client.get( + f"{NODES_PATH}/{node_id}/fs/cat", + params={"path": path, "session_id": session_id}, + headers=_headers(), + ) + if r.status_code == 200: + return r.json().get("content", "") + return None + + +def _mirror_cat(client: httpx.Client, path: str, session_id: str) -> str | None: + """ + Read a file from the Hub server mirror directly by asking node-1 for it + using the session_id workspace (the Hub mirror is queried when the node + reflects a workspace file). + + For the server-side write tests we can call the same endpoint but the + source is the Hub mirror, not the live node FS. + """ + # The Hub's /cat endpoint fetches from node, then caches in mirror. + # For "server wrote it → does node have it?" we ask the node directly. + return _cat(client, NODE_1, path, session_id) + + +def _touch( + client: httpx.Client, + node_id: str, + path: str, + content: str, + session_id: str, + is_dir: bool = False, +) -> dict: + """Write a file to a node via the REST API.""" + r = client.post( + f"{NODES_PATH}/{node_id}/fs/touch", + json={"path": path, "content": content, "is_dir": is_dir, "session_id": session_id}, + headers=_headers(), + timeout=120.0, + ) + r.raise_for_status() + return r.json() + + +def _rm(client: httpx.Client, node_id: str, path: str, session_id: str) -> dict: + """Delete a file from a node via the REST API.""" + r = client.post( + f"{NODES_PATH}/{node_id}/fs/rm", + json={"path": path, "session_id": session_id}, + headers=_headers(), + timeout=30.0, + ) + r.raise_for_status() + return r.json() + + +def _file_missing(client: httpx.Client, node_id: str, path: str, session_id: str) -> bool: + """Return True if the file does NOT exist on the node.""" + r = client.get( + f"{NODES_PATH}/{node_id}/fs/cat", + params={"path": path, "session_id": session_id}, + headers=_headers(), + ) + return r.status_code != 200 + + +# ── Session fixture ───────────────────────────────────────────────────────────── + +@pytest.fixture(scope="module") +def sync_client(): + """Synchronous httpx client for the whole module (avoids asyncio overhead).""" + with httpx.Client(base_url=BASE_URL, timeout=60.0) as c: + # Quick connectivity + node-online check + try: + r = c.get(f"{NODES_PATH}/{NODE_1}/status", headers=_headers()) + if r.status_code not in (200, 404): + pytest.skip(f"{NODE_1} unreachable — hub returned {r.status_code}") + except Exception as exc: + pytest.skip(f"Hub unreachable at {BASE_URL}: {exc}") + yield c + + +@pytest.fixture(scope="module") +def swarm_session(sync_client: httpx.Client) -> str: + """ + Create (or reuse) one swarm_control session that has both test nodes + attached. Returned value is the workspace_id string used by all sync ops. + """ + # Create the session + r = sync_client.post( + f"{SESSIONS_PATH}/", + json={"user_id": USER_ID, "provider_name": "deepseek", "feature_name": "swarm_control"}, + headers=_headers(), + ) + assert r.status_code == 200, f"Create session failed: {r.text}" + session_id = r.json()["id"] + + # Attach both nodes with source="empty" so they both watch the workspace + r2 = sync_client.post( + f"{SESSIONS_PATH}/{session_id}/nodes", + json={"node_ids": [NODE_1, NODE_2], "config": {"source": "empty"}}, + headers=_headers(), + ) + assert r2.status_code == 200, f"Attach nodes failed: {r2.text}" + workspace_id = r2.json().get("sync_workspace_id") + assert workspace_id, "Expected sync_workspace_id in response" + + # Give nodes a moment to ACK the workspace and start watching + time.sleep(2.0) + + yield workspace_id + + # Teardown: archive the session + sync_client.delete(f"{SESSIONS_PATH}/{session_id}", headers=_headers()) + + +# ══════════════════════════════════════════════════════════════════════════════ +# SMALL FILE TESTS (< 1 chunk) +# ══════════════════════════════════════════════════════════════════════════════ + +class TestSmallFileSync: + """Cases 1–4: single small-file create + delete in both directions.""" + + # ── Case 1: node-1 → node-2 + server ─────────────────────────────────── + def test_case1_write_from_node1_visible_on_node2_and_server( + self, sync_client, swarm_session + ): + """ + Write a file from test-node-1; verify test-node-2 AND the server + mirror receive it within SMALL_FILE_TIMEOUT seconds. + """ + filename = _unique("case1") + content = f"Case 1 payload – node-1 → mesh – {uuid.uuid4()}" + workspace = swarm_session + + print(f"\n[Case 1] Writing {filename!r} to {NODE_1} in workspace {workspace}") + result = _touch(sync_client, NODE_1, filename, content, workspace) + assert result.get("success"), f"Write failed: {result}" + + # Verify on node-2 + node2_content = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert node2_content is not None, ( + f"[Case 1] File '{filename}' did NOT appear on {NODE_2} within " + f"{SMALL_FILE_TIMEOUT}s" + ) + assert content in node2_content, ( + f"[Case 1] Content mismatch on {NODE_2}. Got: {node2_content!r}" + ) + print(f"[Case 1] ✅ {NODE_2} received the file.") + + # Verify on Hub server mirror (query node-1 with session scope uses mirror) + server_content = _poll_until( + lambda: _cat(sync_client, NODE_1, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert server_content is not None and content in server_content, ( + f"[Case 1] File '{filename}' not found on Hub mirror." + ) + print(f"[Case 1] ✅ Hub mirror has the file.") + + # ── Case 2: server → node-1 + node-2 ─────────────────────────────────── + def test_case2_write_from_server_visible_on_all_nodes( + self, sync_client, swarm_session + ): + """ + Write a file via the server (the touch endpoint dispatches to all nodes + in the session + writes to Hub mirror). Verify both client nodes and + the mirror reflect it. + """ + filename = _unique("case2") + content = f"Case 2 payload – server → mesh – {uuid.uuid4()}" + workspace = swarm_session + + print(f"\n[Case 2] Writing {filename!r} via server to workspace {workspace}") + # Intentionally write via node-1 (server-dispatched; Hub mirror updated first) + result = _touch(sync_client, NODE_1, filename, content, workspace) + assert result.get("success"), f"Write failed: {result}" + + # Node-2 should receive via broadcast + node2_content = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert node2_content is not None and content in node2_content, ( + f"[Case 2] File '{filename}' did NOT appear on {NODE_2}." + ) + print(f"[Case 2] ✅ {NODE_2} received the file.") + + # Node-1 should also have it (it was written directly to it and mirrored) + node1_content = _cat(sync_client, NODE_1, filename, workspace) + assert node1_content is not None and content in node1_content, ( + f"[Case 2] File '{filename}' not found on {NODE_1}." + ) + print(f"[Case 2] ✅ {NODE_1} has the file.") + + # ── Case 3: delete from server → nodes purged ────────────────────────── + def test_case3_delete_from_server_purges_client_nodes( + self, sync_client, swarm_session + ): + """ + Create a file via server, then delete it via the server endpoint. + Verify both client nodes no longer have the file. + """ + filename = _unique("case3") + content = f"Case 3 – to be deleted by server – {uuid.uuid4()}" + workspace = swarm_session + + # Setup: write the file from node-1 (server-side orchestrated) + r = _touch(sync_client, NODE_1, filename, content, workspace) + assert r.get("success"), f"Setup write failed: {r}" + + # Make sure node-2 got it before we delete + got = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert got is not None, f"[Case 3] Setup: file not on {NODE_2}." + + print(f"\n[Case 3] Deleting {filename!r} from server (via {NODE_1} endpoint)") + _rm(sync_client, NODE_1, filename, workspace) + + # node-2 should no longer have the file + gone_node2 = _poll_until( + lambda: _file_missing(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert gone_node2, ( + f"[Case 3] File '{filename}' still present on {NODE_2} after server delete." + ) + print(f"[Case 3] ✅ {NODE_2} no longer has the file.") + + # node-1 should also have it gone + gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace) + assert gone_node1, ( + f"[Case 3] File '{filename}' still present on {NODE_1} after server delete." + ) + print(f"[Case 3] ✅ {NODE_1} no longer has the file.") + + # ── Case 4: delete from node-2 → server + node-1 purged ─────────────── + def test_case4_delete_from_node2_purges_server_and_node1( + self, sync_client, swarm_session + ): + """ + Create a file, let it propagate, then delete it FROM node-2. + Verify the Hub mirror and node-1 no longer have the file. + """ + filename = _unique("case4") + content = f"Case 4 – to be deleted from node-2 – {uuid.uuid4()}" + workspace = swarm_session + + # Setup: write from node-1 so both nodes and mirror have it + r = _touch(sync_client, NODE_1, filename, content, workspace) + assert r.get("success"), f"Setup write failed: {r}" + + got_node2 = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert got_node2 is not None, f"[Case 4] Setup: file did not reach {NODE_2}." + + print(f"\n[Case 4] Deleting {filename!r} from {NODE_2}") + _rm(sync_client, NODE_2, filename, workspace) + + # Hub mirror (observed via node-1's workspace view) should purge + gone_server = _poll_until( + lambda: _file_missing(sync_client, NODE_1, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert gone_server, ( + f"[Case 4] File '{filename}' still present on Hub mirror after node-2 delete." + ) + print(f"[Case 4] ✅ Hub mirror no longer has the file.") + + # node-1 should also be purged + gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace) + assert gone_node1, ( + f"[Case 4] File '{filename}' still present on {NODE_1} after node-2 delete." + ) + print(f"[Case 4] ✅ {NODE_1} no longer has the file.") + + +# ══════════════════════════════════════════════════════════════════════════════ +# LARGE FILE TESTS (20 MB, multi-chunk) +# ══════════════════════════════════════════════════════════════════════════════ + +class TestLargeFileSync: + """Cases 5–8: 20 MB file create + delete in both directions.""" + + @pytest.fixture(scope="class", autouse=True) + def _large_content(self): + """Pre-build the 20 MB string once per class to save CPU time.""" + self.__class__._content = _large_content(LARGE_FILE_SIZE_MB) + self.__class__._expected_hash = hashlib.sha256( + self._content.encode() + ).hexdigest() + + # ── Case 5: 20 MB from node-1 → server + node-2 ──────────────────────── + def test_case5_large_file_from_node1_to_server_and_node2( + self, sync_client, swarm_session + ): + """ + Create a 20 MB file from test-node-1. + Both server mirror and test-node-2 should receive it within + LARGE_FILE_TIMEOUT seconds. + """ + filename = _unique("case5_large") + workspace = swarm_session + + print(f"\n[Case 5] Writing {LARGE_FILE_SIZE_MB} MB file {filename!r} from {NODE_1}") + result = _touch(sync_client, NODE_1, filename, self._content, workspace) + assert result.get("success"), f"Write failed: {result}" + + # Verify node-2 received the file + node2_content = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=LARGE_FILE_TIMEOUT, + ) + assert node2_content is not None, ( + f"[Case 5] Large file did NOT appear on {NODE_2} within {LARGE_FILE_TIMEOUT}s." + ) + got_hash = hashlib.sha256(node2_content.encode()).hexdigest() + assert got_hash == self._expected_hash, ( + f"[Case 5] Hash mismatch on {NODE_2}. Expected {self._expected_hash}, got {got_hash}" + ) + print(f"[Case 5] ✅ {NODE_2} received and verified 20 MB large file.") + + # Verify server mirror + mirror_content = _cat(sync_client, NODE_1, filename, workspace) + assert mirror_content is not None, ( + f"[Case 5] Large file not on Hub mirror." + ) + print(f"[Case 5] ✅ Hub mirror has the large file.") + + # ── Case 6: 20 MB from server → node-1 + node-2 ──────────────────────── + def test_case6_large_file_from_server_to_all_nodes( + self, sync_client, swarm_session + ): + """ + Write a 20 MB file via the server (touch endpoint with session scope). + Both client nodes should receive it within LARGE_FILE_TIMEOUT seconds. + """ + filename = _unique("case6_large") + workspace = swarm_session + + print(f"\n[Case 6] Writing {LARGE_FILE_SIZE_MB} MB file {filename!r} via server") + result = _touch(sync_client, NODE_1, filename, self._content, workspace) + assert result.get("success"), f"Write failed: {result}" + + # node-2 receives via mesh broadcast + node2_ok = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=LARGE_FILE_TIMEOUT, + ) + assert node2_ok is not None, ( + f"[Case 6] Large file did NOT appear on {NODE_2} within {LARGE_FILE_TIMEOUT}s." + ) + print(f"[Case 6] ✅ {NODE_2} received the 20 MB file.") + + node1_ok = _cat(sync_client, NODE_1, filename, workspace) + assert node1_ok is not None, f"[Case 6] File not on {NODE_1}." + print(f"[Case 6] ✅ {NODE_1} has the 20 MB file.") + + # ── Case 7: delete large file from server → nodes purged ─────────────── + def test_case7_delete_large_file_from_server_purges_nodes( + self, sync_client, swarm_session + ): + """ + Write and fully sync a large file, then delete via server. + Verify all client nodes are purged. + """ + filename = _unique("case7_large") + workspace = swarm_session + + # Setup + r = _touch(sync_client, NODE_1, filename, self._content, workspace) + assert r.get("success"), f"Setup write failed: {r}" + + synced = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=LARGE_FILE_TIMEOUT, + ) + assert synced is not None, f"[Case 7] Setup: large file did not reach {NODE_2}." + + print(f"\n[Case 7] Deleting large file {filename!r} from server") + _rm(sync_client, NODE_1, filename, workspace) + + gone_node2 = _poll_until( + lambda: _file_missing(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert gone_node2, ( + f"[Case 7] Large file still present on {NODE_2} after server delete." + ) + print(f"[Case 7] ✅ {NODE_2} purged the large file.") + + gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace) + assert gone_node1, ( + f"[Case 7] Large file still present on {NODE_1} after server delete." + ) + print(f"[Case 7] ✅ {NODE_1} purged the large file.") + + # ── Case 8: delete large file from node-2 → server + node-1 ─────────── + def test_case8_delete_large_file_from_node2_purges_server_and_node1( + self, sync_client, swarm_session + ): + """ + Write and sync a large file, then delete it FROM node-2. + Verify Hub mirror and node-1 are both purged. + """ + filename = _unique("case8_large") + workspace = swarm_session + + # Setup + r = _touch(sync_client, NODE_1, filename, self._content, workspace) + assert r.get("success"), f"Setup write failed: {r}" + + synced = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=LARGE_FILE_TIMEOUT, + ) + assert synced is not None, f"[Case 8] Setup: large file did not reach {NODE_2}." + + print(f"\n[Case 8] Deleting large file {filename!r} from {NODE_2}") + _rm(sync_client, NODE_2, filename, workspace) + + gone_server = _poll_until( + lambda: _file_missing(sync_client, NODE_1, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert gone_server, ( + f"[Case 8] Large file still on Hub mirror after {NODE_2} delete." + ) + print(f"[Case 8] ✅ Hub mirror purged the large file.") + + gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace) + assert gone_node1, ( + f"[Case 8] Large file still on {NODE_1} after {NODE_2} delete." + ) + print(f"[Case 8] ✅ {NODE_1} purged the large file.")