Newer
Older
cortex-hub / agent-node / agent_node / core / sync.py
import os
import hashlib
from agent_node.config import SYNC_DIR
from protos import agent_pb2

class NodeSyncManager:
    """Handles local filesystem synchronization on the Agent Node."""
    def __init__(self, base_sync_dir=SYNC_DIR):
        self.base_sync_dir = base_sync_dir
        if not os.path.exists(self.base_sync_dir):
            os.makedirs(self.base_sync_dir, exist_ok=True)

    def get_session_dir(self, session_id: str) -> str:
        """Returns the unique identifier directory for this session's sync."""
        path = os.path.join(self.base_sync_dir, session_id)
        os.makedirs(path, exist_ok=True)
        return path

    def handle_manifest(self, session_id: str, manifest: agent_pb2.DirectoryManifest) -> list:
        """Compares local files with the server manifest and returns paths needing update."""
        session_dir = self.get_session_dir(session_id)
        print(f"[📁] Reconciling Sync Directory: {session_dir}")
        
        needs_update = []
        for file_info in manifest.files:
            target_path = os.path.join(session_dir, file_info.path)
            
            if file_info.is_dir:
                os.makedirs(target_path, exist_ok=True)
                continue
                
            # File Check
            if not os.path.exists(target_path):
                needs_update.append(file_info.path)
            else:
                # Hash comparison
                with open(target_path, "rb") as f:
                    actual_hash = hashlib.sha256(f.read()).hexdigest()
                if actual_hash != file_info.hash:
                    print(f"    [⚠️] Drift Detected: {file_info.path} (Local: {actual_hash[:8]} vs Remote: {file_info.hash[:8]})")
                    needs_update.append(file_info.path)
        
        return needs_update

    def write_chunk(self, session_id: str, payload: agent_pb2.FilePayload) -> bool:
        """Writes a file chunk to the local session directory."""
        session_dir = self.get_session_dir(session_id)
        target_path = os.path.normpath(os.path.join(session_dir, payload.path))
        
        if not target_path.startswith(session_dir):
            return False # Path traversal guard
            
        os.makedirs(os.path.dirname(target_path), exist_ok=True)
        
        mode = "ab" if payload.chunk_index > 0 else "wb"
        with open(target_path, mode) as f:
            f.write(payload.chunk)
            
        if payload.is_final and payload.hash:
            return self._verify(target_path, payload.hash)
        return True

    def _verify(self, path, expected_hash):
        with open(path, "rb") as f:
            actual = hashlib.sha256(f.read()).hexdigest()
        if actual != expected_hash:
            print(f"[⚠️] Sync Hash Mismatch for {path}")
            return False
        return True