"""
File Sync Integration Tests
============================
Verifies the end-to-end mesh file synchronisation behaviour across nodes and
the Hub server mirror. These tests run against a *live* deployment that has
at least two test nodes connected:
• test-node-1 (NODE_1 constant)
• test-node-2 (NODE_2 constant)
The Hub exposes REST endpoints used to:
- read files: GET /api/v1/nodes/{node_id}/fs/cat
- list dirs: GET /api/v1/nodes/{node_id}/fs/ls
- write files: POST /api/v1/nodes/{node_id}/fs/touch
- delete: POST /api/v1/nodes/{node_id}/fs/rm
A shared swarm-control session is created once per test module so that all
nodes are in the same mesh workspace, and file operations propagate correctly.
Environment assumptions
-----------------------
BASE_URL http://127.0.0.1:8001 (set by conftest.py)
NODE_1 test-node-1
NODE_2 test-node-2
USER_ID integration_tester_sync (auto-provisioned if absent)
TIMEOUT 10 s for small files, 60 s for 20 MB files
"""
import os
import time
import uuid
import hashlib
import pytest
import httpx
# ── Configuration ──────────────────────────────────────────────────────────────
BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8001")
USER_ID = "integration_tester_sync"
NODE_1 = os.getenv("SYNC_TEST_NODE1", "test-node-1")
NODE_2 = os.getenv("SYNC_TEST_NODE2", "test-node-2")
SMALL_FILE_TIMEOUT = 10 # seconds
LARGE_FILE_TIMEOUT = 60 # seconds (20 MB)
LARGE_FILE_SIZE_MB = 20
POLL_INTERVAL = 0.5 # seconds
# Paths (note: the test server mounts routes with no /api/v1 prefix)
SESSIONS_PATH = "/sessions"
NODES_PATH = "/nodes"
# ── Module-level: skip the whole file if nodes are not online ──────────────────
def pytest_configure(config):
config.addinivalue_line(
"markers",
"requires_nodes: mark test as requiring live agent nodes to be connected",
)
pytestmark = pytest.mark.requires_nodes
# ── Helpers ─────────────────────────────────────────────────────────────────────
def _headers():
return {"X-User-ID": USER_ID}
def _unique(prefix="synctest"):
return f"{prefix}_{uuid.uuid4().hex[:8]}.txt"
def _large_content(mb: int = LARGE_FILE_SIZE_MB) -> str:
"""Return a UTF-8 string of approximately `mb` megabytes."""
line = "A" * 1023 + "\n" # 1 KB per line
return line * (mb * 1024) # mb * 1024 lines ≈ mb MB
def _poll_until(fn, timeout: float, interval: float = POLL_INTERVAL):
"""
Repeatedly call fn() until it returns a truthy value or timeout expires.
Returns the last return value of fn().
"""
deadline = time.time() + timeout
last = None
while time.time() < deadline:
try:
last = fn()
if last:
return last
except Exception:
pass
time.sleep(interval)
return last
def _cat(client: httpx.Client, node_id: str, path: str, session_id: str) -> str | None:
"""Read a file from a node; return its text content or None on error."""
r = client.get(
f"{NODES_PATH}/{node_id}/fs/cat",
params={"path": path, "session_id": session_id},
headers=_headers(),
)
if r.status_code == 200:
return r.json().get("content", "")
return None
def _mirror_cat(client: httpx.Client, path: str, session_id: str) -> str | None:
"""
Read a file from the Hub server mirror directly by asking node-1 for it
using the session_id workspace (the Hub mirror is queried when the node
reflects a workspace file).
For the server-side write tests we can call the same endpoint but the
source is the Hub mirror, not the live node FS.
"""
# The Hub's /cat endpoint fetches from node, then caches in mirror.
# For "server wrote it → does node have it?" we ask the node directly.
return _cat(client, NODE_1, path, session_id)
def _touch(
client: httpx.Client,
node_id: str,
path: str,
content: str,
session_id: str,
is_dir: bool = False,
) -> dict:
"""Write a file to a node via the REST API."""
r = client.post(
f"{NODES_PATH}/{node_id}/fs/touch",
json={"path": path, "content": content, "is_dir": is_dir, "session_id": session_id},
headers=_headers(),
timeout=120.0,
)
r.raise_for_status()
return r.json()
def _rm(client: httpx.Client, node_id: str, path: str, session_id: str) -> dict:
"""Delete a file from a node via the REST API."""
r = client.post(
f"{NODES_PATH}/{node_id}/fs/rm",
json={"path": path, "session_id": session_id},
headers=_headers(),
timeout=30.0,
)
r.raise_for_status()
return r.json()
def _file_missing(client: httpx.Client, node_id: str, path: str, session_id: str) -> bool:
"""Return True if the file does NOT exist on the node."""
r = client.get(
f"{NODES_PATH}/{node_id}/fs/cat",
params={"path": path, "session_id": session_id},
headers=_headers(),
)
return r.status_code != 200
# ── Session fixture ─────────────────────────────────────────────────────────────
@pytest.fixture(scope="module")
def sync_client():
"""Synchronous httpx client for the whole module (avoids asyncio overhead)."""
with httpx.Client(base_url=BASE_URL, timeout=60.0) as c:
# Quick connectivity + node-online check
try:
r = c.get(f"{NODES_PATH}/{NODE_1}/status", headers=_headers())
if r.status_code not in (200, 404):
pytest.skip(f"{NODE_1} unreachable — hub returned {r.status_code}")
except Exception as exc:
pytest.skip(f"Hub unreachable at {BASE_URL}: {exc}")
yield c
@pytest.fixture(scope="module")
def swarm_session(sync_client: httpx.Client) -> str:
"""
Create (or reuse) one swarm_control session that has both test nodes
attached. Returned value is the workspace_id string used by all sync ops.
"""
# Create the session
r = sync_client.post(
f"{SESSIONS_PATH}/",
json={"user_id": USER_ID, "provider_name": "deepseek", "feature_name": "swarm_control"},
headers=_headers(),
)
assert r.status_code == 200, f"Create session failed: {r.text}"
session_id = r.json()["id"]
# Attach both nodes with source="empty" so they both watch the workspace
r2 = sync_client.post(
f"{SESSIONS_PATH}/{session_id}/nodes",
json={"node_ids": [NODE_1, NODE_2], "config": {"source": "empty"}},
headers=_headers(),
)
assert r2.status_code == 200, f"Attach nodes failed: {r2.text}"
workspace_id = r2.json().get("sync_workspace_id")
assert workspace_id, "Expected sync_workspace_id in response"
# Give nodes a moment to ACK the workspace and start watching
time.sleep(2.0)
yield workspace_id
# Teardown: archive the session
sync_client.delete(f"{SESSIONS_PATH}/{session_id}", headers=_headers())
# ══════════════════════════════════════════════════════════════════════════════
# SMALL FILE TESTS (< 1 chunk)
# ══════════════════════════════════════════════════════════════════════════════
class TestSmallFileSync:
"""Cases 1–4: single small-file create + delete in both directions."""
# ── Case 1: node-1 → node-2 + server ───────────────────────────────────
def test_case1_write_from_node1_visible_on_node2_and_server(
self, sync_client, swarm_session
):
"""
Write a file from test-node-1; verify test-node-2 AND the server
mirror receive it within SMALL_FILE_TIMEOUT seconds.
"""
filename = _unique("case1")
content = f"Case 1 payload – node-1 → mesh – {uuid.uuid4()}"
workspace = swarm_session
print(f"\n[Case 1] Writing {filename!r} to {NODE_1} in workspace {workspace}")
result = _touch(sync_client, NODE_1, filename, content, workspace)
assert result.get("success"), f"Write failed: {result}"
# Verify on node-2
node2_content = _poll_until(
lambda: _cat(sync_client, NODE_2, filename, workspace),
timeout=SMALL_FILE_TIMEOUT,
)
assert node2_content is not None, (
f"[Case 1] File '{filename}' did NOT appear on {NODE_2} within "
f"{SMALL_FILE_TIMEOUT}s"
)
assert content in node2_content, (
f"[Case 1] Content mismatch on {NODE_2}. Got: {node2_content!r}"
)
print(f"[Case 1] ✅ {NODE_2} received the file.")
# Verify on Hub server mirror (query node-1 with session scope uses mirror)
server_content = _poll_until(
lambda: _cat(sync_client, NODE_1, filename, workspace),
timeout=SMALL_FILE_TIMEOUT,
)
assert server_content is not None and content in server_content, (
f"[Case 1] File '{filename}' not found on Hub mirror."
)
print(f"[Case 1] ✅ Hub mirror has the file.")
# ── Case 2: server → node-1 + node-2 ───────────────────────────────────
def test_case2_write_from_server_visible_on_all_nodes(
self, sync_client, swarm_session
):
"""
Write a file via the server (the touch endpoint dispatches to all nodes
in the session + writes to Hub mirror). Verify both client nodes and
the mirror reflect it.
"""
filename = _unique("case2")
content = f"Case 2 payload – server → mesh – {uuid.uuid4()}"
workspace = swarm_session
print(f"\n[Case 2] Writing {filename!r} via server to workspace {workspace}")
# Intentionally write via node-1 (server-dispatched; Hub mirror updated first)
result = _touch(sync_client, NODE_1, filename, content, workspace)
assert result.get("success"), f"Write failed: {result}"
# Node-2 should receive via broadcast
node2_content = _poll_until(
lambda: _cat(sync_client, NODE_2, filename, workspace),
timeout=SMALL_FILE_TIMEOUT,
)
assert node2_content is not None and content in node2_content, (
f"[Case 2] File '{filename}' did NOT appear on {NODE_2}."
)
print(f"[Case 2] ✅ {NODE_2} received the file.")
# Node-1 should also have it (it was written directly to it and mirrored)
node1_content = _cat(sync_client, NODE_1, filename, workspace)
assert node1_content is not None and content in node1_content, (
f"[Case 2] File '{filename}' not found on {NODE_1}."
)
print(f"[Case 2] ✅ {NODE_1} has the file.")
# ── Case 3: delete from server → nodes purged ──────────────────────────
def test_case3_delete_from_server_purges_client_nodes(
self, sync_client, swarm_session
):
"""
Create a file via server, then delete it via the server endpoint.
Verify both client nodes no longer have the file.
"""
filename = _unique("case3")
content = f"Case 3 – to be deleted by server – {uuid.uuid4()}"
workspace = swarm_session
# Setup: write the file from node-1 (server-side orchestrated)
r = _touch(sync_client, NODE_1, filename, content, workspace)
assert r.get("success"), f"Setup write failed: {r}"
# Make sure node-2 got it before we delete
got = _poll_until(
lambda: _cat(sync_client, NODE_2, filename, workspace),
timeout=SMALL_FILE_TIMEOUT,
)
assert got is not None, f"[Case 3] Setup: file not on {NODE_2}."
print(f"\n[Case 3] Deleting {filename!r} from server (via {NODE_1} endpoint)")
_rm(sync_client, NODE_1, filename, workspace)
# node-2 should no longer have the file
gone_node2 = _poll_until(
lambda: _file_missing(sync_client, NODE_2, filename, workspace),
timeout=SMALL_FILE_TIMEOUT,
)
assert gone_node2, (
f"[Case 3] File '{filename}' still present on {NODE_2} after server delete."
)
print(f"[Case 3] ✅ {NODE_2} no longer has the file.")
# node-1 should also have it gone
gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace)
assert gone_node1, (
f"[Case 3] File '{filename}' still present on {NODE_1} after server delete."
)
print(f"[Case 3] ✅ {NODE_1} no longer has the file.")
# ── Case 4: delete from node-2 → server + node-1 purged ───────────────
def test_case4_delete_from_node2_purges_server_and_node1(
self, sync_client, swarm_session
):
"""
Create a file, let it propagate, then delete it FROM node-2.
Verify the Hub mirror and node-1 no longer have the file.
"""
filename = _unique("case4")
content = f"Case 4 – to be deleted from node-2 – {uuid.uuid4()}"
workspace = swarm_session
# Setup: write from node-1 so both nodes and mirror have it
r = _touch(sync_client, NODE_1, filename, content, workspace)
assert r.get("success"), f"Setup write failed: {r}"
got_node2 = _poll_until(
lambda: _cat(sync_client, NODE_2, filename, workspace),
timeout=SMALL_FILE_TIMEOUT,
)
assert got_node2 is not None, f"[Case 4] Setup: file did not reach {NODE_2}."
print(f"\n[Case 4] Deleting {filename!r} from {NODE_2}")
_rm(sync_client, NODE_2, filename, workspace)
# Hub mirror (observed via node-1's workspace view) should purge
gone_server = _poll_until(
lambda: _file_missing(sync_client, NODE_1, filename, workspace),
timeout=SMALL_FILE_TIMEOUT,
)
assert gone_server, (
f"[Case 4] File '{filename}' still present on Hub mirror after node-2 delete."
)
print(f"[Case 4] ✅ Hub mirror no longer has the file.")
# node-1 should also be purged
gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace)
assert gone_node1, (
f"[Case 4] File '{filename}' still present on {NODE_1} after node-2 delete."
)
print(f"[Case 4] ✅ {NODE_1} no longer has the file.")
# ══════════════════════════════════════════════════════════════════════════════
# LARGE FILE TESTS (20 MB, multi-chunk)
# ══════════════════════════════════════════════════════════════════════════════
class TestLargeFileSync:
"""Cases 5–8: 20 MB file create + delete in both directions."""
@pytest.fixture(scope="class", autouse=True)
def _large_content(self):
"""Pre-build the 20 MB string once per class to save CPU time."""
self.__class__._content = _large_content(LARGE_FILE_SIZE_MB)
self.__class__._expected_hash = hashlib.sha256(
self._content.encode()
).hexdigest()
# ── Case 5: 20 MB from node-1 → server + node-2 ────────────────────────
def test_case5_large_file_from_node1_to_server_and_node2(
self, sync_client, swarm_session
):
"""
Create a 20 MB file from test-node-1.
Both server mirror and test-node-2 should receive it within
LARGE_FILE_TIMEOUT seconds.
"""
filename = _unique("case5_large")
workspace = swarm_session
print(f"\n[Case 5] Writing {LARGE_FILE_SIZE_MB} MB file {filename!r} from {NODE_1}")
result = _touch(sync_client, NODE_1, filename, self._content, workspace)
assert result.get("success"), f"Write failed: {result}"
# Verify node-2 received the file
node2_content = _poll_until(
lambda: _cat(sync_client, NODE_2, filename, workspace),
timeout=LARGE_FILE_TIMEOUT,
)
assert node2_content is not None, (
f"[Case 5] Large file did NOT appear on {NODE_2} within {LARGE_FILE_TIMEOUT}s."
)
got_hash = hashlib.sha256(node2_content.encode()).hexdigest()
assert got_hash == self._expected_hash, (
f"[Case 5] Hash mismatch on {NODE_2}. Expected {self._expected_hash}, got {got_hash}"
)
print(f"[Case 5] ✅ {NODE_2} received and verified 20 MB large file.")
# Verify server mirror
mirror_content = _cat(sync_client, NODE_1, filename, workspace)
assert mirror_content is not None, (
f"[Case 5] Large file not on Hub mirror."
)
print(f"[Case 5] ✅ Hub mirror has the large file.")
# ── Case 6: 20 MB from server → node-1 + node-2 ────────────────────────
def test_case6_large_file_from_server_to_all_nodes(
self, sync_client, swarm_session
):
"""
Write a 20 MB file via the server (touch endpoint with session scope).
Both client nodes should receive it within LARGE_FILE_TIMEOUT seconds.
"""
filename = _unique("case6_large")
workspace = swarm_session
print(f"\n[Case 6] Writing {LARGE_FILE_SIZE_MB} MB file {filename!r} via server")
result = _touch(sync_client, NODE_1, filename, self._content, workspace)
assert result.get("success"), f"Write failed: {result}"
# node-2 receives via mesh broadcast
node2_ok = _poll_until(
lambda: _cat(sync_client, NODE_2, filename, workspace),
timeout=LARGE_FILE_TIMEOUT,
)
assert node2_ok is not None, (
f"[Case 6] Large file did NOT appear on {NODE_2} within {LARGE_FILE_TIMEOUT}s."
)
print(f"[Case 6] ✅ {NODE_2} received the 20 MB file.")
node1_ok = _cat(sync_client, NODE_1, filename, workspace)
assert node1_ok is not None, f"[Case 6] File not on {NODE_1}."
print(f"[Case 6] ✅ {NODE_1} has the 20 MB file.")
# ── Case 7: delete large file from server → nodes purged ───────────────
def test_case7_delete_large_file_from_server_purges_nodes(
self, sync_client, swarm_session
):
"""
Write and fully sync a large file, then delete via server.
Verify all client nodes are purged.
"""
filename = _unique("case7_large")
workspace = swarm_session
# Setup
r = _touch(sync_client, NODE_1, filename, self._content, workspace)
assert r.get("success"), f"Setup write failed: {r}"
synced = _poll_until(
lambda: _cat(sync_client, NODE_2, filename, workspace),
timeout=LARGE_FILE_TIMEOUT,
)
assert synced is not None, f"[Case 7] Setup: large file did not reach {NODE_2}."
print(f"\n[Case 7] Deleting large file {filename!r} from server")
_rm(sync_client, NODE_1, filename, workspace)
gone_node2 = _poll_until(
lambda: _file_missing(sync_client, NODE_2, filename, workspace),
timeout=SMALL_FILE_TIMEOUT,
)
assert gone_node2, (
f"[Case 7] Large file still present on {NODE_2} after server delete."
)
print(f"[Case 7] ✅ {NODE_2} purged the large file.")
gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace)
assert gone_node1, (
f"[Case 7] Large file still present on {NODE_1} after server delete."
)
print(f"[Case 7] ✅ {NODE_1} purged the large file.")
# ── Case 8: delete large file from node-2 → server + node-1 ───────────
def test_case8_delete_large_file_from_node2_purges_server_and_node1(
self, sync_client, swarm_session
):
"""
Write and sync a large file, then delete it FROM node-2.
Verify Hub mirror and node-1 are both purged.
"""
filename = _unique("case8_large")
workspace = swarm_session
# Setup
r = _touch(sync_client, NODE_1, filename, self._content, workspace)
assert r.get("success"), f"Setup write failed: {r}"
synced = _poll_until(
lambda: _cat(sync_client, NODE_2, filename, workspace),
timeout=LARGE_FILE_TIMEOUT,
)
assert synced is not None, f"[Case 8] Setup: large file did not reach {NODE_2}."
print(f"\n[Case 8] Deleting large file {filename!r} from {NODE_2}")
_rm(sync_client, NODE_2, filename, workspace)
gone_server = _poll_until(
lambda: _file_missing(sync_client, NODE_1, filename, workspace),
timeout=SMALL_FILE_TIMEOUT,
)
assert gone_server, (
f"[Case 8] Large file still on Hub mirror after {NODE_2} delete."
)
print(f"[Case 8] ✅ Hub mirror purged the large file.")
gone_node1 = _file_missing(sync_client, NODE_1, filename, workspace)
assert gone_node1, (
f"[Case 8] Large file still on {NODE_1} after {NODE_2} delete."
)
print(f"[Case 8] ✅ {NODE_1} purged the large file.")