import sys
import os
import time
import grpc
import logging
from concurrent import futures
# Add parent dir to path so we can import mesh_core
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from mesh_core.engines import MeshServerCore
from mesh_core.transport import GrpcServerTransport
from mesh_core import agent_pb2, agent_pb2_grpc
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("LiteServer")
class LiteServer(agent_pb2_grpc.AgentOrchestratorServicer):
"""
A minimal Mesh Hub implementation using the Lite Kit.
Demonstrates how to build a remote-control hub in < 100 lines of code.
"""
def __init__(self):
self.mesh_core = MeshServerCore()
# Bind events
self.mesh_core.on_node_online = lambda node: logger.info(f"🟢 Node Online: {node.node_id}")
self.mesh_core.on_node_offline = lambda node: logger.warning(f"🔴 Node Offline: {node.node_id}")
self.mesh_core.on_message_received = self.handle_message
def handle_message(self, node_id, msg):
kind = msg.WhichOneof('payload')
logger.info(f"[*] Message from {node_id}: {kind}")
if kind == 'task_response':
logger.info(f"✅ Task {msg.task_response.task_id} completed with status {msg.task_response.status}")
def SyncConfiguration(self, request, context):
logger.info(f"Handshake request from {request.node_id}")
# Always allow in this demo
return agent_pb2.RegistrationResponse(
success=True,
policy=agent_pb2.SandboxPolicy(mode=agent_pb2.SandboxPolicy.PERMISSIVE)
)
def TaskStream(self, request_iterator, context):
# 1. Identify node from first message (announce)
try:
first_msg = next(request_iterator)
if not first_msg.HasField('announce'):
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "First message must be 'announce'")
return
node_id = first_msg.announce.node_id
except Exception as e:
logger.error(f"Handshake error: {e}")
return
# 2. Setup Transport
transport = GrpcServerTransport(context)
self.mesh_core.register_node(node_id, "default-user", {}, transport)
# 3. Start Reader Thread
def _read():
try:
for msg in request_iterator:
self.mesh_core.handle_inbound(node_id, msg)
except Exception as e:
logger.warning(f"Reader for {node_id} closed: {e}")
finally:
self.mesh_core.deregister_node(node_id)
import threading
threading.Thread(target=_read, daemon=True).start()
# 4. Main Writer Loop
logger.info(f"Stream established for {node_id}")
while context.is_active():
try:
# In a real app, you'd dispatch tasks here.
# For demo, we just wait for outbound messages.
msg = transport.send_queue.get(timeout=1.0)
yield msg
except:
pass
def ReportHealth(self, request_iterator, context):
for hb in request_iterator:
logger.info(f"💓 Heartbeat from {hb.node_id}: CPU {hb.cpu_usage_percent}%")
yield agent_pb2.HealthCheckResponse(server_time_ms=int(time.time()*1000))
def serve(port=50051):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
agent_pb2_grpc.add_AgentOrchestratorServicer_to_server(LiteServer(), server)
server.add_insecure_port(f'[::]:{port}')
logger.info(f"LiteServer starting on port {port}...")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
serve()