Newer
Older
cortex-hub / mesh-sdk / mesh_core / utils / data.py
@yangyang xie yangyang xie 17 days ago 1 KB refactor done
import hashlib
import zlib
import os
from typing import Generator, Optional, BinaryIO, Union

class DataChunker:
    """
    Standardizes how large payloads are split into segments for the Mesh.
    Works with bytes, strings, or file-like objects.
    """
    @staticmethod
    def chunk_bytes(data: bytes, chunk_size: int = 4 * 1024 * 1024) -> Generator[bytes, None, None]:
        for i in range(0, len(data), chunk_size):
            yield data[i:i + chunk_size]

    @staticmethod
    def chunk_file(file_obj: BinaryIO, chunk_size: int = 4 * 1024 * 1024) -> Generator[bytes, None, None]:
        while True:
            chunk = file_obj.read(chunk_size)
            if not chunk:
                break
            yield chunk

class DataReassembler:
    """
    Standardizes how segmented data is reconstructed.
    """
    def __init__(self, expected_hash: Optional[str] = None):
        self.chunks = {}
        self.expected_hash = expected_hash
        self.hasher = hashlib.sha256()

    def add_chunk(self, index: int, data: bytes, is_compressed: bool = False):
        chunk_data = zlib.decompress(data) if is_compressed else data
        self.chunks[index] = chunk_data
        
    def get_full_data(self) -> bytes:
        sorted_indices = sorted(self.chunks.keys())
        full_data = b"".join(self.chunks[i] for i in sorted_indices)
        
        if self.expected_hash:
            actual_hash = hashlib.sha256(full_data).hexdigest()
            if actual_hash != self.expected_hash:
                raise ValueError(f"Hash mismatch: expected {self.expected_hash}, got {actual_hash}")
        
        return full_data

    def write_to_file(self, path: str):
        full_data = self.get_full_data()
        os.makedirs(os.path.dirname(path), exist_ok=True)
        with open(path, "wb") as f:
            f.write(full_data)