import hashlib
import zlib
import os
from typing import Generator, Optional, BinaryIO, Union
class DataChunker:
"""
Standardizes how large payloads are split into segments for the Mesh.
Works with bytes, strings, or file-like objects.
"""
@staticmethod
def chunk_bytes(data: bytes, chunk_size: int = 4 * 1024 * 1024) -> Generator[bytes, None, None]:
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
@staticmethod
def chunk_file(file_obj: BinaryIO, chunk_size: int = 4 * 1024 * 1024) -> Generator[bytes, None, None]:
while True:
chunk = file_obj.read(chunk_size)
if not chunk:
break
yield chunk
class DataReassembler:
"""
Standardizes how segmented data is reconstructed.
"""
def __init__(self, expected_hash: Optional[str] = None):
self.chunks = {}
self.expected_hash = expected_hash
self.hasher = hashlib.sha256()
def add_chunk(self, index: int, data: bytes, is_compressed: bool = False):
chunk_data = zlib.decompress(data) if is_compressed else data
self.chunks[index] = chunk_data
def get_full_data(self) -> bytes:
sorted_indices = sorted(self.chunks.keys())
full_data = b"".join(self.chunks[i] for i in sorted_indices)
if self.expected_hash:
actual_hash = hashlib.sha256(full_data).hexdigest()
if actual_hash != self.expected_hash:
raise ValueError(f"Hash mismatch: expected {self.expected_hash}, got {actual_hash}")
return full_data
def write_to_file(self, path: str):
full_data = self.get_full_data()
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as f:
f.write(full_data)