diff --git a/agent-node/requirements.txt b/agent-node/requirements.txt index ee9cb24..0376e24 100644 --- a/agent-node/requirements.txt +++ b/agent-node/requirements.txt @@ -1,5 +1,5 @@ -grpcio>=1.48.0 -grpcio-tools>=1.48.0 +grpcio +grpcio-tools PyJWT==2.8.0 playwright>=1.47.0 watchdog>=3.0.0 diff --git a/agent-node/src/agent_node/core/sync.py b/agent-node/src/agent_node/core/sync.py index 3855222..a02b15e 100644 --- a/agent-node/src/agent_node/core/sync.py +++ b/agent-node/src/agent_node/core/sync.py @@ -142,9 +142,10 @@ def write_chunk(self, session_id: str, payload: agent_pb2.FilePayload) -> bool: """Writes a file chunk to a shadow file and swaps to target on completion.""" session_dir = self.get_session_dir(session_id, create=True) - target_path = os.path.normpath(os.path.join(session_dir, payload.path.lstrip("/"))) + abs_session_dir = os.path.abspath(session_dir) + target_path = os.path.abspath(os.path.join(abs_session_dir, payload.path.lstrip("/"))) - if not target_path.startswith(session_dir): + if not target_path.startswith(abs_session_dir): return False # Path traversal guard os.makedirs(os.path.dirname(target_path), exist_ok=True) diff --git a/ai-hub/app/api/routes/nodes.py b/ai-hub/app/api/routes/nodes.py index fe12411..9923523 100644 --- a/ai-hub/app/api/routes/nodes.py +++ b/ai-hub/app/api/routes/nodes.py @@ -463,8 +463,12 @@ zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file: # 1. Add Agent Node files (source, scripts, and future binary) - # Try production mount first, then local fallback - source_dirs = ["/app/agent-node-source", "/app/agent-node"] + # Try production mount first, then environment overrides, then local fallback + source_dirs = [ + "/app/agent-node-source", + "/app/agent-node", + os.environ.get("AGENT_NODE_SRC_DIR", "../agent-node") + ] found_dir = None for sd in source_dirs: if os.path.exists(sd): @@ -485,8 +489,11 @@ rel_path = os.path.relpath(file_path, found_dir) zip_file.write(file_path, rel_path) - # 2. Add skills from /app/skills - skills_dir = "/app/skills" + # 2. Add skills from mapped directory or fallback + skills_dir = os.environ.get("SKILLS_SRC_DIR", "/app/skills") + if not os.path.exists(skills_dir) and os.path.exists("../skills"): + skills_dir = "../skills" + if os.path.exists(skills_dir): for root, dirs, files in os.walk(skills_dir): dirs[:] = [d for d in dirs if d != "__pycache__"] diff --git a/ai-hub/app/api/routes/sessions.py b/ai-hub/app/api/routes/sessions.py index 6b18317..2e6fab9 100644 --- a/ai-hub/app/api/routes/sessions.py +++ b/ai-hub/app/api/routes/sessions.py @@ -262,7 +262,8 @@ logging.exception(f"[📁⚠️] Failed to send PURGE to node {node.node_id}: {e}") # Hub local purge - mirror_path = os.path.join("/app/data/mirrors", sync_workspace_id) + from app.config import settings + mirror_path = os.path.join(settings.DATA_DIR, "mirrors", sync_workspace_id) if os.path.exists(mirror_path): shutil.rmtree(mirror_path) except Exception as e: @@ -312,8 +313,9 @@ ), priority=0) # Hub local purge + from app.config import settings for wid in workspaces_to_purge: - mirror_path = os.path.join("/app/data/mirrors", wid) + mirror_path = os.path.join(settings.DATA_DIR, "mirrors", wid) if os.path.exists(mirror_path): shutil.rmtree(mirror_path) except Exception as e: @@ -332,7 +334,8 @@ raise HTTPException(status_code=404, detail="Message not found.") # Create data directory if not exists - audio_dir = "/app/data/audio" + from app.config import settings + audio_dir = os.path.join(settings.DATA_DIR, "audio") os.makedirs(audio_dir, exist_ok=True) # Save file diff --git a/ai-hub/app/core/grpc/core/mirror.py b/ai-hub/app/core/grpc/core/mirror.py index 0369d34..73731ab 100644 --- a/ai-hub/app/core/grpc/core/mirror.py +++ b/ai-hub/app/core/grpc/core/mirror.py @@ -49,9 +49,10 @@ # Prevent path traversal — strip leading slash to ensure it's relative to workspace path_safe = file_payload.path.lstrip("/") - safe_path = os.path.normpath(os.path.join(workspace, path_safe)) - if not safe_path.startswith(workspace): - raise ValueError(f"Malicious path detected: {file_payload.path}") + abs_workspace = os.path.abspath(workspace) + safe_path = os.path.abspath(os.path.join(abs_workspace, path_safe)) + if not safe_path.startswith(abs_workspace): + raise ValueError(f"Malicious path detected: file={file_payload.path} safe_path={safe_path} workspace={abs_workspace}") # --- SYSTEM SKILL IMMUTABILITY LOCK --- # Prevent any AI agent from modifying strict system skills across the mesh @@ -241,9 +242,10 @@ return # Parent folder is already gone path_safe = rel_path.lstrip("/") - safe_path = os.path.normpath(os.path.join(workspace, path_safe)) + abs_workspace = os.path.abspath(workspace) + safe_path = os.path.abspath(os.path.join(abs_workspace, path_safe)) - if not safe_path.startswith(workspace): + if not safe_path.startswith(abs_workspace): raise ValueError(f"Malicious path detected: {rel_path}") # --- SYSTEM SKILL IMMUTABILITY LOCK --- diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index 6353403..0f4aa86 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -212,13 +212,10 @@ ) ), priority=2) - # M6: Use bounded registry executor for parallel mesh broadcast. - # Max queue size on the BoundedThreadPoolExecutor ensures backpressure, preventing Hub OOM on 2GB+ meshes. + # M6 Fix: Do not use BoundedThreadPoolExecutor for chunks because it scrambles order! + # node.queue limits provide the backpressure. We iterate and put sequentially. for nid in destinations: - if getattr(self.registry, 'executor', None): - self.registry.executor.submit(_send_to_node, nid) - else: - _send_to_node(nid) + _send_to_node(nid) def broadcast_delete(self, session_id: str, sender_node_id: str, rel_path: str): """Broadcasts a file deletion from one node to all other nodes in the session mesh.""" with self.membership_lock: diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index e741fbc..b8d299c 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -27,7 +27,7 @@ self.registry = registry # Injected NodeRegistryService self.journal = TaskJournal() self.pool = GlobalWorkPool() - self.mirror = GhostMirrorManager() + self.mirror = GhostMirrorManager(storage_root=os.path.join(settings.DATA_DIR, "mirrors")) self.io_locks = {} # key -> threading.Lock self.io_locks_lock = threading.Lock() self.assistant = TaskAssistant(self.registry, self.journal, self.pool, self.mirror) diff --git a/ai-hub/integration_tests/conftest.py b/ai-hub/integration_tests/conftest.py index cff9628..d6fa26f 100644 --- a/ai-hub/integration_tests/conftest.py +++ b/ai-hub/integration_tests/conftest.py @@ -162,13 +162,17 @@ env["HUB_URL"] = http_ep env["AGENT_TLS_ENABLED"] = "false" env["PYTHONPATH"] = f"{agent_dir}/src" + env["PYTHONUNBUFFERED"] = "1" + env["CORTEX_SYNC_DIR"] = f"/tmp/cortex-sync-{node_id}" + log_file = open(f"/tmp/{node_id}_pytest.log", "w") + import sys proc = subprocess.Popen( - ["python3", "-m", "agent_node.node"], + [sys.executable, "-m", "agent_node.main"], env=env, cwd=agent_dir, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + stdout=log_file, + stderr=log_file ) node_processes.append(proc) diff --git a/ai-hub/integration_tests/test_file_sync.py b/ai-hub/integration_tests/test_file_sync.py index 2ccb35b..1a4cdbf 100644 --- a/ai-hub/integration_tests/test_file_sync.py +++ b/ai-hub/integration_tests/test_file_sync.py @@ -740,8 +740,9 @@ # Create a 1GB file consisting of zeros (highly compressible over the network) on NODE_1 directly. # This will trigger the Inotify watcher to push chunks back up to the Hub. # We output to the active session workspace path on the node. - # In the agent container, the workspace is at /tmp/cortex-sync/{swarm_session} - dd_command = f"dd if=/dev/zero of=/tmp/cortex-sync/{workspace}/{filename} bs=1M count=1000" + is_native = os.environ.get("SKIP_DOCKER_NODES") == "true" + sync_dir = f"/tmp/cortex-sync-{NODE_1}" if is_native else "/tmp/cortex-sync" + dd_command = f"dd if=/dev/zero of={sync_dir}/{workspace}/{filename} bs=1M count=1000" r_disp = sync_client.post( f"{NODES_PATH}/{NODE_1}/dispatch", @@ -861,22 +862,25 @@ time.sleep(3.0) # 5. Check client-side folders are purged using DISPATCH to run "ls" + is_native = os.environ.get("SKIP_DOCKER_NODES") == "true" + n1_dir = f"/tmp/cortex-sync-{NODE_1}" if is_native else "/tmp/cortex-sync" # Node 1 r_d1 = sync_client.post( f"{NODES_PATH}/{NODE_1}/dispatch", params={"user_id": _get_user_id()}, - json={"command": f"stat /tmp/cortex-sync/{workspace_id}"}, + json={"command": f"stat {n1_dir}/{workspace_id}"}, headers=_headers() ) assert "No such file or directory" in r_d1.json().get("stderr", "") or r_d1.json().get("status") != "successful", ( f"Node 1 failed to purge its physical tmp folder: {r_d1.text}" ) + n2_dir = f"/tmp/cortex-sync-{NODE_2}" if is_native else "/tmp/cortex-sync" # Node 2 r_d2 = sync_client.post( f"{NODES_PATH}/{NODE_2}/dispatch", params={"user_id": _get_user_id()}, - json={"command": f"stat /tmp/cortex-sync/{workspace_id}"}, + json={"command": f"stat {n2_dir}/{workspace_id}"}, headers=_headers() ) assert "No such file or directory" in r_d2.json().get("stderr", "") or r_d2.json().get("status") != "successful", ( @@ -885,12 +889,16 @@ print("[Case Purge] ✅ Physical client-side (`/tmp/cortex-sync/...`) folders proactively erased on all nodes") # 6. Check server-side folder - # (Since the test runner is executed on host but ai_hub is Docker container, we can use docker exec) - cmd = ["docker", "exec", "ai_hub_service", "stat", f"/app/data/mirrors/{workspace_id}"] - # This should fail if it doesn't exist. - res_hub = subprocess.run(cmd, capture_output=True, text=True) - assert res_hub.returncode != 0, f"Server mirror folder still physically exists! stat matched: {res_hub.stdout}" - assert "No such file or directory" in res_hub.stderr, f"Unexpected error during server stat: {res_hub.stderr}" + if is_native: + mirror_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "ai-hub/data/mirrors", workspace_id) + assert not os.path.exists(mirror_path), f"Server mirror folder still physically exists! stat matched: {mirror_path}" + else: + # (Since the test runner is executed on host but ai_hub is Docker container, we can use docker exec) + cmd = ["docker", "exec", "ai_hub_service", "stat", f"/app/data/mirrors/{workspace_id}"] + # This should fail if it doesn't exist. + res_hub = subprocess.run(cmd, capture_output=True, text=True) + assert res_hub.returncode != 0, f"Server mirror folder still physically exists! stat matched: {res_hub.stdout}" + assert "No such file or directory" in res_hub.stderr, f"Unexpected error during server stat: {res_hub.stderr}" print("[Case Purge] ✅ Server-side physical mirror folder proactively erased") diff --git a/ai-hub/requirements.txt b/ai-hub/requirements.txt index 6d2c2d0..6c678f6 100644 --- a/ai-hub/requirements.txt +++ b/ai-hub/requirements.txt @@ -20,7 +20,7 @@ tenacity litellm tiktoken -grpcio==1.62.1 -grpcio-tools==1.62.1 -grpcio-reflection==1.62.1 +grpcio +grpcio-tools +grpcio-reflection croniter diff --git a/run_integration_tests.sh b/run_integration_tests.sh index 735031a..74eebaa 100755 --- a/run_integration_tests.sh +++ b/run_integration_tests.sh @@ -27,12 +27,13 @@ NO_REBUILD=true fi -# Check if docker daemon is reachable (i.e., not inside DevContainer without DIND) +# Check if docker daemon is reachable (i.e., not inside DevContainer without DIND) or if specifically skipped DOCKER_AVAILABLE=false -if docker info >/dev/null 2>&1; then +if docker info >/dev/null 2>&1 && [ "$SKIP_DOCKER_NODES" != "true" ]; then DOCKER_AVAILABLE=true + export SKIP_DOCKER_NODES=false else - echo "Docker daemon not reachable (likely running in a Dev Container). Switching to Native Python mode..." + echo "Docker skipping or not reachable (likely Native mode). Switching to Native Python mode..." export SKIP_DOCKER_NODES=true export SYNC_TEST_BASE_URL="http://127.0.0.1:8000/api/v1" fi @@ -74,13 +75,17 @@ mkdir -p ai-hub/data pkill -f uvicorn || true - pkill -f agent_node.node || true + pkill -f agent_node.main || true sleep 1 - cd /app/ai-hub - uvicorn app.main:app --host 0.0.0.0 --port 8000 > native_hub.log 2>&1 & + if [ -d "ai-hub" ]; then + cd ai-hub + elif [ -d "/app/ai-hub" ]; then + cd /app/ai-hub + fi + DATA_DIR=./data DATABASE_URL=sqlite:///./test.db AGENT_NODE_SRC_DIR=../agent-node SKILLS_SRC_DIR=../skills /tmp/venv/bin/uvicorn app.main:app --host 0.0.0.0 --port 8000 > native_hub.log 2>&1 & HUB_PID=$! - cd /app + cd - > /dev/null # Wait for healthy echo "Waiting for AI Hub to be ready..." @@ -116,7 +121,6 @@ if [ "$NO_REBUILD" = false ] || [ "$IS_RUNNING" = false ]; then echo "==========================================" echo " TEARING DOWN INTEGRATION ENVIRONMENT " - echo "==========================================" if [ "$DOCKER_AVAILABLE" = true ]; then docker compose down -v else