diff --git a/agent-node/VERSION b/agent-node/VERSION index 9d8637c..1edd062 100644 --- a/agent-node/VERSION +++ b/agent-node/VERSION @@ -1 +1 @@ -1.0.70 +1.0.71 diff --git a/agent-node/src/agent_node/core/sync.py b/agent-node/src/agent_node/core/sync.py index 0fa835f..0e84914 100644 --- a/agent-node/src/agent_node/core/sync.py +++ b/agent-node/src/agent_node/core/sync.py @@ -42,7 +42,7 @@ shutil.rmtree(path) print(f" [πŸ“πŸ§Ή] Proactively purged unused session directory: {session_id}") - def handle_manifest(self, session_id: str, manifest: agent_pb2.DirectoryManifest) -> list: + def handle_manifest(self, session_id: str, manifest: agent_pb2.DirectoryManifest, on_purge_callback=None) -> list: """Compares local files with the server manifest and returns paths needing update.""" session_dir = self.get_session_dir(session_id, create=True) print(f"[πŸ“] Reconciling Sync Directory: {session_dir}") @@ -59,6 +59,8 @@ if rel_path in [".cortexignore", ".gitignore"]: continue if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): try: + if on_purge_callback: + on_purge_callback(rel_path) os.remove(abs_path) print(f" [πŸ“πŸ—‘οΈ] Deleted extraneous local file: {rel_path}") except Exception as e: @@ -70,6 +72,8 @@ if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): try: if not os.listdir(abs_path): + if on_purge_callback: + on_purge_callback(rel_path) os.rmdir(abs_path) except Exception: pass diff --git a/agent-node/src/agent_node/core/watcher.py b/agent-node/src/agent_node/core/watcher.py index 5c8d6ee..8cd7fc9 100644 --- a/agent-node/src/agent_node/core/watcher.py +++ b/agent-node/src/agent_node/core/watcher.py @@ -45,6 +45,13 @@ def on_deleted(self, event): if not event.is_directory: + if self.session_id == "__fs_explorer__": + from agent_node.config import SYNC_DIR + real_sync = os.path.realpath(SYNC_DIR) + real_src = os.path.realpath(event.src_path) + if real_src.startswith(real_sync) or event.src_path.startswith(SYNC_DIR): + return + # Resolve real paths to handle symlinks (e.g. /tmp -> /private/tmp on macOS) real_src = os.path.realpath(event.src_path) rel_path = os.path.normpath(os.path.relpath(real_src, self.root_path)) @@ -53,6 +60,10 @@ # Without this filter, the watcher would relay a spurious DELETE to the Hub server. if rel_path.endswith(".cortex_tmp") or rel_path.endswith(".cortex_lock"): return + + if self.last_sync.get(rel_path) == "__DELETED__": + return + if not self.ignore_filter.is_ignored(rel_path): self.callback(self.session_id, agent_pb2.FileSyncMessage( session_id=self.session_id, @@ -68,6 +79,13 @@ if self.locked: return # Block all user edits when session is locked + if self.session_id == "__fs_explorer__": + from agent_node.config import SYNC_DIR + real_sync = os.path.realpath(SYNC_DIR) + real_abs_check = os.path.realpath(abs_path) + if real_abs_check.startswith(real_sync) or abs_path.startswith(SYNC_DIR): + return + # Resolve real paths to handle symlinks real_abs = os.path.realpath(abs_path) rel_path = os.path.normpath(os.path.relpath(real_abs, self.root_path)) @@ -220,6 +238,12 @@ _, handler = self.observers[session_id] handler.last_sync[rel_path] = file_hash + def acknowledge_remote_delete(self, session_id, rel_path): + """Updates the internal hash record to match a remote delete, preventing an echo-back.""" + if session_id in self.observers: + _, handler = self.observers[session_id] + handler.last_sync[rel_path] = "__DELETED__" + def suppress_path(self, session_id, rel_path): """Tells the watcher to ignore events for a specific path (e.g. during sync).""" if session_id in self.observers: diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py index ddada51..53d2efa 100644 --- a/agent-node/src/agent_node/node.py +++ b/agent-node/src/agent_node/node.py @@ -400,7 +400,11 @@ print(f" [πŸ“] Sync MSG: {type_str} | Session: {sid}") if fs.HasField("manifest"): - needs_update = self.sync_mgr.handle_manifest(sid, fs.manifest) + needs_update = self.sync_mgr.handle_manifest( + sid, + fs.manifest, + on_purge_callback=lambda p: self.watcher.acknowledge_remote_delete(sid, p) + ) if needs_update: print(f" [πŸ“βš οΈ] Drift Detected for {sid}: {len(needs_update)} files need sync") self.task_queue.put(agent_pb2.ClientTaskMessage( @@ -656,6 +660,9 @@ if not os.path.exists(target_path): raise Exception("File not found") + # Acknowledge deletion to prevent watchdog echo loop + self.watcher.acknowledge_remote_delete(session_id, safe_rel) + import shutil if os.path.isdir(target_path): shutil.rmtree(target_path) diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index 1a2d8bd..abdc8cb 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -374,6 +374,7 @@ # M6: Only reconcile for real user sessions, not for the modular explorer. if fs.session_id != "__fs_explorer__": + drifts = None # Do not reconcile on shallow manifests triggered by interactive FS tools if task_id and any(task_id.startswith(p) for p in ("fs-ls-", "fs-write-", "fs-rm-")): pass diff --git a/ai-hub/integration_tests/test_file_sync.py b/ai-hub/integration_tests/test_file_sync.py index 077e959..bd02caf 100644 --- a/ai-hub/integration_tests/test_file_sync.py +++ b/ai-hub/integration_tests/test_file_sync.py @@ -444,6 +444,102 @@ # ══════════════════════════════════════════════════════════════════════════════ +# NODE RECONNECT / RESYNC TESTS +# ══════════════════════════════════════════════════════════════════════════════ + +# Docker container names for the test nodes on the production server +_NODE_CONTAINER = { + "test-node-1": "cortex-test-1", + "test-node-2": "cortex-test-2", +} + +_SSH_CMD = "sshpass -p 'a6163484a' ssh -o StrictHostKeyChecking=no axieyangb@192.168.68.113" + + +def _restart_test_node(node_id: str): + """ + Restart the named test-node Docker container on the production server. + This wipes /tmp/cortex-sync on the node, simulating a real reboot. + """ + import subprocess + container = _NODE_CONTAINER.get(node_id) + if not container: + pytest.skip(f"No container mapping for {node_id}") + cmd = ( + f"sshpass -p 'a6163484a' ssh -o StrictHostKeyChecking=no axieyangb@192.168.68.113 " + f"\"echo 'a6163484a' | sudo -S docker restart {container}\"" + ) + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) + if result.returncode != 0: + pytest.skip(f"Could not restart {container}: {result.stderr}") + + +class TestNodeResync: + """ + Case 10: node reconnect / workspace resync after container restart. + + Real-world scenario: a test node restarts (deploy, crash, reboot) and + /tmp/cortex-sync is wiped. The Hub must re-push the workspace to the + reconnected node via manifest-driven reconciliation. + """ + + # ── Case 10: node-2 restart β†’ hub re-delivers workspace ──────────────── + def test_case10_node_resync_after_restart( + self, sync_client, swarm_session + ): + """ + 1. Write a file to node-1 and confirm node-2 received it. + 2. Restart the node-2 container (wipes /tmp/cortex-sync). + 3. Wait for node-2 to reconnect and receive the manifest from Hub. + 4. Assert that the file re-appears on node-2 within RESYNC_TIMEOUT. + + This guards against regressions in the push_workspace / manifest-driven + reconciliation loop that re-delivers Hub mirror contents to a freshly + reconnected node. + """ + RESYNC_TIMEOUT = 30 # seconds for node to reconnect + resync + RESTART_WAIT = 8 # seconds to allow container to come back up + + filename = _unique("case10_resync") + content = f"Case 10 β€” node resync after restart β€” {uuid.uuid4()}" + workspace = swarm_session + + # Setup: write from node-1, wait for node-2 to receive + r = _touch(sync_client, NODE_1, filename, content, workspace) + assert r.get("success"), f"[Case 10] Setup write failed: {r}" + + synced = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=SMALL_FILE_TIMEOUT, + ) + assert synced is not None, f"[Case 10] Setup: file did not reach {NODE_2} before restart." + print(f"\n[Case 10] File confirmed on {NODE_2}. Restarting container…") + + # Restart node-2 container β€” wipes /tmp/cortex-sync + _restart_test_node(NODE_2) + + # Brief pause to let the container fully stop, then wait for reconnect + time.sleep(RESTART_WAIT) + print(f"[Case 10] Container restarted. Waiting for {NODE_2} to reconnect and resync…") + + # After reconnect, node sends its (now-empty) manifest β†’ Hub sends back + # all missing files. Poll until the file reappears. + resynced = _poll_until( + lambda: _cat(sync_client, NODE_2, filename, workspace), + timeout=RESYNC_TIMEOUT, + ) + assert resynced is not None, ( + f"[Case 10] File '{filename}' did NOT re-appear on {NODE_2} within " + f"{RESYNC_TIMEOUT}s after container restart. " + f"Manifest-driven resync may be broken." + ) + assert content in resynced, ( + f"[Case 10] Content mismatch on {NODE_2} after resync. Got: {resynced!r}" + ) + print(f"[Case 10] βœ… {NODE_2} resynced the file after container restart.") + + +# ══════════════════════════════════════════════════════════════════════════════ # LARGE FILE TESTS (20 MB, multi-chunk) # ══════════════════════════════════════════════════════════════════════════════ diff --git a/check_assistant.py b/check_assistant.py new file mode 100644 index 0000000..961a5a1 --- /dev/null +++ b/check_assistant.py @@ -0,0 +1,10 @@ +import re + +with open("ai-hub/app/core/grpc/services/grpc_server.py", "r") as f: + grpc_server = f.read() + +with open("ai-hub/app/core/grpc/services/assistant.py", "r") as f: + assistant = f.read() + +print(re.search(r'def _handle_client_message\([^)]+\):.*?def', grpc_server, re.DOTALL).group(0)[:500]) + diff --git a/frontend/src/components/ChatWindow.js b/frontend/src/components/ChatWindow.js index 4eaf1a5..1b34972 100644 --- a/frontend/src/components/ChatWindow.js +++ b/frontend/src/components/ChatWindow.js @@ -124,7 +124,7 @@
{message.reasoning}
@@ -132,7 +132,7 @@ )} -
+
{message.text}
diff --git a/frontend/src/index.css b/frontend/src/index.css index a90ea32..90d92f8 100644 --- a/frontend/src/index.css +++ b/frontend/src/index.css @@ -3,31 +3,31 @@ @tailwind utilities; .markdown-preview h1 { - @apply text-2xl font-bold mb-4 mt-6 text-gray-900 dark:text-white; + @apply text-xl font-bold mb-3 mt-4 text-gray-900 dark:text-white; } .markdown-preview h2 { - @apply text-xl font-bold mb-3 mt-5 text-gray-900 dark:text-white; + @apply text-lg font-bold mb-2 mt-3 text-gray-900 dark:text-white; } .markdown-preview h3 { - @apply text-lg font-bold mb-2 mt-4 text-gray-800 dark:text-gray-100; + @apply text-base font-bold mb-1 mt-2 text-gray-800 dark:text-gray-100; } .markdown-preview p { - @apply mb-4 leading-relaxed; + @apply mb-2 leading-normal; } .markdown-preview ul { - @apply list-disc list-inside mb-4 ml-4; + @apply list-disc list-inside mb-2 ml-4; } .markdown-preview ol { - @apply list-decimal list-inside mb-4 ml-4; + @apply list-decimal list-inside mb-2 ml-4; } .markdown-preview li { - @apply mb-1; + @apply mb-0.5 leading-normal; } .markdown-preview code { @@ -35,7 +35,7 @@ } .markdown-preview pre { - @apply bg-gray-900 border border-gray-800 rounded-xl p-4 mb-6 overflow-x-auto; + @apply bg-gray-900 border border-gray-800 rounded-xl p-3 mb-3 overflow-x-auto text-[13px] leading-snug; } .markdown-preview pre code { diff --git a/patch_broadcast.py b/patch_broadcast.py new file mode 100644 index 0000000..a9a48be --- /dev/null +++ b/patch_broadcast.py @@ -0,0 +1,8 @@ +with open("ai-hub/app/core/grpc/services/grpc_server.py", "r") as f: + content = f.read() + +target = "self.assistant.broadcast_file_chunk(fs.session_id, node_id, fs.file_data)" +if target in content: + print("grpc_server calls broadcast_file_chunk correctly.") +else: + print("grpc_server missing broadcast call!") diff --git a/patch_broadcast2.py b/patch_broadcast2.py new file mode 100644 index 0000000..8f27254 --- /dev/null +++ b/patch_broadcast2.py @@ -0,0 +1,9 @@ +import re + +with open("ai-hub/app/core/grpc/services/grpc_server.py", "r") as f: + grpc_server = f.read() + +lines = grpc_server.split("\n") +for i, line in enumerate(lines): + if "self.assistant.broadcast_file_chunk" in line: + print("\n".join(lines[max(0, i-5):min(len(lines), i+6)])) diff --git a/patch_grpc.py b/patch_grpc.py new file mode 100644 index 0000000..313c7e4 --- /dev/null +++ b/patch_grpc.py @@ -0,0 +1,19 @@ +with open("ai-hub/app/core/grpc/services/grpc_server.py", "r") as f: + content = f.read() + +target = """ if fs.session_id != "__fs_explorer__": + drifts = self.mirror.reconcile(fs.session_id, fs.manifest)""" + +replacement = """ if fs.session_id != "__fs_explorer__": + # Do not reconcile on shallow manifests triggered by interactive FS tools + if task_id and any(task_id.startswith(p) for p in ("fs-ls-", "fs-write-", "fs-rm-")): + pass + else: + drifts = self.mirror.reconcile(fs.session_id, fs.manifest)""" + +if target in content: + with open("ai-hub/app/core/grpc/services/grpc_server.py", "w") as f: + f.write(content.replace(target, replacement)) + print("Patched grpc_server.py") +else: + print("Target not found") diff --git a/patch_grpc_listener.py b/patch_grpc_listener.py new file mode 100644 index 0000000..d64f542 --- /dev/null +++ b/patch_grpc_listener.py @@ -0,0 +1,24 @@ +with open("ai-hub/app/core/grpc/services/grpc_server.py", "r") as f: + content = f.read() + +target = """ def _read_results(): + try: + for msg in request_iterator: + self._handle_client_message(msg, node_id, node) + except Exception as e:""" + +replacement = """ def _read_results(): + try: + for msg in request_iterator: + try: + self._handle_client_message(msg, node_id, node) + except Exception as inner_e: + logger.error(f"[!] Error processing task message from {node_id}: {inner_e}", exc_info=True) + except Exception as e:""" + +if target in content: + with open("ai-hub/app/core/grpc/services/grpc_server.py", "w") as f: + f.write(content.replace(target, replacement)) + print("Patched grpc_server.py listener") +else: + print("Target not found")