Newer
Older
cortex-hub / mesh-sdk / examples / robustness_demo.py
@yangyang xie yangyang xie 17 days ago 2 KB refactor done
import sys
import os
import hashlib
import zlib
import time

# Add mesh_core dir directly to bypass __init__.py which requires protobuf
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "mesh_core"))
from utils import DataChunker, DataReassembler

def demo_chunking():
    print("=== Demo: Standardized Chunking & Reassembly ===")
    
    # 1. Create a "large" payload
    original_data = b"Cortex AI Mesh Payload " * 1000
    expected_hash = hashlib.sha256(original_data).hexdigest()
    print(f"[1] Original Data Size: {len(original_data)} bytes")
    print(f"[1] Expected Hash: {expected_hash}")

    # 2. Chunking (Simulating Hub/Agent sending)
    print("\n[2] Chunking data into 1KB segments...")
    chunks = list(DataChunker.chunk_bytes(original_data, chunk_size=1024))
    print(f"[2] Produced {len(chunks)} chunks.")

    # 3. Reassembly (Simulating Receiver)
    print("\n[3] Reassembling chunks with hash verification...")
    reassembler = DataReassembler(expected_hash=expected_hash)
    for i, chunk in enumerate(chunks):
        # Simulate optional compression
        compressed = zlib.compress(chunk)
        reassembler.add_chunk(i, compressed, is_compressed=True)
    
    try:
        final_data = reassembler.get_full_data()
        print(f"[3] Success! Reassembled size: {len(final_data)} bytes")
        print(f"[3] Matches original: {final_data == original_data}")
    except ValueError as e:
        print(f"[3] Verification Failed: {e}")

def demo_recovery_logic():
    print("\n=== Demo: Task Recovery Telemetry ===")
    print("[1] Simulating Node 'node-alpha' heartbeat timeout...")
    
    # This simulates what happens in the Hub when a node drops
    node_id = "node-alpha"
    in_flight_tasks = ["task-789", "task-101"]
    
    print(f"[2] Hub Orchestrator detected {node_id} is offline.")
    print(f"[3] Scanning Journal for in-flight tasks on {node_id}: {in_flight_tasks}")
    
    for tid in in_flight_tasks:
        # Simulate Idempotency check
        is_idempotent = tid == "task-789" # Let's say 789 is 'ls' and 101 is 'write'
        
        if is_idempotent:
            print(f"    [RESURRECT] Task {tid} is IDEMPOTENT. Moving to Global Pool for retry.")
        else:
            print(f"    [ABORT] Task {tid} is NOT safe to retry automatically. Marking as FAILED.")

if __name__ == "__main__":
    demo_chunking()
    demo_recovery_logic()