import time
import os
import hashlib
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from protos import agent_pb2
class SyncEventHandler(FileSystemEventHandler):
"""Listens for FS events and triggers gRPC delta pushes."""
def __init__(self, session_id, root_path, callback):
self.session_id = session_id
self.root_path = root_path
self.callback = callback
self.last_sync = {} # path -> last_hash
def on_modified(self, event):
if not event.is_directory:
self._process_change(event.src_path)
def on_created(self, event):
if not event.is_directory:
self._process_change(event.src_path)
def on_moved(self, event):
# Simplification: treat move as a delete and create, or just process the dest
self._process_change(event.dest_path)
def _process_change(self, abs_path):
rel_path = os.path.relpath(abs_path, self.root_path)
# Basic ignore filter (v1)
if any(part.startswith('.') for part in rel_path.split(os.sep)) or "node_modules" in rel_path:
return
try:
with open(abs_path, "rb") as f:
content = f.read()
file_hash = hashlib.sha256(content).hexdigest()
if self.last_sync.get(rel_path) == file_hash:
return # No actual change
self.last_sync[rel_path] = file_hash
print(f" [📁📤] Detected Change: {rel_path}")
# Chunk and Send
chunk_size = 64 * 1024
for i in range(0, len(content), chunk_size):
chunk = content[i:i + chunk_size]
is_final = i + chunk_size >= len(content)
payload = agent_pb2.FilePayload(
path=rel_path,
chunk=chunk,
chunk_index=i // chunk_size,
is_final=is_final,
hash=file_hash if is_final else ""
)
self.callback(self.session_id, payload)
except Exception as e:
print(f" [!] Watcher Error for {rel_path}: {e}")
class WorkspaceWatcher:
"""Manages FS observers for active synchronization."""
def __init__(self, callback):
self.callback = callback
self.observers = {} # session_id -> observer
def start_watching(self, session_id, root_path):
if session_id in self.observers:
self.stop_watching(session_id)
print(f"[*] Starting Watcher for Session {session_id} at {root_path}")
handler = SyncEventHandler(session_id, root_path, self.callback)
observer = Observer()
observer.schedule(handler, root_path, recursive=True)
observer.start()
self.observers[session_id] = observer
def stop_watching(self, session_id):
if session_id in self.observers:
print(f"[*] Stopping Watcher for Session {session_id}")
self.observers[session_id].stop()
self.observers[session_id].join()
del self.observers[session_id]
def shutdown(self):
for sid in list(self.observers.keys()):
self.stop_watching(sid)