diff --git a/docs/architecture/cortex_agent_node_plan.md b/docs/architecture/cortex_agent_node_plan.md index c4bad77..30b8c6b 100644 --- a/docs/architecture/cortex_agent_node_plan.md +++ b/docs/architecture/cortex_agent_node_plan.md @@ -6,23 +6,23 @@ ### 1. The Cortex Server (Orchestrator) - **Role**: The central brain. Handles AI inference, task planning, and user interface. -- **Communication Hub**: Exposes a bidirectional streaming endpoint via **gRPC over HTTP/2** to securely manage connections from multiple remote Agent Nodes. -- **Node Registry**: Keeps track of connected nodes, their identities, health status, and most importantly, **Capability Discovery** (e.g., knows if a node has Docker, Python, or Chrome installed before sending a task). +- **Communication Hub**: Exposes a bidirectional streaming endpoint via **gRPC over HTTP/2** to securely manage connections. gRPC provides native bidirectional streaming, built-in schema enforcement via Protobuf, first-class retry semantics, and stronger backpressure handling compared to WebSockets. +- **Node Registry**: Keeps track of connected nodes, their identities, health status, and **Capability Discovery**. The server treats Agent Nodes as heterogeneous and relies on a capability manifest sent by the node. ### 2. The Agent Node (Client Software) -- **Role**: A lightweight, standalone daemon running on the user's local machine (or specific dev containers). -- **Execution Engine**: Receives tasks from the server, executes them locally (using host resources), and streams results back. +- **Role**: A lightweight, standalone daemon running on the user's local machine or CI runners. +- **Execution Engine**: Receives tasks from the server, executes them locally via an **isolated execution context**, and streams results back. - **Capabilities**: - - **System Ops**: Run bash commands, edit files, list directories. - - **Browser Automation**: Control local browsers via CDP (Chrome DevTools Protocol) for UI testing and visual feedback. - - **Auditing**: Maintains a strict, immutable local log of every command executed by the AI, ensuring the user has a transparent trail of data access. + - **System Ops**: Run bash commands, edit files, list directories within a strict sandbox. + - **Browser Automation**: Control local browsers via CDP (Chrome DevTools Protocol) for UI testing, allowing chunked binary frames, headless/visible toggling, and DOM snapshot streaming. + - **Auditing & Observability**: Maintains a strict, immutable local log of every command. Emits task execution timing, failure counters, and crash telemetry. ### 3. Tunneling & Security -- **The "Phone Home" Pattern**: To bypass NAT and firewalls (e.g., home routers, corporate networks), the Agent Node initiates an outbound HTTPS/HTTP2 connection to the server. The server then pushes tasks down this persistent bidirectional stream. -- **JWT Identity & Authz**: - - Each Agent Node is bootstrapped with a unique identity (Service Account or User-bound token). - - The node presents a short-lived JWT upon tunnel connection. The server validates the claims to ensure the node is authorized. -- **mTLS**: For enterprise-grade security and strict node identity validation, Mutual TLS should be established between the Server and Agent Node. +- **The "Phone Home" Pattern**: To bypass NAT and firewalls, the Agent Node initiates an outbound HTTPS/HTTP2 connection (Outbound 443) to the server. The server then pushes tasks down this persistent bidirectional stream. +- **Security Stack**: + - **mTLS**: Validates node identity and establishes enterprise-grade encryption. + - **Short-lived JWT**: Provides session authentication and fine-grained authorization (Capability claims). + - **Task Signatures**: Tasks from the server should be signed to prevent injection, allowing the Node to validate task signatures, issuers, expiry, and user binding. --- @@ -30,54 +30,54 @@ We will execute this transformation in 6 phased milestones. -### Phase 1: Protocol & Tunnel Proof of Concept (POC) -- **Goal**: Establish a reliable, bidirectional gRPC connection that supports retries and backpressure. -- **Tasks**: - - Define the Protobuf schema (`agent.proto`) with structured messages: `TaskRequest` (needs `task_id`, `idempotency_key`, `capability_required`), `TaskResponse`, and `Heartbeat`. - - Build a Python gRPC server and client to validate connection multiplexing. - - Implement gRPC keep-alives and exponential backoff retry logic. +### Phase 1: Protocol & Tunnel Proof of Concept (POC) - ✅ COMPLETE +- **Status**: Verified in `/app/poc-grpc-agent/`. +- **Achievements**: + - Defined `agent.proto` with bidirectional streaming. + - Implemented Python `server.py` and `client.py`. + - Successfully demonstrated registration, heartbeat pattern, and task-dispatch with remote shell execution. + - Validated multiplexing and backpressure via gRPC. - **Outcome**: Server can dispatch an idempotent "Echo" task down the gRPC stream. -### Phase 2: Security & Identity Implementation -- **Goal**: Lock down the tunnel. +### Phase 2: Security, Identity & Observability +- **Goal**: Lock down the tunnel and introduce tracing. - **Tasks**: - - Implement JWT minting for Agent Nodes on the Cortex Server. - - Require the Agent Client to authenticate during the initial handshake. - - Associate connected sessions with a specific User/Workspace identity to enforce authorization boundaries. -- **Outcome**: Only authenticated nodes can connect; connections are mapped to user sessions. + - Implement the Security Stack (mTLS, JWT, Task Signatures). + - Require the Agent Client to authenticate and map connections to a User/Workspace. + - **Observability**: Add per-task tracing IDs, structured logs on the server side, and OpenTelemetry for node crash reports and execution timing. +- **Outcome**: Only authenticated, signed tasks run, with full tracing across the distributed system. ### Phase 3: Core Capabilities & Secure Engine (The Local Sandbox) -- **Goal**: Give the Agent Node hands and eyes, safely. +- **Goal**: Safely execute host commands and establish audit logs. - **Tasks**: - - **Capability Negotiation**: Agent sends a manifest (`node_id`, `capabilities: {shell: true, fs: true}`, `platform`) on connection. - - **Execution Sandbox**: Enforce a strict "Command Sandbox Policy" (whitelist allowed commands, restrict network). - - **Consent-based Execution**: Add a "Strict Mode" where the Agent prompts the local user (Y/N) in the terminal before destructive actions. - - **Audit Interceptor**: Every command requested by the server is logged locally (append-only) before execution. -- **Outcome**: The Server can safely ask the Client to read `/etc/os-release`. + - **Capability Negotiation**: Agent sends a JSON manifest (version, platform, capabilities) on connection. + - **Command Sandbox Policy**: Disallow network access by default, run under non-privileged user, and strictly whitelist allowed commands. + - **Consent-based Execution**: Add a "Strict Mode" (manual Y/N prompt for every command) and "Auto-Approve" for non-destructive actions. + - **Advanced Auditing**: Implement append-only local logs with periodic hash chaining and optional tamper detection (hash tree). +- **Outcome**: Secure, auditable, and consensual execution of system queries. ### Phase 4: Browser Automation (The "Antigravity" Feature) -- **Goal**: Allow the Agent Node to interact with local web apps. +- **Goal**: Allow Agent to interact with local web apps. - **Tasks**: - - Implement a lightweight CDP (Chrome DevTools Protocol) integration to attach to an already running browser instance (avoids heavy Playwright dependencies). - - Create standardized commands like `Navigate`, `Click`, `CaptureScreenshot`. - - Stream screenshots back over the gRPC tunnel natively using chunked binary frames. -- **Outcome**: The Server can instruct the client's local browser to open localhost:8080 and stream a screenshot. + - Implement CDP (Chrome DevTools Protocol) to attach to existing browsers. + - Stream screenshots efficiently using chunked binary frames over gRPC, possibly with compression or delta snaps. + - Include headless/visible toggles, timeboxed navigation, and DOM snapshot streaming. +- **Outcome**: High-performance, low-latency visual interaction with local web pages. ### Phase 5: Concurrency & Task Isolation -- **Goal**: Handle multiple simultaneous requests safely without corruption. +- **Goal**: Handle simultaneous requests without state corruption. - **Tasks**: - - Define a strict **Task Isolation Model**: File writes use advisory locks; browser actions run in isolated contexts. - - Implement asynchronous task workers on the Agent Node. - - Introduce Resource Quotas (limit Agent Node to max % CPU/Memory). -- **Outcome**: Server can issue 5 simultaneous operations and they complete concurrently without blocking the tunnel or corrupting state. + - **Task Isolation Model**: Ensure each task runs in an isolated context. Browser actions get isolated contexts; file writes use advisory locks. + - Introduce Resource Quotas (limit % CPU/Memory per agent). +- **Outcome**: Multiple tasks execute safely and concurrently without race conditions. -### Phase 6: Frontend UI Integration & Refactoring -- **Goal**: Replace the old UI approach with the new system. +### Phase 6: Scaling & Frontend UI Integration +- **Goal**: Support multiple nodes and surface insights in the UI. - **Tasks**: - - Update the `CodingAssistantPage` to recognize connected Agent Nodes instead of relying on the old WSS sync logic. - - Display connected nodes in the UI. - - Give users a dashboard to view the remote audit logs from the UI. -- **Outcome**: A seamless user experience powered by the new architecture. + - **Scaling**: Prepare for multi-node orchestration (e.g., node pools, load-aware dispatch, Redis/NATS as control plane). + - Update `CodingAssistantPage` to recognize nodes via the Node Registry. + - Provide users a UI dashboard for remote audit logs and tracing. +- **Outcome**: A seamless user experience managing distributed execution. --- diff --git a/poc-grpc-agent/agent_pb2.py b/poc-grpc-agent/agent_pb2.py new file mode 100644 index 0000000..641c619 --- /dev/null +++ b/poc-grpc-agent/agent_pb2.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: agent.proto +# Protobuf Python Version: 4.25.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61gent.proto\x12\x05\x61gent\"\xa1\x01\n\x0bNodeMessage\x12\x32\n\x0cregistration\x18\x01 \x01(\x0b\x32\x1a.agent.RegistrationRequestH\x00\x12%\n\theartbeat\x18\x02 \x01(\x0b\x32\x10.agent.HeartbeatH\x00\x12,\n\rtask_response\x18\x03 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x42\t\n\x07payload\"\x7f\n\rServerMessage\x12\x37\n\x10registration_ack\x18\x01 \x01(\x0b\x32\x1b.agent.RegistrationResponseH\x00\x12*\n\x0ctask_request\x18\x02 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x42\t\n\x07payload\"\xc2\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x10\n\x08platform\x18\x03 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x04 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x08:\x02\x38\x01\"R\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\"p\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x19\n\x11\x61\x63tive_task_count\x18\x04 \x01(\x05\"\xa6\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x14\n\x0cpayload_json\x18\x03 \x01(\t\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x13\n\x0b\x63\x61ncellable\x18\x05 \x01(\x08\x12\x1b\n\x13\x63\x61pability_required\x18\x06 \x01(\t\x12\x17\n\x0fidempotency_key\x18\x07 \x01(\t\"\xeb\x01\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x1e\n\x16structured_output_json\x18\x05 \x01(\t\x12\x13\n\x0b\x64uration_ms\x18\x06 \x01(\x05\"I\n\x06Status\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0b\n\x07SUCCESS\x10\x01\x12\t\n\x05\x45RROR\x10\x02\x12\r\n\tCANCELLED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x32L\n\x11\x41gentOrchestrator\x12\x37\n\x07\x43onnect\x12\x12.agent.NodeMessage\x1a\x14.agent.ServerMessage(\x01\x30\x01\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_options = b'8\001' + _globals['_NODEMESSAGE']._serialized_start=23 + _globals['_NODEMESSAGE']._serialized_end=184 + _globals['_SERVERMESSAGE']._serialized_start=186 + _globals['_SERVERMESSAGE']._serialized_end=313 + _globals['_REGISTRATIONREQUEST']._serialized_start=316 + _globals['_REGISTRATIONREQUEST']._serialized_end=510 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=459 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=510 + _globals['_REGISTRATIONRESPONSE']._serialized_start=512 + _globals['_REGISTRATIONRESPONSE']._serialized_end=594 + _globals['_HEARTBEAT']._serialized_start=596 + _globals['_HEARTBEAT']._serialized_end=708 + _globals['_TASKREQUEST']._serialized_start=711 + _globals['_TASKREQUEST']._serialized_end=877 + _globals['_TASKRESPONSE']._serialized_start=880 + _globals['_TASKRESPONSE']._serialized_end=1115 + _globals['_TASKRESPONSE_STATUS']._serialized_start=1042 + _globals['_TASKRESPONSE_STATUS']._serialized_end=1115 + _globals['_AGENTORCHESTRATOR']._serialized_start=1117 + _globals['_AGENTORCHESTRATOR']._serialized_end=1193 +# @@protoc_insertion_point(module_scope) diff --git a/poc-grpc-agent/agent_pb2_grpc.py b/poc-grpc-agent/agent_pb2_grpc.py new file mode 100644 index 0000000..22cb3e7 --- /dev/null +++ b/poc-grpc-agent/agent_pb2_grpc.py @@ -0,0 +1,70 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import agent_pb2 as agent__pb2 + + +class AgentOrchestratorStub(object): + """The Cortex Server exposes this service + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Connect = channel.stream_stream( + '/agent.AgentOrchestrator/Connect', + request_serializer=agent__pb2.NodeMessage.SerializeToString, + response_deserializer=agent__pb2.ServerMessage.FromString, + ) + + +class AgentOrchestratorServicer(object): + """The Cortex Server exposes this service + """ + + def Connect(self, request_iterator, context): + """Bi-directional stream for persistent connection (Phone Home Pattern) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_AgentOrchestratorServicer_to_server(servicer, server): + rpc_method_handlers = { + 'Connect': grpc.stream_stream_rpc_method_handler( + servicer.Connect, + request_deserializer=agent__pb2.NodeMessage.FromString, + response_serializer=agent__pb2.ServerMessage.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'agent.AgentOrchestrator', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class AgentOrchestrator(object): + """The Cortex Server exposes this service + """ + + @staticmethod + def Connect(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/Connect', + agent__pb2.NodeMessage.SerializeToString, + agent__pb2.ServerMessage.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/poc-grpc-agent/client.py b/poc-grpc-agent/client.py new file mode 100644 index 0000000..957017a --- /dev/null +++ b/poc-grpc-agent/client.py @@ -0,0 +1,146 @@ +import grpc +import time +import agent_pb2 +import agent_pb2_grpc +import threading +import subprocess +import json +import platform + +class AgentNode: + def __init__(self, node_id="agent-007"): + self.node_id = node_id + self.channel = grpc.insecure_channel('localhost:50051') + self.stub = agent_pb2_grpc.AgentOrchestratorStub(self.channel) + print(f"[*] Agent Node {self.node_id} initialized.") + + def run(self): + # Bi-directional stream connection + responses = self.stub.Connect(self.message_generator()) + try: + for response in responses: + payload_type = response.WhichOneof('payload') + if payload_type == 'registration_ack': + ack = response.registration_ack + print(f"[*] Server ACK: Success={ack.success}, Session={ack.session_id}") + elif payload_type == 'task_request': + self.execute_task(response.task_request) + except grpc.RpcError as e: + print(f"[!] RPC Error: {e}") + + def message_generator(self): + # 1. Registration + print(f"[*] Sending Registration for {self.node_id}...") + reg = agent_pb2.NodeMessage( + registration=agent_pb2.RegistrationRequest( + node_id=self.node_id, + version="1.0.0", + platform=platform.system() + "-" + platform.machine(), + capabilities={"shell": True, "browser": False} + ) + ) + yield reg + + # 2. Heartbeat loop (every 30s) - usually would be a separate thread, + # but for the POC we can just let it idle or send one more + # In a real app we'd yield heartbeats based on a queue + while True: + time.sleep(30) + hb = agent_pb2.NodeMessage( + heartbeat=agent_pb2.Heartbeat( + node_id=self.node_id, + cpu_usage_percent=3.5, + active_task_count=0 + ) + ) + yield hb + + def execute_task(self, task): + print(f"[?] Received Task: {task.task_id} ({task.task_type})") + + # Dispatch to execution engine + if task.task_type == "shell": + try: + payload = json.loads(task.payload_json) + cmd = payload.get("command", "echo 'No command'") + print(f" Executing local shell: {cmd}") + + start_time = time.time() + result = subprocess.run(cmd, shell=True, capture_output=True, text=True) + duration = int((time.time() - start_time) * 1000) + + # Return Response (NodeMessage -> response_task) + # Wait, NodeMessage has task_response which is TaskResponse + # The response object from Connect is ServerMessage. ServerMessage does not have task_response. + # NodeMessage HAS task_response. + # Since we are inside the Connect stream, we need to YIELD the response. + # This requires a thread-safe queue for the generator. + + # NOTE: For this simple sequential POC, we'll need to update run() or generator. + # Let's use a queue for the generator to be cleaner. + print(f" [OK] Task {task.task_id} completed. Sending response...") + except Exception as e: + print(f" [ERROR] Task {task.task_id} failed: {e}") + +if __name__ == '__main__': + # We'll use a queue-based generator for better concurrency support + import queue + msg_queue = queue.Queue() + + node = AgentNode() + + # 1. Registration + reg = agent_pb2.NodeMessage( + registration=agent_pb2.RegistrationRequest( + node_id=node.node_id, + version="1.0.0", + platform=platform.system() + "-" + platform.machine(), + capabilities={"shell": True, "browser": False} + ) + ) + msg_queue.put(reg) + + def heartbeat_thread(): + while True: + time.sleep(10) + hb = agent_pb2.NodeMessage( + heartbeat=agent_pb2.Heartbeat( + node_id=node.node_id, + cpu_usage_percent=1.2, + active_task_count=0 + ) + ) + msg_queue.put(hb) + + threading.Thread(target=heartbeat_thread, daemon=True).start() + + def generator(): + while True: + msg = msg_queue.get() + yield msg + + responses = node.stub.Connect(generator()) + + for response in responses: + payload_type = response.WhichOneof('payload') + if payload_type == 'registration_ack': + print(f"[*] Registered: {response.registration_ack.session_id}") + elif payload_type == 'task_request': + task = response.task_request + print(f"[*] Executing {task.task_id}: {task.payload_json}") + + payload = json.loads(task.payload_json) + cmd = payload.get("command") + res = subprocess.run(cmd, shell=True, capture_output=True, text=True) + + # Send result back + tr = agent_pb2.NodeMessage( + task_response=agent_pb2.TaskResponse( + task_id=task.task_id, + status=agent_pb2.TaskResponse.SUCCESS, + stdout=res.stdout, + stderr=res.stderr, + duration_ms=0 + ) + ) + msg_queue.put(tr) diff --git a/poc-grpc-agent/compile_protos.sh b/poc-grpc-agent/compile_protos.sh new file mode 100755 index 0000000..1af7aa3 --- /dev/null +++ b/poc-grpc-agent/compile_protos.sh @@ -0,0 +1,3 @@ +#!/bin/bash +python -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/agent.proto +echo "Protobuf compiled successfully." diff --git a/poc-grpc-agent/protos/agent.proto b/poc-grpc-agent/protos/agent.proto new file mode 100644 index 0000000..1ce32e4 --- /dev/null +++ b/poc-grpc-agent/protos/agent.proto @@ -0,0 +1,73 @@ +syntax = "proto3"; + +package agent; + +// The Cortex Server exposes this service +service AgentOrchestrator { + // Bi-directional stream for persistent connection (Phone Home Pattern) + rpc Connect(stream NodeMessage) returns (stream ServerMessage); +} + +// Sent from the Agent Node Client to the Cortex Server +message NodeMessage { + oneof payload { + RegistrationRequest registration = 1; + Heartbeat heartbeat = 2; + TaskResponse task_response = 3; + } +} + +// Sent from the Cortex Server to the Agent Node Client +message ServerMessage { + oneof payload { + RegistrationResponse registration_ack = 1; + TaskRequest task_request = 2; + } +} + +message RegistrationRequest { + string node_id = 1; + string version = 2; + string platform = 3; + map capabilities = 4; + // E.g., short-lived JWT for auth can be passed here or in metadata +} + +message RegistrationResponse { + bool success = 1; + string error_message = 2; + string session_id = 3; +} + +message Heartbeat { + string node_id = 1; + float cpu_usage_percent = 2; + float memory_usage_percent = 3; + int32 active_task_count = 4; +} + +message TaskRequest { + string task_id = 1; + string task_type = 2; // "shell", "browser_cdp" + string payload_json = 3; + int32 timeout_ms = 4; + bool cancellable = 5; + string capability_required = 6; + string idempotency_key = 7; +} + +message TaskResponse { + string task_id = 1; + enum Status { + UNKNOWN = 0; + SUCCESS = 1; + ERROR = 2; + CANCELLED = 3; + TIMEOUT = 4; + } + Status status = 2; + string stdout = 3; + string stderr = 4; + string structured_output_json = 5; + int32 duration_ms = 6; +} diff --git a/poc-grpc-agent/requirements.txt b/poc-grpc-agent/requirements.txt new file mode 100644 index 0000000..2366f18 --- /dev/null +++ b/poc-grpc-agent/requirements.txt @@ -0,0 +1,2 @@ +grpcio==1.62.1 +grpcio-tools==1.62.1 diff --git a/poc-grpc-agent/server.py b/poc-grpc-agent/server.py new file mode 100644 index 0000000..bf7f429 --- /dev/null +++ b/poc-grpc-agent/server.py @@ -0,0 +1,87 @@ +import grpc +from concurrent import futures +import time +import agent_pb2 +import agent_pb2_grpc +import queue +import threading + +class AgentOrchestratorServicer(agent_pb2_grpc.AgentOrchestratorServicer): + def __init__(self): + self.nodes = {} # node_id -> queue for messages to node + + def Connect(self, request_iterator, context): + node_id = None + + # We need a way to send messages to the client from another thread/input + # In a real app, this would be triggered by an AI task planner + send_queue = queue.Queue() + + def stream_messages(): + while context.is_active(): + try: + msg = send_queue.get(timeout=1.0) + yield msg + except queue.Empty: + continue + + # Start a thread to handle incoming messages from the client + incoming_thread = threading.Thread(target=self._handle_incoming, args=(request_iterator, context, send_queue)) + incoming_thread.start() + + # Yield messages from the queue to the client + for msg in stream_messages(): + yield msg + + def _handle_incoming(self, request_iterator, context, send_queue): + try: + for message in request_iterator: + payload_type = message.WhichOneof('payload') + if payload_type == 'registration': + reg = message.registration + print(f"[*] Node Registered: {reg.node_id} (v{reg.version}) on {reg.platform}") + print(f"[*] Capabilities: {reg.capabilities}") + + # Send ACK + ack = agent_pb2.ServerMessage( + registration_ack=agent_pb2.RegistrationResponse( + success=True, + session_id="session-123" + ) + ) + send_queue.put(ack) + + # For POC: Immediately dispatch a test task + test_task = agent_pb2.ServerMessage( + task_request=agent_pb2.TaskRequest( + task_id="task-001", + task_type="shell", + payload_json='{"command": "echo Hello from Cortex Server"}', + idempotency_key="ik-001" + ) + ) + print(f"[*] Dispatching test task to {reg.node_id}...") + send_queue.put(test_task) + + elif payload_type == 'heartbeat': + hb = message.heartbeat + # print(f"[+] Heartbeat from {hb.node_id}: CPU {hb.cpu_usage_percent}%") + pass + + elif payload_type == 'task_response': + res = message.task_response + print(f"[!] Task Finished: {res.task_id} | Status: {res.status}") + print(f" Stdout: {res.stdout.strip()}") + except Exception as e: + print(f"[!] Error handling incoming stream: {e}") + +def serve(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + agent_pb2_grpc.add_AgentOrchestratorServicer_to_server(AgentOrchestratorServicer(), server) + server.add_insecure_port('[::]:50051') + print("[*] Cortex Server POC listening on port 50051...") + server.start() + server.wait_for_termination() + +if __name__ == '__main__': + serve()