Newer
Older
cortex-hub / poc-grpc-agent / server.py
import grpc
from concurrent import futures
import time
import agent_pb2
import agent_pb2_grpc
import queue
import threading

class AgentOrchestratorServicer(agent_pb2_grpc.AgentOrchestratorServicer):
    def __init__(self):
        self.nodes = {} # node_id -> queue for messages to node

    def Connect(self, request_iterator, context):
        node_id = None
        
        # We need a way to send messages to the client from another thread/input
        # In a real app, this would be triggered by an AI task planner
        send_queue = queue.Queue()

        def stream_messages():
            while context.is_active():
                try:
                    msg = send_queue.get(timeout=1.0)
                    yield msg
                except queue.Empty:
                    continue

        # Start a thread to handle incoming messages from the client
        incoming_thread = threading.Thread(target=self._handle_incoming, args=(request_iterator, context, send_queue))
        incoming_thread.start()

        # Yield messages from the queue to the client
        for msg in stream_messages():
            yield msg

    def _handle_incoming(self, request_iterator, context, send_queue):
        try:
            for message in request_iterator:
                payload_type = message.WhichOneof('payload')
                if payload_type == 'registration':
                    reg = message.registration
                    print(f"[*] Node Registered: {reg.node_id} (v{reg.version}) on {reg.platform}")
                    print(f"[*] Capabilities: {reg.capabilities}")
                    
                    # Send ACK
                    ack = agent_pb2.ServerMessage(
                        registration_ack=agent_pb2.RegistrationResponse(
                            success=True,
                            session_id="session-123"
                        )
                    )
                    send_queue.put(ack)

                    # For POC: Immediately dispatch a test task
                    test_task = agent_pb2.ServerMessage(
                        task_request=agent_pb2.TaskRequest(
                            task_id="task-001",
                            task_type="shell",
                            payload_json='{"command": "echo Hello from Cortex Server"}',
                            idempotency_key="ik-001"
                        )
                    )
                    print(f"[*] Dispatching test task to {reg.node_id}...")
                    send_queue.put(test_task)

                elif payload_type == 'heartbeat':
                    hb = message.heartbeat
                    # print(f"[+] Heartbeat from {hb.node_id}: CPU {hb.cpu_usage_percent}%")
                    pass

                elif payload_type == 'task_response':
                    res = message.task_response
                    print(f"[!] Task Finished: {res.task_id} | Status: {res.status}")
                    print(f"    Stdout: {res.stdout.strip()}")
        except Exception as e:
            print(f"[!] Error handling incoming stream: {e}")

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    agent_pb2_grpc.add_AgentOrchestratorServicer_to_server(AgentOrchestratorServicer(), server)
    server.add_insecure_port('[::]:50051')
    print("[*] Cortex Server POC listening on port 50051...")
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    serve()