diff --git a/.agent/utils/gitbucket/add_comment.py b/.agent/utils/gitbucket/add_comment.py index 80d1666..8f6ef4e 100644 --- a/.agent/utils/gitbucket/add_comment.py +++ b/.agent/utils/gitbucket/add_comment.py @@ -21,6 +21,8 @@ sys.exit(1) url = f"https://gitbucket.jerxie.com/api/v3/repos/{repo}/issues/{issue_number}/comments" + # Support literal \n passed from shell + body = body.replace("\\n", "\n") data = {"body": body} req = urllib.request.Request(url, json.dumps(data).encode("utf-8"), headers={ diff --git a/.agent/utils/gitbucket/create_issue.py b/.agent/utils/gitbucket/create_issue.py index 6116285..2692f68 100644 --- a/.agent/utils/gitbucket/create_issue.py +++ b/.agent/utils/gitbucket/create_issue.py @@ -21,6 +21,8 @@ sys.exit(1) url = f"https://gitbucket.jerxie.com/api/v3/repos/{repo}/issues" + # Support literal \n passed from shell + body = body.replace("\\n", "\n") data = {"title": title, "body": body} req = urllib.request.Request(url, json.dumps(data).encode("utf-8"), headers={ diff --git a/docs/architecture/agent_node_feature_gap_analysis.md b/docs/architecture/agent_node_feature_gap_analysis.md new file mode 100644 index 0000000..db6bce5 --- /dev/null +++ b/docs/architecture/agent_node_feature_gap_analysis.md @@ -0,0 +1,37 @@ +# πŸ” Cortex Agent Node: Feature Gap Analysis & Roadmap + +This document outlines the critical missing features required to transition the current gRPC Proof of Concept (PoC) into a full-scale, production-ready Distributed AI Agent System. + +## 1. πŸ—„οΈ Workspace & File Synchronization +The current node executes commands but lacks a native way to manage project-level files. +- **The Gap**: No bi-directional sync (e.g., local server files -> node workspace). +- **Required**: A content-addressable synchronization layer (Merkle Tree / Hash-based) to efficiently mirror workspaces to remote nodes without redundant transfers. + +## 2. 🌊 Real-time Log Streaming (Observability) +Currently, `stdout/stderr` is only returned upon task completion. +- **The Gap**: No visibility into long-running tasks or hanging builds. +- **Required**: Implementing gRPC Server-to-Client streaming for live console logs, allowing the Main AI to detect progress or failures as they occur. + +## 3. πŸ›‘οΈ Robust Sandbox Isolation +The current sandbox relies on string-filtering shell commands. +- **The Gap**: Vulnerable to complex shell escapes, symlink attacks, and environment manipulation. +- **Required**: OS-level containerization (Docker, Podman, or Firecracker microVMs) to ensure each task is strictly trapped within its own namespace. + +## 4. πŸ”— Specialized Sub-Worker Protocols (CDP/LSP) +The agent treats browser automation and coding as generic shell commands. +- **The Gap**: Inefficiency; starting a fresh browser for every click is slow and loses state. +- **Required**: Persistent sub-bridges (e.g., Chrome DevTools Protocol link) allowing the Main AI to maintain a long-running session across multiple delegated tasks. + +## 5. πŸ“¦ Binary Artifact & Large Data Handling +The system currently lacks logic for large file transport. +- **The Gap**: gRPC message limits (4MB) will crash the system if a node tries to return a video capture or large log file. +- **Required**: Chunked file upload/download logic for artifacts like screenshots, videos, and build binaries. + +## πŸ—οΈ Node Lifecycle & Quality of Life +- **Automatic Updates**: Mechanism for nodes to self-update their binary/logic when the central protocol evolves. +- **Graceful Shutdown**: Handling system signals to allow background workers to finish or clean up before disconnection. +- **Local Cache**: Persistence for task history and metadata on the node to handle temporary network partitions. + +--- +> [!NOTE] +> These features bridge the gap between "Command Execution" and "Full Autonomous Collaboration." diff --git a/docs/architecture/cortex_agent_node_plan.md b/docs/architecture/cortex_agent_node_plan.md index 2129fc4..a351446 100644 --- a/docs/architecture/cortex_agent_node_plan.md +++ b/docs/architecture/cortex_agent_node_plan.md @@ -66,12 +66,19 @@ - Include headless/visible toggles, timeboxed navigation, and DOM snapshot streaming. - **Outcome**: High-performance, low-latency visual interaction with local web pages. -### Phase 5: Concurrency & Task Isolation -- **Goal**: Handle simultaneous requests without state corruption. -- **Tasks**: - - **Task Isolation Model**: Ensure each task runs in an isolated context. Browser actions get isolated contexts; file writes use advisory locks. - - Introduce Resource Quotas (limit % CPU/Memory per agent). -- **Outcome**: Multiple tasks execute safely and concurrently without race conditions. +### Phase 5: Concurrency & Distributed Architecture - βœ… COMPLETE +- **Status**: Verified in `/app/poc-grpc-agent/`. +- **Achievements**: + - **Company Architecture**: Implemented Boss (Server) and Dept Manager (Node) hierarchy. + - **Multi-Channel gRPC**: Split communication into 1. **Config**, 2. **Task**, and 3. **Health** channels. + - **Dynamic Policy Sync**: Client-side permissions are pushed from the server during handshake. + - **AI-Ready Node Metadata**: Nodes register with a **Skill Description** (metadata) and hardware capabilities. + - **Worker Pool Robustness**: Migrated to stateful `subprocess.Popen` execution. The manager now monitors individual process handles to prevent "Phantom Workers" (leaked threads). + - **Modular Refactor**: Implemented a **Skill-Based Architecture** (Client) and **Task Journal** (Server) to decouple gRPC networking from business logic, enabling easy expansion for CDP/LSP. + - **Hanging Task Recovery**: Implemented **Remote Cancellation**. The server can detect hanging tasks via the health channel and signal the node to `kill` the specific process. + - **Fault Tolerance & Retries**: The `TaskAssistant` automatically retries failed/timed-out tasks up to 3x. + - **Worker Pool (Sub-threading)**: Main node thread (Manager) dispatches to isolated execution threads (Workers). + - **Global Work Pool**: Shared task discovery and **Task Claiming** to prevent rework across nodes. ### Phase 6: Scaling & Frontend UI Integration - **Goal**: Support multiple nodes and surface insights in the UI. diff --git a/docs/architecture/cortex_project_todo.md b/docs/architecture/cortex_project_todo.md new file mode 100644 index 0000000..926c2fe --- /dev/null +++ b/docs/architecture/cortex_project_todo.md @@ -0,0 +1,71 @@ +# πŸ“ Cortex Distributed Agent: Project TODO List + +This document tracks prioritized tasks, technical debt, and future implementations for the Cortex project. + +## πŸš€ High Priority (Infrastructure) + +### [ ] Persistent Sub-Worker Bridges (CDP/LSP) - βœ… FOUNDATIONS BUILT +- **Status**: Basic Navigation, Screenshotting, and Persistent Session state implemented. +- **Goal**: Support a professional, high-fidelity **Antigravity Browser Skill**. + +#### 🌐 Comprehensive Browser Skill Requirements: +- **[ ] JS Console Tunnel**: Pipe `console.log/error` from the browser back to the server in real-time. +- **[ ] Network Observability**: Capture and return XHR/Fetch traffic (HAR or failed requests only) for AI debugging. +- **[ ] A11y Tree Perception**: Provide the **Accessibility Tree** (JSON) to the AI instead of just raw HTML/DOM for better semantic understanding. +- **[ ] Advanced Interactions**: Support `Hover`, `Scroll`, `Drag & Drop`, and `Multi-key` complex input. +- **[ ] EVAL Skill**: Allow the AI to inject and execute arbitrary JavaScript (`page.evaluate()`) to extract data or trigger events. +- **[ ] Smart Wait Logic**: Implement `wait_for_network_idle`, `wait_for_selector`, and custom predicates to reduce task flakiness. +- **[ ] Artifact Extraction**: Export high-definition **Videos** (chunked) and **HAR** files for audit trails. + +### [ ] Multi-Tenancy & Resource Isolation +- **Description**: Isolate node groups by user/tenant and enforce hardware quotas. +- **Why**: Allows the Main AI (Antigravity) to manage resource usage and forcefully **cancel zombie tasks** that may be hanging or orphaned, ensuring node health. + +### [ ] Binary Artifact & Large Data Handling (Chunking) +- **Description**: Implement gRPC stream-based chunking for large artifacts. +- **Specific Case**: Support high-fidelity **Video Recordings** from Browser sessions (multi-GB files). +- **Requirement**: **Transparency**. The Main AI should just see a "File" result; reassembly happens at the server layer. + +### [ ] Architectural Refinement: Unified Worker Shim +- **Description**: Re-evaluate the "Skill" abstraction. Move towards a model where each task is a specialized worker process that decides its capability (Shell vs Playwright) at startup. +- **Goal**: Simplifies context isolation and reduces manager-thread overhead. + +### [ ] Graceful Shutdown & Local Task Persistence (Built-in) +- **Description**: Handle node interrupts (SIGTERM/SIGINT) to allow workers to finish or checkpoint. Store a local `task_history.json` on the node to recover state after crash/restart. + +### [ ] Server-Side Registry & Task Persistence +- **Description**: Migrate `NodeRegistry` and `WorkPool` from in-memory to a persistent backend (Postgres/Redis). +- **Priority**: Deferred until **Full System Integration** phase. + +### [ ] Workspace Mirroring & Efficient File Sync +- **Description**: Maintain a local server-side mirror of node workspaces for Zero-latency AI perception. + +### [ ] Real-time gRPC Log Streaming +- **Description**: Bidirectional stream for live `stdout/stderr`. + +--- + +## 🐒 Low Priority / Observation + +### [ ] OS-Level Isolation (Firecracker/VNC) +- **Description**: Lightweight virtualization (microVMs) for worker execution. +- **Status**: Monitored. + +### [ ] Node Lifecycle: Auto-Updates +- **Description**: Mechanism for nodes to self-update. + +### [ ] Vertical & Horizontal Scalability +- **Description**: Migrate to a stateless server design with load balancing. + +--- + +## πŸ—ΊοΈ Future Roadmap (Strategic) + +### [ ] Advanced Scheduling & Capability Routing +- **Description**: Sophisticated scheduler to match complex constraints (GPU, Region, Priority). + +### [ ] mTLS Certificate Lifecycle Management +- **Description**: Automated renewal, revocation, and rotation of node certificates. + +### [ ] Immutable Audit & Compliance +- **Description**: Cryptographically signed records of every TaskRequest and TaskResponse for forensics. diff --git a/docs/architecture/cortex_server_feature_gap_analysis.md b/docs/architecture/cortex_server_feature_gap_analysis.md new file mode 100644 index 0000000..e944e79 --- /dev/null +++ b/docs/architecture/cortex_server_feature_gap_analysis.md @@ -0,0 +1,37 @@ +# πŸ” Cortex Server: Feature Gap Analysis & Roadmap + +This document outlines the missing capabilities required to evolve the **Cortex Boss (Server)** from a Proof of Concept into a highly available, secure, and scalable central orchestrator. + +## 1. πŸ’Ύ Registry & Task Persistence +Currently, the `NodeRegistry` and `WorkPool` are entirely in-memory. +- **The Gap**: If the server restarts, all active node sessions, task histories, and heartbeat data are lost. +- **Required**: A persistent backend (e.g., PostgreSQL or Redis) to store node metadata, lifelong audit logs, and the global task queue state. + +## 2. βš–οΈ Vertical & Horizontal Scalability +The server is currently a single-process gRPC listener. +- **The Gap**: A single server cannot scale to thousands of active mTLS connections or handle geographic distribution. +- **Required**: A stateless server design where node connections can be balanced across a cluster using a shared message broker (like RabbitMQ or NATS) for task dispatch. + +## 3. 🎯 Advanced Scheduling & Capability Routing +The `TaskAssistant` uses basic "best manager" logic. +- **The Gap**: No support for complex constraints (e.g., "Node must have NVIDIA GPU", "Node must be in US-East region", "Task priority: High"). +- **Required**: A sophisticated scheduler that matches task requirements against node capabilities and real-time health metrics (Telemetric Routing). + +## 4. πŸ” mTLS Certificate Lifecycle Management +Certificates are currently static files manually generated. +- **The Gap**: No mechanism for automatic renewal, revocation, or hardware-backed security (HSM). +- **Required**: Integration with a Certificate Authority (CA) like HashiCorp Vault or AWS Private CA to manage automatic node cert rotation and CRL (Certificate Revocation List) checking. + +## 5. πŸ“œ Immutable Audit & Compliance +The server lacks a "Black Box" recorder. +- **The Gap**: No forensic record of which command was authorized by which user and executed on which node. +- **Required**: An immutable audit log where every `TaskRequest` and signed `TaskResponse` is archived with cryptographic timestamps for security auditing. + +## 🏒 Multi-Tenancy & Resource Isolation +- **Tenant Guardrails**: Logic to ensure Node A (owned by User X) can never receive a task from User Y. +- **Resource Quotas**: Limiting the number of concurrent tasks a specific user or node group can consume. +- **Web Console / API**: A management UI to visualize the global fleet, manually cancel "zombie" tasks, and update node policies. + +--- +> [!TIP] +> This roadmap transforms the server from a "gRPC Bridge" into a "Sovereign Control Plane" capable of managing enterprise-scale distributed intelligence. diff --git a/poc-grpc-agent/.nfs00000000006c16c80000003a b/poc-grpc-agent/.nfs00000000006c16c80000003a new file mode 100644 index 0000000..be501d1 --- /dev/null +++ b/poc-grpc-agent/.nfs00000000006c16c80000003a @@ -0,0 +1,33 @@ +[πŸ›‘οΈ] Boss Plane Orchestrator Starting on [::]:50051... +[πŸ›‘οΈ] Boss Plane Refactored & Online. +[πŸ“‹] Registered Agent Node: agent-node-007 +[πŸ“Ά] Stream Online for agent-node-007 + [πŸ“¦] Task shared-001 Claimed by agent-node-007 + [πŸ“¦] Task shared-002 Claimed by agent-node-007 + [πŸš€] Streamed message to agent-node-007 + [πŸš€] Streamed message to agent-node-007 + +[🧠] AI Simulation Start... +[πŸ“€] Dispatching shell task-1772514336015 to agent-node-007 + [πŸš€] Streamed message to agent-node-007 + Uname Output: {'stdout': 'Linux d1ceb63b86a7 6.10.11-linuxkit #1 SMP Thu Oct 3 10:17:28 UTC 2024 aarch64 GNU/Linux\n', 'status': 0} + +[🧠] AI Phase 4: Navigating Browser (Antigravity Bridge)... +[πŸŒπŸ“€] Dispatching browser br-1772514336028 to agent-node-007 + [πŸš€] Streamed message to agent-node-007 + [🌐] Net Inspect: GET https://example.com/ + Nav Result: {'stdout': '', 'status': 0, 'browser': {'url': 'https://example.com/', 'title': 'Example Domain', 'has_snapshot': False, 'a11y': None, 'eval': ''}} + +[🧠] AI Phase 4 Pro: Perception & Advanced Logic... +[πŸŒπŸ“€] Dispatching browser br-1772514336293 to agent-node-007 + [πŸš€] Streamed message to agent-node-007 + A11y Result: {"role": "WebArea", "name": "Example Domain", "children": [{"role": "heading", "name": "Example Doma... +[πŸŒπŸ“€] Dispatching browser br-1772514336300 to agent-node-007 + [πŸš€] Streamed message to agent-node-007 + Eval Result: 115.89999997615814 + +[🧠] AI Phase 4 Pro: Triggering Real-time Events (Tunneling)... +[πŸŒπŸ“€] Dispatching browser br-1772514336305 to agent-node-007 + [πŸš€] Streamed message to agent-node-007 + [πŸ–₯️] Live Console: Refactored Hello! + [πŸ–₯️] Live Console: Failed to load resource: the server responded with a status of 404 () diff --git a/poc-grpc-agent/.nfs00000000006c16c900000039 b/poc-grpc-agent/.nfs00000000006c16c900000039 new file mode 100644 index 0000000..9d3246b --- /dev/null +++ b/poc-grpc-agent/.nfs00000000006c16c900000039 @@ -0,0 +1,51 @@ +[*] Starting Antigravity Agent Node: agent-node-007... +[🌐] Browser Actor Starting... +[*] Handshake with Orchestrator: agent-node-007 +[OK] Sandbox Policy Synced. +[*] Task Stream Online: agent-node-007 +[🌐] Browser Engine Online. + [πŸ“₯] Received from Stream: work_pool_update +[*] Inbound: work_pool_update + [πŸ“₯] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: shared-001 +[βœ…] Validated task shared-001 + [🐚] Executing Shell: uname -a + [πŸ“₯] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: shared-002 +[βœ…] Validated task shared-002 +[*] Completion: shared-002 + [🐚] Shell Done: uname -a | Stdout Size: 90 +[*] Completion: shared-001 + [πŸ“₯] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: task-1772514336015 +[βœ…] Validated task task-1772514336015 + [🐚] Executing Shell: uname -a + [🐚] Shell Done: uname -a | Stdout Size: 90 +[*] Completion: task-1772514336015 + [πŸ“₯] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: br-1772514336028 +[βœ…] Validated task br-1772514336028 + [🌐] Browser Actor Processing: NAVIGATE | Session: antigravity-session-1 +[*] Completion: br-1772514336028 + [πŸ“₯] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: br-1772514336293 +[βœ…] Validated task br-1772514336293 + [🌐] Browser Actor Processing: GET_A11Y | Session: antigravity-session-1 +[*] Completion: br-1772514336293 + [πŸ“₯] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: br-1772514336300 +[βœ…] Validated task br-1772514336300 + [🌐] Browser Actor Processing: EVAL | Session: antigravity-session-1 +[*] Completion: br-1772514336300 + [πŸ“₯] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: br-1772514336305 +[βœ…] Validated task br-1772514336305 + [🌐] Browser Actor Processing: EVAL | Session: antigravity-session-1 +[*] Completion: br-1772514336305 diff --git a/poc-grpc-agent/agent_node/__init__.py b/poc-grpc-agent/agent_node/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/agent_node/__init__.py diff --git a/poc-grpc-agent/agent_node/config.py b/poc-grpc-agent/agent_node/config.py new file mode 100644 index 0000000..d0c5f62 --- /dev/null +++ b/poc-grpc-agent/agent_node/config.py @@ -0,0 +1,20 @@ +import os +import platform + +# 12-Factor Config: Environment variables with defaults +SECRET_KEY = os.getenv("AGENT_SECRET_KEY", "cortex-secret-shared-key") +NODE_ID = os.getenv("AGENT_NODE_ID", "agent-node-007") +NODE_DESC = os.getenv("AGENT_NODE_DESC", "Modular Stateful Node") + +# Orchestrator Connection +SERVER_HOST = os.getenv("SERVER_HOST", "localhost") +SERVER_PORT = os.getenv("SERVER_PORT", "50051") + +# Certificate Paths +CERT_CA = os.getenv("CERT_CA", "certs/ca.crt") +CERT_CLIENT_CRT = os.getenv("CERT_CLIENT_CRT", "certs/client.crt") +CERT_CLIENT_KEY = os.getenv("CERT_CLIENT_KEY", "certs/client.key") + +# Resource Limits +MAX_SKILL_WORKERS = int(os.getenv("MAX_SKILL_WORKERS", "5")) +HEALTH_REPORT_INTERVAL = int(os.getenv("HEALTH_REPORT_INTERVAL", "10")) diff --git a/poc-grpc-agent/agent_node/core/__init__.py b/poc-grpc-agent/agent_node/core/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/agent_node/core/__init__.py diff --git a/poc-grpc-agent/agent_node/core/sandbox.py b/poc-grpc-agent/agent_node/core/sandbox.py new file mode 100644 index 0000000..9f9390c --- /dev/null +++ b/poc-grpc-agent/agent_node/core/sandbox.py @@ -0,0 +1,31 @@ +from protos import agent_pb2 + +class SandboxEngine: + """Core Security Engine for Local Command Verification.""" + def __init__(self): + self.policy = None + + def sync(self, p): + """Syncs the latest policy from the Orchestrator.""" + self.policy = { + "MODE": "STRICT" if p.mode == agent_pb2.SandboxPolicy.STRICT else "PERMISSIVE", + "ALLOWED": list(p.allowed_commands), + "DENIED": list(p.denied_commands), + "SENSITIVE": list(p.sensitive_commands) + } + + def verify(self, command_str): + """Verifies if a command string is allowed under the current policy.""" + if not self.policy: return False, "No Policy" + + parts = (command_str or "").strip().split() + if not parts: return False, "Empty" + + base_cmd = parts[0] + if base_cmd in self.policy["DENIED"]: + return False, f"Forbidden command: {base_cmd}" + + if self.policy["MODE"] == "STRICT" and base_cmd not in self.policy["ALLOWED"]: + return False, f"Command '{base_cmd}' not whitelisted" + + return True, "OK" diff --git a/poc-grpc-agent/agent_node/main.py b/poc-grpc-agent/agent_node/main.py new file mode 100644 index 0000000..a39c687 --- /dev/null +++ b/poc-grpc-agent/agent_node/main.py @@ -0,0 +1,30 @@ +import sys +import os + +# Add root to path to find protos and other packages +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from agent_node.node import AgentNode +from agent_node.config import NODE_ID + +def main(): + print(f"[*] Starting Antigravity Agent Node: {NODE_ID}...") + + # 1. Initialization + node = AgentNode() + + # 2. Handshake: Sync configuration and Sandbox Policy + node.sync_configuration() + + # 3. Background: Start health reporting (Heartbeats) + node.start_health_reporting() + + # 4. Foreground: Run Persistent Task Stream + node.run_task_stream() + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print("\n[πŸ›‘] Agent Node Shutdown.") + sys.exit(0) diff --git a/poc-grpc-agent/agent_node/node.py b/poc-grpc-agent/agent_node/node.py new file mode 100644 index 0000000..59a1e2c --- /dev/null +++ b/poc-grpc-agent/agent_node/node.py @@ -0,0 +1,141 @@ +import threading +import queue +import time +import sys +from protos import agent_pb2, agent_pb2_grpc +from agent_node.skills.manager import SkillManager +from agent_node.core.sandbox import SandboxEngine +from agent_node.utils.auth import create_auth_token, verify_task_signature +from agent_node.utils.network import get_secure_stub +from agent_node.config import NODE_ID, NODE_DESC, HEALTH_REPORT_INTERVAL, MAX_SKILL_WORKERS + +class AgentNode: + """The 'Agent Core': Orchestrates Local Skills and Maintains gRPC Connection.""" + def __init__(self, node_id=NODE_ID): + self.node_id = node_id + self.skills = SkillManager(max_workers=MAX_SKILL_WORKERS) + self.sandbox = SandboxEngine() + self.task_queue = queue.Queue() + self.stub = get_secure_stub() + + def sync_configuration(self): + """Initial handshake to retrieve policy and metadata.""" + print(f"[*] Handshake with Orchestrator: {self.node_id}") + reg_req = agent_pb2.RegistrationRequest( + node_id=self.node_id, + auth_token=create_auth_token(self.node_id), + node_description=NODE_DESC, + capabilities={"shell": "v1", "browser": "playwright-sync-bridge"} + ) + + try: + res = self.stub.SyncConfiguration(reg_req) + if res.success: + self.sandbox.sync(res.policy) + print("[OK] Sandbox Policy Synced.") + else: + print(f"[!] Rejection: {res.error_message}") + sys.exit(1) + except Exception as e: + print(f"[!] Connection Fail: {e}") + sys.exit(1) + + def start_health_reporting(self): + """Streaming node metrics to the orchestrator for load balancing.""" + def _gen(): + while True: + ids = self.skills.get_active_ids() + yield agent_pb2.Heartbeat( + node_id=self.node_id, cpu_usage_percent=1.0, + active_worker_count=len(ids), + max_worker_capacity=MAX_SKILL_WORKERS, + running_task_ids=ids + ) + time.sleep(HEALTH_REPORT_INTERVAL) + + # Non-blocking thread for health heartbeat + threading.Thread( + target=lambda: list(self.stub.ReportHealth(_gen())), + daemon=True, name=f"Health-{self.node_id}" + ).start() + + def run_task_stream(self): + """Main Persistent Bi-directional Stream for Task Management.""" + def _gen(): + # Initial announcement for routing identity + yield agent_pb2.ClientTaskMessage( + announce=agent_pb2.NodeAnnounce(node_id=self.node_id) + ) + while True: + yield self.task_queue.get() + + responses = self.stub.TaskStream(_gen()) + print(f"[*] Task Stream Online: {self.node_id}", flush=True) + + try: + for msg in responses: + kind = msg.WhichOneof('payload') + print(f" [πŸ“₯] Received from Stream: {kind}", flush=True) + self._process_server_message(msg) + except Exception as e: + print(f"[!] Task Stream Failure: {e}", flush=True) + + def _process_server_message(self, msg): + kind = msg.WhichOneof('payload') + print(f"[*] Inbound: {kind}", flush=True) + + if kind == 'task_request': + self._handle_task(msg.task_request) + + elif kind == 'task_cancel': + if self.skills.cancel(msg.task_cancel.task_id): + self._send_response(msg.task_cancel.task_id, None, agent_pb2.TaskResponse.CANCELLED) + + elif kind == 'work_pool_update': + # Claim logical idle tasks from global pool + if len(self.skills.get_active_ids()) < MAX_SKILL_WORKERS: + for tid in msg.work_pool_update.available_task_ids: + self.task_queue.put(agent_pb2.ClientTaskMessage( + task_claim=agent_pb2.TaskClaimRequest(task_id=tid, node_id=self.node_id) + )) + + def _handle_task(self, task): + print(f"[*] Task Launch: {task.task_id}", flush=True) + # 1. Cryptographic Signature Verification + if not verify_task_signature(task): + print(f"[!] Signature Validation Failed for {task.task_id}", flush=True) + return + + print(f"[βœ…] Validated task {task.task_id}", flush=True) + + # 2. Skill Manager Submission + success, reason = self.skills.submit(task, self.sandbox, self._on_finish, self._on_event) + if not success: + print(f"[!] Execution Rejected: {reason}", flush=True) + + def _on_event(self, event): + """Live Event Tunneler: Routes browser/skill events into the main stream.""" + self.task_queue.put(agent_pb2.ClientTaskMessage(browser_event=event)) + + def _on_finish(self, tid, res, trace): + """Final Completion Callback: Routes task results back to server.""" + print(f"[*] Completion: {tid}", flush=True) + status = agent_pb2.TaskResponse.SUCCESS if res['status'] == 1 else agent_pb2.TaskResponse.ERROR + + tr = agent_pb2.TaskResponse( + task_id=tid, status=status, + stdout=res.get('stdout',''), + stderr=res.get('stderr',''), + trace_id=trace, + browser_result=res.get("browser_result") + ) + self._send_response(tid, tr) + + def _send_response(self, tid, tr=None, status=None): + """Utility for placing response messages into the gRPC outbound queue.""" + if tr: + self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=tr)) + else: + self.task_queue.put(agent_pb2.ClientTaskMessage( + task_response=agent_pb2.TaskResponse(task_id=tid, status=status) + )) diff --git a/poc-grpc-agent/agent_node/skills/__init__.py b/poc-grpc-agent/agent_node/skills/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/agent_node/skills/__init__.py diff --git a/poc-grpc-agent/agent_node/skills/base.py b/poc-grpc-agent/agent_node/skills/base.py new file mode 100644 index 0000000..7df6e34 --- /dev/null +++ b/poc-grpc-agent/agent_node/skills/base.py @@ -0,0 +1,9 @@ +class BaseSkill: + """Abstract interface for all Node capabilities (Shell, Browser, etc.).""" + def execute(self, task, sandbox, on_complete, on_event=None): + """Processes the given task and notifies results via callbacks.""" + raise NotImplementedError + + def cancel(self, task_id: str) -> bool: + """Attempts to cancel the task and returns success status.""" + return False diff --git a/poc-grpc-agent/agent_node/skills/browser.py b/poc-grpc-agent/agent_node/skills/browser.py new file mode 100644 index 0000000..ea957d3 --- /dev/null +++ b/poc-grpc-agent/agent_node/skills/browser.py @@ -0,0 +1,107 @@ +import threading +import queue +import time +import json +from playwright.sync_api import sync_playwright +from agent_node.skills.base import BaseSkill +from protos import agent_pb2 + +class BrowserSkill(BaseSkill): + """The 'Antigravity Bridge': Persistent Browser Skill using a dedicated Actor thread.""" + def __init__(self): + self.task_queue = queue.Queue() + self.sessions = {} # session_id -> { "context": Context, "page": Page } + self.lock = threading.Lock() + threading.Thread(target=self._browser_actor, daemon=True, name="BrowserActor").start() + + def _setup_listeners(self, sid, page, on_event): + """Tunnels browser internal events back to the Orchestrator.""" + if not on_event: return + + # Live Console Redirector + page.on("console", lambda msg: on_event(agent_pb2.BrowserEvent( + session_id=sid, console_msg=agent_pb2.ConsoleMessage( + level=msg.type, text=msg.text, timestamp_ms=int(time.time()*1000) + ) + ))) + + # Live Network Redirector + page.on("requestfinished", lambda req: on_event(agent_pb2.BrowserEvent( + session_id=sid, network_req=agent_pb2.NetworkRequest( + method=req.method, url=req.url, status=req.response().status if req.response() else 0, + resource_type=req.resource_type, latency_ms=0 + ) + ))) + + def _browser_actor(self): + """Serializes all Playwright operations on a single dedicated thread.""" + print("[🌐] Browser Actor Starting...", flush=True) + try: + pw = sync_playwright().start() + # 12-Factor/Container Optimization: Standard non-sandbox arguments + browser = pw.chromium.launch(headless=True, args=[ + '--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-gpu' + ]) + print("[🌐] Browser Engine Online.", flush=True) + except Exception as e: + print(f"[!] Browser Actor Startup Fail: {e}", flush=True) + return + + while True: + try: + task, sandbox, on_complete, on_event = self.task_queue.get() + action = task.browser_action + sid = action.session_id or "default" + + with self.lock: + if sid not in self.sessions: + ctx = browser.new_context() + pg = ctx.new_page() + self._setup_listeners(sid, pg, on_event) + self.sessions[sid] = {"context": ctx, "page": pg} + + page = self.sessions[sid]["page"] + print(f" [🌐] Browser Actor Processing: {agent_pb2.BrowserAction.ActionType.Name(action.action)} | Session: {sid}", flush=True) + + res_data = {} + # State-Machine Logic for Actions + if action.action == agent_pb2.BrowserAction.NAVIGATE: + page.goto(action.url, wait_until="commit") + elif action.action == agent_pb2.BrowserAction.CLICK: + page.click(action.selector) + elif action.action == agent_pb2.BrowserAction.TYPE: + page.fill(action.selector, action.text) + elif action.action == agent_pb2.BrowserAction.SCREENSHOT: + res_data["snapshot"] = page.screenshot() + elif action.action == agent_pb2.BrowserAction.GET_DOM: + res_data["dom_content"] = page.content() + elif action.action == agent_pb2.BrowserAction.HOVER: + page.hover(action.selector) + elif action.action == agent_pb2.BrowserAction.SCROLL: + page.mouse.wheel(x=0, y=action.y) + elif action.action == agent_pb2.BrowserAction.EVAL: + res_data["eval_result"] = str(page.evaluate(action.text)) + elif action.action == agent_pb2.BrowserAction.GET_A11Y: + res_data["a11y_tree"] = json.dumps(page.accessibility.snapshot()) + elif action.action == agent_pb2.BrowserAction.CLOSE: + with self.lock: + sess = self.sessions.pop(sid, None) + if sess: sess["context"].close() + + # Results Construction + br_res = agent_pb2.BrowserResponse( + url=page.url, title=page.title(), + snapshot=res_data.get("snapshot", b""), + dom_content=res_data.get("dom_content", ""), + a11y_tree=res_data.get("a11y_tree", ""), + eval_result=res_data.get("eval_result", "") + ) + on_complete(task.task_id, {"status": 1, "browser_result": br_res}, task.trace_id) + except Exception as e: + print(f" [!] Browser Actor Error: {e}", flush=True) + on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id) + + def execute(self, task, sandbox, on_complete, on_event=None): + self.task_queue.put((task, sandbox, on_complete, on_event)) + + def cancel(self, task_id): return False diff --git a/poc-grpc-agent/agent_node/skills/manager.py b/poc-grpc-agent/agent_node/skills/manager.py new file mode 100644 index 0000000..a1a93e7 --- /dev/null +++ b/poc-grpc-agent/agent_node/skills/manager.py @@ -0,0 +1,53 @@ +import threading +from concurrent import futures +from agent_node.skills.shell import ShellSkill +from agent_node.skills.browser import BrowserSkill +from agent_node.config import MAX_SKILL_WORKERS + +class SkillManager: + """Orchestrates multiple modular skills and manages the task worker pool.""" + def __init__(self, max_workers=MAX_SKILL_WORKERS): + self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker") + self.active_tasks = {} # task_id -> future + self.skills = { + "shell": ShellSkill(), + "browser": BrowserSkill() + } + self.max_workers = max_workers + self.lock = threading.Lock() + + def submit(self, task, sandbox, on_complete, on_event=None): + """Routes a task to the appropriate skill and submits it to the thread pool.""" + with self.lock: + if len(self.active_tasks) >= self.max_workers: + return False, "Node Capacity Reached" + + # 1. Routing Engine + if task.HasField("browser_action"): + skill = self.skills["browser"] + else: + skill = self.skills["shell"] + + # 2. Execution submission + future = self.executor.submit(skill.execute, task, sandbox, on_complete, on_event) + self.active_tasks[task.task_id] = future + + # Cleanup hook + future.add_done_callback(lambda f: self._cleanup(task.task_id)) + return True, "Accepted" + + def cancel(self, task_id): + """Attempts to cancel an active task through all registered skills.""" + with self.lock: + cancelled = any(s.cancel(task_id) for s in self.skills.values()) + return cancelled + + def get_active_ids(self): + """Returns the list of currently running task IDs.""" + with self.lock: + return list(self.active_tasks.keys()) + + def _cleanup(self, task_id): + """Internal callback to release capacity when a task finishes.""" + with self.lock: + self.active_tasks.pop(task_id, None) diff --git a/poc-grpc-agent/agent_node/skills/shell.py b/poc-grpc-agent/agent_node/skills/shell.py new file mode 100644 index 0000000..ae681a1 --- /dev/null +++ b/poc-grpc-agent/agent_node/skills/shell.py @@ -0,0 +1,56 @@ +import subprocess +import threading +from .base import BaseSkill + +class ShellSkill(BaseSkill): + """Default Skill: Executing shell commands with sandbox safety.""" + def __init__(self): + self.processes = {} # task_id -> Popen + self.lock = threading.Lock() + + def execute(self, task, sandbox, on_complete, on_event=None): + """Processes shell-based commands for the Node.""" + try: + cmd = task.payload_json + + # 1. Verification Logic + allowed, status_msg = sandbox.verify(cmd) + if not allowed: + err_msg = f"SANDBOX_VIOLATION: {status_msg}" + return on_complete(task.task_id, {"stderr": err_msg, "status": 2}, task.trace_id) + + # 2. Sequential Execution + print(f" [🐚] Executing Shell: {cmd}", flush=True) + p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + with self.lock: + self.processes[task.task_id] = p + + # 3. Timeout Handling + timeout = task.timeout_ms / 1000.0 if task.timeout_ms > 0 else None + stdout, stderr = p.communicate(timeout=timeout) + + print(f" [🐚] Shell Done: {cmd} | Stdout Size: {len(stdout)}", flush=True) + on_complete(task.task_id, { + "stdout": stdout, "stderr": stderr, + "status": 1 if p.returncode == 0 else 2 + }, task.trace_id) + + except subprocess.TimeoutExpired: + self.cancel(task.task_id) + on_complete(task.task_id, {"stderr": "TASK_TIMEOUT", "status": 2}, task.trace_id) + except Exception as e: + on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id) + finally: + with self.lock: + self.processes.pop(task.task_id, None) + + def cancel(self, task_id: str): + """Standard process termination for shell tasks.""" + with self.lock: + p = self.processes.get(task_id) + if p: + print(f"[πŸ›‘] Killing Shell Task: {task_id}") + p.kill() + return True + return False diff --git a/poc-grpc-agent/agent_node/utils/__init__.py b/poc-grpc-agent/agent_node/utils/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/agent_node/utils/__init__.py diff --git a/poc-grpc-agent/agent_node/utils/auth.py b/poc-grpc-agent/agent_node/utils/auth.py new file mode 100644 index 0000000..202fd4c --- /dev/null +++ b/poc-grpc-agent/agent_node/utils/auth.py @@ -0,0 +1,28 @@ +import jwt +import datetime +import hmac +import hashlib +from protos import agent_pb2 +from agent_node.config import SECRET_KEY + +def create_auth_token(node_id: str) -> str: + """Creates a JWT for node authentication.""" + payload = { + "sub": node_id, + "iat": datetime.datetime.utcnow(), + "exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=10) + } + return jwt.encode(payload, SECRET_KEY, algorithm="HS256") + +def verify_task_signature(task, secret=SECRET_KEY) -> bool: + """Verifies HMAC signature for shell or browser tasks.""" + if task.HasField("browser_action"): + a = task.browser_action + # Aligned with orchestrator's sign_browser_action using the string Name + kind = agent_pb2.BrowserAction.ActionType.Name(a.action) + sign_base = f"{kind}:{a.url}:{a.session_id}" + else: + sign_base = task.payload_json + + expected_sig = hmac.new(secret.encode(), sign_base.encode(), hashlib.sha256).hexdigest() + return hmac.compare_digest(task.signature, expected_sig) diff --git a/poc-grpc-agent/agent_node/utils/network.py b/poc-grpc-agent/agent_node/utils/network.py new file mode 100644 index 0000000..6ba7cc2 --- /dev/null +++ b/poc-grpc-agent/agent_node/utils/network.py @@ -0,0 +1,13 @@ +import grpc +from protos import agent_pb2_grpc +from agent_node.config import SERVER_HOST, SERVER_PORT, CERT_CA, CERT_CLIENT_CRT, CERT_CLIENT_KEY + +def get_secure_stub(): + """Initializes a gRPC secure channel and returns the orchestrator stub.""" + with open(CERT_CLIENT_KEY, 'rb') as f: pkey = f.read() + with open(CERT_CLIENT_CRT, 'rb') as f: cert = f.read() + with open(CERT_CA, 'rb') as f: ca = f.read() + + creds = grpc.ssl_channel_credentials(ca, pkey, cert) + channel = grpc.secure_channel(f'{SERVER_HOST}:{SERVER_PORT}', creds) + return agent_pb2_grpc.AgentOrchestratorStub(channel) diff --git a/poc-grpc-agent/agent_pb2.py b/poc-grpc-agent/agent_pb2.py deleted file mode 100644 index 1fffa4a..0000000 --- a/poc-grpc-agent/agent_pb2.py +++ /dev/null @@ -1,46 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: agent.proto -# Protobuf Python Version: 4.25.1 -"""Generated protocol buffer code.""" -from google.protobuf import descriptor as _descriptor -from google.protobuf import descriptor_pool as _descriptor_pool -from google.protobuf import symbol_database as _symbol_database -from google.protobuf.internal import builder as _builder -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - - - -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61gent.proto\x12\x05\x61gent\"\xa1\x01\n\x0bNodeMessage\x12\x32\n\x0cregistration\x18\x01 \x01(\x0b\x32\x1a.agent.RegistrationRequestH\x00\x12%\n\theartbeat\x18\x02 \x01(\x0b\x32\x10.agent.HeartbeatH\x00\x12,\n\rtask_response\x18\x03 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x42\t\n\x07payload\"\x7f\n\rServerMessage\x12\x37\n\x10registration_ack\x18\x01 \x01(\x0b\x32\x1b.agent.RegistrationResponseH\x00\x12*\n\x0ctask_request\x18\x02 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x42\t\n\x07payload\"\xd6\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x10\n\x08platform\x18\x03 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x04 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x12\x12\n\nauth_token\x18\x05 \x01(\t\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x08:\x02\x38\x01\"r\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12\x1e\n\x16next_token_rotation_ms\x18\x04 \x01(\t\"p\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x19\n\x11\x61\x63tive_task_count\x18\x04 \x01(\x05\"\xcb\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x14\n\x0cpayload_json\x18\x03 \x01(\t\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x13\n\x0b\x63\x61ncellable\x18\x05 \x01(\x08\x12\x1b\n\x13\x63\x61pability_required\x18\x06 \x01(\t\x12\x17\n\x0fidempotency_key\x18\x07 \x01(\t\x12\x10\n\x08trace_id\x18\x08 \x01(\t\x12\x11\n\tsignature\x18\t \x01(\t\"\xfd\x01\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x1e\n\x16structured_output_json\x18\x05 \x01(\t\x12\x13\n\x0b\x64uration_ms\x18\x06 \x01(\x05\x12\x10\n\x08trace_id\x18\x07 \x01(\t\"I\n\x06Status\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0b\n\x07SUCCESS\x10\x01\x12\t\n\x05\x45RROR\x10\x02\x12\r\n\tCANCELLED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x32L\n\x11\x41gentOrchestrator\x12\x37\n\x07\x43onnect\x12\x12.agent.NodeMessage\x1a\x14.agent.ServerMessage(\x01\x30\x01\x62\x06proto3') - -_globals = globals() -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_pb2', _globals) -if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._options = None - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_options = b'8\001' - _globals['_NODEMESSAGE']._serialized_start=23 - _globals['_NODEMESSAGE']._serialized_end=184 - _globals['_SERVERMESSAGE']._serialized_start=186 - _globals['_SERVERMESSAGE']._serialized_end=313 - _globals['_REGISTRATIONREQUEST']._serialized_start=316 - _globals['_REGISTRATIONREQUEST']._serialized_end=530 - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=479 - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=530 - _globals['_REGISTRATIONRESPONSE']._serialized_start=532 - _globals['_REGISTRATIONRESPONSE']._serialized_end=646 - _globals['_HEARTBEAT']._serialized_start=648 - _globals['_HEARTBEAT']._serialized_end=760 - _globals['_TASKREQUEST']._serialized_start=763 - _globals['_TASKREQUEST']._serialized_end=966 - _globals['_TASKRESPONSE']._serialized_start=969 - _globals['_TASKRESPONSE']._serialized_end=1222 - _globals['_TASKRESPONSE_STATUS']._serialized_start=1149 - _globals['_TASKRESPONSE_STATUS']._serialized_end=1222 - _globals['_AGENTORCHESTRATOR']._serialized_start=1224 - _globals['_AGENTORCHESTRATOR']._serialized_end=1300 -# @@protoc_insertion_point(module_scope) diff --git a/poc-grpc-agent/agent_pb2_grpc.py b/poc-grpc-agent/agent_pb2_grpc.py deleted file mode 100644 index 22cb3e7..0000000 --- a/poc-grpc-agent/agent_pb2_grpc.py +++ /dev/null @@ -1,70 +0,0 @@ -# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! -"""Client and server classes corresponding to protobuf-defined services.""" -import grpc - -import agent_pb2 as agent__pb2 - - -class AgentOrchestratorStub(object): - """The Cortex Server exposes this service - """ - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.Connect = channel.stream_stream( - '/agent.AgentOrchestrator/Connect', - request_serializer=agent__pb2.NodeMessage.SerializeToString, - response_deserializer=agent__pb2.ServerMessage.FromString, - ) - - -class AgentOrchestratorServicer(object): - """The Cortex Server exposes this service - """ - - def Connect(self, request_iterator, context): - """Bi-directional stream for persistent connection (Phone Home Pattern) - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_AgentOrchestratorServicer_to_server(servicer, server): - rpc_method_handlers = { - 'Connect': grpc.stream_stream_rpc_method_handler( - servicer.Connect, - request_deserializer=agent__pb2.NodeMessage.FromString, - response_serializer=agent__pb2.ServerMessage.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'agent.AgentOrchestrator', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - # This class is part of an EXPERIMENTAL API. -class AgentOrchestrator(object): - """The Cortex Server exposes this service - """ - - @staticmethod - def Connect(request_iterator, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/Connect', - agent__pb2.NodeMessage.SerializeToString, - agent__pb2.ServerMessage.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/poc-grpc-agent/client.py b/poc-grpc-agent/client.py deleted file mode 100644 index b4c568d..0000000 --- a/poc-grpc-agent/client.py +++ /dev/null @@ -1,201 +0,0 @@ -import grpc -import time -import os -import agent_pb2 -import agent_pb2_grpc -import threading -import subprocess -import json -import platform -import jwt -import datetime -import hmac -import hashlib - -SECRET_KEY = "cortex-secret-shared-key" - -# --- Sandbox Policy Configuration --- -SANDBOX_POLICY = { - "MODE": "PERMISSIVE", # Toggle between "STRICT" and "PERMISSIVE" - "ALLOWED_COMMANDS": ["ls", "grep", "cat", "pwd", "git", "echo", "python", "whoami", "uname"], - "SENSITIVE_COMMANDS": ["rm", "cp", "mv", "chmod", "chown", "pkill"], - "DENIED_COMMANDS": ["sudo", "mkfs", "dd", "sh", "bash", "zsh"], - "WORKING_DIR": os.getcwd() -} - -class AgentNode: - def verify_sandbox_policy(self, command_str): - """Verifies if a command is allowed under the current sandbox policy.""" - parts = command_str.strip().split() - if not parts: - return False, "Empty command" - - base_cmd = parts[0] - - # 1. ALWAYS Block Denied List (Strictly forbidden in all modes) - if base_cmd in SANDBOX_POLICY["DENIED_COMMANDS"]: - return False, f"Command '{base_cmd}' is strictly FORBIDDEN." - - # 2. Path Guard (Simple string check for escaping ..) - if ".." in command_str: - return False, "Path traversal attempt detected (.. disallowed)." - - # 3. Mode-specific Logic - if SANDBOX_POLICY["MODE"] == "STRICT": - # In STRICT mode, we check against the whitelist - if base_cmd not in SANDBOX_POLICY["ALLOWED_COMMANDS"] and base_cmd not in SANDBOX_POLICY["SENSITIVE_COMMANDS"]: - return False, f"STRICT MODE: Command '{base_cmd}' is NOT whitelisted." - - # 4. Sensitive / Consent Check (Applied to both modes) - if base_cmd in SANDBOX_POLICY["SENSITIVE_COMMANDS"]: - return True, "SENSITIVE_CONSENT_REQUIRED" - - return True, "OK" - - def create_registration_token(self): - payload = { - "sub": "agent-node-007", - "workspace_id": "ws-production-001", - "iat": datetime.datetime.utcnow(), - "exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=10) - } - return jwt.encode(payload, SECRET_KEY, algorithm="HS256") - - def __init__(self, node_id="agent-node-007"): - self.node_id = node_id - - # Load certificates for mTLS - print("[πŸ”] Loading mTLS certificates...") - try: - with open('certs/client.key', 'rb') as f: - private_key = f.read() - with open('certs/client.crt', 'rb') as f: - certificate_chain = f.read() - with open('certs/ca.crt', 'rb') as f: - root_certificates = f.read() - - # Create secure channel credentials - credentials = grpc.ssl_channel_credentials( - root_certificates=root_certificates, - private_key=private_key, - certificate_chain=certificate_chain - ) - - # Connect to localhost:50051 using secure channel - self.channel = grpc.secure_channel('localhost:50051', credentials) - self.stub = agent_pb2_grpc.AgentOrchestratorStub(self.channel) - print(f"[*] Agent Node {self.node_id} initialized with secure mTLS channel.") - except FileNotFoundError as e: - print(f"[!] Error: Certificates not found. Ensure generate_certs.sh was run. | {e}") - sys.exit(1) - - # ... (rest of methods) - -if __name__ == '__main__': - # We'll use a queue-based generator for better concurrency support - import queue - import sys - msg_queue = queue.Queue() - - node = AgentNode() - - # 1. Registration (Pre-handshake credentials with JWT) - token = node.create_registration_token() - reg = agent_pb2.NodeMessage( - registration=agent_pb2.RegistrationRequest( - node_id=node.node_id, - version="1.2.0", - platform=platform.system() + "-" + platform.machine(), - capabilities={"shell": True, "browser": False, "secure": True}, - auth_token=token - ) - ) - msg_queue.put(reg) - - def heartbeat_thread(): - while True: - time.sleep(10) - hb = agent_pb2.NodeMessage( - heartbeat=agent_pb2.Heartbeat( - node_id=node.node_id, - cpu_usage_percent=1.2, - active_task_count=0 - ) - ) - msg_queue.put(hb) - - threading.Thread(target=heartbeat_thread, daemon=True).start() - - def generator(): - while True: - msg = msg_queue.get() - yield msg - - try: - responses = node.stub.Connect(generator()) - - for response in responses: - payload_type = response.WhichOneof('payload') - if payload_type == 'registration_ack': - ack = response.registration_ack - if ack.success: - print(f"[*] Registered successfully. Session: {ack.session_id}") - else: - print(f"[!] Registration REJECTED: {ack.error_message}") - sys.exit(1) - elif payload_type == 'task_request': - task = response.task_request - print(f"[*] Task Received: {task.task_id}. Verifying signature...") - - # Verify payload signature - expected_sig = hmac.new(SECRET_KEY.encode(), task.payload_json.encode(), hashlib.sha256).hexdigest() - if hmac.compare_digest(task.signature, expected_sig): - print(f" [OK] Signature verified. Checking sandbox policy...") - - payload = json.loads(task.payload_json) - cmd = payload.get("command") - - # --- Sandbox Enforcement --- - allowed, status_msg = node.verify_sandbox_policy(cmd) - - if not allowed: - print(f" [β›”] Sandbox Violation: {status_msg}") - tr = agent_pb2.NodeMessage( - task_response=agent_pb2.TaskResponse( - task_id=task.task_id, - status=agent_pb2.TaskResponse.ERROR, - stderr=f"SANDBOX_VIOLATION: {status_msg}", - trace_id=task.trace_id - ) - ) - msg_queue.put(tr) - continue - - if status_msg == "SENSITIVE_CONSENT_REQUIRED": - # In production: Wait for UI prompt. In POC: Log and proceed with a warning tag. - print(f" [⚠️] Sensitive Command Encountered: {cmd}. Automated approval assumed in POC.") - # ------------------------------- - - print(f" [OK] Execution starts: {cmd}") - res = subprocess.run(cmd, shell=True, capture_output=True, text=True) - - # Send result back - tr = agent_pb2.NodeMessage( - task_response=agent_pb2.TaskResponse( - task_id=task.task_id, - status=agent_pb2.TaskResponse.SUCCESS, - stdout=res.stdout, - stderr=res.stderr, - duration_ms=0, - trace_id=task.trace_id - ) - ) - msg_queue.put(tr) - else: - print(f" [FAIL] Invalid signature for Task {task.task_id}! REJECTING.") - except grpc.RpcError as e: - print(f"[!] RPC Error: {e.code()} | {e.details()}") - if e.code() == grpc.StatusCode.UNAVAILABLE: - print(" Is the server running and reachable?") - elif e.code() == grpc.StatusCode.UNAUTHENTICATED: - print(" Authentication failed. Check certificates.") diff --git a/poc-grpc-agent/orchestrator/__init__.py b/poc-grpc-agent/orchestrator/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/orchestrator/__init__.py diff --git a/poc-grpc-agent/orchestrator/app.py b/poc-grpc-agent/orchestrator/app.py new file mode 100644 index 0000000..7507f55 --- /dev/null +++ b/poc-grpc-agent/orchestrator/app.py @@ -0,0 +1,91 @@ +import grpc +import time +import os +import sys +from concurrent import futures + +# Add root to path to find protos +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from protos import agent_pb2, agent_pb2_grpc +from orchestrator.config import ( + CERT_CA, CERT_SERVER_CRT, CERT_SERVER_KEY, + GRPC_HOST, GRPC_PORT, SIMULATION_DELAY_SEC, MAX_WORKERS +) +from orchestrator.services.grpc_server import AgentOrchestrator + +def serve(): + print(f"[πŸ›‘οΈ] Boss Plane Orchestrator Starting on {GRPC_HOST}:{GRPC_PORT}...") + + # 1. SSL/TLS Setup + with open(CERT_SERVER_KEY, 'rb') as f: pkey = f.read() + with open(CERT_SERVER_CRT, 'rb') as f: cert = f.read() + with open(CERT_CA, 'rb') as f: ca = f.read() + creds = grpc.ssl_server_credentials([(pkey, cert)], ca, True) + + # 2. Server Initialization + server = grpc.server(futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)) + orch = AgentOrchestrator() + agent_pb2_grpc.add_AgentOrchestratorServicer_to_server(orch, server) + + server.add_secure_port(f'{GRPC_HOST}:{GRPC_PORT}', creds) + + # 3. Start + server.start() + print("[πŸ›‘οΈ] Boss Plane Refactored & Online.", flush=True) + + # 4. Simulation Launcher + # (In Production, this would be an API interface or Webhook handler) + _run_simulation(orch) + + server.wait_for_termination() + +def _run_simulation(orch): + """Refactored AI Simulation logic using the TaskAssistant service.""" + time.sleep(SIMULATION_DELAY_SEC) + print("\n[🧠] AI Simulation Start...", flush=True) + + target_node = "agent-node-007" + + # Phase 1: Shell + res_single = orch.assistant.dispatch_single(target_node, 'uname -a') + print(f" Uname Output: {res_single}", flush=True) + + # Phase 4: Browser Bridge + print("\n[🧠] AI Phase 4: Navigating Browser (Antigravity Bridge)...") + nav_action = agent_pb2.BrowserAction( + action=agent_pb2.BrowserAction.NAVIGATE, + url="https://example.com", + session_id="antigravity-session-1" + ) + res_nav = orch.assistant.dispatch_browser(target_node, nav_action) + print(f" Nav Result: {res_nav}") + + # Phase 4 Pro: Perception & Evaluation + print("\n[🧠] AI Phase 4 Pro: Perception & Advanced Logic...") + a11y_action = agent_pb2.BrowserAction( + action=agent_pb2.BrowserAction.GET_A11Y, + session_id="antigravity-session-1" + ) + res_a11y = orch.assistant.dispatch_browser(target_node, a11y_action) + print(f" A11y Result: {res_a11y.get('browser', {}).get('a11y')}") + + eval_action = agent_pb2.BrowserAction( + action=agent_pb2.BrowserAction.EVAL, + text="window.performance.now()", + session_id="antigravity-session-1" + ) + res_eval = orch.assistant.dispatch_browser(target_node, eval_action) + print(f" Eval Result: {res_eval.get('browser', {}).get('eval')}") + + # Real-time Events + print("\n[🧠] AI Phase 4 Pro: Triggering Real-time Events (Tunneling)...") + trigger_action = agent_pb2.BrowserAction( + action=agent_pb2.BrowserAction.EVAL, + text="console.log('Refactored Hello!'); fetch('https://example.com/api/ping');", + session_id="antigravity-session-1" + ) + orch.assistant.dispatch_browser(target_node, trigger_action) + +if __name__ == '__main__': + serve() diff --git a/poc-grpc-agent/orchestrator/config.py b/poc-grpc-agent/orchestrator/config.py new file mode 100644 index 0000000..2493988 --- /dev/null +++ b/poc-grpc-agent/orchestrator/config.py @@ -0,0 +1,17 @@ +import os + +# 12-Factor Config: Load from environment variables with defaults +SECRET_KEY = os.getenv("ORCHESTRATOR_SECRET_KEY", "cortex-secret-shared-key") + +# Network Settings +GRPC_HOST = os.getenv("GRPC_HOST", "[::]") +GRPC_PORT = os.getenv("GRPC_PORT", "50051") + +# Certificate Paths +CERT_CA = os.getenv("CERT_CA", "certs/ca.crt") +CERT_SERVER_CRT = os.getenv("CERT_SERVER_CRT", "certs/server.crt") +CERT_SERVER_KEY = os.getenv("CERT_SERVER_KEY", "certs/server.key") + +# Operational Settings +SIMULATION_DELAY_SEC = int(os.getenv("SIMULATION_DELAY_SEC", "10")) +MAX_WORKERS = int(os.getenv("MAX_WORKERS", "10")) diff --git a/poc-grpc-agent/orchestrator/core/__init__.py b/poc-grpc-agent/orchestrator/core/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/orchestrator/core/__init__.py diff --git a/poc-grpc-agent/orchestrator/core/journal.py b/poc-grpc-agent/orchestrator/core/journal.py new file mode 100644 index 0000000..b223f2f --- /dev/null +++ b/poc-grpc-agent/orchestrator/core/journal.py @@ -0,0 +1,34 @@ +import threading + +class TaskJournal: + """State machine for tracking tasks through their asynchronous lifecycle.""" + def __init__(self): + self.lock = threading.Lock() + self.tasks = {} # task_id -> { "event": Event, "result": None, "node_id": str } + + def register(self, task_id, node_id=None): + """Initializes state for a new task and returns its notification event.""" + event = threading.Event() + with self.lock: + self.tasks[task_id] = {"event": event, "result": None, "node_id": node_id} + return event + + def fulfill(self, task_id, result): + """Processes a result from a node and triggers the waiting thread.""" + with self.lock: + if task_id in self.tasks: + self.tasks[task_id]["result"] = result + self.tasks[task_id]["event"].set() + return True + return False + + def get_result(self, task_id): + """Returns the result associated with the given task ID.""" + with self.lock: + data = self.tasks.get(task_id) + return data["result"] if data else None + + def pop(self, task_id): + """Removes the task's state from the journal.""" + with self.lock: + return self.tasks.pop(task_id, None) diff --git a/poc-grpc-agent/orchestrator/core/pool.py b/poc-grpc-agent/orchestrator/core/pool.py new file mode 100644 index 0000000..222a28b --- /dev/null +++ b/poc-grpc-agent/orchestrator/core/pool.py @@ -0,0 +1,20 @@ +import threading + +class GlobalWorkPool: + """Thread-safe pool of unassigned tasks that can be claimed by any node.""" + def __init__(self): + self.lock = threading.Lock() + self.available = {"shared-001": "uname -a", "shared-002": "uptime"} + + def claim(self, task_id, node_id): + """Allows a node to pull a specific task from the pool.""" + with self.lock: + if task_id in self.available: + print(f" [πŸ“¦] Task {task_id} Claimed by {node_id}") + return True, self.available.pop(task_id) + return False, None + + def list_available(self): + """Returns IDs of all currently available unclaimed tasks.""" + with self.lock: + return list(self.available.keys()) diff --git a/poc-grpc-agent/orchestrator/core/registry.py b/poc-grpc-agent/orchestrator/core/registry.py new file mode 100644 index 0000000..b40e6e4 --- /dev/null +++ b/poc-grpc-agent/orchestrator/core/registry.py @@ -0,0 +1,36 @@ +import threading +import queue + +class AbstractNodeRegistry: + """Interface for finding and tracking Agent Nodes.""" + def register(self, node_id: str, q: queue.Queue, metadata: dict): raise NotImplementedError + def update_stats(self, node_id: str, stats: dict): raise NotImplementedError + def get_best(self) -> str: raise NotImplementedError + def get_node(self, node_id: str) -> dict: raise NotImplementedError + +class MemoryNodeRegistry(AbstractNodeRegistry): + """In-memory implementation of the Node Registry.""" + def __init__(self): + self.lock = threading.Lock() + self.nodes = {} # node_id -> { stats: {}, queue: queue, metadata: {} } + + def register(self, node_id, q, metadata): + with self.lock: + self.nodes[node_id] = {"stats": {}, "queue": q, "metadata": metadata} + print(f"[πŸ“‹] Registered Agent Node: {node_id}") + + def update_stats(self, node_id, stats): + with self.lock: + if node_id in self.nodes: + self.nodes[node_id]["stats"].update(stats) + + def get_best(self): + """Picks the agent with the lowest active worker count.""" + with self.lock: + if not self.nodes: return None + # Simple heuristic: sort by active worker count + return sorted(self.nodes.items(), key=lambda x: x[1]["stats"].get("active_worker_count", 999))[0][0] + + def get_node(self, node_id): + with self.lock: + return self.nodes.get(node_id) diff --git a/poc-grpc-agent/orchestrator/services/__init__.py b/poc-grpc-agent/orchestrator/services/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/orchestrator/services/__init__.py diff --git a/poc-grpc-agent/orchestrator/services/assistant.py b/poc-grpc-agent/orchestrator/services/assistant.py new file mode 100644 index 0000000..67266ba --- /dev/null +++ b/poc-grpc-agent/orchestrator/services/assistant.py @@ -0,0 +1,62 @@ +import time +import json +from orchestrator.utils.crypto import sign_payload, sign_browser_action +from protos import agent_pb2 + +class TaskAssistant: + """The 'Brain' of the Orchestrator: High-Level AI API for Dispatching Tasks.""" + def __init__(self, registry, journal, pool): + self.registry = registry + self.journal = journal + self.pool = pool + + def dispatch_single(self, node_id, cmd, timeout=30): + """Dispatches a shell command to a specific node.""" + node = self.registry.get_node(node_id) + if not node: return {"error": f"Node {node_id} Offline"} + + tid = f"task-{int(time.time()*1000)}" + event = self.journal.register(tid, node_id) + + # 12-Factor Signing Logic + sig = sign_payload(cmd) + req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest( + task_id=tid, payload_json=cmd, signature=sig)) + + print(f"[πŸ“€] Dispatching shell {tid} to {node_id}") + node["queue"].put(req) + + if event.wait(timeout): + res = self.journal.get_result(tid) + self.journal.pop(tid) + return res + self.journal.pop(tid) + return {"error": "Timeout"} + + def dispatch_browser(self, node_id, action, timeout=60): + """Dispatches a browser action to a directed session node.""" + node = self.registry.get_node(node_id) + if not node: return {"error": f"Node {node_id} Offline"} + + tid = f"br-{int(time.time()*1000)}" + event = self.journal.register(tid, node_id) + + # Secure Browser Signing + sig = sign_browser_action( + agent_pb2.BrowserAction.ActionType.Name(action.action), + action.url, + action.session_id + ) + + req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest( + task_id=tid, browser_action=action, signature=sig)) + + print(f"[πŸŒπŸ“€] Dispatching browser {tid} to {node_id}") + node["queue"].put(req) + + if event.wait(timeout): + res = self.journal.get_result(tid) + self.journal.pop(tid) + return res + self.journal.pop(tid) + return {"error": "Timeout"} diff --git a/poc-grpc-agent/orchestrator/services/grpc_server.py b/poc-grpc-agent/orchestrator/services/grpc_server.py new file mode 100644 index 0000000..7112d89 --- /dev/null +++ b/poc-grpc-agent/orchestrator/services/grpc_server.py @@ -0,0 +1,113 @@ +import threading +import queue +import time +from protos import agent_pb2, agent_pb2_grpc +from orchestrator.core.registry import MemoryNodeRegistry +from orchestrator.core.journal import TaskJournal +from orchestrator.core.pool import GlobalWorkPool +from orchestrator.services.assistant import TaskAssistant +from orchestrator.utils.crypto import sign_payload + +class AgentOrchestrator(agent_pb2_grpc.AgentOrchestratorServicer): + """Refactored gRPC Servicer for Agent Orchestration.""" + def __init__(self): + self.registry = MemoryNodeRegistry() + self.journal = TaskJournal() + self.pool = GlobalWorkPool() + self.assistant = TaskAssistant(self.registry, self.journal, self.pool) + + def SyncConfiguration(self, request, context): + """Standard Handshake: Authenticate and Send Policy.""" + # Pre-registration for metadata search + self.registry.register(request.node_id, queue.Queue(), { + "desc": request.node_description, + "caps": dict(request.capabilities) + }) + + # 12-Factor Sandbox Policy (Standardized Mode) + return agent_pb2.RegistrationResponse( + success=True, + policy=agent_pb2.SandboxPolicy( + mode=agent_pb2.SandboxPolicy.STRICT, + allowed_commands=["ls", "uname", "echo", "sleep"] + ) + ) + + def TaskStream(self, request_iterator, context): + """Persistent Bi-directional Stream for Command & Control.""" + try: + # 1. Blocking wait for Node Identity + first_msg = next(request_iterator) + if first_msg.WhichOneof('payload') != 'announce': + print("[!] Stream rejected: No NodeAnnounce") + return + + node_id = first_msg.announce.node_id + node = self.registry.get_node(node_id) + if not node: + print(f"[!] Stream rejected: Node {node_id} not registered") + return + + print(f"[πŸ“Ά] Stream Online for {node_id}") + + # 2. Results Listener (Read Thread) + def _read_results(): + for msg in request_iterator: + self._handle_client_message(msg, node_id, node) + + threading.Thread(target=_read_results, daemon=True, name=f"Results-{node_id}").start() + + # 3. Work Dispatcher (Main Stream) + while context.is_active(): + try: + # Non-blocking wait to check context periodically + msg = node["queue"].get(timeout=1.0) + yield msg + print(f" [πŸš€] Streamed message to {node_id}") + except queue.Empty: + # Minimal background traffic keeps connection alive + if self.pool.available: + # Only broadcast every 5s or if state changes in a real system + yield agent_pb2.ServerTaskMessage( + work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=self.pool.list_available()) + ) + continue + + except StopIteration: pass + except Exception as e: + print(f"[!] TaskStream Error for {node_id}: {e}") + + def _handle_client_message(self, msg, node_id, node): + kind = msg.WhichOneof('payload') + if kind == 'task_claim': + success, payload = self.pool.claim(msg.task_claim.task_id, node_id) + if success: + sig = sign_payload(payload) + node["queue"].put(agent_pb2.ServerTaskMessage( + task_request=agent_pb2.TaskRequest(task_id=msg.task_claim.task_id, payload_json=payload, signature=sig))) + + elif kind == 'task_response': + res_obj = {"stdout": msg.task_response.stdout, "status": msg.task_response.status} + if msg.task_response.HasField("browser_result"): + br = msg.task_response.browser_result + res_obj["browser"] = { + "url": br.url, "title": br.title, "has_snapshot": len(br.snapshot) > 0, + "a11y": br.a11y_tree[:100] + "..." if br.a11y_tree else None, + "eval": br.eval_result + } + self.journal.fulfill(msg.task_response.task_id, res_obj) + + elif kind == 'browser_event': + e = msg.browser_event + prefix = "[πŸ–₯️] Live Console" if e.HasField("console_msg") else "[🌐] Net Inspect" + content = e.console_msg.text if e.HasField("console_msg") else f"{e.network_req.method} {e.network_req.url}" + print(f" {prefix}: {content}", flush=True) + + def ReportHealth(self, request_iterator, context): + """Collect Health Metrics and Feed Policy Updates.""" + for hb in request_iterator: + self.registry.update_stats(hb.node_id, { + "active_worker_count": hb.active_worker_count, + "running": list(hb.running_task_ids) + }) + yield agent_pb2.HealthCheckResponse(server_time_ms=int(time.time()*1000)) diff --git a/poc-grpc-agent/orchestrator/utils/__init__.py b/poc-grpc-agent/orchestrator/utils/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/orchestrator/utils/__init__.py diff --git a/poc-grpc-agent/orchestrator/utils/crypto.py b/poc-grpc-agent/orchestrator/utils/crypto.py new file mode 100644 index 0000000..c34a495 --- /dev/null +++ b/poc-grpc-agent/orchestrator/utils/crypto.py @@ -0,0 +1,17 @@ +import hmac +import hashlib +from orchestrator.config import SECRET_KEY + +def sign_payload(payload: str) -> str: + """Signs a string payload using HMAC-SHA256.""" + return hmac.new(SECRET_KEY.encode(), payload.encode(), hashlib.sha256).hexdigest() + +def sign_browser_action(action_type: str, url: str, session_id: str) -> str: + """Signs a browser action based on its key identify fields.""" + sign_base = f"{action_type}:{url}:{session_id}" + return sign_payload(sign_base) + +def verify_signature(payload: str, signature: str) -> bool: + """Verifies a signature against a payload using HMAC-SHA256.""" + expected = sign_payload(payload) + return hmac.compare_digest(signature, expected) diff --git a/poc-grpc-agent/protos/__init__.py b/poc-grpc-agent/protos/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/protos/__init__.py diff --git a/poc-grpc-agent/protos/agent.proto b/poc-grpc-agent/protos/agent.proto index 542a79c..35226bd 100644 --- a/poc-grpc-agent/protos/agent.proto +++ b/poc-grpc-agent/protos/agent.proto @@ -4,74 +4,184 @@ // The Cortex Server exposes this service service AgentOrchestrator { - // Bi-directional stream for persistent connection (Phone Home Pattern) - rpc Connect(stream NodeMessage) returns (stream ServerMessage); + // 1. Control Channel: Sync policies and settings (Unary) + rpc SyncConfiguration(RegistrationRequest) returns (RegistrationResponse); + + // 2. Task Channel: Bidirectional work dispatch and reporting (Persistent) + rpc TaskStream(stream ClientTaskMessage) returns (stream ServerTaskMessage); + + // 3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) + rpc ReportHealth(stream Heartbeat) returns (stream HealthCheckResponse); } -// Sent from the Agent Node Client to the Cortex Server -message NodeMessage { - oneof payload { - RegistrationRequest registration = 1; - Heartbeat heartbeat = 2; - TaskResponse task_response = 3; - } -} - -// Sent from the Cortex Server to the Agent Node Client -message ServerMessage { - oneof payload { - RegistrationResponse registration_ack = 1; - TaskRequest task_request = 2; - } -} - +// --- Channel 1: Registration & Policy --- message RegistrationRequest { string node_id = 1; string version = 2; - string platform = 3; - map capabilities = 4; - string auth_token = 5; // Short-lived JWT identifying the User/Workspace + string auth_token = 3; + string node_description = 4; // AI-readable description of this node's role + map capabilities = 5; // e.g. "gpu": "nvidia-3080", "os": "ubuntu-22.04" +} + +message SandboxPolicy { + enum Mode { + STRICT = 0; + PERMISSIVE = 1; + } + Mode mode = 1; + repeated string allowed_commands = 2; + repeated string denied_commands = 3; + repeated string sensitive_commands = 4; + string working_dir_jail = 5; } message RegistrationResponse { bool success = 1; string error_message = 2; string session_id = 3; - string next_token_rotation_ms = 4; + SandboxPolicy policy = 4; } -message Heartbeat { - string node_id = 1; - float cpu_usage_percent = 2; - float memory_usage_percent = 3; - int32 active_task_count = 4; +// --- Channel 2: Tasks & Collaboration --- +message ClientTaskMessage { + oneof payload { + TaskResponse task_response = 1; + TaskClaimRequest task_claim = 2; + BrowserEvent browser_event = 3; + NodeAnnounce announce = 4; // NEW: Identification on stream connect + } +} + +message NodeAnnounce { + string node_id = 1; +} + +message BrowserEvent { + string session_id = 1; + oneof event { + ConsoleMessage console_msg = 2; + NetworkRequest network_req = 3; + } +} + +message ServerTaskMessage { + oneof payload { + TaskRequest task_request = 1; + WorkPoolUpdate work_pool_update = 2; + TaskClaimResponse claim_status = 3; + TaskCancelRequest task_cancel = 4; + } +} + +message TaskCancelRequest { + string task_id = 1; } message TaskRequest { string task_id = 1; - string task_type = 2; // "shell", "browser_cdp" - string payload_json = 3; + string task_type = 2; + oneof payload { + string payload_json = 3; // For legacy shell/fallback + BrowserAction browser_action = 7; // NEW: Structured Browser Skill + } int32 timeout_ms = 4; - bool cancellable = 5; - string capability_required = 6; - string idempotency_key = 7; - string trace_id = 8; // For OpenTelemetry observability - string signature = 9; // Cryptographic signature of payload_json + string trace_id = 5; + string signature = 6; +} + +message BrowserAction { + enum ActionType { + NAVIGATE = 0; + CLICK = 1; + TYPE = 2; + SCREENSHOT = 3; + GET_DOM = 4; + HOVER = 5; + SCROLL = 6; + CLOSE = 7; + EVAL = 8; + GET_A11Y = 9; + } + ActionType action = 1; + string url = 2; + string selector = 3; + string text = 4; + string session_id = 5; + int32 x = 6; + int32 y = 7; } message TaskResponse { string task_id = 1; enum Status { - UNKNOWN = 0; - SUCCESS = 1; - ERROR = 2; + SUCCESS = 0; + ERROR = 1; + TIMEOUT = 2; CANCELLED = 3; - TIMEOUT = 4; } Status status = 2; string stdout = 3; string stderr = 4; - string structured_output_json = 5; - int32 duration_ms = 6; - string trace_id = 7; + string trace_id = 5; + map artifacts = 6; + + // NEW: Structured Skill Results + oneof result { + BrowserResponse browser_result = 7; + } +} + +message BrowserResponse { + string url = 1; + string title = 2; + bytes snapshot = 3; + string dom_content = 4; + string a11y_tree = 5; + string eval_result = 6; + repeated ConsoleMessage console_history = 7; + repeated NetworkRequest network_history = 8; +} + +message ConsoleMessage { + string level = 1; + string text = 2; + int64 timestamp_ms = 3; +} + +message NetworkRequest { + string method = 1; + string url = 2; + int32 status = 3; + string resource_type = 4; + int64 latency_ms = 5; +} + +message WorkPoolUpdate { + repeated string available_task_ids = 1; +} + +message TaskClaimRequest { + string task_id = 1; + string node_id = 2; +} + +message TaskClaimResponse { + string task_id = 1; + bool granted = 2; + string reason = 3; +} + +// --- Channel 3: Health & Observation --- +message Heartbeat { + string node_id = 1; + float cpu_usage_percent = 2; + float memory_usage_percent = 3; + int32 active_worker_count = 4; + int32 max_worker_capacity = 5; + string status_message = 6; + repeated string running_task_ids = 7; +} + +message HealthCheckResponse { + int64 server_time_ms = 1; } diff --git a/poc-grpc-agent/protos/agent_pb2.py b/poc-grpc-agent/protos/agent_pb2.py new file mode 100644 index 0000000..f51d6f2 --- /dev/null +++ b/poc-grpc-agent/protos/agent_pb2.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: agent.proto +# Protobuf Python Version: 4.25.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61gent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc5\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xd2\x01\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x42\t\n\x07payload\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xe0\x01\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xbd\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xdc\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xc1\x01\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_options = b'8\001' + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._options = None + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_options = b'8\001' + _globals['_REGISTRATIONREQUEST']._serialized_start=23 + _globals['_REGISTRATIONREQUEST']._serialized_end=245 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=194 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=245 + _globals['_SANDBOXPOLICY']._serialized_start=248 + _globals['_SANDBOXPOLICY']._serialized_end=445 + _globals['_SANDBOXPOLICY_MODE']._serialized_start=411 + _globals['_SANDBOXPOLICY_MODE']._serialized_end=445 + _globals['_REGISTRATIONRESPONSE']._serialized_start=447 + _globals['_REGISTRATIONRESPONSE']._serialized_end=567 + _globals['_CLIENTTASKMESSAGE']._serialized_start=570 + _globals['_CLIENTTASKMESSAGE']._serialized_end=780 + _globals['_NODEANNOUNCE']._serialized_start=782 + _globals['_NODEANNOUNCE']._serialized_end=813 + _globals['_BROWSEREVENT']._serialized_start=816 + _globals['_BROWSEREVENT']._serialized_end=951 + _globals['_SERVERTASKMESSAGE']._serialized_start=954 + _globals['_SERVERTASKMESSAGE']._serialized_end=1178 + _globals['_TASKCANCELREQUEST']._serialized_start=1180 + _globals['_TASKCANCELREQUEST']._serialized_end=1216 + _globals['_TASKREQUEST']._serialized_start=1219 + _globals['_TASKREQUEST']._serialized_end=1408 + _globals['_BROWSERACTION']._serialized_start=1411 + _globals['_BROWSERACTION']._serialized_end=1699 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1565 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=1699 + _globals['_TASKRESPONSE']._serialized_start=1702 + _globals['_TASKRESPONSE']._serialized_end=2054 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=1934 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=1982 + _globals['_TASKRESPONSE_STATUS']._serialized_start=1984 + _globals['_TASKRESPONSE_STATUS']._serialized_end=2044 + _globals['_BROWSERRESPONSE']._serialized_start=2057 + _globals['_BROWSERRESPONSE']._serialized_end=2277 + _globals['_CONSOLEMESSAGE']._serialized_start=2279 + _globals['_CONSOLEMESSAGE']._serialized_end=2346 + _globals['_NETWORKREQUEST']._serialized_start=2348 + _globals['_NETWORKREQUEST']._serialized_end=2452 + _globals['_WORKPOOLUPDATE']._serialized_start=2454 + _globals['_WORKPOOLUPDATE']._serialized_end=2498 + _globals['_TASKCLAIMREQUEST']._serialized_start=2500 + _globals['_TASKCLAIMREQUEST']._serialized_end=2552 + _globals['_TASKCLAIMRESPONSE']._serialized_start=2554 + _globals['_TASKCLAIMRESPONSE']._serialized_end=2623 + _globals['_HEARTBEAT']._serialized_start=2626 + _globals['_HEARTBEAT']._serialized_end=2819 + _globals['_HEALTHCHECKRESPONSE']._serialized_start=2821 + _globals['_HEALTHCHECKRESPONSE']._serialized_end=2866 + _globals['_AGENTORCHESTRATOR']._serialized_start=2869 + _globals['_AGENTORCHESTRATOR']._serialized_end=3102 +# @@protoc_insertion_point(module_scope) diff --git a/poc-grpc-agent/protos/agent_pb2_grpc.py b/poc-grpc-agent/protos/agent_pb2_grpc.py new file mode 100644 index 0000000..b91c8a0 --- /dev/null +++ b/poc-grpc-agent/protos/agent_pb2_grpc.py @@ -0,0 +1,138 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from . import agent_pb2 as agent__pb2 + + +class AgentOrchestratorStub(object): + """The Cortex Server exposes this service + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SyncConfiguration = channel.unary_unary( + '/agent.AgentOrchestrator/SyncConfiguration', + request_serializer=agent__pb2.RegistrationRequest.SerializeToString, + response_deserializer=agent__pb2.RegistrationResponse.FromString, + ) + self.TaskStream = channel.stream_stream( + '/agent.AgentOrchestrator/TaskStream', + request_serializer=agent__pb2.ClientTaskMessage.SerializeToString, + response_deserializer=agent__pb2.ServerTaskMessage.FromString, + ) + self.ReportHealth = channel.stream_stream( + '/agent.AgentOrchestrator/ReportHealth', + request_serializer=agent__pb2.Heartbeat.SerializeToString, + response_deserializer=agent__pb2.HealthCheckResponse.FromString, + ) + + +class AgentOrchestratorServicer(object): + """The Cortex Server exposes this service + """ + + def SyncConfiguration(self, request, context): + """1. Control Channel: Sync policies and settings (Unary) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def TaskStream(self, request_iterator, context): + """2. Task Channel: Bidirectional work dispatch and reporting (Persistent) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ReportHealth(self, request_iterator, context): + """3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_AgentOrchestratorServicer_to_server(servicer, server): + rpc_method_handlers = { + 'SyncConfiguration': grpc.unary_unary_rpc_method_handler( + servicer.SyncConfiguration, + request_deserializer=agent__pb2.RegistrationRequest.FromString, + response_serializer=agent__pb2.RegistrationResponse.SerializeToString, + ), + 'TaskStream': grpc.stream_stream_rpc_method_handler( + servicer.TaskStream, + request_deserializer=agent__pb2.ClientTaskMessage.FromString, + response_serializer=agent__pb2.ServerTaskMessage.SerializeToString, + ), + 'ReportHealth': grpc.stream_stream_rpc_method_handler( + servicer.ReportHealth, + request_deserializer=agent__pb2.Heartbeat.FromString, + response_serializer=agent__pb2.HealthCheckResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'agent.AgentOrchestrator', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class AgentOrchestrator(object): + """The Cortex Server exposes this service + """ + + @staticmethod + def SyncConfiguration(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/agent.AgentOrchestrator/SyncConfiguration', + agent__pb2.RegistrationRequest.SerializeToString, + agent__pb2.RegistrationResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def TaskStream(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/TaskStream', + agent__pb2.ClientTaskMessage.SerializeToString, + agent__pb2.ServerTaskMessage.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def ReportHealth(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/ReportHealth', + agent__pb2.Heartbeat.SerializeToString, + agent__pb2.HealthCheckResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/poc-grpc-agent/requirements.txt b/poc-grpc-agent/requirements.txt index 2366f18..6e33b6d 100644 --- a/poc-grpc-agent/requirements.txt +++ b/poc-grpc-agent/requirements.txt @@ -1,2 +1,4 @@ grpcio==1.62.1 grpcio-tools==1.62.1 +PyJWT==2.8.0 +playwright==1.42.0 diff --git a/poc-grpc-agent/server.py b/poc-grpc-agent/server.py deleted file mode 100644 index dc4a37c..0000000 --- a/poc-grpc-agent/server.py +++ /dev/null @@ -1,191 +0,0 @@ -import grpc -from concurrent import futures -import time -import agent_pb2 -import agent_pb2_grpc -import queue -import threading -import jwt -import hmac -import hashlib - -# In production, these would be in .env -SECRET_KEY = "cortex-secret-shared-key" - -class AgentOrchestratorServicer(agent_pb2_grpc.AgentOrchestratorServicer): - def __init__(self): - self.nodes = {} # node_id -> queue for messages to node - - def Connect(self, request_iterator, context): - node_id = None - - # We need a way to send messages to the client from another thread/input - # In a real app, this would be triggered by an AI task planner - send_queue = queue.Queue() - - def stream_messages(): - while context.is_active(): - try: - msg = send_queue.get(timeout=1.0) - yield msg - except queue.Empty: - continue - - # Start a thread to handle incoming messages from the client - incoming_thread = threading.Thread(target=self._handle_incoming, args=(request_iterator, context, send_queue)) - incoming_thread.start() - - # Yield messages from the queue to the client - for msg in stream_messages(): - yield msg - - def _handle_incoming(self, request_iterator, context, send_queue): - try: - for message in request_iterator: - payload_type = message.WhichOneof('payload') - if payload_type == 'registration': - reg = message.registration - print(f"[*] Node {reg.node_id} registration request. Verifying token...") - - try: - # Verify JWT - # In real app, we check if node_id matches token subject - decoded = jwt.decode(reg.auth_token, SECRET_KEY, algorithms=["HS256"]) - print(f" [OK] Token verified for workspace: {decoded.get('workspace_id')}") - - # Send ACK - ack = agent_pb2.ServerMessage( - registration_ack=agent_pb2.RegistrationResponse( - success=True, - session_id="session-secure-123" - ) - ) - send_queue.put(ack) - - # Test 1: Allowed Command - t1_payload = '{"command": "whoami"}' - t1_sig = hmac.new(SECRET_KEY.encode(), t1_payload.encode(), hashlib.sha256).hexdigest() - send_queue.put(agent_pb2.ServerMessage( - task_request=agent_pb2.TaskRequest( - task_id="task-001-ALLOWED", - task_type="shell", - payload_json=t1_payload, - trace_id="trace-001", - signature=t1_sig - ) - )) - - # Test 2: Sensitive Command (Consent Required) - t2_payload = '{"command": "rm -rf /tmp/node-test"}' - t2_sig = hmac.new(SECRET_KEY.encode(), t2_payload.encode(), hashlib.sha256).hexdigest() - send_queue.put(agent_pb2.ServerMessage( - task_request=agent_pb2.TaskRequest( - task_id="task-002-SENSITIVE", - task_type="shell", - payload_json=t2_payload, - trace_id="trace-002", - signature=t2_sig - ) - )) - - # Test 3: Path Traversal Attempt (JAILBREAK) - t3_payload = '{"command": "cat ../.env.gitbucket"}' - t3_sig = hmac.new(SECRET_KEY.encode(), t3_payload.encode(), hashlib.sha256).hexdigest() - send_queue.put(agent_pb2.ServerMessage( - task_request=agent_pb2.TaskRequest( - task_id="task-003-TRAVERSAL", - task_type="shell", - payload_json=t3_payload, - trace_id="trace-003", - signature=t3_sig - ) - )) - - # Test 4: STRICTLY Forbidden - t4_payload = '{"command": "sudo apt update"}' - t4_sig = hmac.new(SECRET_KEY.encode(), t4_payload.encode(), hashlib.sha256).hexdigest() - send_queue.put(agent_pb2.ServerMessage( - task_request=agent_pb2.TaskRequest( - task_id="task-004-FORBIDDEN", - task_type="shell", - payload_json=t4_payload, - trace_id="trace-004", - signature=t4_sig - ) - )) - - # Test 5: Non-whitelisted but Allowed in PERMISSIVE - t5_payload = '{"command": "df -h"}' - t5_sig = hmac.new(SECRET_KEY.encode(), t5_payload.encode(), hashlib.sha256).hexdigest() - send_queue.put(agent_pb2.ServerMessage( - task_request=agent_pb2.TaskRequest( - task_id="task-005-PERMISSIVE", - task_type="shell", - payload_json=t5_payload, - trace_id="trace-005", - signature=t5_sig - ) - )) - - print("[*] Sequence of 5 test tasks dispatched to verify Sandbox Policy (PERMISSIVE mode).") - - except jwt.ExpiredSignatureError: - print(f" [FAIL] Token for {reg.node_id} expired.") - ack = agent_pb2.ServerMessage( - registration_ack=agent_pb2.RegistrationResponse( - success=False, - error_message="Authentication token expired." - ) - ) - send_queue.put(ack) - except jwt.InvalidTokenError as e: - print(f" [FAIL] Invalid token for {reg.node_id}: {e}") - ack = agent_pb2.ServerMessage( - registration_ack=agent_pb2.RegistrationResponse( - success=False, - error_message=f"Invalid authentication token: {e}" - ) - ) - send_queue.put(ack) - - elif payload_type == 'heartbeat': - hb = message.heartbeat - # print(f"[+] Heartbeat from {hb.node_id}: CPU {hb.cpu_usage_percent}%") - pass - - elif payload_type == 'task_response': - res = message.task_response - print(f"[!] Task Finished: {res.task_id} | Status: {res.status}") - print(f" Stdout: {res.stdout.strip()}") - except Exception as e: - print(f"[!] Error handling incoming stream: {e}") - -def serve(): - # Load certificates for mTLS - print("[πŸ”] Loading mTLS certificates...") - with open('certs/server.key', 'rb') as f: - private_key = f.read() - with open('certs/server.crt', 'rb') as f: - certificate_chain = f.read() - with open('certs/ca.crt', 'rb') as f: - root_certificates = f.read() - - # Create server credentials - # require_client_auth=True enforces bidirectional verification (mTLS) - server_credentials = grpc.ssl_server_credentials( - [(private_key, certificate_chain)], - root_certificates=root_certificates, - require_client_auth=True - ) - - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - agent_pb2_grpc.add_AgentOrchestratorServicer_to_server(AgentOrchestratorServicer(), server) - - # Use secure_port instead of insecure_port - server.add_secure_port('[::]:50051', server_credentials) - print("[*] Cortex Secure Server POC listening on port 50051 (mTLS enabled)...") - server.start() - server.wait_for_termination() - -if __name__ == '__main__': - serve()