diff --git a/agent-node/VERSION b/agent-node/VERSION index c3ccccf..9972f12 100644 --- a/agent-node/VERSION +++ b/agent-node/VERSION @@ -1 +1 @@ -1.0.59 +1.0.61 diff --git a/agent-node/src/agent_node/core/sync.py b/agent-node/src/agent_node/core/sync.py index 76f8ae8..d48fa03 100644 --- a/agent-node/src/agent_node/core/sync.py +++ b/agent-node/src/agent_node/core/sync.py @@ -140,7 +140,7 @@ else: # Random access write to shadow file if not os.path.exists(tmp_path): - with open(tmp_path, "ab") as f: pass + with open(tmp_path, "wb") as f: pass data = payload.chunk if payload.compressed: @@ -148,7 +148,8 @@ except: pass with open(tmp_path, "r+b") as f: - f.seek(payload.offset if payload.HasField("offset") else 0) + # Use offset directly. In proto3, it defaults to 0 if not set. + f.seek(payload.offset) f.write(data) if payload.is_final: diff --git a/agent-node/src/agent_node/core/watcher.py b/agent-node/src/agent_node/core/watcher.py index d755d33..0e24a66 100644 --- a/agent-node/src/agent_node/core/watcher.py +++ b/agent-node/src/agent_node/core/watcher.py @@ -29,15 +29,18 @@ def on_modified(self, event): if not event.is_directory: + print(f" [📁👁️] Watcher: Modification detected: {event.src_path}") self._process_change(event.src_path) def on_created(self, event): if not event.is_directory: + print(f" [📁👁️] Watcher: Creation detected: {event.src_path}") self._process_change(event.src_path) def on_closed(self, event): # critical for large writes like 'dd' or 'cp' that trigger many modified events if not event.is_directory: + print(f" [📁👁️] Watcher: File closed (triggering immediate sync): {event.src_path}") self._process_change(event.src_path, force=True) def on_deleted(self, event): @@ -71,6 +74,10 @@ if self.ignore_filter.is_ignored(rel_path): return + # Critical: Ignore temporary and lock files to prevent infinite sync loops/echoes + if rel_path.endswith(".cortex_tmp") or rel_path.endswith(".cortex_lock"): + return + if rel_path in self.syncing_paths: return diff --git a/ai-hub/app/core/grpc/core/mirror.py b/ai-hub/app/core/grpc/core/mirror.py index 836f930..9836fd6 100644 --- a/ai-hub/app/core/grpc/core/mirror.py +++ b/ai-hub/app/core/grpc/core/mirror.py @@ -42,6 +42,9 @@ print(f" [📁🚷] Ignoring write to {file_payload.path}") return + if file_payload.path.endswith(".cortex_tmp") or file_payload.path.endswith(".cortex_lock"): + return + # Prevent path traversal — strip leading slash to ensure it's relative to workspace path_safe = file_payload.path.lstrip("/") safe_path = os.path.normpath(os.path.join(workspace, path_safe)) @@ -54,19 +57,20 @@ # 1. Lock/Metadata initialization or update if file_payload.chunk_index == 0: + print(f" [📁📥] Sync Starting: {file_payload.path} ({file_payload.total_size} bytes, {file_payload.total_chunks} chunks)") try: metadata = { "ts": time.time(), "owner": "hub", "path": file_payload.path, - "total_chunks": file_payload.total_chunks, - "total_size": file_payload.total_size, + "total_chunks": getattr(file_payload, "total_chunks", 0), + "total_size": getattr(file_payload, "total_size", 0), "received_chunks": [0] } with open(lock_path, "w") as lf: lf.write(json.dumps(metadata)) except: pass - elif file_payload.chunk_index % 20 == 0: + elif file_payload.chunk_index % 10 == 0: try: if os.path.exists(lock_path): with open(lock_path, "r+") as lf: @@ -91,10 +95,11 @@ else: if not os.path.exists(tmp_path): # Ensure file exists even if chunk 0 was somehow bypassed (unlikely but safe) - with open(tmp_path, "ab") as f: pass + with open(tmp_path, "wb") as f: pass with open(tmp_path, "r+b") as f: - f.seek(file_payload.offset if file_payload.HasField("offset") else 0) + # Use offset directly. In proto3, it defaults to 0 if not set. + f.seek(file_payload.offset) f.write(data) if file_payload.is_final: @@ -104,6 +109,12 @@ if success: try: + # M6: Ownership Preservation + try: + parent_stat = os.stat(os.path.dirname(safe_path)) + os.chown(tmp_path, parent_stat.st_uid, parent_stat.st_gid) + except: pass + os.replace(tmp_path, safe_path) print(f" [📁✅] Sync Complete: {file_payload.path} (Swapped and verified)") except Exception as e: diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index 024d5b3..09ac7b9 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -174,8 +174,13 @@ ), priority=0) for sid in active_sessions: - # Re-push manifest to trigger node-side drift check + # 1. Re-push manifest to trigger node-side drift check self.push_workspace(node_id, sid) + + # 2. PROACTIVE: Ensure the node starts watching this session + # This is critical for catching 'dd' or 'cp' writes on the node + self.control_sync(node_id, sid, action="START") + # Add a small delay to prevent saturating the gRPC stream for multiple sessions time.sleep(0.5)