"""
Advanced Mesh File Operations Integration Tests
===============================================
Verifies move, copy, and stat operations across the mesh.
Uses test-node-1 and test-node-2.
"""
import os
import time
import uuid
import pytest
import httpx
# ── Configuration ──────────────────────────────────────────────────────────────
BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8002/api/v1/")
USER_ID = os.getenv("SYNC_TEST_USER_ID", "c4401d34-8784-4d6e-93a0-c702bd202b66")
NODE_1 = os.getenv("SYNC_TEST_NODE1", "test-node-1")
NODE_2 = os.getenv("SYNC_TEST_NODE2", "test-node-2")
SMALL_FILE_TIMEOUT = 10
POLL_INTERVAL = 0.5
SESSIONS_PATH = "sessions"
NODES_PATH = "nodes"
# ── Helpers ─────────────────────────────────────────────────────────────────────
def _headers():
user_id = os.getenv("SYNC_TEST_USER_ID", "c4401d34-8784-4d6e-93a0-c702bd202b66")
return {"X-User-ID": user_id}
def _unique(prefix="advfs"):
return f"{prefix}_{uuid.uuid4().hex[:8]}.txt"
def _poll_until(fn, timeout: float, interval: float = POLL_INTERVAL):
deadline = time.time() + timeout
while time.time() < deadline:
try:
res = fn()
if res: return res
except: pass
time.sleep(interval)
return None
def _cat(client, node_id, path, session_id):
r = client.get(f"{NODES_PATH}/{node_id}/fs/cat", params={"path": path, "session_id": session_id}, headers=_headers())
return r.json().get("content") if r.status_code == 200 else None
def _stat(client, node_id, path, session_id):
r = client.get(
f"{NODES_PATH}/{node_id}/fs/stat",
params={"path": path, "session_id": session_id},
headers=_headers()
)
r.raise_for_status()
return r.json()
def _touch(client, node_id, path, content, session_id):
r = client.post(f"{NODES_PATH}/{node_id}/fs/touch", json={"path": path, "content": content, "session_id": session_id}, headers=_headers())
r.raise_for_status()
return r.json()
def _move(client, node_id, session_id, old_path, new_path):
r = client.post(
f"{NODES_PATH}/{node_id}/fs/move",
json={"old_path": old_path, "new_path": new_path, "session_id": session_id},
headers=_headers()
)
r.raise_for_status()
return r.json()
def _copy(client, node_id, session_id, old_path, new_path):
r = client.post(
f"{NODES_PATH}/{node_id}/fs/copy",
json={"old_path": old_path, "new_path": new_path, "session_id": session_id},
headers=_headers()
)
r.raise_for_status()
return r.json()
# ── Fixtures ────────────────────────────────────────────────────────────────────
@pytest.fixture(scope="module")
def sync_client():
with httpx.Client(base_url=BASE_URL, timeout=30.0) as c:
yield c
@pytest.fixture(scope="module")
def swarm_session(sync_client):
r = sync_client.post(f"{SESSIONS_PATH}/", json={"user_id": USER_ID, "provider_name": "gemini", "feature_name": "swarm_control"}, headers=_headers())
session_id = r.json()["id"]
r2 = sync_client.post(f"{SESSIONS_PATH}/{session_id}/nodes", json={"node_ids": [NODE_1, NODE_2], "config": {"source": "empty"}}, headers=_headers())
workspace = r2.json()["sync_workspace_id"]
time.sleep(2)
yield workspace
sync_client.delete(f"{SESSIONS_PATH}/{session_id}", headers=_headers())
# ── Tests ───────────────────────────────────────────────────────────────────────
@pytest.mark.requires_nodes
class TestAdvancedFS:
def test_mesh_move_atomic(self, sync_client, swarm_session):
"""move file on hub -> gone from old, present on new across all nodes."""
filename = _unique("move_src")
destname = _unique("move_dst")
content = f"Move Payload {uuid.uuid4()}"
workspace = swarm_session
# 1. Write to Node 1
_touch(sync_client, NODE_1, filename, content, workspace)
# 2. Wait for Node 2 to get it
assert _poll_until(lambda: _cat(sync_client, NODE_2, filename, workspace) == content, SMALL_FILE_TIMEOUT)
# 3. Perform Move (via Node 1 endpoint, but it's a mesh op on session)
print(f"\n[Move] Renaming {filename} -> {destname}")
_move(sync_client, NODE_1, workspace, filename, destname)
# 4. Verify old is gone on both nodes
assert _poll_until(lambda: _cat(sync_client, NODE_1, filename, workspace) is None, SMALL_FILE_TIMEOUT)
assert _poll_until(lambda: _cat(sync_client, NODE_2, filename, workspace) is None, SMALL_FILE_TIMEOUT)
# 5. Verify new exists on both nodes
assert _poll_until(lambda: _cat(sync_client, NODE_1, destname, workspace) == content, SMALL_FILE_TIMEOUT)
assert _poll_until(lambda: _cat(sync_client, NODE_2, destname, workspace) == content, SMALL_FILE_TIMEOUT)
print("✅ Atomic Move synchronized across mesh.")
def test_mesh_copy_atomic(self, sync_client, swarm_session):
"""copy file on hub -> original stays, new appears across all nodes."""
filename = _unique("copy_src")
destname = _unique("copy_dst")
content = f"Copy Payload {uuid.uuid4()}"
workspace = swarm_session
# 1. Write to Node 1
_touch(sync_client, NODE_1, filename, content, workspace)
assert _poll_until(lambda: _cat(sync_client, NODE_2, filename, workspace) == content, SMALL_FILE_TIMEOUT)
# 2. Perform Copy
print(f"\n[Copy] Duplicating {filename} -> {destname}")
_copy(sync_client, NODE_1, workspace, filename, destname)
# 3. Verify BOTH exist on Node 2
assert _poll_until(lambda: _cat(sync_client, NODE_2, filename, workspace) == content, SMALL_FILE_TIMEOUT)
assert _poll_until(lambda: _cat(sync_client, NODE_2, destname, workspace) == content, SMALL_FILE_TIMEOUT)
print("✅ Atomic Copy synchronized across mesh.")
def test_mesh_stat_speed(self, sync_client, swarm_session):
"""stat file -> returns metadata instantly from hub mirror."""
filename = _unique("stat_test")
content = "Stat content"
workspace = swarm_session
_touch(sync_client, NODE_1, filename, content, workspace)
# Stat via Hub endpoint
print(f"\n[Stat] Checking metadata for {filename}")
info = _stat(sync_client, NODE_1, filename, workspace)
assert info["exists"] is True
assert info["size"] == len(content)
assert info["is_file"] is True
assert info["is_dir"] is False
# Stat non-existent
try:
r = sync_client.get(f"{NODES_PATH}/{NODE_1}/fs/stat", params={"path": "non-existent_file.txt", "session_id": workspace}, headers=_headers())
assert r.status_code == 404
except httpx.HTTPStatusError as e:
assert e.response.status_code == 404
print("✅ Stat returned correct metadata and handled missing files.")