import grpc
from concurrent import futures
import time
import agent_pb2
import agent_pb2_grpc
import queue
import threading
class AgentOrchestratorServicer(agent_pb2_grpc.AgentOrchestratorServicer):
def __init__(self):
self.nodes = {} # node_id -> queue for messages to node
def Connect(self, request_iterator, context):
node_id = None
# We need a way to send messages to the client from another thread/input
# In a real app, this would be triggered by an AI task planner
send_queue = queue.Queue()
def stream_messages():
while context.is_active():
try:
msg = send_queue.get(timeout=1.0)
yield msg
except queue.Empty:
continue
# Start a thread to handle incoming messages from the client
incoming_thread = threading.Thread(target=self._handle_incoming, args=(request_iterator, context, send_queue))
incoming_thread.start()
# Yield messages from the queue to the client
for msg in stream_messages():
yield msg
def _handle_incoming(self, request_iterator, context, send_queue):
try:
for message in request_iterator:
payload_type = message.WhichOneof('payload')
if payload_type == 'registration':
reg = message.registration
print(f"[*] Node Registered: {reg.node_id} (v{reg.version}) on {reg.platform}")
print(f"[*] Capabilities: {reg.capabilities}")
# Send ACK
ack = agent_pb2.ServerMessage(
registration_ack=agent_pb2.RegistrationResponse(
success=True,
session_id="session-123"
)
)
send_queue.put(ack)
# For POC: Immediately dispatch a test task
test_task = agent_pb2.ServerMessage(
task_request=agent_pb2.TaskRequest(
task_id="task-001",
task_type="shell",
payload_json='{"command": "echo Hello from Cortex Server"}',
idempotency_key="ik-001"
)
)
print(f"[*] Dispatching test task to {reg.node_id}...")
send_queue.put(test_task)
elif payload_type == 'heartbeat':
hb = message.heartbeat
# print(f"[+] Heartbeat from {hb.node_id}: CPU {hb.cpu_usage_percent}%")
pass
elif payload_type == 'task_response':
res = message.task_response
print(f"[!] Task Finished: {res.task_id} | Status: {res.status}")
print(f" Stdout: {res.stdout.strip()}")
except Exception as e:
print(f"[!] Error handling incoming stream: {e}")
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
agent_pb2_grpc.add_AgentOrchestratorServicer_to_server(AgentOrchestratorServicer(), server)
server.add_insecure_port('[::]:50051')
print("[*] Cortex Server POC listening on port 50051...")
server.start()
server.wait_for_termination()
if __name__ == '__main__':
serve()