import pytest
import os
import httpx
import json
import time
from conftest import BASE_URL
def _headers():
return {
"X-User-ID": os.environ.get("SYNC_TEST_USER_ID", "")
}
@pytest.mark.slow
def test_mcp_fs_ops():
"""
Test calling 'write_file' and 'delete_file' tools via MCP.
"""
node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1")
user_id = os.getenv("SYNC_TEST_USER_ID", "")
with httpx.Client(timeout=15.0) as client:
# 1. Call tools/call for write_file
test_file = "tmp/test_mcp_file.txt"
test_content = "Hello from MCP File Ops"
payload_write = {
"jsonrpc": "2.0",
"id": "test-write-1",
"method": "tools/call",
"params": {
"name": "write_file",
"arguments": {
"node_id": node_id,
"path": test_file,
"content": test_content,
"session_id": "__fs_explorer__"
}
}
}
r = client.post(f"{BASE_URL}/mcp/?token={user_id}", json=payload_write)
assert r.status_code == 200, f"MCP write failed: {r.text}"
res_data = r.json()
assert "result" in res_data
content_block = res_data["result"]["content"][0]
text_val = content_block["text"]
data = json.loads(text_val)
assert data.get("success") is True or "success" in data
print(f"[test] MCP Write success: {data}")
# 2. Verify file exists by listing directory via REST API
time.sleep(2)
r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params={"path": "tmp", "session_id": "__fs_explorer__"}, headers=_headers())
assert r_ls.status_code == 200, f"Failed to list directory: {r_ls.text}"
files = r_ls.json().get("files", [])
assert any(f["name"] == "test_mcp_file.txt" for f in files), f"File not found in listing: {files}"
# 3. Call tools/call for delete_file
payload_delete = {
"jsonrpc": "2.0",
"id": "test-delete-1",
"method": "tools/call",
"params": {
"name": "delete_file",
"arguments": {
"node_id": node_id,
"path": test_file,
"session_id": "__fs_explorer__"
}
}
}
r = client.post(f"{BASE_URL}/mcp/?token={user_id}", json=payload_delete)
assert r.status_code == 200, f"MCP delete failed: {r.text}"
res_del = r.json()
content_block_del = res_del["result"]["content"][0]
text_val_del = content_block_del["text"]
data_del = json.loads(text_val_del)
assert data_del.get("success") is True or "success" in data_del
print(f"[test] MCP Delete success: {data_del}")
# 4. Verify deletion
r_cat_404 = client.get(f"{BASE_URL}/nodes/{node_id}/fs/cat", params={"path": test_file, "session_id": "__fs_explorer__"}, headers=_headers())
assert r_cat_404.status_code == 404