import time
import os
import hashlib
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from shared_core.ignore import CortexIgnore
from protos import agent_pb2
class SyncEventHandler(FileSystemEventHandler):
"""Listens for FS events and triggers gRPC delta pushes."""
def __init__(self, session_id, root_path, callback):
self.session_id = session_id
self.root_path = root_path
self.callback = callback
self.ignore_filter = CortexIgnore(root_path)
self.last_sync = {} # path -> last_hash
self.locked = False
def on_modified(self, event):
if not event.is_directory:
self._process_change(event.src_path)
def on_created(self, event):
if not event.is_directory:
self._process_change(event.src_path)
def on_moved(self, event):
# Simplification: treat move as a delete and create, or just process the dest
self._process_change(event.dest_path)
def _process_change(self, abs_path):
if self.locked:
return # Block all user edits when session is locked
rel_path = os.path.normpath(os.path.relpath(abs_path, self.root_path))
# Phase 3: Dynamic reload if .cortexignore / .gitignore changed
if rel_path in [".cortexignore", ".gitignore"]:
print(f" [*] Reloading Ignore Filter for {self.session_id}")
self.ignore_filter = CortexIgnore(self.root_path)
if self.ignore_filter.is_ignored(rel_path):
return
try:
with open(abs_path, "rb") as f:
content = f.read()
file_hash = hashlib.sha256(content).hexdigest()
if self.last_sync.get(rel_path) == file_hash:
return # No actual change
self.last_sync[rel_path] = file_hash
print(f" [📁📤] Detected Change: {rel_path}")
# Chunk and Send
chunk_size = 64 * 1024
for i in range(0, len(content), chunk_size):
chunk = content[i:i + chunk_size]
is_final = i + chunk_size >= len(content)
payload = agent_pb2.FilePayload(
path=rel_path,
chunk=chunk,
chunk_index=i // chunk_size,
is_final=is_final,
hash=file_hash if is_final else ""
)
self.callback(self.session_id, payload)
except Exception as e:
print(f" [!] Watcher Error for {rel_path}: {e}")
class WorkspaceWatcher:
"""Manages FS observers for active synchronization."""
def __init__(self, callback):
self.callback = callback
self.observers = {} # session_id -> (observer, handler)
def set_lock(self, session_id, locked=True):
if session_id in self.observers:
print(f"[*] Workspace LOCK for {session_id}: {locked}")
self.observers[session_id][1].locked = locked
def start_watching(self, session_id, root_path):
if session_id in self.observers:
self.stop_watching(session_id)
print(f"[*] Starting Watcher for Session {session_id} at {root_path}")
handler = SyncEventHandler(session_id, root_path, self.callback)
observer = Observer()
observer.schedule(handler, root_path, recursive=True)
observer.start()
self.observers[session_id] = (observer, handler)
def stop_watching(self, session_id):
if session_id in self.observers:
print(f"[*] Stopping Watcher for Session {session_id}")
obs, _ = self.observers[session_id]
obs.stop()
obs.join()
del self.observers[session_id]
def get_watch_path(self, session_id):
if session_id in self.observers:
return self.observers[session_id][1].root_path
return None
def shutdown(self):
for sid in list(self.observers.keys()):
self.stop_watching(sid)