diff --git a/ai-hub/integration_tests/test_mcp_file_ops_enhanced.py b/ai-hub/integration_tests/test_mcp_file_ops_enhanced.py new file mode 100644 index 0000000..df9dc18 --- /dev/null +++ b/ai-hub/integration_tests/test_mcp_file_ops_enhanced.py @@ -0,0 +1,152 @@ +import pytest +import os +import httpx +import json +import time +import jwt + +# Use the same BASE_URL as other integration tests +BASE_URL = os.getenv("HUB_API_URL", "http://localhost:8000/api/v1") +SECRET_KEY = os.getenv("SECRET_KEY", "aYc2j1lYUUZXkBFFUndnleZI") + +def _get_token(user_id): + # Generate an internal HS256 JWT + payload = { + "iss": "cortex-hub-internal", + "sub": user_id, + "exp": int(time.time()) + 3600 + } + return jwt.encode(payload, SECRET_KEY, algorithm="HS256") + +def _headers(user_id): + return { + "X-User-ID": user_id, + "Authorization": f"Bearer {_get_token(user_id)}" + } + +@pytest.mark.slow +def test_mcp_file_ops_enhanced(): + """ + Test calling 'move_file', 'copy_file', and 'get_file_stat' tools via MCP. + """ + node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1") + user_id = os.getenv("SYNC_TEST_USER_ID", "admin") + + headers = _headers(user_id) + + with httpx.Client(timeout=15.0) as client: + # 0. Check initial state + r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params={"path": ".", "session_id": "__fs_explorer__"}, headers=headers) + print(f"[test] Initial LS (.): {r_ls.json().get('files', [])}") + + # 1. Prepare: Write a test file + test_file = "mcp_test_orig.txt" + test_content = "Original Content" + payload_write = { + "jsonrpc": "2.0", + "id": "write-1", + "method": "tools/call", + "params": { + "name": "write_file", + "arguments": { + "node_id": node_id, + "path": test_file, + "content": test_content, + "session_id": "__fs_explorer__" + } + } + } + r = client.post(f"{BASE_URL}/mcp/", json=payload_write, headers=headers) + assert r.status_code == 200, f"MCP write failed: {r.text}" + + # 2. Test get_file_stat + payload_stat = { + "jsonrpc": "2.0", + "id": "stat-1", + "method": "tools/call", + "params": { + "name": "get_file_stat", + "arguments": { + "node_id": node_id, + "path": test_file, + "session_id": "__fs_explorer__" + } + } + } + r = client.post(f"{BASE_URL}/mcp/", json=payload_stat, headers=headers) + assert r.status_code == 200, f"MCP stat failed: {r.text}" + res_stat = r.json() + assert "result" in res_stat, f"Result missing in stat: {res_stat}" + text_val = res_stat["result"]["content"][0]["text"] + data = json.loads(text_val) + assert data.get("exists") is True, f"File does not exist: {data}" + assert data.get("size") == len(test_content), f"Size mismatch: {data}" + print(f"[test] get_file_stat success: {data}") + + # 3. Test copy_file + copy_file = "mcp_test_copy.txt" + payload_copy = { + "jsonrpc": "2.0", + "id": "copy-1", + "method": "tools/call", + "params": { + "name": "copy_file", + "arguments": { + "node_id": node_id, + "old_path": test_file, + "new_path": copy_file, + "session_id": "__fs_explorer__" + } + } + } + r = client.post(f"{BASE_URL}/mcp/", json=payload_copy, headers=headers) + assert r.status_code == 200, f"MCP copy failed: {r.text}" + print(f"[test] copy_file call success") + + # Verify copy exists + time.sleep(1) + r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params={"path": ".", "session_id": "__fs_explorer__"}, headers=headers) + assert r_ls.status_code == 200, f"LS failed: {r_ls.text}" + files = r_ls.json().get("files", []) + assert any(f["name"] == "mcp_test_copy.txt" for f in files), f"Copy missing: {files}" + + # 4. Test move_file + move_file = "mcp_test_moved.txt" + payload_move = { + "jsonrpc": "2.0", + "id": "move-1", + "method": "tools/call", + "params": { + "name": "move_file", + "arguments": { + "node_id": node_id, + "old_path": test_file, + "new_path": move_file, + "session_id": "__fs_explorer__" + } + } + } + r = client.post(f"{BASE_URL}/mcp/", json=payload_move, headers=headers) + assert r.status_code == 200, f"MCP move failed: {r.text}" + print(f"[test] move_file call success") + + # Verify move + time.sleep(1) + r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params={"path": ".", "session_id": "__fs_explorer__"}, headers=headers) + assert r_ls.status_code == 200, f"LS failed: {r_ls.text}" + files = r_ls.json().get("files", []) + assert any(f["name"] == "mcp_test_moved.txt" for f in files), f"Moved file missing: {files}" + assert not any(f["name"] == "mcp_test_orig.txt" for f in files), f"Original file still exists: {files}" + + # Cleanup + client.post(f"{BASE_URL}/mcp/", json={ + "jsonrpc": "2.0", "id": "clean-1", "method": "tools/call", + "params": {"name": "remove_file", "arguments": {"node_id": node_id, "path": move_file, "session_id": "__fs_explorer__"}} + }, headers=headers) + client.post(f"{BASE_URL}/mcp/", json={ + "jsonrpc": "2.0", "id": "clean-2", "method": "tools/call", + "params": {"name": "remove_file", "arguments": {"node_id": node_id, "path": copy_file, "session_id": "__fs_explorer__"}} + }, headers=headers) + +if __name__ == "__main__": + test_mcp_file_ops_enhanced()