diff --git a/agent-node/src/agent_node/core/watcher.py b/agent-node/src/agent_node/core/watcher.py index 191d912..5c8d6ee 100644 --- a/agent-node/src/agent_node/core/watcher.py +++ b/agent-node/src/agent_node/core/watcher.py @@ -48,6 +48,11 @@ # Resolve real paths to handle symlinks (e.g. /tmp -> /private/tmp on macOS) real_src = os.path.realpath(event.src_path) rel_path = os.path.normpath(os.path.relpath(real_src, self.root_path)) + # Critical: Do NOT send DELETE for internal temp/lock files. + # When sync does os.replace(tmp -> final), the OS fires a delete for .cortex_tmp. + # Without this filter, the watcher would relay a spurious DELETE to the Hub server. + if rel_path.endswith(".cortex_tmp") or rel_path.endswith(".cortex_lock"): + return if not self.ignore_filter.is_ignored(rel_path): self.callback(self.session_id, agent_pb2.FileSyncMessage( session_id=self.session_id, diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index 41257b4..9a6f94d 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -220,6 +220,30 @@ for nid in destinations: _send_to_node(nid) + def broadcast_delete(self, session_id: str, sender_node_id: str, rel_path: str): + """Broadcasts a file deletion from one node to all other nodes in the session mesh.""" + with self.membership_lock: + session_members = self.memberships.get(session_id, []) + destinations = [n for n in session_members if n != sender_node_id] + + if destinations: + print(f" [📁🗑️📢] Broadcasting DELETE {rel_path} from {sender_node_id} to: {', '.join(destinations)}") + + tid = f"fs-rm-bcast-{int(time.time()*1000)}" + for nid in destinations: + node = self.registry.get_node(nid) + if node: + node.send_message(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=tid, + control=agent_pb2.SyncControl( + action=agent_pb2.SyncControl.DELETE, + path=rel_path + ) + ) + ), priority=2) + def lock_workspace(self, node_id, session_id): """Disables user-side synchronization from a node during AI refactors.""" self.control_sync(node_id, session_id, action="LOCK") diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index 37a5546..1a2d8bd 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -422,9 +422,16 @@ elif fs.HasField("control"): ctrl = fs.control if ctrl.action == agent_pb2.SyncControl.DELETE: - print(f" [📁🗑️] Node requested DELETE on mirror: {ctrl.path}") - self.mirror.delete_file(fs.session_id, ctrl.path) - # Broadcast to other nodes (optional M7) + path_to_del = ctrl.path + # Guard: never process deletes for internal temp/lock files. + # Nodes may fire spurious DELETE events for these when os.replace() is used. + if path_to_del.endswith(".cortex_tmp") or path_to_del.endswith(".cortex_lock"): + print(f" [📁🚫] Ignored temp/lock DELETE from {node_id}: {path_to_del}") + else: + print(f" [📁🗑️] Node requested DELETE on mirror: {path_to_del}") + self.mirror.delete_file(fs.session_id, path_to_del) + # Broadcast delete to all other session nodes for mesh consistency + self.assistant.broadcast_delete(fs.session_id, node_id, path_to_del) def ReportHealth(self, request_iterator, context): """Collect Health Metrics and Feed Policy Updates."""