diff --git a/agent-node/src/agent_node/core/sync.py b/agent-node/src/agent_node/core/sync.py index 08d7e31..0fa835f 100644 --- a/agent-node/src/agent_node/core/sync.py +++ b/agent-node/src/agent_node/core/sync.py @@ -110,6 +110,30 @@ os.makedirs(os.path.dirname(target_path), exist_ok=True) + # 0. Fast-Path for single-chunk files + total_chunks = getattr(payload, "total_chunks", 0) + if total_chunks == 1 and payload.chunk_index == 0 and payload.is_final: + data = payload.chunk + if payload.compressed: + try: data = zlib.decompress(data) + except: pass + + with open(target_path, "wb") as f: + f.write(data) + + success = True + if payload.hash: + success = self._verify(target_path, payload.hash) + + if not success: + try: os.remove(target_path) + except: pass + print(f" [📁❌] Fast Sync failed hash check for {payload.path}") + return False + + print(f" [📁⚡] Fast Sync Complete: {payload.path}") + return True + # We always write to a temporary "shadow" file during the sync tmp_path = target_path + ".cortex_tmp" lock_path = target_path + ".cortex_lock" diff --git a/agent-node/src/agent_node/core/watcher.py b/agent-node/src/agent_node/core/watcher.py index 6a344ae..e0e7a4c 100644 --- a/agent-node/src/agent_node/core/watcher.py +++ b/agent-node/src/agent_node/core/watcher.py @@ -119,7 +119,6 @@ hasher = hashlib.sha256() with open(abs_path, "rb") as f: - offset = 0 index = 0 while True: chunk = f.read(chunk_size) @@ -127,6 +126,8 @@ break hasher.update(chunk) + offset = f.tell() - len(chunk) + is_final = f.tell() >= file_size # Compress Chunk for transit compressed_chunk = zlib.compress(chunk) @@ -135,10 +136,10 @@ "path": rel_path, "chunk": compressed_chunk, "chunk_index": index, - "is_final": False, + "is_final": is_final, "offset": offset, "compressed": True, - "hash": "", + "hash": hasher.hexdigest() if is_final else "", } if hasattr(agent_pb2.FilePayload, "total_chunks"): payload_fields["total_chunks"] = total_chunks @@ -149,26 +150,10 @@ # Callback pushes to gRPC queue self.callback(self.session_id, payload) - offset += len(chunk) + if is_final: break index += 1 - # Update internal tracking with the final hash file_hash = hasher.hexdigest() - - # Signal completion with the hash to the server - # (The server uses this to verify integrity and perform the atomic swap) - sentinel_fields = { - "path": rel_path, - "is_final": True, - "hash": file_hash, - "chunk_index": index, - "offset": offset, - } - if hasattr(agent_pb2.FilePayload, "total_chunks"): - sentinel_fields["total_chunks"] = total_chunks - sentinel_fields["total_size"] = file_size - - self.callback(self.session_id, agent_pb2.FilePayload(**sentinel_fields)) if self.last_sync.get(rel_path) == file_hash: # Chunks were already sent, so we must send sentinel above, diff --git a/ai-hub/app/core/grpc/core/mirror.py b/ai-hub/app/core/grpc/core/mirror.py index 42d36f5..b8d9bd5 100644 --- a/ai-hub/app/core/grpc/core/mirror.py +++ b/ai-hub/app/core/grpc/core/mirror.py @@ -52,6 +52,34 @@ raise ValueError(f"Malicious path detected: {file_payload.path}") os.makedirs(os.path.dirname(safe_path), exist_ok=True) + + # 0. Fast-Path for single-chunk files (Zero UX latency, no locking) + total_chunks = getattr(file_payload, "total_chunks", 0) + if total_chunks == 1 and file_payload.chunk_index == 0 and file_payload.is_final: + data = file_payload.chunk + if file_payload.compressed: + try: data = zlib.decompress(data) + except: pass + + with open(safe_path, "wb") as f: + f.write(data) + + success = True + if file_payload.hash: + success = self._verify_hash(safe_path, file_payload.hash) + + if not success: + try: os.remove(safe_path) + except: pass + print(f" [📁❌] Fast Sync failed hash check for {file_payload.path}") + else: + try: + parent_stat = os.stat(os.path.dirname(safe_path)) + os.chown(safe_path, parent_stat.st_uid, parent_stat.st_gid) + except: pass + print(f" [📁⚡] Fast Sync Complete: {file_payload.path}") + return + tmp_path = safe_path + ".cortex_tmp" lock_path = safe_path + ".cortex_lock" diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index c8a624d..7867de1 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -76,9 +76,11 @@ try: with open(abs_path, "rb") as f: + chunk_size = 4 * 1024 * 1024 # 4MB chunks (optimal for gRPC) + total_chunks = (file_size + chunk_size - 1) // chunk_size if file_size > 0 else 1 index = 0 while True: - chunk = f.read(4 * 1024 * 1024) # 4MB chunks (optimal for gRPC) + chunk = f.read(chunk_size) if not chunk: break hasher.update(chunk) @@ -87,20 +89,26 @@ # Compress Chunk for transit compressed_chunk = zlib.compress(chunk) + + # Construct payload + payload_fields = { + "path": rel_path, + "chunk": compressed_chunk, + "chunk_index": index, + "is_final": is_final, + "hash": hasher.hexdigest() if is_final else "", + "offset": offset, + "compressed": True, + } + if hasattr(agent_pb2.FilePayload, "total_chunks"): + payload_fields["total_chunks"] = total_chunks + payload_fields["total_size"] = file_size # Put into priority dispatcher (priority 2 for sync data) node.send_message(agent_pb2.ServerTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, - file_data=agent_pb2.FilePayload( - path=rel_path, - chunk=compressed_chunk, - chunk_index=index, - is_final=is_final, - hash=hasher.hexdigest() if is_final else "", - offset=offset, - compressed=True - ) + file_data=agent_pb2.FilePayload(**payload_fields) ) ), priority=2)