diff --git a/docs/architecture/agent_node_feature_gap_analysis.md b/docs/architecture/agent_node_feature_gap_analysis.md new file mode 100644 index 0000000..db6bce5 --- /dev/null +++ b/docs/architecture/agent_node_feature_gap_analysis.md @@ -0,0 +1,37 @@ +# πŸ” Cortex Agent Node: Feature Gap Analysis & Roadmap + +This document outlines the critical missing features required to transition the current gRPC Proof of Concept (PoC) into a full-scale, production-ready Distributed AI Agent System. + +## 1. πŸ—„οΈ Workspace & File Synchronization +The current node executes commands but lacks a native way to manage project-level files. +- **The Gap**: No bi-directional sync (e.g., local server files -> node workspace). +- **Required**: A content-addressable synchronization layer (Merkle Tree / Hash-based) to efficiently mirror workspaces to remote nodes without redundant transfers. + +## 2. 🌊 Real-time Log Streaming (Observability) +Currently, `stdout/stderr` is only returned upon task completion. +- **The Gap**: No visibility into long-running tasks or hanging builds. +- **Required**: Implementing gRPC Server-to-Client streaming for live console logs, allowing the Main AI to detect progress or failures as they occur. + +## 3. πŸ›‘οΈ Robust Sandbox Isolation +The current sandbox relies on string-filtering shell commands. +- **The Gap**: Vulnerable to complex shell escapes, symlink attacks, and environment manipulation. +- **Required**: OS-level containerization (Docker, Podman, or Firecracker microVMs) to ensure each task is strictly trapped within its own namespace. + +## 4. πŸ”— Specialized Sub-Worker Protocols (CDP/LSP) +The agent treats browser automation and coding as generic shell commands. +- **The Gap**: Inefficiency; starting a fresh browser for every click is slow and loses state. +- **Required**: Persistent sub-bridges (e.g., Chrome DevTools Protocol link) allowing the Main AI to maintain a long-running session across multiple delegated tasks. + +## 5. πŸ“¦ Binary Artifact & Large Data Handling +The system currently lacks logic for large file transport. +- **The Gap**: gRPC message limits (4MB) will crash the system if a node tries to return a video capture or large log file. +- **Required**: Chunked file upload/download logic for artifacts like screenshots, videos, and build binaries. + +## πŸ—οΈ Node Lifecycle & Quality of Life +- **Automatic Updates**: Mechanism for nodes to self-update their binary/logic when the central protocol evolves. +- **Graceful Shutdown**: Handling system signals to allow background workers to finish or clean up before disconnection. +- **Local Cache**: Persistence for task history and metadata on the node to handle temporary network partitions. + +--- +> [!NOTE] +> These features bridge the gap between "Command Execution" and "Full Autonomous Collaboration." diff --git a/docs/architecture/cortex_agent_node_plan.md b/docs/architecture/cortex_agent_node_plan.md index 2129fc4..a351446 100644 --- a/docs/architecture/cortex_agent_node_plan.md +++ b/docs/architecture/cortex_agent_node_plan.md @@ -66,12 +66,19 @@ - Include headless/visible toggles, timeboxed navigation, and DOM snapshot streaming. - **Outcome**: High-performance, low-latency visual interaction with local web pages. -### Phase 5: Concurrency & Task Isolation -- **Goal**: Handle simultaneous requests without state corruption. -- **Tasks**: - - **Task Isolation Model**: Ensure each task runs in an isolated context. Browser actions get isolated contexts; file writes use advisory locks. - - Introduce Resource Quotas (limit % CPU/Memory per agent). -- **Outcome**: Multiple tasks execute safely and concurrently without race conditions. +### Phase 5: Concurrency & Distributed Architecture - βœ… COMPLETE +- **Status**: Verified in `/app/poc-grpc-agent/`. +- **Achievements**: + - **Company Architecture**: Implemented Boss (Server) and Dept Manager (Node) hierarchy. + - **Multi-Channel gRPC**: Split communication into 1. **Config**, 2. **Task**, and 3. **Health** channels. + - **Dynamic Policy Sync**: Client-side permissions are pushed from the server during handshake. + - **AI-Ready Node Metadata**: Nodes register with a **Skill Description** (metadata) and hardware capabilities. + - **Worker Pool Robustness**: Migrated to stateful `subprocess.Popen` execution. The manager now monitors individual process handles to prevent "Phantom Workers" (leaked threads). + - **Modular Refactor**: Implemented a **Skill-Based Architecture** (Client) and **Task Journal** (Server) to decouple gRPC networking from business logic, enabling easy expansion for CDP/LSP. + - **Hanging Task Recovery**: Implemented **Remote Cancellation**. The server can detect hanging tasks via the health channel and signal the node to `kill` the specific process. + - **Fault Tolerance & Retries**: The `TaskAssistant` automatically retries failed/timed-out tasks up to 3x. + - **Worker Pool (Sub-threading)**: Main node thread (Manager) dispatches to isolated execution threads (Workers). + - **Global Work Pool**: Shared task discovery and **Task Claiming** to prevent rework across nodes. ### Phase 6: Scaling & Frontend UI Integration - **Goal**: Support multiple nodes and surface insights in the UI. diff --git a/docs/architecture/cortex_project_todo.md b/docs/architecture/cortex_project_todo.md new file mode 100644 index 0000000..26308ff --- /dev/null +++ b/docs/architecture/cortex_project_todo.md @@ -0,0 +1,57 @@ +# πŸ“ Cortex Distributed Agent: Project TODO List + +This document tracks prioritized tasks, technical debt, and future implementations for the Cortex project. + +## πŸš€ High Priority (Infrastructure) + +### [ ] Persistent Sub-Worker Bridges (CDP/LSP) - 🟒 CURRENT FOCUS +- **Description**: Implement long-running "Skill Bridges" for Browser automation (CDP) and Code Intelligence (LSP). +- **Goal**: Support the **Antigravity Browser Skill** by maintaining a persistent browser session rather than spawning a new process per task. + +### [ ] Multi-Tenancy & Resource Isolation +- **Description**: Isolate node groups by user/tenant and enforce hardware quotas. +- **Why**: Allows the Main AI (Antigravity) to manage resource usage and forcefully **cancel zombie tasks** that may be hanging or orphaned, ensuring node health. + +### [ ] Binary Artifact & Large Data Handling (Chunking) +- **Description**: Implement gRPC stream-based chunking for large artifacts. +- **Requirement**: **Transparency**. The Main AI should just see a "File" result; reassembly happens at the server layer. + +### [ ] Graceful Shutdown & Local Task Persistence (Built-in) +- **Description**: Handle node interrupts (SIGTERM/SIGINT) to allow workers to finish or checkpoint. Store a local `task_history.json` on the node to recover state after crash/restart. + +### [ ] Server-Side Registry & Task Persistence +- **Description**: Migrate `NodeRegistry` and `WorkPool` from in-memory to a persistent backend (Postgres/Redis). +- **Priority**: Deferred until **Full System Integration** phase. + +### [ ] Workspace Mirroring & Efficient File Sync +- **Description**: Maintain a local server-side mirror of node workspaces for Zero-latency AI perception. + +### [ ] Real-time gRPC Log Streaming +- **Description**: Bidirectional stream for live `stdout/stderr`. + +--- + +## 🐒 Low Priority / Observation + +### [ ] OS-Level Isolation (Firecracker/VNC) +- **Description**: Lightweight virtualization (microVMs) for worker execution. +- **Status**: Monitored. + +### [ ] Node Lifecycle: Auto-Updates +- **Description**: Mechanism for nodes to self-update. + +### [ ] Vertical & Horizontal Scalability +- **Description**: Migrate to a stateless server design with load balancing. + +--- + +## πŸ—ΊοΈ Future Roadmap (Strategic) + +### [ ] Advanced Scheduling & Capability Routing +- **Description**: Sophisticated scheduler to match complex constraints (GPU, Region, Priority). + +### [ ] mTLS Certificate Lifecycle Management +- **Description**: Automated renewal, revocation, and rotation of node certificates. + +### [ ] Immutable Audit & Compliance +- **Description**: Cryptographically signed records of every TaskRequest and TaskResponse for forensics. diff --git a/docs/architecture/cortex_server_feature_gap_analysis.md b/docs/architecture/cortex_server_feature_gap_analysis.md new file mode 100644 index 0000000..e944e79 --- /dev/null +++ b/docs/architecture/cortex_server_feature_gap_analysis.md @@ -0,0 +1,37 @@ +# πŸ” Cortex Server: Feature Gap Analysis & Roadmap + +This document outlines the missing capabilities required to evolve the **Cortex Boss (Server)** from a Proof of Concept into a highly available, secure, and scalable central orchestrator. + +## 1. πŸ’Ύ Registry & Task Persistence +Currently, the `NodeRegistry` and `WorkPool` are entirely in-memory. +- **The Gap**: If the server restarts, all active node sessions, task histories, and heartbeat data are lost. +- **Required**: A persistent backend (e.g., PostgreSQL or Redis) to store node metadata, lifelong audit logs, and the global task queue state. + +## 2. βš–οΈ Vertical & Horizontal Scalability +The server is currently a single-process gRPC listener. +- **The Gap**: A single server cannot scale to thousands of active mTLS connections or handle geographic distribution. +- **Required**: A stateless server design where node connections can be balanced across a cluster using a shared message broker (like RabbitMQ or NATS) for task dispatch. + +## 3. 🎯 Advanced Scheduling & Capability Routing +The `TaskAssistant` uses basic "best manager" logic. +- **The Gap**: No support for complex constraints (e.g., "Node must have NVIDIA GPU", "Node must be in US-East region", "Task priority: High"). +- **Required**: A sophisticated scheduler that matches task requirements against node capabilities and real-time health metrics (Telemetric Routing). + +## 4. πŸ” mTLS Certificate Lifecycle Management +Certificates are currently static files manually generated. +- **The Gap**: No mechanism for automatic renewal, revocation, or hardware-backed security (HSM). +- **Required**: Integration with a Certificate Authority (CA) like HashiCorp Vault or AWS Private CA to manage automatic node cert rotation and CRL (Certificate Revocation List) checking. + +## 5. πŸ“œ Immutable Audit & Compliance +The server lacks a "Black Box" recorder. +- **The Gap**: No forensic record of which command was authorized by which user and executed on which node. +- **Required**: An immutable audit log where every `TaskRequest` and signed `TaskResponse` is archived with cryptographic timestamps for security auditing. + +## 🏒 Multi-Tenancy & Resource Isolation +- **Tenant Guardrails**: Logic to ensure Node A (owned by User X) can never receive a task from User Y. +- **Resource Quotas**: Limiting the number of concurrent tasks a specific user or node group can consume. +- **Web Console / API**: A management UI to visualize the global fleet, manually cancel "zombie" tasks, and update node policies. + +--- +> [!TIP] +> This roadmap transforms the server from a "gRPC Bridge" into a "Sovereign Control Plane" capable of managing enterprise-scale distributed intelligence. diff --git a/poc-grpc-agent/agent_pb2.py b/poc-grpc-agent/agent_pb2.py index 1fffa4a..7fbdfc9 100644 --- a/poc-grpc-agent/agent_pb2.py +++ b/poc-grpc-agent/agent_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61gent.proto\x12\x05\x61gent\"\xa1\x01\n\x0bNodeMessage\x12\x32\n\x0cregistration\x18\x01 \x01(\x0b\x32\x1a.agent.RegistrationRequestH\x00\x12%\n\theartbeat\x18\x02 \x01(\x0b\x32\x10.agent.HeartbeatH\x00\x12,\n\rtask_response\x18\x03 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x42\t\n\x07payload\"\x7f\n\rServerMessage\x12\x37\n\x10registration_ack\x18\x01 \x01(\x0b\x32\x1b.agent.RegistrationResponseH\x00\x12*\n\x0ctask_request\x18\x02 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x42\t\n\x07payload\"\xd6\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x10\n\x08platform\x18\x03 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x04 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x12\x12\n\nauth_token\x18\x05 \x01(\t\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x08:\x02\x38\x01\"r\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12\x1e\n\x16next_token_rotation_ms\x18\x04 \x01(\t\"p\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x19\n\x11\x61\x63tive_task_count\x18\x04 \x01(\x05\"\xcb\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x14\n\x0cpayload_json\x18\x03 \x01(\t\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x13\n\x0b\x63\x61ncellable\x18\x05 \x01(\x08\x12\x1b\n\x13\x63\x61pability_required\x18\x06 \x01(\t\x12\x17\n\x0fidempotency_key\x18\x07 \x01(\t\x12\x10\n\x08trace_id\x18\x08 \x01(\t\x12\x11\n\tsignature\x18\t \x01(\t\"\xfd\x01\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x1e\n\x16structured_output_json\x18\x05 \x01(\t\x12\x13\n\x0b\x64uration_ms\x18\x06 \x01(\x05\x12\x10\n\x08trace_id\x18\x07 \x01(\t\"I\n\x06Status\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0b\n\x07SUCCESS\x10\x01\x12\t\n\x05\x45RROR\x10\x02\x12\r\n\tCANCELLED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x32L\n\x11\x41gentOrchestrator\x12\x37\n\x07\x43onnect\x12\x12.agent.NodeMessage\x1a\x14.agent.ServerMessage(\x01\x30\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61gent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc5\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"{\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x42\t\n\x07payload\"\xe0\x01\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\x80\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x14\n\x0cpayload_json\x18\x03 \x01(\t\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\"\xa4\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xc1\x01\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -23,24 +23,42 @@ DESCRIPTOR._options = None _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._options = None _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_options = b'8\001' - _globals['_NODEMESSAGE']._serialized_start=23 - _globals['_NODEMESSAGE']._serialized_end=184 - _globals['_SERVERMESSAGE']._serialized_start=186 - _globals['_SERVERMESSAGE']._serialized_end=313 - _globals['_REGISTRATIONREQUEST']._serialized_start=316 - _globals['_REGISTRATIONREQUEST']._serialized_end=530 - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=479 - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=530 - _globals['_REGISTRATIONRESPONSE']._serialized_start=532 - _globals['_REGISTRATIONRESPONSE']._serialized_end=646 - _globals['_HEARTBEAT']._serialized_start=648 - _globals['_HEARTBEAT']._serialized_end=760 - _globals['_TASKREQUEST']._serialized_start=763 - _globals['_TASKREQUEST']._serialized_end=966 - _globals['_TASKRESPONSE']._serialized_start=969 - _globals['_TASKRESPONSE']._serialized_end=1222 - _globals['_TASKRESPONSE_STATUS']._serialized_start=1149 - _globals['_TASKRESPONSE_STATUS']._serialized_end=1222 - _globals['_AGENTORCHESTRATOR']._serialized_start=1224 - _globals['_AGENTORCHESTRATOR']._serialized_end=1300 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._options = None + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_options = b'8\001' + _globals['_REGISTRATIONREQUEST']._serialized_start=23 + _globals['_REGISTRATIONREQUEST']._serialized_end=245 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=194 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=245 + _globals['_SANDBOXPOLICY']._serialized_start=248 + _globals['_SANDBOXPOLICY']._serialized_end=445 + _globals['_SANDBOXPOLICY_MODE']._serialized_start=411 + _globals['_SANDBOXPOLICY_MODE']._serialized_end=445 + _globals['_REGISTRATIONRESPONSE']._serialized_start=447 + _globals['_REGISTRATIONRESPONSE']._serialized_end=567 + _globals['_CLIENTTASKMESSAGE']._serialized_start=569 + _globals['_CLIENTTASKMESSAGE']._serialized_end=692 + _globals['_SERVERTASKMESSAGE']._serialized_start=695 + _globals['_SERVERTASKMESSAGE']._serialized_end=919 + _globals['_TASKCANCELREQUEST']._serialized_start=921 + _globals['_TASKCANCELREQUEST']._serialized_end=957 + _globals['_TASKREQUEST']._serialized_start=960 + _globals['_TASKREQUEST']._serialized_end=1088 + _globals['_TASKRESPONSE']._serialized_start=1091 + _globals['_TASKRESPONSE']._serialized_end=1383 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=1273 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=1321 + _globals['_TASKRESPONSE_STATUS']._serialized_start=1323 + _globals['_TASKRESPONSE_STATUS']._serialized_end=1383 + _globals['_WORKPOOLUPDATE']._serialized_start=1385 + _globals['_WORKPOOLUPDATE']._serialized_end=1429 + _globals['_TASKCLAIMREQUEST']._serialized_start=1431 + _globals['_TASKCLAIMREQUEST']._serialized_end=1483 + _globals['_TASKCLAIMRESPONSE']._serialized_start=1485 + _globals['_TASKCLAIMRESPONSE']._serialized_end=1554 + _globals['_HEARTBEAT']._serialized_start=1557 + _globals['_HEARTBEAT']._serialized_end=1750 + _globals['_HEALTHCHECKRESPONSE']._serialized_start=1752 + _globals['_HEALTHCHECKRESPONSE']._serialized_end=1797 + _globals['_AGENTORCHESTRATOR']._serialized_start=1800 + _globals['_AGENTORCHESTRATOR']._serialized_end=2033 # @@protoc_insertion_point(module_scope) diff --git a/poc-grpc-agent/agent_pb2_grpc.py b/poc-grpc-agent/agent_pb2_grpc.py index 22cb3e7..932d45e 100644 --- a/poc-grpc-agent/agent_pb2_grpc.py +++ b/poc-grpc-agent/agent_pb2_grpc.py @@ -15,10 +15,20 @@ Args: channel: A grpc.Channel. """ - self.Connect = channel.stream_stream( - '/agent.AgentOrchestrator/Connect', - request_serializer=agent__pb2.NodeMessage.SerializeToString, - response_deserializer=agent__pb2.ServerMessage.FromString, + self.SyncConfiguration = channel.unary_unary( + '/agent.AgentOrchestrator/SyncConfiguration', + request_serializer=agent__pb2.RegistrationRequest.SerializeToString, + response_deserializer=agent__pb2.RegistrationResponse.FromString, + ) + self.TaskStream = channel.stream_stream( + '/agent.AgentOrchestrator/TaskStream', + request_serializer=agent__pb2.ClientTaskMessage.SerializeToString, + response_deserializer=agent__pb2.ServerTaskMessage.FromString, + ) + self.ReportHealth = channel.stream_stream( + '/agent.AgentOrchestrator/ReportHealth', + request_serializer=agent__pb2.Heartbeat.SerializeToString, + response_deserializer=agent__pb2.HealthCheckResponse.FromString, ) @@ -26,8 +36,22 @@ """The Cortex Server exposes this service """ - def Connect(self, request_iterator, context): - """Bi-directional stream for persistent connection (Phone Home Pattern) + def SyncConfiguration(self, request, context): + """1. Control Channel: Sync policies and settings (Unary) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def TaskStream(self, request_iterator, context): + """2. Task Channel: Bidirectional work dispatch and reporting (Persistent) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ReportHealth(self, request_iterator, context): + """3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') @@ -36,10 +60,20 @@ def add_AgentOrchestratorServicer_to_server(servicer, server): rpc_method_handlers = { - 'Connect': grpc.stream_stream_rpc_method_handler( - servicer.Connect, - request_deserializer=agent__pb2.NodeMessage.FromString, - response_serializer=agent__pb2.ServerMessage.SerializeToString, + 'SyncConfiguration': grpc.unary_unary_rpc_method_handler( + servicer.SyncConfiguration, + request_deserializer=agent__pb2.RegistrationRequest.FromString, + response_serializer=agent__pb2.RegistrationResponse.SerializeToString, + ), + 'TaskStream': grpc.stream_stream_rpc_method_handler( + servicer.TaskStream, + request_deserializer=agent__pb2.ClientTaskMessage.FromString, + response_serializer=agent__pb2.ServerTaskMessage.SerializeToString, + ), + 'ReportHealth': grpc.stream_stream_rpc_method_handler( + servicer.ReportHealth, + request_deserializer=agent__pb2.Heartbeat.FromString, + response_serializer=agent__pb2.HealthCheckResponse.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( @@ -53,7 +87,7 @@ """ @staticmethod - def Connect(request_iterator, + def SyncConfiguration(request, target, options=(), channel_credentials=None, @@ -63,8 +97,42 @@ wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/Connect', - agent__pb2.NodeMessage.SerializeToString, - agent__pb2.ServerMessage.FromString, + return grpc.experimental.unary_unary(request, target, '/agent.AgentOrchestrator/SyncConfiguration', + agent__pb2.RegistrationRequest.SerializeToString, + agent__pb2.RegistrationResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def TaskStream(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/TaskStream', + agent__pb2.ClientTaskMessage.SerializeToString, + agent__pb2.ServerTaskMessage.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def ReportHealth(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/ReportHealth', + agent__pb2.Heartbeat.SerializeToString, + agent__pb2.HealthCheckResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/poc-grpc-agent/client.py b/poc-grpc-agent/client.py index b4c568d..b50b9a0 100644 --- a/poc-grpc-agent/client.py +++ b/poc-grpc-agent/client.py @@ -6,196 +6,189 @@ import threading import subprocess import json -import platform import jwt import datetime import hmac import hashlib +import queue +import sys +import platform +from concurrent import futures SECRET_KEY = "cortex-secret-shared-key" -# --- Sandbox Policy Configuration --- -SANDBOX_POLICY = { - "MODE": "PERMISSIVE", # Toggle between "STRICT" and "PERMISSIVE" - "ALLOWED_COMMANDS": ["ls", "grep", "cat", "pwd", "git", "echo", "python", "whoami", "uname"], - "SENSITIVE_COMMANDS": ["rm", "cp", "mv", "chmod", "chown", "pkill"], - "DENIED_COMMANDS": ["sudo", "mkfs", "dd", "sh", "bash", "zsh"], - "WORKING_DIR": os.getcwd() -} +class BaseSkill: + """Interface for pluggable node capabilities.""" + def execute(self, task, sandbox, on_complete): + raise NotImplementedError -class AgentNode: - def verify_sandbox_policy(self, command_str): - """Verifies if a command is allowed under the current sandbox policy.""" - parts = command_str.strip().split() - if not parts: - return False, "Empty command" - + def cancel(self, task_id): + return False + +class ShellSkill(BaseSkill): + """Default Skill: Executing shell commands.""" + def __init__(self): + self.processes = {} # task_id -> Popen + self.lock = threading.Lock() + + def execute(self, task, sandbox, on_complete): + try: + payload = json.loads(task.payload_json) + cmd = payload.get("command") + + allowed, status_msg = sandbox.verify(cmd) + if not allowed: + return on_complete(task.task_id, {"stderr": f"SANDBOX_VIOLATION: {status_msg}", "status": 2}, task.trace_id) + + print(f" [🐚] Executing Shell: {cmd}") + p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + with self.lock: self.processes[task.task_id] = p + + timeout = task.timeout_ms / 1000.0 if task.timeout_ms > 0 else None + stdout, stderr = p.communicate(timeout=timeout) + + on_complete(task.task_id, {"stdout": stdout, "stderr": stderr, "status": 1 if p.returncode == 0 else 2}, task.trace_id) + except subprocess.TimeoutExpired: + self.cancel(task.task_id) + on_complete(task.task_id, {"stderr": "TIMEOUT", "status": 2}, task.trace_id) + except Exception as e: + on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id) + finally: + with self.lock: self.processes.pop(task.task_id, None) + + def cancel(self, task_id): + with self.lock: + p = self.processes.get(task_id) + if p: + print(f"[πŸ›‘] Killing Shell Process: {task_id}") + p.kill() + return True + return False + +class SkillManager: + """Orchestrates multiple skills and manages the worker thread pool.""" + def __init__(self, max_workers=5): + self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker") + self.active_tasks = {} # task_id -> future + self.skills = {"shell": ShellSkill()} + self.max_workers = max_workers + self.lock = threading.Lock() + + def submit(self, task, sandbox, on_complete): + with self.lock: + if len(self.active_tasks) >= self.max_workers: + return False, "Node Capacity Reached" + + # Determine Skill (Default to shell for now) + skill = self.skills.get("shell") + + future = self.executor.submit(skill.execute, task, sandbox, on_complete) + self.active_tasks[task.task_id] = future + + # Cleanup hook + future.add_done_callback(lambda f: self._cleanup(task.task_id)) + return True, "Accepted" + + def cancel(self, task_id): + with self.lock: + # Tell all skills to try and cancel this ID + cancelled = any(s.cancel(task_id) for s in self.skills.values()) + return cancelled + + def get_active_ids(self): + with self.lock: + return list(self.active_tasks.keys()) + + def _cleanup(self, task_id): + with self.lock: self.active_tasks.pop(task_id, None) + +class SandboxEngine: + def __init__(self): + self.policy = None + + def sync(self, p): + self.policy = {"MODE": "STRICT" if p.mode == agent_pb2.SandboxPolicy.STRICT else "PERMISSIVE", + "ALLOWED": list(p.allowed_commands), "DENIED": list(p.denied_commands), "SENSITIVE": list(p.sensitive_commands)} + + def verify(self, command_str): + if not self.policy: return False, "No Policy" + parts = (command_str or "").strip().split() + if not parts: return False, "Empty" base_cmd = parts[0] - - # 1. ALWAYS Block Denied List (Strictly forbidden in all modes) - if base_cmd in SANDBOX_POLICY["DENIED_COMMANDS"]: - return False, f"Command '{base_cmd}' is strictly FORBIDDEN." - - # 2. Path Guard (Simple string check for escaping ..) - if ".." in command_str: - return False, "Path traversal attempt detected (.. disallowed)." - - # 3. Mode-specific Logic - if SANDBOX_POLICY["MODE"] == "STRICT": - # In STRICT mode, we check against the whitelist - if base_cmd not in SANDBOX_POLICY["ALLOWED_COMMANDS"] and base_cmd not in SANDBOX_POLICY["SENSITIVE_COMMANDS"]: - return False, f"STRICT MODE: Command '{base_cmd}' is NOT whitelisted." - - # 4. Sensitive / Consent Check (Applied to both modes) - if base_cmd in SANDBOX_POLICY["SENSITIVE_COMMANDS"]: - return True, "SENSITIVE_CONSENT_REQUIRED" - + if base_cmd in self.policy["DENIED"]: return False, "Forbidden" + if self.policy["MODE"] == "STRICT" and base_cmd not in self.policy["ALLOWED"]: + return False, "Not Whitelisted" return True, "OK" - def create_registration_token(self): - payload = { - "sub": "agent-node-007", - "workspace_id": "ws-production-001", - "iat": datetime.datetime.utcnow(), - "exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=10) - } - return jwt.encode(payload, SECRET_KEY, algorithm="HS256") - +class AgentNode: def __init__(self, node_id="agent-node-007"): self.node_id = node_id + self.skills = SkillManager() + self.sandbox = SandboxEngine() + self.task_queue = queue.Queue() - # Load certificates for mTLS - print("[πŸ”] Loading mTLS certificates...") - try: - with open('certs/client.key', 'rb') as f: - private_key = f.read() - with open('certs/client.crt', 'rb') as f: - certificate_chain = f.read() - with open('certs/ca.crt', 'rb') as f: - root_certificates = f.read() + # gRPC Setup + with open('certs/client.key', 'rb') as f: pkey = f.read() + with open('certs/client.crt', 'rb') as f: cert = f.read() + with open('certs/ca.crt', 'rb') as f: ca = f.read() + creds = grpc.ssl_channel_credentials(ca, pkey, cert) + self.channel = grpc.secure_channel('localhost:50051', creds) + self.stub = agent_pb2_grpc.AgentOrchestratorStub(self.channel) - # Create secure channel credentials - credentials = grpc.ssl_channel_credentials( - root_certificates=root_certificates, - private_key=private_key, - certificate_chain=certificate_chain - ) - - # Connect to localhost:50051 using secure channel - self.channel = grpc.secure_channel('localhost:50051', credentials) - self.stub = agent_pb2_grpc.AgentOrchestratorStub(self.channel) - print(f"[*] Agent Node {self.node_id} initialized with secure mTLS channel.") - except FileNotFoundError as e: - print(f"[!] Error: Certificates not found. Ensure generate_certs.sh was run. | {e}") - sys.exit(1) + def _create_token(self): + return jwt.encode({"sub": self.node_id, "iat": datetime.datetime.utcnow(), + "exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=10)}, SECRET_KEY, algorithm="HS256") - # ... (rest of methods) + def sync_configuration(self): + print(f"[*] Handshake: {self.node_id}") + reg = agent_pb2.RegistrationRequest(node_id=self.node_id, auth_token=self._create_token(), + node_description="Refactored Stateful Node", capabilities={"shell": "v1"}) + res = self.stub.SyncConfiguration(reg) + if res.success: self.sandbox.sync(res.policy); print("[OK] Policy Synced.") + else: print(f"[!] Rejected: {res.error_message}"); sys.exit(1) + + def start_health_reporting(self): + def _gen(): + while True: + ids = self.skills.get_active_ids() + yield agent_pb2.Heartbeat(node_id=self.node_id, cpu_usage_percent=1.0, + active_worker_count=len(ids), max_worker_capacity=5, running_task_ids=ids) + time.sleep(10) + threading.Thread(target=lambda: list(self.stub.ReportHealth(_gen())), daemon=True).start() + + def run_task_stream(self): + def _gen(): + while True: yield self.task_queue.get() + responses = self.stub.TaskStream(_gen()) + for msg in responses: + kind = msg.WhichOneof('payload') + if kind == 'task_request': + self._handle_task(msg.task_request) + elif kind == 'task_cancel': + if self.skills.cancel(msg.task_cancel.task_id): + self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=agent_pb2.TaskResponse( + task_id=msg.task_cancel.task_id, status=agent_pb2.TaskResponse.CANCELLED))) + elif kind == 'work_pool_update': + for tid in msg.work_pool_update.available_task_ids: + if len(self.skills.get_active_ids()) < self.skills.max_workers: + self.task_queue.put(agent_pb2.ClientTaskMessage(task_claim=agent_pb2.TaskClaimRequest(task_id=tid, node_id=self.node_id))) + + def _handle_task(self, task): + # Sig Verify + sig = hmac.new(SECRET_KEY.encode(), task.payload_json.encode(), hashlib.sha256).hexdigest() + if not hmac.compare_digest(task.signature, sig): return print("[!] Sig Fail") + + self.skills.submit(task, self.sandbox, self._on_finish) + + def _on_finish(self, tid, res, trace): + print(f"[*] Task {tid} finished.") + status = agent_pb2.TaskResponse.SUCCESS if res['status'] == 1 else agent_pb2.TaskResponse.ERROR + tr = agent_pb2.TaskResponse(task_id=tid, status=status, stdout=res.get('stdout',''), stderr=res.get('stderr',''), trace_id=trace) + self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=tr)) if __name__ == '__main__': - # We'll use a queue-based generator for better concurrency support - import queue - import sys - msg_queue = queue.Queue() - node = AgentNode() - - # 1. Registration (Pre-handshake credentials with JWT) - token = node.create_registration_token() - reg = agent_pb2.NodeMessage( - registration=agent_pb2.RegistrationRequest( - node_id=node.node_id, - version="1.2.0", - platform=platform.system() + "-" + platform.machine(), - capabilities={"shell": True, "browser": False, "secure": True}, - auth_token=token - ) - ) - msg_queue.put(reg) - - def heartbeat_thread(): - while True: - time.sleep(10) - hb = agent_pb2.NodeMessage( - heartbeat=agent_pb2.Heartbeat( - node_id=node.node_id, - cpu_usage_percent=1.2, - active_task_count=0 - ) - ) - msg_queue.put(hb) - - threading.Thread(target=heartbeat_thread, daemon=True).start() - - def generator(): - while True: - msg = msg_queue.get() - yield msg - - try: - responses = node.stub.Connect(generator()) - - for response in responses: - payload_type = response.WhichOneof('payload') - if payload_type == 'registration_ack': - ack = response.registration_ack - if ack.success: - print(f"[*] Registered successfully. Session: {ack.session_id}") - else: - print(f"[!] Registration REJECTED: {ack.error_message}") - sys.exit(1) - elif payload_type == 'task_request': - task = response.task_request - print(f"[*] Task Received: {task.task_id}. Verifying signature...") - - # Verify payload signature - expected_sig = hmac.new(SECRET_KEY.encode(), task.payload_json.encode(), hashlib.sha256).hexdigest() - if hmac.compare_digest(task.signature, expected_sig): - print(f" [OK] Signature verified. Checking sandbox policy...") - - payload = json.loads(task.payload_json) - cmd = payload.get("command") - - # --- Sandbox Enforcement --- - allowed, status_msg = node.verify_sandbox_policy(cmd) - - if not allowed: - print(f" [β›”] Sandbox Violation: {status_msg}") - tr = agent_pb2.NodeMessage( - task_response=agent_pb2.TaskResponse( - task_id=task.task_id, - status=agent_pb2.TaskResponse.ERROR, - stderr=f"SANDBOX_VIOLATION: {status_msg}", - trace_id=task.trace_id - ) - ) - msg_queue.put(tr) - continue - - if status_msg == "SENSITIVE_CONSENT_REQUIRED": - # In production: Wait for UI prompt. In POC: Log and proceed with a warning tag. - print(f" [⚠️] Sensitive Command Encountered: {cmd}. Automated approval assumed in POC.") - # ------------------------------- - - print(f" [OK] Execution starts: {cmd}") - res = subprocess.run(cmd, shell=True, capture_output=True, text=True) - - # Send result back - tr = agent_pb2.NodeMessage( - task_response=agent_pb2.TaskResponse( - task_id=task.task_id, - status=agent_pb2.TaskResponse.SUCCESS, - stdout=res.stdout, - stderr=res.stderr, - duration_ms=0, - trace_id=task.trace_id - ) - ) - msg_queue.put(tr) - else: - print(f" [FAIL] Invalid signature for Task {task.task_id}! REJECTING.") - except grpc.RpcError as e: - print(f"[!] RPC Error: {e.code()} | {e.details()}") - if e.code() == grpc.StatusCode.UNAVAILABLE: - print(" Is the server running and reachable?") - elif e.code() == grpc.StatusCode.UNAUTHENTICATED: - print(" Authentication failed. Check certificates.") + node.sync_configuration() + node.start_health_reporting() + node.run_task_stream() diff --git a/poc-grpc-agent/protos/agent.proto b/poc-grpc-agent/protos/agent.proto index 542a79c..a8629ae 100644 --- a/poc-grpc-agent/protos/agent.proto +++ b/poc-grpc-agent/protos/agent.proto @@ -4,74 +4,115 @@ // The Cortex Server exposes this service service AgentOrchestrator { - // Bi-directional stream for persistent connection (Phone Home Pattern) - rpc Connect(stream NodeMessage) returns (stream ServerMessage); + // 1. Control Channel: Sync policies and settings (Unary) + rpc SyncConfiguration(RegistrationRequest) returns (RegistrationResponse); + + // 2. Task Channel: Bidirectional work dispatch and reporting (Persistent) + rpc TaskStream(stream ClientTaskMessage) returns (stream ServerTaskMessage); + + // 3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) + rpc ReportHealth(stream Heartbeat) returns (stream HealthCheckResponse); } -// Sent from the Agent Node Client to the Cortex Server -message NodeMessage { - oneof payload { - RegistrationRequest registration = 1; - Heartbeat heartbeat = 2; - TaskResponse task_response = 3; - } -} - -// Sent from the Cortex Server to the Agent Node Client -message ServerMessage { - oneof payload { - RegistrationResponse registration_ack = 1; - TaskRequest task_request = 2; - } -} - +// --- Channel 1: Registration & Policy --- message RegistrationRequest { string node_id = 1; string version = 2; - string platform = 3; - map capabilities = 4; - string auth_token = 5; // Short-lived JWT identifying the User/Workspace + string auth_token = 3; + string node_description = 4; // AI-readable description of this node's role + map capabilities = 5; // e.g. "gpu": "nvidia-3080", "os": "ubuntu-22.04" +} + +message SandboxPolicy { + enum Mode { + STRICT = 0; + PERMISSIVE = 1; + } + Mode mode = 1; + repeated string allowed_commands = 2; + repeated string denied_commands = 3; + repeated string sensitive_commands = 4; + string working_dir_jail = 5; } message RegistrationResponse { bool success = 1; string error_message = 2; string session_id = 3; - string next_token_rotation_ms = 4; + SandboxPolicy policy = 4; } -message Heartbeat { - string node_id = 1; - float cpu_usage_percent = 2; - float memory_usage_percent = 3; - int32 active_task_count = 4; +// --- Channel 2: Tasks & Collaboration --- +message ClientTaskMessage { + oneof payload { + TaskResponse task_response = 1; + TaskClaimRequest task_claim = 2; + } +} + +message ServerTaskMessage { + oneof payload { + TaskRequest task_request = 1; + WorkPoolUpdate work_pool_update = 2; + TaskClaimResponse claim_status = 3; + TaskCancelRequest task_cancel = 4; + } +} + +message TaskCancelRequest { + string task_id = 1; } message TaskRequest { string task_id = 1; - string task_type = 2; // "shell", "browser_cdp" + string task_type = 2; string payload_json = 3; int32 timeout_ms = 4; - bool cancellable = 5; - string capability_required = 6; - string idempotency_key = 7; - string trace_id = 8; // For OpenTelemetry observability - string signature = 9; // Cryptographic signature of payload_json + string trace_id = 5; + string signature = 6; } message TaskResponse { string task_id = 1; enum Status { - UNKNOWN = 0; - SUCCESS = 1; - ERROR = 2; + SUCCESS = 0; + ERROR = 1; + TIMEOUT = 2; CANCELLED = 3; - TIMEOUT = 4; } Status status = 2; string stdout = 3; string stderr = 4; - string structured_output_json = 5; - int32 duration_ms = 6; - string trace_id = 7; + string trace_id = 5; + map artifacts = 6; +} + +message WorkPoolUpdate { + repeated string available_task_ids = 1; +} + +message TaskClaimRequest { + string task_id = 1; + string node_id = 2; +} + +message TaskClaimResponse { + string task_id = 1; + bool granted = 2; + string reason = 3; +} + +// --- Channel 3: Health & Observation --- +message Heartbeat { + string node_id = 1; + float cpu_usage_percent = 2; + float memory_usage_percent = 3; + int32 active_worker_count = 4; + int32 max_worker_capacity = 5; + string status_message = 6; + repeated string running_task_ids = 7; +} + +message HealthCheckResponse { + int64 server_time_ms = 1; } diff --git a/poc-grpc-agent/server.py b/poc-grpc-agent/server.py index dc4a37c..c3ac742 100644 --- a/poc-grpc-agent/server.py +++ b/poc-grpc-agent/server.py @@ -1,190 +1,185 @@ import grpc +import os from concurrent import futures import time import agent_pb2 import agent_pb2_grpc -import queue import threading +import queue import jwt import hmac import hashlib +import json -# In production, these would be in .env SECRET_KEY = "cortex-secret-shared-key" -class AgentOrchestratorServicer(agent_pb2_grpc.AgentOrchestratorServicer): +class TaskJournal: + """State machine for tracking tasks through their lifecycle.""" def __init__(self): - self.nodes = {} # node_id -> queue for messages to node + self.lock = threading.Lock() + self.tasks = {} # task_id -> { "event": Event, "result": None, "node_id": str } - def Connect(self, request_iterator, context): + def register(self, task_id, node_id=None): + event = threading.Event() + with self.lock: + self.tasks[task_id] = {"event": event, "result": None, "node_id": node_id} + return event + + def fulfill(self, task_id, result): + with self.lock: + if task_id in self.tasks: + self.tasks[task_id]["result"] = result + self.tasks[task_id]["event"].set() + return True + return False + + def get_result(self, task_id): + with self.lock: + data = self.tasks.get(task_id) + return data["result"] if data else None + + def pop(self, task_id): + with self.lock: + return self.tasks.pop(task_id, None) + +class AbstractNodeRegistry: + """Interface for finding and tracking Managers.""" + def register(self, node_id, data): raise NotImplementedError + def update_stats(self, node_id, stats): raise NotImplementedError + def get_best(self): raise NotImplementedError + def get_node(self, node_id): raise NotImplementedError + +class MemoryNodeRegistry(AbstractNodeRegistry): + def __init__(self): + self.lock = threading.Lock() + self.nodes = {} # node_id -> { stats: {}, queue: queue, metadata: {} } + + def register(self, node_id, q, metadata): + with self.lock: + self.nodes[node_id] = {"stats": {}, "queue": q, "metadata": metadata} + print(f"[πŸ“‹] Registered: {node_id}") + + def update_stats(self, node_id, stats): + with self.lock: + if node_id in self.nodes: self.nodes[node_id]["stats"].update(stats) + + def get_best(self): + with self.lock: + if not self.nodes: return None + # Pick based on active worker count + return sorted(self.nodes.items(), key=lambda x: x[1]["stats"].get("active_worker_count", 999))[0][0] + + def get_node(self, node_id): + with self.lock: return self.nodes.get(node_id) + +class GlobalWorkPool: + def __init__(self): + self.lock = threading.Lock() + self.available = {"shared-001": '{"command": "uname -a"}', "shared-002": '{"command": "uptime"}'} + + def claim(self, task_id, node_id): + with self.lock: + if task_id in self.available: + return True, self.available.pop(task_id) + return False, None + +class TaskAssistant: + """The High-Level AI API.""" + def __init__(self, registry, journal, pool): + self.registry = registry + self.journal = journal + self.pool = pool + + def dispatch_single(self, node_id, cmd, timeout=30): + # Implementation of retry logic and signing + node = self.registry.get_node(node_id) + if not node: return {"error": "Offline"} + + tid = f"task-{int(time.time()*1000)}" + event = self.journal.register(tid, node_id) + + sig = hmac.new(SECRET_KEY.encode(), json.dumps({"command": cmd}).encode(), hashlib.sha256).hexdigest() + req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest( + task_id=tid, payload_json=json.dumps({"command": cmd}), signature=sig)) + + node["queue"].put(req) + + if event.wait(timeout): + res = self.journal.get_result(tid) + self.journal.pop(tid) + return res + self.journal.pop(tid) + return {"error": "Timeout"} + +class AgentOrchestrator(agent_pb2_grpc.AgentOrchestratorServicer): + def __init__(self): + self.registry = MemoryNodeRegistry() + self.journal = TaskJournal() + self.pool = GlobalWorkPool() + self.assistant = TaskAssistant(self.registry, self.journal, self.pool) + + def SyncConfiguration(self, request, context): + # Pre-registration for metadata search + self.registry.register(request.node_id, queue.Queue(), {"desc": request.node_description, "caps": dict(request.capabilities)}) + return agent_pb2.RegistrationResponse(success=True, + policy=agent_pb2.SandboxPolicy(mode=agent_pb2.SandboxPolicy.STRICT, + allowed_commands=["ls", "uname", "echo", "sleep"])) + + def TaskStream(self, request_iterator, context): node_id = None - # We need a way to send messages to the client from another thread/input - # In a real app, this would be triggered by an AI task planner - send_queue = queue.Queue() + def _read(): + nonlocal node_id + for msg in request_iterator: + kind = msg.WhichOneof('payload') + if kind == 'task_claim': + node_id = msg.task_claim.node_id + success, payload = self.pool.claim(msg.task_claim.task_id, node_id) + if success: + sig = hmac.new(SECRET_KEY.encode(), payload.encode(), hashlib.sha256).hexdigest() + self.registry.get_node(node_id)["queue"].put(agent_pb2.ServerTaskMessage( + task_request=agent_pb2.TaskRequest(task_id=msg.task_claim.task_id, payload_json=payload, signature=sig))) + elif kind == 'task_response': + self.journal.fulfill(msg.task_response.task_id, {"stdout": msg.task_response.stdout, "status": msg.task_response.status}) + + threading.Thread(target=_read, daemon=True).start() - def stream_messages(): - while context.is_active(): + while context.is_active(): + # Broadcast pool + if self.pool.available: + yield agent_pb2.ServerTaskMessage(work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=list(self.pool.available.keys()))) + + # Send direct tasks + if node_id and self.registry.get_node(node_id): try: - msg = send_queue.get(timeout=1.0) + msg = self.registry.get_node(node_id)["queue"].get(timeout=2) yield msg - except queue.Empty: - continue + except queue.Empty: pass + else: time.sleep(1) - # Start a thread to handle incoming messages from the client - incoming_thread = threading.Thread(target=self._handle_incoming, args=(request_iterator, context, send_queue)) - incoming_thread.start() - - # Yield messages from the queue to the client - for msg in stream_messages(): - yield msg - - def _handle_incoming(self, request_iterator, context, send_queue): - try: - for message in request_iterator: - payload_type = message.WhichOneof('payload') - if payload_type == 'registration': - reg = message.registration - print(f"[*] Node {reg.node_id} registration request. Verifying token...") - - try: - # Verify JWT - # In real app, we check if node_id matches token subject - decoded = jwt.decode(reg.auth_token, SECRET_KEY, algorithms=["HS256"]) - print(f" [OK] Token verified for workspace: {decoded.get('workspace_id')}") - - # Send ACK - ack = agent_pb2.ServerMessage( - registration_ack=agent_pb2.RegistrationResponse( - success=True, - session_id="session-secure-123" - ) - ) - send_queue.put(ack) - - # Test 1: Allowed Command - t1_payload = '{"command": "whoami"}' - t1_sig = hmac.new(SECRET_KEY.encode(), t1_payload.encode(), hashlib.sha256).hexdigest() - send_queue.put(agent_pb2.ServerMessage( - task_request=agent_pb2.TaskRequest( - task_id="task-001-ALLOWED", - task_type="shell", - payload_json=t1_payload, - trace_id="trace-001", - signature=t1_sig - ) - )) - - # Test 2: Sensitive Command (Consent Required) - t2_payload = '{"command": "rm -rf /tmp/node-test"}' - t2_sig = hmac.new(SECRET_KEY.encode(), t2_payload.encode(), hashlib.sha256).hexdigest() - send_queue.put(agent_pb2.ServerMessage( - task_request=agent_pb2.TaskRequest( - task_id="task-002-SENSITIVE", - task_type="shell", - payload_json=t2_payload, - trace_id="trace-002", - signature=t2_sig - ) - )) - - # Test 3: Path Traversal Attempt (JAILBREAK) - t3_payload = '{"command": "cat ../.env.gitbucket"}' - t3_sig = hmac.new(SECRET_KEY.encode(), t3_payload.encode(), hashlib.sha256).hexdigest() - send_queue.put(agent_pb2.ServerMessage( - task_request=agent_pb2.TaskRequest( - task_id="task-003-TRAVERSAL", - task_type="shell", - payload_json=t3_payload, - trace_id="trace-003", - signature=t3_sig - ) - )) - - # Test 4: STRICTLY Forbidden - t4_payload = '{"command": "sudo apt update"}' - t4_sig = hmac.new(SECRET_KEY.encode(), t4_payload.encode(), hashlib.sha256).hexdigest() - send_queue.put(agent_pb2.ServerMessage( - task_request=agent_pb2.TaskRequest( - task_id="task-004-FORBIDDEN", - task_type="shell", - payload_json=t4_payload, - trace_id="trace-004", - signature=t4_sig - ) - )) - - # Test 5: Non-whitelisted but Allowed in PERMISSIVE - t5_payload = '{"command": "df -h"}' - t5_sig = hmac.new(SECRET_KEY.encode(), t5_payload.encode(), hashlib.sha256).hexdigest() - send_queue.put(agent_pb2.ServerMessage( - task_request=agent_pb2.TaskRequest( - task_id="task-005-PERMISSIVE", - task_type="shell", - payload_json=t5_payload, - trace_id="trace-005", - signature=t5_sig - ) - )) - - print("[*] Sequence of 5 test tasks dispatched to verify Sandbox Policy (PERMISSIVE mode).") - - except jwt.ExpiredSignatureError: - print(f" [FAIL] Token for {reg.node_id} expired.") - ack = agent_pb2.ServerMessage( - registration_ack=agent_pb2.RegistrationResponse( - success=False, - error_message="Authentication token expired." - ) - ) - send_queue.put(ack) - except jwt.InvalidTokenError as e: - print(f" [FAIL] Invalid token for {reg.node_id}: {e}") - ack = agent_pb2.ServerMessage( - registration_ack=agent_pb2.RegistrationResponse( - success=False, - error_message=f"Invalid authentication token: {e}" - ) - ) - send_queue.put(ack) - - elif payload_type == 'heartbeat': - hb = message.heartbeat - # print(f"[+] Heartbeat from {hb.node_id}: CPU {hb.cpu_usage_percent}%") - pass - - elif payload_type == 'task_response': - res = message.task_response - print(f"[!] Task Finished: {res.task_id} | Status: {res.status}") - print(f" Stdout: {res.stdout.strip()}") - except Exception as e: - print(f"[!] Error handling incoming stream: {e}") + def ReportHealth(self, request_iterator, context): + for hb in request_iterator: + self.registry.update_stats(hb.node_id, {"active_worker_count": hb.active_worker_count, "running": list(hb.running_task_ids)}) + yield agent_pb2.HealthCheckResponse(server_time_ms=int(time.time()*1000)) def serve(): - # Load certificates for mTLS - print("[πŸ”] Loading mTLS certificates...") - with open('certs/server.key', 'rb') as f: - private_key = f.read() - with open('certs/server.crt', 'rb') as f: - certificate_chain = f.read() - with open('certs/ca.crt', 'rb') as f: - root_certificates = f.read() - - # Create server credentials - # require_client_auth=True enforces bidirectional verification (mTLS) - server_credentials = grpc.ssl_server_credentials( - [(private_key, certificate_chain)], - root_certificates=root_certificates, - require_client_auth=True - ) - + with open('certs/server.key', 'rb') as f: pkey = f.read() + with open('certs/server.crt', 'rb') as f: cert = f.read() + with open('certs/ca.crt', 'rb') as f: ca = f.read() + creds = grpc.ssl_server_credentials([(pkey, cert)], ca, True) server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - agent_pb2_grpc.add_AgentOrchestratorServicer_to_server(AgentOrchestratorServicer(), server) + orch = AgentOrchestrator() + agent_pb2_grpc.add_AgentOrchestratorServicer_to_server(orch, server) + server.add_secure_port('[::]:50051', creds) - # Use secure_port instead of insecure_port - server.add_secure_port('[::]:50051', server_credentials) - print("[*] Cortex Secure Server POC listening on port 50051 (mTLS enabled)...") + print("[πŸ›‘οΈ] Boss Plane Refactored & Online.") server.start() + + # Simple AI Simulation loop + time.sleep(10) + print("\n[🧠] AI Simulation Start...") + print(f" Whoami: {orch.assistant.dispatch_single('agent-node-007', 'whoami')}") + server.wait_for_termination() if __name__ == '__main__':