Newer
Older
cortex-hub / mesh-sdk / examples / lite_node.py
@Antigravity AI Antigravity AI 17 days ago 2 KB half done refactoring
import sys
import os
import time
import grpc
import logging

# Add parent dir to path so we can import mesh_core
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from mesh_core.node_engine import MeshNodeCore
from mesh_core.transport_grpc import GrpcMeshTransport
from mesh_core import agent_pb2, agent_pb2_grpc

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("LiteNode")

def get_stub():
    """
    Example stub factory. 
    In production, this would return a secure channel with SSL certs.
    """
    target = os.getenv("MESH_HUB_URL", "localhost:50051")
    logger.info(f"Connecting to {target}...")
    channel = grpc.insecure_channel(target)
    stub = agent_pb2_grpc.AgentOrchestratorStub(channel)
    return stub, channel

class LiteNode(MeshNodeCore):
    """
    A minimal Mesh Node implementation using the Lite Kit.
    Demonstrates how to build a remote-controlled agent in < 50 lines of code.
    """
    def __init__(self, node_id: str, auth_token: str = ""):
        # 1. Choose a transport (gRPC) and provide the connection factory
        self.transport = GrpcMeshTransport(node_id, get_stub, auth_token=auth_token)
        
        # 2. Initialize the core engine
        super().__init__(node_id, self.transport)
        
        # 3. Bind your logic
        self.on_task = self.handle_task
        self.on_ready = lambda _: logger.info(f"[*] LiteNode '{node_id}' is AUTHORIZED and ONLINE.")
        self.on_disconnect = lambda: logger.warning("[!] Lost connection to Hub.")

    def handle_task(self, task):
        logger.info(f"[Task] Executing: {task.task_id}")
        
        # Simulate some work
        time.sleep(1)
        
        # Send result back via the transport-agnostic engine
        response = agent_pb2.TaskResponse(
            task_id=task.task_id, 
            stdout=f"LiteNode '{self.node_id}' successfully processed task {task.task_id}", 
            status=0
        )
        self.send_message(agent_pb2.ClientTaskMessage(task_response=response))

if __name__ == "__main__":
    node_id = sys.argv[1] if len(sys.argv) > 1 else "lite-demo-node"
    
    node = LiteNode(node_id)
    node.start()
    
    logger.info("LiteNode running. Press Ctrl+C to stop.")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Shutting down...")
        node.stop()