Newer
Older
cortex-hub / agent-node / agent_node / core / sync.py
import os
import hashlib
from agent_node.config import SYNC_DIR
from protos import agent_pb2

class NodeSyncManager:
    """Handles local filesystem synchronization on the Agent Node."""
    def __init__(self, base_sync_dir=SYNC_DIR):
        self.base_sync_dir = base_sync_dir
        if not os.path.exists(self.base_sync_dir):
            os.makedirs(self.base_sync_dir, exist_ok=True)

    def get_session_dir(self, session_id: str, create: bool = False) -> str:
        """Returns the unique identifier directory for this session's sync."""
        path = os.path.join(self.base_sync_dir, session_id)
        if create:
            os.makedirs(path, exist_ok=True)
        return path

    def purge(self, session_id: str):
        """Completely removes a session's sync directory from the node."""
        path = os.path.join(self.base_sync_dir, session_id)
        if os.path.exists(path):
            import shutil
            shutil.rmtree(path)
            print(f"    [๐Ÿ“๐Ÿงน] Node sync directory deleted: {session_id}")

    def cleanup_unused_sessions(self, active_session_ids: list):
        """Removes any session directories that are no longer active on the server."""
        if not os.path.exists(self.base_sync_dir):
            return
            
        import shutil
        active_set = set(active_session_ids)
        for session_id in os.listdir(self.base_sync_dir):
            if session_id.startswith("session-") and session_id not in active_set:
                path = os.path.join(self.base_sync_dir, session_id)
                if os.path.isdir(path):
                    shutil.rmtree(path)
                    print(f"    [๐Ÿ“๐Ÿงน] Proactively purged unused session directory: {session_id}")

    def handle_manifest(self, session_id: str, manifest: agent_pb2.DirectoryManifest) -> list:
        """Compares local files with the server manifest and returns paths needing update."""
        session_dir = self.get_session_dir(session_id, create=True)
        print(f"[๐Ÿ“] Reconciling Sync Directory: {session_dir}")
        
        from shared_core.ignore import CortexIgnore
        ignore_filter = CortexIgnore(session_dir)
        expected_paths = {f.path for f in manifest.files}

        # 1. Purge extraneous local files and directories (handles Deletions)
        for root, dirs, files in os.walk(session_dir, topdown=False):
            for name in files:
                abs_path = os.path.join(root, name)
                rel_path = os.path.relpath(abs_path, session_dir)
                if rel_path in [".cortexignore", ".gitignore"]: continue
                if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path):
                    try:
                        os.remove(abs_path)
                        print(f"    [๐Ÿ“๐Ÿ—‘๏ธ] Deleted extraneous local file: {rel_path}")
                    except Exception as e:
                        print(f"    [โš ๏ธ] Failed to delete file {rel_path}: {e}")
            
            for name in dirs:
                abs_path = os.path.join(root, name)
                rel_path = os.path.relpath(abs_path, session_dir)
                if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path):
                    try:
                        if not os.listdir(abs_path):
                            os.rmdir(abs_path)
                    except Exception:
                        pass
        
        needs_update = []
        for file_info in manifest.files:
            target_path = os.path.join(session_dir, file_info.path)
            
            if file_info.is_dir:
                os.makedirs(target_path, exist_ok=True)
                continue
                
            # File Check
            if not os.path.exists(target_path):
                needs_update.append(file_info.path)
            else:
                # Hash comparison
                with open(target_path, "rb") as f:
                    actual_hash = hashlib.sha256(f.read()).hexdigest()
                if actual_hash != file_info.hash:
                    print(f"    [โš ๏ธ] Drift Detected: {file_info.path} (Local: {actual_hash[:8]} vs Remote: {file_info.hash[:8]})")
                    needs_update.append(file_info.path)
        
        return needs_update

    def write_chunk(self, session_id: str, payload: agent_pb2.FilePayload) -> bool:
        """Writes a file chunk to the local session directory."""
        session_dir = self.get_session_dir(session_id, create=True)
        target_path = os.path.normpath(os.path.join(session_dir, payload.path))
        
        if not target_path.startswith(session_dir):
            return False # Path traversal guard
            
        os.makedirs(os.path.dirname(target_path), exist_ok=True)
        
        mode = "ab" if payload.chunk_index > 0 else "wb"
        with open(target_path, mode) as f:
            f.write(payload.chunk)
            
        if payload.is_final and payload.hash:
            return self._verify(target_path, payload.hash)
        return True

    def _verify(self, path, expected_hash):
        with open(path, "rb") as f:
            actual = hashlib.sha256(f.read()).hexdigest()
        if actual != expected_hash:
            print(f"[โš ๏ธ] Sync Hash Mismatch for {path}")
            return False
        return True