import time
import os
import hashlib
import zlib
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
HAS_WATCHDOG = True
except ImportError:
# Optional dependency: Only required for live file sync/push-to-node features.
Observer = object
FileSystemEventHandler = object
HAS_WATCHDOG = False
from shared_core.ignore import CortexIgnore
from protos import agent_pb2
class SyncEventHandler(FileSystemEventHandler):
"""Listens for FS events and triggers gRPC delta pushes."""
def __init__(self, session_id, root_path, callback):
self.session_id = session_id
self.root_path = os.path.realpath(root_path)
self.callback = callback
self.ignore_filter = CortexIgnore(self.root_path)
self.last_sync = {} # path -> last_hash
self.locked = False
self.suppressed_paths = set() # Paths currently being modified by the system
self.syncing_paths = set() # Paths currently being scanned/pushed
def on_modified(self, event):
if not event.is_directory:
if event.src_path.endswith(".cortex_tmp") or event.src_path.endswith(".cortex_lock"):
return
print(f" [📁👁️] Watcher: Modification detected: {event.src_path}")
self._process_change(event.src_path)
def on_created(self, event):
if not event.is_directory:
if event.src_path.endswith(".cortex_tmp") or event.src_path.endswith(".cortex_lock"):
return
print(f" [📁👁️] Watcher: Creation detected: {event.src_path}")
self._process_change(event.src_path)
def on_closed(self, event):
# critical for large writes like 'dd' or 'cp' that trigger many modified events
if not event.is_directory:
if event.src_path.endswith(".cortex_tmp") or event.src_path.endswith(".cortex_lock"):
return
print(f" [📁👁️] Watcher: File closed (triggering immediate sync): {event.src_path}")
self._process_change(event.src_path, force=True)
def on_deleted(self, event):
if not event.is_directory:
if self.session_id == "__fs_explorer__":
from agent_node.config import SYNC_DIR
real_sync = os.path.realpath(SYNC_DIR)
real_src = os.path.realpath(event.src_path)
if real_src.startswith(real_sync) or event.src_path.startswith(SYNC_DIR):
return
# Resolve real paths to handle symlinks (e.g. /tmp -> /private/tmp on macOS)
real_src = os.path.realpath(event.src_path)
rel_path = os.path.normpath(os.path.relpath(real_src, self.root_path))
# M6: Extremely Critical macOS FSEvent/inotify fix.
# If a file is "deleted" but actually still exists on disk, it's either an os.replace()
# overwrite firing a deleted event for the unlinked inode, or a highly delayed
# FSEvent where a network-write (chunk) re-created a file that was deleted moments ago.
# We MUST suppress this, or it creates an infinite Delete Echo loop in the mesh!
if os.path.exists(real_src) or os.path.lexists(event.src_path):
print(f" [📁🛑] Watcher Suppressing echo delete because file STILL EXISTS (delayed event / replace): {rel_path}", flush=True)
return
# Critical: Do NOT send DELETE for internal temp/lock files.
# When sync does os.replace(tmp -> final), the OS fires a delete for .cortex_tmp.
# Without this filter, the watcher would relay a spurious DELETE to the Hub server.
if rel_path.endswith(".cortex_tmp") or rel_path.endswith(".cortex_lock"):
return
print(f" [📁🤔] Watcher on_deleted eval: event.src={event.src_path}, real_src={real_src}, root={self.root_path}, rel={rel_path}, last_sync={self.last_sync.get(rel_path)}", flush=True)
if self.last_sync.get(rel_path) == "__DELETED__":
print(f" [📁🛑] Watcher Suppressing echo delete for: {rel_path}", flush=True)
return
if not self.ignore_filter.is_ignored(rel_path):
print(f" [📁⚠️] Watcher EMITTING DELETE to SERVER for: {rel_path}", flush=True)
self.callback(self.session_id, agent_pb2.FileSyncMessage(
session_id=self.session_id,
control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.DELETE, path=rel_path)
))
def on_moved(self, event):
# Treat as delete of src and create of dest
self.on_deleted(event)
self._process_change(event.dest_path, force=True)
def _process_change(self, abs_path, force=False):
if self.locked:
return # Block all user edits when session is locked
if self.session_id == "__fs_explorer__":
from agent_node.config import SYNC_DIR
real_sync = os.path.realpath(SYNC_DIR)
real_abs_check = os.path.realpath(abs_path)
if real_abs_check.startswith(real_sync) or abs_path.startswith(SYNC_DIR):
return
# Resolve real paths to handle symlinks
real_abs = os.path.realpath(abs_path)
rel_path = os.path.normpath(os.path.relpath(real_abs, self.root_path))
if rel_path in self.suppressed_paths:
return # Ignore changes coming from the sync manager
# Phase 3: Dynamic reload if .cortexignore / .gitignore changed
if rel_path in [".cortexignore", ".gitignore"]:
print(f" [*] Reloading Ignore Filter for {self.session_id}")
self.ignore_filter = CortexIgnore(self.root_path)
if self.ignore_filter.is_ignored(rel_path):
return
# Critical: Ignore temporary and lock files to prevent infinite sync loops/echoes
if rel_path.endswith(".cortex_tmp") or rel_path.endswith(".cortex_lock"):
return
if rel_path in self.syncing_paths:
return
self.syncing_paths.add(rel_path)
try:
# Step 0: Settle Check (Debounce)
if not force:
# Wait a moment to see if the file is still being written to.
# This is critical for tools like 'dd' or 'cp' that write in bursts.
try:
initial_mtime = os.path.getmtime(abs_path)
initial_size = os.path.getsize(abs_path)
time.sleep(1.0)
if not os.path.exists(abs_path): return # File deleted during wait
current_mtime = os.path.getmtime(abs_path)
current_size = os.path.getsize(abs_path)
if current_mtime != initial_mtime or current_size != initial_size:
# Still being modified. We'll skip this event and let the next
# 'modified' event trigger the actual sync.
return
except (OSError, FileNotFoundError):
return
if not os.path.exists(abs_path):
return
# Step 1: Calculate hash first to prevent infinite echo loops
file_hash = ""
try:
h = hashlib.sha256()
with open(abs_path, "rb") as f:
while True:
block = f.read(1024 * 1024)
if not block: break
h.update(block)
file_hash = h.hexdigest()
except Exception as e:
print(f" [!] Watcher hashing error for {rel_path}: {e}")
return
if self.last_sync.get(rel_path) == file_hash:
return # Already in sync
self.last_sync[rel_path] = file_hash
file_size = os.path.getsize(abs_path)
chunk_size = 1024 * 1024 # 1MB buffer for hashing/stream
total_chunks = (file_size + chunk_size - 1) // chunk_size if file_size > 0 else 1
print(f" [📁📤] Streaming Sync Started: {rel_path} ({file_size} bytes)")
with open(abs_path, "rb") as f:
index = 0
while True:
chunk = f.read(chunk_size)
if not chunk and index > 0:
break
offset = f.tell() - len(chunk)
is_final = f.tell() >= file_size
# Compress Chunk for transit
compressed_chunk = zlib.compress(chunk)
payload_fields = {
"path": rel_path,
"chunk": compressed_chunk,
"chunk_index": index,
"is_final": is_final,
"offset": offset,
"compressed": True,
"hash": file_hash if is_final else "",
}
if "total_chunks" in agent_pb2.FilePayload.DESCRIPTOR.fields_by_name:
payload_fields["total_chunks"] = total_chunks
payload_fields["total_size"] = file_size
payload = agent_pb2.FilePayload(**payload_fields)
# Callback pushes to gRPC queue
self.callback(self.session_id, payload)
if is_final: break
index += 1
print(f" [📁📤] Streaming Sync Complete: {rel_path}")
except Exception as e:
print(f" [!] Watcher Error for {rel_path}: {e}")
finally:
self.syncing_paths.discard(rel_path)
class WorkspaceWatcher:
"""Manages FS observers for active synchronization."""
def __init__(self, callback):
self.callback = callback
self.observers = {} # session_id -> (observer, handler)
def set_lock(self, session_id, locked=True):
if session_id in self.observers:
print(f"[*] Workspace LOCK for {session_id}: {locked}")
self.observers[session_id][1].locked = locked
def start_watching(self, session_id, root_path):
if session_id in self.observers:
self.stop_watching(session_id)
if not HAS_WATCHDOG:
print(f"[!] Warning: 'watchdog' not installed. File sync disabled for session {session_id}", flush=True)
return
print(f"[*] Starting Watcher for Session {session_id} at {root_path} (realpath: {os.path.realpath(root_path)})", flush=True)
os.makedirs(root_path, exist_ok=True)
handler = SyncEventHandler(session_id, root_path, self.callback)
observer = Observer()
observer.schedule(handler, root_path, recursive=True)
observer.start()
self.observers[session_id] = (observer, handler)
def stop_watching(self, session_id):
if session_id in self.observers:
print(f"[*] Stopping Watcher for Session {session_id}")
obs, _ = self.observers[session_id]
obs.stop()
obs.join()
del self.observers[session_id]
def get_watch_path(self, session_id):
if session_id in self.observers:
return self.observers[session_id][1].root_path
return None
def acknowledge_remote_write(self, session_id, rel_path, file_hash):
"""Updates the internal hash record to match a remote write, preventing an echo-back."""
if session_id in self.observers:
_, handler = self.observers[session_id]
handler.last_sync[rel_path] = file_hash
def acknowledge_remote_delete(self, session_id, rel_path):
"""Updates the internal hash record to match a remote delete, preventing an echo-back."""
if session_id in self.observers:
print(f"[*] Watcher explicitly acknowledging REMOTE DELETE for: {rel_path} in session {session_id}", flush=True)
_, handler = self.observers[session_id]
handler.last_sync[rel_path] = "__DELETED__"
def suppress_path(self, session_id, rel_path):
"""Tells the watcher to ignore events for a specific path (e.g. during sync)."""
if session_id in self.observers:
_, handler = self.observers[session_id]
handler.suppressed_paths.add(rel_path)
def unsuppress_path(self, session_id, rel_path):
"""Resumes watching a specific path."""
if session_id in self.observers:
_, handler = self.observers[session_id]
# Use discard to avoid KeyError if it wasn't there
handler.suppressed_paths.discard(rel_path)
def shutdown(self):
for sid in list(self.observers.keys()):
self.stop_watching(sid)