diff --git a/agent-node/VERSION b/agent-node/VERSION index ea2f1d3..e9acec7 100644 --- a/agent-node/VERSION +++ b/agent-node/VERSION @@ -1 +1 @@ -1.0.74 +1.0.75 diff --git a/agent-node/src/agent_node/core/watcher.py b/agent-node/src/agent_node/core/watcher.py index 56199a2..69dd04a 100644 --- a/agent-node/src/agent_node/core/watcher.py +++ b/agent-node/src/agent_node/core/watcher.py @@ -29,17 +29,23 @@ def on_modified(self, event): if not event.is_directory: + if event.src_path.endswith(".cortex_tmp") or event.src_path.endswith(".cortex_lock"): + return print(f" [📁👁️] Watcher: Modification detected: {event.src_path}") self._process_change(event.src_path) def on_created(self, event): if not event.is_directory: + if event.src_path.endswith(".cortex_tmp") or event.src_path.endswith(".cortex_lock"): + return print(f" [📁👁️] Watcher: Creation detected: {event.src_path}") self._process_change(event.src_path) def on_closed(self, event): # critical for large writes like 'dd' or 'cp' that trigger many modified events if not event.is_directory: + if event.src_path.endswith(".cortex_tmp") or event.src_path.endswith(".cortex_lock"): + return print(f" [📁👁️] Watcher: File closed (triggering immediate sync): {event.src_path}") self._process_change(event.src_path, force=True) @@ -148,12 +154,29 @@ if not os.path.exists(abs_path): return + # Step 1: Calculate hash first to prevent infinite echo loops + file_hash = "" + try: + h = hashlib.sha256() + with open(abs_path, "rb") as f: + while True: + block = f.read(1024 * 1024) + if not block: break + h.update(block) + file_hash = h.hexdigest() + except Exception as e: + print(f" [!] Watcher hashing error for {rel_path}: {e}") + return + + if self.last_sync.get(rel_path) == file_hash: + return # Already in sync + + self.last_sync[rel_path] = file_hash file_size = os.path.getsize(abs_path) chunk_size = 1024 * 1024 # 1MB buffer for hashing/stream total_chunks = (file_size + chunk_size - 1) // chunk_size if file_size > 0 else 1 - # Memory-safe incremental hashing - hasher = hashlib.sha256() + print(f" [📁📤] Streaming Sync Started: {rel_path} ({file_size} bytes)") with open(abs_path, "rb") as f: index = 0 @@ -162,7 +185,6 @@ if not chunk and index > 0: break - hasher.update(chunk) offset = f.tell() - len(chunk) is_final = f.tell() >= file_size @@ -176,7 +198,7 @@ "is_final": is_final, "offset": offset, "compressed": True, - "hash": hasher.hexdigest() if is_final else "", + "hash": file_hash if is_final else "", } if "total_chunks" in agent_pb2.FilePayload.DESCRIPTOR.fields_by_name: payload_fields["total_chunks"] = total_chunks @@ -190,15 +212,7 @@ if is_final: break index += 1 - file_hash = hasher.hexdigest() - - if self.last_sync.get(rel_path) == file_hash: - # Chunks were already sent, so we must send sentinel above, - # but we can skip the log message. - return - - self.last_sync[rel_path] = file_hash - print(f" [📁📤] Streaming Sync Complete: {rel_path} ({file_size} bytes)") + print(f" [📁📤] Streaming Sync Complete: {rel_path}") except Exception as e: print(f" [!] Watcher Error for {rel_path}: {e}")