diff --git a/agent-node/src/agent_node/core/sync.py b/agent-node/src/agent_node/core/sync.py index 0e84914..9d4d6cf 100644 --- a/agent-node/src/agent_node/core/sync.py +++ b/agent-node/src/agent_node/core/sync.py @@ -12,6 +12,32 @@ self.base_sync_dir = base_sync_dir if not os.path.exists(self.base_sync_dir): os.makedirs(self.base_sync_dir, exist_ok=True) + self.hash_cache = {} # (abs_path) -> (size, mtime, hash) + + def get_file_hash(self, abs_path: str) -> str: + """Returns the SHA256 hash of a file, using a stat-based metadata cache to avoid I/O when unchanged.""" + try: + stats = os.stat(abs_path) + size = stats.st_size + mtime = stats.st_mtime + + cached = self.hash_cache.get(abs_path) + if cached and cached[0] == size and cached[1] == mtime: + return cached[2] + + # Memory-safe incremental hashing + h = hashlib.sha256() + with open(abs_path, "rb") as f: + while True: + chunk = f.read(1024 * 1024) + if not chunk: break + h.update(chunk) + + file_hash = h.hexdigest() + self.hash_cache[abs_path] = (size, mtime, file_hash) + return file_hash + except Exception: + return "" def get_session_dir(self, session_id: str, create: bool = False) -> str: """Returns the unique identifier directory for this session's sync.""" @@ -90,14 +116,23 @@ if not os.path.exists(target_path): needs_update.append(file_info.path) else: - # Memory-safe incremental hashing - h = hashlib.sha256() - with open(target_path, "rb") as f: - while True: - chunk = f.read(1024 * 1024) - if not chunk: break - h.update(chunk) - actual_hash = h.hexdigest() + # Fast path hash check vs manifest + try: + stats = os.stat(target_path) + if stats.st_size != file_info.size: + needs_update.append(file_info.path) + continue + + cached = self.hash_cache.get(target_path) + if cached and cached[0] == stats.st_size and cached[1] == stats.st_mtime: + if cached[2] != file_info.hash: + needs_update.append(file_info.path) + continue + except: + needs_update.append(file_info.path) + continue + + actual_hash = self.get_file_hash(target_path) if actual_hash != file_info.hash: print(f" [⚠️] Drift Detected: {file_info.path} (Local: {actual_hash[:8]} vs Remote: {file_info.hash[:8]})") needs_update.append(file_info.path) @@ -157,28 +192,18 @@ lf.write(json.dumps({"ts": time.time(), "owner": "node", "path": payload.path})) except: pass - # 2. Initialize Shadow File (Truncate) - data = payload.chunk - if payload.compressed: - try: data = zlib.decompress(data) - except: pass + # 2. Write Data with random access support (handles out-of-order chunks) + data = payload.chunk + if payload.compressed: + try: data = zlib.decompress(data) + except: pass - with open(tmp_path, "wb") as f: - f.write(data) - else: - # Random access write to shadow file - if not os.path.exists(tmp_path): - with open(tmp_path, "wb") as f: pass - - data = payload.chunk - if payload.compressed: - try: data = zlib.decompress(data) - except: pass - - with open(tmp_path, "r+b") as f: - # Use offset directly. In proto3, it defaults to 0 if not set. - f.seek(payload.offset) - f.write(data) + # Open in r+b if exists to avoid truncating chunks that might have arrived out-of-order, + # otherwise create with wb. + mode = "r+b" if os.path.exists(tmp_path) else "wb" + with open(tmp_path, mode) as f: + f.seek(payload.offset) + f.write(data) if payload.is_final: # 3. Finalization: Verify and Swap diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py index 354219a..8b54593 100644 --- a/agent-node/src/agent_node/node.py +++ b/agent-node/src/agent_node/node.py @@ -559,14 +559,10 @@ # r_path must be relative to base_dir so the server correctly joins it to the mirror root r_path = os.path.relpath(abs_path, base_dir) try: - # Memory-safe incremental hashing - h = hashlib.sha256() - with open(abs_path, "rb") as f: - while True: - chunk = f.read(1024 * 1024) # 1MB chunks - if not chunk: break - h.update(chunk) - file_hash = h.hexdigest() + # Memory-safe hashing with metadata cache + file_hash = self.sync_mgr.get_file_hash(abs_path) + if not file_hash: continue + files.append(agent_pb2.FileInfo( path=r_path, size=os.path.getsize(abs_path), diff --git a/ai-hub/app/core/grpc/core/mirror.py b/ai-hub/app/core/grpc/core/mirror.py index b8d9bd5..7b5ad79 100644 --- a/ai-hub/app/core/grpc/core/mirror.py +++ b/ai-hub/app/core/grpc/core/mirror.py @@ -14,6 +14,7 @@ self.storage_root = storage_root if not os.path.exists(self.storage_root): os.makedirs(self.storage_root, exist_ok=True) + self.hash_cache = {} # (abs_path) -> (size, mtime, hash) def get_ignore_filter(self, session_id: str) -> CortexIgnore: """Returns a CortexIgnore instance for a session.""" @@ -111,24 +112,18 @@ lf.truncate() except: pass - # 2. Decompress and Write Data + # 2. Decompress and Write Data (handles out-of-order chunks) data = file_payload.chunk if file_payload.compressed: try: data = zlib.decompress(data) except: pass - if file_payload.chunk_index == 0: - with open(tmp_path, "wb") as f: - f.write(data) - else: - if not os.path.exists(tmp_path): - # Ensure file exists even if chunk 0 was somehow bypassed (unlikely but safe) - with open(tmp_path, "wb") as f: pass - - with open(tmp_path, "r+b") as f: - # Use offset directly. In proto3, it defaults to 0 if not set. - f.seek(file_payload.offset) - f.write(data) + # Open in r+b if exists to avoid truncating chunks that might have arrived out-of-order, + # otherwise create with wb. + mode = "r+b" if os.path.exists(tmp_path) else "wb" + with open(tmp_path, mode) as f: + f.seek(file_payload.offset) + f.write(data) if file_payload.is_final: success = True @@ -188,20 +183,30 @@ continue raw_files.append((abs_path, rel_path)) - def _hash_file(paths): - abs_p, rel_p = paths try: - # Memory-safe incremental hashing - h = hashlib.sha256() - with open(abs_p, "rb") as f: - while True: - chunk = f.read(1024 * 1024) # 1MB chunks - if not chunk: break - h.update(chunk) + # Optimized metadata check + stats = os.stat(abs_p) + mtime = stats.st_mtime + size = stats.st_size + + cached = self.hash_cache.get(abs_p) + if cached and cached[0] == size and cached[1] == mtime: + file_hash = cached[2] + else: + # Memory-safe incremental hashing + h = hashlib.sha256() + with open(abs_p, "rb") as f: + while True: + chunk = f.read(1024 * 1024) # 1MB chunks + if not chunk: break + h.update(chunk) + file_hash = h.hexdigest() + self.hash_cache[abs_p] = (size, mtime, file_hash) + return agent_pb2.FileInfo( path=rel_p, - size=os.path.getsize(abs_p), - hash=h.hexdigest(), + size=size, + hash=file_hash, is_dir=False ) except Exception: @@ -273,8 +278,17 @@ # Metadata pre-check (Fast path) try: - if os.path.getsize(local_path) != remote_file.size: + stats = os.stat(local_path) + size = stats.st_size + mtime = stats.st_mtime + if size != remote_file.size: return remote_file.path + + cached = self.hash_cache.get(local_path) + if cached and cached[0] == size and cached[1] == mtime: + if cached[2] != remote_file.hash: + return remote_file.path + return None except: return remote_file.path # Hash check (Slow path) - Incremental @@ -285,7 +299,9 @@ chunk = f.read(1024 * 1024) if not chunk: break h.update(chunk) - if h.hexdigest() != remote_file.hash: + file_hash = h.hexdigest() + self.hash_cache[local_path] = (size, mtime, file_hash) + if file_hash != remote_file.hash: return remote_file.path except: return remote_file.path