diff --git a/agent-node/VERSION b/agent-node/VERSION index 9d8637c..1edd062 100644 --- a/agent-node/VERSION +++ b/agent-node/VERSION @@ -1 +1 @@ -1.0.70 +1.0.71 diff --git a/agent-node/src/agent_node/core/sync.py b/agent-node/src/agent_node/core/sync.py index 0fa835f..0e84914 100644 --- a/agent-node/src/agent_node/core/sync.py +++ b/agent-node/src/agent_node/core/sync.py @@ -42,7 +42,7 @@ shutil.rmtree(path) print(f" [ππ§Ή] Proactively purged unused session directory: {session_id}") - def handle_manifest(self, session_id: str, manifest: agent_pb2.DirectoryManifest) -> list: + def handle_manifest(self, session_id: str, manifest: agent_pb2.DirectoryManifest, on_purge_callback=None) -> list: """Compares local files with the server manifest and returns paths needing update.""" session_dir = self.get_session_dir(session_id, create=True) print(f"[π] Reconciling Sync Directory: {session_dir}") @@ -59,6 +59,8 @@ if rel_path in [".cortexignore", ".gitignore"]: continue if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): try: + if on_purge_callback: + on_purge_callback(rel_path) os.remove(abs_path) print(f" [πποΈ] Deleted extraneous local file: {rel_path}") except Exception as e: @@ -70,6 +72,8 @@ if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): try: if not os.listdir(abs_path): + if on_purge_callback: + on_purge_callback(rel_path) os.rmdir(abs_path) except Exception: pass diff --git a/agent-node/src/agent_node/core/watcher.py b/agent-node/src/agent_node/core/watcher.py index 5c8d6ee..8cd7fc9 100644 --- a/agent-node/src/agent_node/core/watcher.py +++ b/agent-node/src/agent_node/core/watcher.py @@ -45,6 +45,13 @@ def on_deleted(self, event): if not event.is_directory: + if self.session_id == "__fs_explorer__": + from agent_node.config import SYNC_DIR + real_sync = os.path.realpath(SYNC_DIR) + real_src = os.path.realpath(event.src_path) + if real_src.startswith(real_sync) or event.src_path.startswith(SYNC_DIR): + return + # Resolve real paths to handle symlinks (e.g. /tmp -> /private/tmp on macOS) real_src = os.path.realpath(event.src_path) rel_path = os.path.normpath(os.path.relpath(real_src, self.root_path)) @@ -53,6 +60,10 @@ # Without this filter, the watcher would relay a spurious DELETE to the Hub server. if rel_path.endswith(".cortex_tmp") or rel_path.endswith(".cortex_lock"): return + + if self.last_sync.get(rel_path) == "__DELETED__": + return + if not self.ignore_filter.is_ignored(rel_path): self.callback(self.session_id, agent_pb2.FileSyncMessage( session_id=self.session_id, @@ -68,6 +79,13 @@ if self.locked: return # Block all user edits when session is locked + if self.session_id == "__fs_explorer__": + from agent_node.config import SYNC_DIR + real_sync = os.path.realpath(SYNC_DIR) + real_abs_check = os.path.realpath(abs_path) + if real_abs_check.startswith(real_sync) or abs_path.startswith(SYNC_DIR): + return + # Resolve real paths to handle symlinks real_abs = os.path.realpath(abs_path) rel_path = os.path.normpath(os.path.relpath(real_abs, self.root_path)) @@ -220,6 +238,12 @@ _, handler = self.observers[session_id] handler.last_sync[rel_path] = file_hash + def acknowledge_remote_delete(self, session_id, rel_path): + """Updates the internal hash record to match a remote delete, preventing an echo-back.""" + if session_id in self.observers: + _, handler = self.observers[session_id] + handler.last_sync[rel_path] = "__DELETED__" + def suppress_path(self, session_id, rel_path): """Tells the watcher to ignore events for a specific path (e.g. during sync).""" if session_id in self.observers: diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py index ddada51..53d2efa 100644 --- a/agent-node/src/agent_node/node.py +++ b/agent-node/src/agent_node/node.py @@ -400,7 +400,11 @@ print(f" [π] Sync MSG: {type_str} | Session: {sid}") if fs.HasField("manifest"): - needs_update = self.sync_mgr.handle_manifest(sid, fs.manifest) + needs_update = self.sync_mgr.handle_manifest( + sid, + fs.manifest, + on_purge_callback=lambda p: self.watcher.acknowledge_remote_delete(sid, p) + ) if needs_update: print(f" [πβ οΈ] Drift Detected for {sid}: {len(needs_update)} files need sync") self.task_queue.put(agent_pb2.ClientTaskMessage( @@ -656,6 +660,9 @@ if not os.path.exists(target_path): raise Exception("File not found") + # Acknowledge deletion to prevent watchdog echo loop + self.watcher.acknowledge_remote_delete(session_id, safe_rel) + import shutil if os.path.isdir(target_path): shutil.rmtree(target_path) diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index 1a2d8bd..abdc8cb 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -374,6 +374,7 @@ # M6: Only reconcile for real user sessions, not for the modular explorer. if fs.session_id != "__fs_explorer__": + drifts = None # Do not reconcile on shallow manifests triggered by interactive FS tools if task_id and any(task_id.startswith(p) for p in ("fs-ls-", "fs-write-", "fs-rm-")): pass diff --git a/ai-hub/integration_tests/test_file_sync.py b/ai-hub/integration_tests/test_file_sync.py index 077e959..bd02caf 100644 --- a/ai-hub/integration_tests/test_file_sync.py +++ b/ai-hub/integration_tests/test_file_sync.py @@ -444,6 +444,102 @@ # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ +# NODE RECONNECT / RESYNC TESTS +# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ + +# Docker container names for the test nodes on the production server +_NODE_CONTAINER = { + "test-node-1": "cortex-test-1", + "test-node-2": "cortex-test-2", +} + +_SSH_CMD = "sshpass -p 'a6163484a' ssh -o StrictHostKeyChecking=no axieyangb@192.168.68.113" + + +def _restart_test_node(node_id: str): + """ + Restart the named test-node Docker container on the production server. + This wipes /tmp/cortex-sync on the node, simulating a real reboot. + """ + import subprocess + container = _NODE_CONTAINER.get(node_id) + if not container: + pytest.skip(f"No container mapping for {node_id}") + cmd = ( + f"sshpass -p 'a6163484a' ssh -o StrictHostKeyChecking=no axieyangb@192.168.68.113 " + f"\"echo 'a6163484a' | sudo -S docker restart {container}\"" + ) + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) + if result.returncode != 0: + pytest.skip(f"Could not restart {container}: {result.stderr}") + + +class TestNodeResync: + """ + Case 10: node reconnect / workspace resync after container restart. + + Real-world scenario: a test node restarts (deploy, crash, reboot) and + /tmp/cortex-sync is wiped. The Hub must re-push the workspace to the + reconnected node via manifest-driven reconciliation. + """ + + # ββ Case 10: node-2 restart β hub re-delivers workspace ββββββββββββββββ + def test_case10_node_resync_after_restart( + self, sync_client, swarm_session + ): + """ + 1. Write a file to node-1 and confirm node-2 received it. + 2. Restart the node-2 container (wipes /tmp/cortex-sync). + 3. Wait for node-2 to reconnect and receive the manifest from Hub. + 4. Assert that the file re-appears on node-2 within RESYNC_TIMEOUT. + + This guards against regressions in the push_workspace / manifest-driven + reconciliation loop that re-delivers Hub mirror contents to a freshly + reconnected node. + """ + RESYNC_TIMEOUT = 30 # seconds for node to reconnect + resync + RESTART_WAIT = 8 # seconds to allow container to come back up + + filename = _unique("case10_resync") + content = f"Case 10 β node resync after restart β {uuid.uuid4()}" + workspace = swarm_session + + # Setup: write from node-1, wait for node-2 to receive + r = _touch(sync_client, NODE_1, filename, content, workspace) + assert r.get("success"), f"[Case 10] Setup write failed: {r}" + + synced = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert synced is not None, f"[Case 10] Setup: file did not reach {NODE_2} before restart." + print(f"\n[Case 10] File confirmed on {NODE_2}. Restarting containerβ¦") + + # Restart node-2 container β wipes /tmp/cortex-sync + _restart_test_node(NODE_2) + + # Brief pause to let the container fully stop, then wait for reconnect + time.sleep(RESTART_WAIT) + print(f"[Case 10] Container restarted. Waiting for {NODE_2} to reconnect and resyncβ¦") + + # After reconnect, node sends its (now-empty) manifest β Hub sends back + # all missing files. Poll until the file reappears. + resynced = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=RESYNC_TIMEOUT, + ) + assert resynced is not None, ( + f"[Case 10] File '{filename}' did NOT re-appear on {NODE_2} within " + f"{RESYNC_TIMEOUT}s after container restart. " + f"Manifest-driven resync may be broken." + ) + assert content in resynced, ( + f"[Case 10] Content mismatch on {NODE_2} after resync. Got: {resynced!r}" + ) + print(f"[Case 10] β {NODE_2} resynced the file after container restart.") + + +# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ # LARGE FILE TESTS (20 MB, multi-chunk) # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ diff --git a/check_assistant.py b/check_assistant.py new file mode 100644 index 0000000..961a5a1 --- /dev/null +++ b/check_assistant.py @@ -0,0 +1,10 @@ +import re + +with open("ai-hub/app/core/grpc/services/grpc_server.py", "r") as f: + grpc_server = f.read() + +with open("ai-hub/app/core/grpc/services/assistant.py", "r") as f: + assistant = f.read() + +print(re.search(r'def _handle_client_message\([^)]+\):.*?def', grpc_server, re.DOTALL).group(0)[:500]) + diff --git a/frontend/src/components/ChatWindow.js b/frontend/src/components/ChatWindow.js index 4eaf1a5..1b34972 100644 --- a/frontend/src/components/ChatWindow.js +++ b/frontend/src/components/ChatWindow.js @@ -124,7 +124,7 @@