Newer
Older
cortex-hub / agent-node / agent_node / core / watcher.py

import time
import os
import hashlib
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from shared_core.ignore import CortexIgnore
from protos import agent_pb2

class SyncEventHandler(FileSystemEventHandler):
    """Listens for FS events and triggers gRPC delta pushes."""
    def __init__(self, session_id, root_path, callback):
        self.session_id = session_id
        self.root_path = root_path
        self.callback = callback
        self.ignore_filter = CortexIgnore(root_path)
        self.last_sync = {} # path -> last_hash
        self.locked = False

    def on_modified(self, event):
        if not event.is_directory:
            self._process_change(event.src_path)

    def on_created(self, event):
        if not event.is_directory:
            self._process_change(event.src_path)

    def on_moved(self, event):
        # Simplification: treat move as a delete and create, or just process the dest
        self._process_change(event.dest_path)

    def _process_change(self, abs_path):
        if self.locked:
            return # Block all user edits when session is locked

        rel_path = os.path.normpath(os.path.relpath(abs_path, self.root_path))
        
        # Phase 3: Dynamic reload if .cortexignore / .gitignore changed
        if rel_path in [".cortexignore", ".gitignore"]:
            print(f"    [*] Reloading Ignore Filter for {self.session_id}")
            self.ignore_filter = CortexIgnore(self.root_path)

        if self.ignore_filter.is_ignored(rel_path):
            return

        try:
            with open(abs_path, "rb") as f:
                content = f.read()
                file_hash = hashlib.sha256(content).hexdigest()
            
            if self.last_sync.get(rel_path) == file_hash:
                return # No actual change

            self.last_sync[rel_path] = file_hash
            print(f"    [📁📤] Detected Change: {rel_path}")
            
            # Chunk and Send
            chunk_size = 64 * 1024
            for i in range(0, len(content), chunk_size):
                chunk = content[i:i + chunk_size]
                is_final = i + chunk_size >= len(content)
                payload = agent_pb2.FilePayload(
                    path=rel_path,
                    chunk=chunk,
                    chunk_index=i // chunk_size,
                    is_final=is_final,
                    hash=file_hash if is_final else ""
                )
                self.callback(self.session_id, payload)
        except Exception as e:
            print(f"    [!] Watcher Error for {rel_path}: {e}")

class WorkspaceWatcher:
    """Manages FS observers for active synchronization."""
    def __init__(self, callback):
        self.callback = callback
        self.observers = {} # session_id -> (observer, handler)

    def set_lock(self, session_id, locked=True):
        if session_id in self.observers:
            print(f"[*] Workspace LOCK for {session_id}: {locked}")
            self.observers[session_id][1].locked = locked

    def start_watching(self, session_id, root_path):
        if session_id in self.observers:
            self.stop_watching(session_id)
        
        print(f"[*] Starting Watcher for Session {session_id} at {root_path}")
        handler = SyncEventHandler(session_id, root_path, self.callback)
        observer = Observer()
        observer.schedule(handler, root_path, recursive=True)
        observer.start()
        self.observers[session_id] = (observer, handler)

    def stop_watching(self, session_id):
        if session_id in self.observers:
            print(f"[*] Stopping Watcher for Session {session_id}")
            obs, _ = self.observers[session_id]
            obs.stop()
            obs.join()
            del self.observers[session_id]

    def get_watch_path(self, session_id):
        if session_id in self.observers:
            return self.observers[session_id][1].root_path
        return None

    def shutdown(self):
        for sid in list(self.observers.keys()):
            self.stop_watching(sid)