diff --git a/poc-grpc-agent/.nfs00000000006c16c80000003a b/poc-grpc-agent/.nfs00000000006c16c80000003a new file mode 100644 index 0000000..be501d1 --- /dev/null +++ b/poc-grpc-agent/.nfs00000000006c16c80000003a @@ -0,0 +1,33 @@ +[🛡️] Boss Plane Orchestrator Starting on [::]:50051... +[🛡️] Boss Plane Refactored & Online. +[📋] Registered Agent Node: agent-node-007 +[📶] Stream Online for agent-node-007 + [📦] Task shared-001 Claimed by agent-node-007 + [📦] Task shared-002 Claimed by agent-node-007 + [🚀] Streamed message to agent-node-007 + [🚀] Streamed message to agent-node-007 + +[🧠] AI Simulation Start... +[📤] Dispatching shell task-1772514336015 to agent-node-007 + [🚀] Streamed message to agent-node-007 + Uname Output: {'stdout': 'Linux d1ceb63b86a7 6.10.11-linuxkit #1 SMP Thu Oct 3 10:17:28 UTC 2024 aarch64 GNU/Linux\n', 'status': 0} + +[🧠] AI Phase 4: Navigating Browser (Antigravity Bridge)... +[🌐📤] Dispatching browser br-1772514336028 to agent-node-007 + [🚀] Streamed message to agent-node-007 + [🌐] Net Inspect: GET https://example.com/ + Nav Result: {'stdout': '', 'status': 0, 'browser': {'url': 'https://example.com/', 'title': 'Example Domain', 'has_snapshot': False, 'a11y': None, 'eval': ''}} + +[🧠] AI Phase 4 Pro: Perception & Advanced Logic... +[🌐📤] Dispatching browser br-1772514336293 to agent-node-007 + [🚀] Streamed message to agent-node-007 + A11y Result: {"role": "WebArea", "name": "Example Domain", "children": [{"role": "heading", "name": "Example Doma... +[🌐📤] Dispatching browser br-1772514336300 to agent-node-007 + [🚀] Streamed message to agent-node-007 + Eval Result: 115.89999997615814 + +[🧠] AI Phase 4 Pro: Triggering Real-time Events (Tunneling)... +[🌐📤] Dispatching browser br-1772514336305 to agent-node-007 + [🚀] Streamed message to agent-node-007 + [🖥️] Live Console: Refactored Hello! + [🖥️] Live Console: Failed to load resource: the server responded with a status of 404 () diff --git a/poc-grpc-agent/.nfs00000000006c16c900000039 b/poc-grpc-agent/.nfs00000000006c16c900000039 new file mode 100644 index 0000000..9d3246b --- /dev/null +++ b/poc-grpc-agent/.nfs00000000006c16c900000039 @@ -0,0 +1,51 @@ +[*] Starting Antigravity Agent Node: agent-node-007... +[🌐] Browser Actor Starting... +[*] Handshake with Orchestrator: agent-node-007 +[OK] Sandbox Policy Synced. +[*] Task Stream Online: agent-node-007 +[🌐] Browser Engine Online. + [📥] Received from Stream: work_pool_update +[*] Inbound: work_pool_update + [📥] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: shared-001 +[✅] Validated task shared-001 + [🐚] Executing Shell: uname -a + [📥] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: shared-002 +[✅] Validated task shared-002 +[*] Completion: shared-002 + [🐚] Shell Done: uname -a | Stdout Size: 90 +[*] Completion: shared-001 + [📥] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: task-1772514336015 +[✅] Validated task task-1772514336015 + [🐚] Executing Shell: uname -a + [🐚] Shell Done: uname -a | Stdout Size: 90 +[*] Completion: task-1772514336015 + [📥] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: br-1772514336028 +[✅] Validated task br-1772514336028 + [🌐] Browser Actor Processing: NAVIGATE | Session: antigravity-session-1 +[*] Completion: br-1772514336028 + [📥] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: br-1772514336293 +[✅] Validated task br-1772514336293 + [🌐] Browser Actor Processing: GET_A11Y | Session: antigravity-session-1 +[*] Completion: br-1772514336293 + [📥] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: br-1772514336300 +[✅] Validated task br-1772514336300 + [🌐] Browser Actor Processing: EVAL | Session: antigravity-session-1 +[*] Completion: br-1772514336300 + [📥] Received from Stream: task_request +[*] Inbound: task_request +[*] Task Launch: br-1772514336305 +[✅] Validated task br-1772514336305 + [🌐] Browser Actor Processing: EVAL | Session: antigravity-session-1 +[*] Completion: br-1772514336305 diff --git a/poc-grpc-agent/agent_node/__init__.py b/poc-grpc-agent/agent_node/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/agent_node/__init__.py diff --git a/poc-grpc-agent/agent_node/config.py b/poc-grpc-agent/agent_node/config.py new file mode 100644 index 0000000..d0c5f62 --- /dev/null +++ b/poc-grpc-agent/agent_node/config.py @@ -0,0 +1,20 @@ +import os +import platform + +# 12-Factor Config: Environment variables with defaults +SECRET_KEY = os.getenv("AGENT_SECRET_KEY", "cortex-secret-shared-key") +NODE_ID = os.getenv("AGENT_NODE_ID", "agent-node-007") +NODE_DESC = os.getenv("AGENT_NODE_DESC", "Modular Stateful Node") + +# Orchestrator Connection +SERVER_HOST = os.getenv("SERVER_HOST", "localhost") +SERVER_PORT = os.getenv("SERVER_PORT", "50051") + +# Certificate Paths +CERT_CA = os.getenv("CERT_CA", "certs/ca.crt") +CERT_CLIENT_CRT = os.getenv("CERT_CLIENT_CRT", "certs/client.crt") +CERT_CLIENT_KEY = os.getenv("CERT_CLIENT_KEY", "certs/client.key") + +# Resource Limits +MAX_SKILL_WORKERS = int(os.getenv("MAX_SKILL_WORKERS", "5")) +HEALTH_REPORT_INTERVAL = int(os.getenv("HEALTH_REPORT_INTERVAL", "10")) diff --git a/poc-grpc-agent/agent_node/core/__init__.py b/poc-grpc-agent/agent_node/core/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/agent_node/core/__init__.py diff --git a/poc-grpc-agent/agent_node/core/sandbox.py b/poc-grpc-agent/agent_node/core/sandbox.py new file mode 100644 index 0000000..9f9390c --- /dev/null +++ b/poc-grpc-agent/agent_node/core/sandbox.py @@ -0,0 +1,31 @@ +from protos import agent_pb2 + +class SandboxEngine: + """Core Security Engine for Local Command Verification.""" + def __init__(self): + self.policy = None + + def sync(self, p): + """Syncs the latest policy from the Orchestrator.""" + self.policy = { + "MODE": "STRICT" if p.mode == agent_pb2.SandboxPolicy.STRICT else "PERMISSIVE", + "ALLOWED": list(p.allowed_commands), + "DENIED": list(p.denied_commands), + "SENSITIVE": list(p.sensitive_commands) + } + + def verify(self, command_str): + """Verifies if a command string is allowed under the current policy.""" + if not self.policy: return False, "No Policy" + + parts = (command_str or "").strip().split() + if not parts: return False, "Empty" + + base_cmd = parts[0] + if base_cmd in self.policy["DENIED"]: + return False, f"Forbidden command: {base_cmd}" + + if self.policy["MODE"] == "STRICT" and base_cmd not in self.policy["ALLOWED"]: + return False, f"Command '{base_cmd}' not whitelisted" + + return True, "OK" diff --git a/poc-grpc-agent/agent_node/main.py b/poc-grpc-agent/agent_node/main.py new file mode 100644 index 0000000..a39c687 --- /dev/null +++ b/poc-grpc-agent/agent_node/main.py @@ -0,0 +1,30 @@ +import sys +import os + +# Add root to path to find protos and other packages +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from agent_node.node import AgentNode +from agent_node.config import NODE_ID + +def main(): + print(f"[*] Starting Antigravity Agent Node: {NODE_ID}...") + + # 1. Initialization + node = AgentNode() + + # 2. Handshake: Sync configuration and Sandbox Policy + node.sync_configuration() + + # 3. Background: Start health reporting (Heartbeats) + node.start_health_reporting() + + # 4. Foreground: Run Persistent Task Stream + node.run_task_stream() + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print("\n[🛑] Agent Node Shutdown.") + sys.exit(0) diff --git a/poc-grpc-agent/agent_node/node.py b/poc-grpc-agent/agent_node/node.py new file mode 100644 index 0000000..59a1e2c --- /dev/null +++ b/poc-grpc-agent/agent_node/node.py @@ -0,0 +1,141 @@ +import threading +import queue +import time +import sys +from protos import agent_pb2, agent_pb2_grpc +from agent_node.skills.manager import SkillManager +from agent_node.core.sandbox import SandboxEngine +from agent_node.utils.auth import create_auth_token, verify_task_signature +from agent_node.utils.network import get_secure_stub +from agent_node.config import NODE_ID, NODE_DESC, HEALTH_REPORT_INTERVAL, MAX_SKILL_WORKERS + +class AgentNode: + """The 'Agent Core': Orchestrates Local Skills and Maintains gRPC Connection.""" + def __init__(self, node_id=NODE_ID): + self.node_id = node_id + self.skills = SkillManager(max_workers=MAX_SKILL_WORKERS) + self.sandbox = SandboxEngine() + self.task_queue = queue.Queue() + self.stub = get_secure_stub() + + def sync_configuration(self): + """Initial handshake to retrieve policy and metadata.""" + print(f"[*] Handshake with Orchestrator: {self.node_id}") + reg_req = agent_pb2.RegistrationRequest( + node_id=self.node_id, + auth_token=create_auth_token(self.node_id), + node_description=NODE_DESC, + capabilities={"shell": "v1", "browser": "playwright-sync-bridge"} + ) + + try: + res = self.stub.SyncConfiguration(reg_req) + if res.success: + self.sandbox.sync(res.policy) + print("[OK] Sandbox Policy Synced.") + else: + print(f"[!] Rejection: {res.error_message}") + sys.exit(1) + except Exception as e: + print(f"[!] Connection Fail: {e}") + sys.exit(1) + + def start_health_reporting(self): + """Streaming node metrics to the orchestrator for load balancing.""" + def _gen(): + while True: + ids = self.skills.get_active_ids() + yield agent_pb2.Heartbeat( + node_id=self.node_id, cpu_usage_percent=1.0, + active_worker_count=len(ids), + max_worker_capacity=MAX_SKILL_WORKERS, + running_task_ids=ids + ) + time.sleep(HEALTH_REPORT_INTERVAL) + + # Non-blocking thread for health heartbeat + threading.Thread( + target=lambda: list(self.stub.ReportHealth(_gen())), + daemon=True, name=f"Health-{self.node_id}" + ).start() + + def run_task_stream(self): + """Main Persistent Bi-directional Stream for Task Management.""" + def _gen(): + # Initial announcement for routing identity + yield agent_pb2.ClientTaskMessage( + announce=agent_pb2.NodeAnnounce(node_id=self.node_id) + ) + while True: + yield self.task_queue.get() + + responses = self.stub.TaskStream(_gen()) + print(f"[*] Task Stream Online: {self.node_id}", flush=True) + + try: + for msg in responses: + kind = msg.WhichOneof('payload') + print(f" [📥] Received from Stream: {kind}", flush=True) + self._process_server_message(msg) + except Exception as e: + print(f"[!] Task Stream Failure: {e}", flush=True) + + def _process_server_message(self, msg): + kind = msg.WhichOneof('payload') + print(f"[*] Inbound: {kind}", flush=True) + + if kind == 'task_request': + self._handle_task(msg.task_request) + + elif kind == 'task_cancel': + if self.skills.cancel(msg.task_cancel.task_id): + self._send_response(msg.task_cancel.task_id, None, agent_pb2.TaskResponse.CANCELLED) + + elif kind == 'work_pool_update': + # Claim logical idle tasks from global pool + if len(self.skills.get_active_ids()) < MAX_SKILL_WORKERS: + for tid in msg.work_pool_update.available_task_ids: + self.task_queue.put(agent_pb2.ClientTaskMessage( + task_claim=agent_pb2.TaskClaimRequest(task_id=tid, node_id=self.node_id) + )) + + def _handle_task(self, task): + print(f"[*] Task Launch: {task.task_id}", flush=True) + # 1. Cryptographic Signature Verification + if not verify_task_signature(task): + print(f"[!] Signature Validation Failed for {task.task_id}", flush=True) + return + + print(f"[✅] Validated task {task.task_id}", flush=True) + + # 2. Skill Manager Submission + success, reason = self.skills.submit(task, self.sandbox, self._on_finish, self._on_event) + if not success: + print(f"[!] Execution Rejected: {reason}", flush=True) + + def _on_event(self, event): + """Live Event Tunneler: Routes browser/skill events into the main stream.""" + self.task_queue.put(agent_pb2.ClientTaskMessage(browser_event=event)) + + def _on_finish(self, tid, res, trace): + """Final Completion Callback: Routes task results back to server.""" + print(f"[*] Completion: {tid}", flush=True) + status = agent_pb2.TaskResponse.SUCCESS if res['status'] == 1 else agent_pb2.TaskResponse.ERROR + + tr = agent_pb2.TaskResponse( + task_id=tid, status=status, + stdout=res.get('stdout',''), + stderr=res.get('stderr',''), + trace_id=trace, + browser_result=res.get("browser_result") + ) + self._send_response(tid, tr) + + def _send_response(self, tid, tr=None, status=None): + """Utility for placing response messages into the gRPC outbound queue.""" + if tr: + self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=tr)) + else: + self.task_queue.put(agent_pb2.ClientTaskMessage( + task_response=agent_pb2.TaskResponse(task_id=tid, status=status) + )) diff --git a/poc-grpc-agent/agent_node/skills/__init__.py b/poc-grpc-agent/agent_node/skills/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/agent_node/skills/__init__.py diff --git a/poc-grpc-agent/agent_node/skills/base.py b/poc-grpc-agent/agent_node/skills/base.py new file mode 100644 index 0000000..7df6e34 --- /dev/null +++ b/poc-grpc-agent/agent_node/skills/base.py @@ -0,0 +1,9 @@ +class BaseSkill: + """Abstract interface for all Node capabilities (Shell, Browser, etc.).""" + def execute(self, task, sandbox, on_complete, on_event=None): + """Processes the given task and notifies results via callbacks.""" + raise NotImplementedError + + def cancel(self, task_id: str) -> bool: + """Attempts to cancel the task and returns success status.""" + return False diff --git a/poc-grpc-agent/agent_node/skills/browser.py b/poc-grpc-agent/agent_node/skills/browser.py new file mode 100644 index 0000000..ea957d3 --- /dev/null +++ b/poc-grpc-agent/agent_node/skills/browser.py @@ -0,0 +1,107 @@ +import threading +import queue +import time +import json +from playwright.sync_api import sync_playwright +from agent_node.skills.base import BaseSkill +from protos import agent_pb2 + +class BrowserSkill(BaseSkill): + """The 'Antigravity Bridge': Persistent Browser Skill using a dedicated Actor thread.""" + def __init__(self): + self.task_queue = queue.Queue() + self.sessions = {} # session_id -> { "context": Context, "page": Page } + self.lock = threading.Lock() + threading.Thread(target=self._browser_actor, daemon=True, name="BrowserActor").start() + + def _setup_listeners(self, sid, page, on_event): + """Tunnels browser internal events back to the Orchestrator.""" + if not on_event: return + + # Live Console Redirector + page.on("console", lambda msg: on_event(agent_pb2.BrowserEvent( + session_id=sid, console_msg=agent_pb2.ConsoleMessage( + level=msg.type, text=msg.text, timestamp_ms=int(time.time()*1000) + ) + ))) + + # Live Network Redirector + page.on("requestfinished", lambda req: on_event(agent_pb2.BrowserEvent( + session_id=sid, network_req=agent_pb2.NetworkRequest( + method=req.method, url=req.url, status=req.response().status if req.response() else 0, + resource_type=req.resource_type, latency_ms=0 + ) + ))) + + def _browser_actor(self): + """Serializes all Playwright operations on a single dedicated thread.""" + print("[🌐] Browser Actor Starting...", flush=True) + try: + pw = sync_playwright().start() + # 12-Factor/Container Optimization: Standard non-sandbox arguments + browser = pw.chromium.launch(headless=True, args=[ + '--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-gpu' + ]) + print("[🌐] Browser Engine Online.", flush=True) + except Exception as e: + print(f"[!] Browser Actor Startup Fail: {e}", flush=True) + return + + while True: + try: + task, sandbox, on_complete, on_event = self.task_queue.get() + action = task.browser_action + sid = action.session_id or "default" + + with self.lock: + if sid not in self.sessions: + ctx = browser.new_context() + pg = ctx.new_page() + self._setup_listeners(sid, pg, on_event) + self.sessions[sid] = {"context": ctx, "page": pg} + + page = self.sessions[sid]["page"] + print(f" [🌐] Browser Actor Processing: {agent_pb2.BrowserAction.ActionType.Name(action.action)} | Session: {sid}", flush=True) + + res_data = {} + # State-Machine Logic for Actions + if action.action == agent_pb2.BrowserAction.NAVIGATE: + page.goto(action.url, wait_until="commit") + elif action.action == agent_pb2.BrowserAction.CLICK: + page.click(action.selector) + elif action.action == agent_pb2.BrowserAction.TYPE: + page.fill(action.selector, action.text) + elif action.action == agent_pb2.BrowserAction.SCREENSHOT: + res_data["snapshot"] = page.screenshot() + elif action.action == agent_pb2.BrowserAction.GET_DOM: + res_data["dom_content"] = page.content() + elif action.action == agent_pb2.BrowserAction.HOVER: + page.hover(action.selector) + elif action.action == agent_pb2.BrowserAction.SCROLL: + page.mouse.wheel(x=0, y=action.y) + elif action.action == agent_pb2.BrowserAction.EVAL: + res_data["eval_result"] = str(page.evaluate(action.text)) + elif action.action == agent_pb2.BrowserAction.GET_A11Y: + res_data["a11y_tree"] = json.dumps(page.accessibility.snapshot()) + elif action.action == agent_pb2.BrowserAction.CLOSE: + with self.lock: + sess = self.sessions.pop(sid, None) + if sess: sess["context"].close() + + # Results Construction + br_res = agent_pb2.BrowserResponse( + url=page.url, title=page.title(), + snapshot=res_data.get("snapshot", b""), + dom_content=res_data.get("dom_content", ""), + a11y_tree=res_data.get("a11y_tree", ""), + eval_result=res_data.get("eval_result", "") + ) + on_complete(task.task_id, {"status": 1, "browser_result": br_res}, task.trace_id) + except Exception as e: + print(f" [!] Browser Actor Error: {e}", flush=True) + on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id) + + def execute(self, task, sandbox, on_complete, on_event=None): + self.task_queue.put((task, sandbox, on_complete, on_event)) + + def cancel(self, task_id): return False diff --git a/poc-grpc-agent/agent_node/skills/manager.py b/poc-grpc-agent/agent_node/skills/manager.py new file mode 100644 index 0000000..a1a93e7 --- /dev/null +++ b/poc-grpc-agent/agent_node/skills/manager.py @@ -0,0 +1,53 @@ +import threading +from concurrent import futures +from agent_node.skills.shell import ShellSkill +from agent_node.skills.browser import BrowserSkill +from agent_node.config import MAX_SKILL_WORKERS + +class SkillManager: + """Orchestrates multiple modular skills and manages the task worker pool.""" + def __init__(self, max_workers=MAX_SKILL_WORKERS): + self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker") + self.active_tasks = {} # task_id -> future + self.skills = { + "shell": ShellSkill(), + "browser": BrowserSkill() + } + self.max_workers = max_workers + self.lock = threading.Lock() + + def submit(self, task, sandbox, on_complete, on_event=None): + """Routes a task to the appropriate skill and submits it to the thread pool.""" + with self.lock: + if len(self.active_tasks) >= self.max_workers: + return False, "Node Capacity Reached" + + # 1. Routing Engine + if task.HasField("browser_action"): + skill = self.skills["browser"] + else: + skill = self.skills["shell"] + + # 2. Execution submission + future = self.executor.submit(skill.execute, task, sandbox, on_complete, on_event) + self.active_tasks[task.task_id] = future + + # Cleanup hook + future.add_done_callback(lambda f: self._cleanup(task.task_id)) + return True, "Accepted" + + def cancel(self, task_id): + """Attempts to cancel an active task through all registered skills.""" + with self.lock: + cancelled = any(s.cancel(task_id) for s in self.skills.values()) + return cancelled + + def get_active_ids(self): + """Returns the list of currently running task IDs.""" + with self.lock: + return list(self.active_tasks.keys()) + + def _cleanup(self, task_id): + """Internal callback to release capacity when a task finishes.""" + with self.lock: + self.active_tasks.pop(task_id, None) diff --git a/poc-grpc-agent/agent_node/skills/shell.py b/poc-grpc-agent/agent_node/skills/shell.py new file mode 100644 index 0000000..ae681a1 --- /dev/null +++ b/poc-grpc-agent/agent_node/skills/shell.py @@ -0,0 +1,56 @@ +import subprocess +import threading +from .base import BaseSkill + +class ShellSkill(BaseSkill): + """Default Skill: Executing shell commands with sandbox safety.""" + def __init__(self): + self.processes = {} # task_id -> Popen + self.lock = threading.Lock() + + def execute(self, task, sandbox, on_complete, on_event=None): + """Processes shell-based commands for the Node.""" + try: + cmd = task.payload_json + + # 1. Verification Logic + allowed, status_msg = sandbox.verify(cmd) + if not allowed: + err_msg = f"SANDBOX_VIOLATION: {status_msg}" + return on_complete(task.task_id, {"stderr": err_msg, "status": 2}, task.trace_id) + + # 2. Sequential Execution + print(f" [🐚] Executing Shell: {cmd}", flush=True) + p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + with self.lock: + self.processes[task.task_id] = p + + # 3. Timeout Handling + timeout = task.timeout_ms / 1000.0 if task.timeout_ms > 0 else None + stdout, stderr = p.communicate(timeout=timeout) + + print(f" [🐚] Shell Done: {cmd} | Stdout Size: {len(stdout)}", flush=True) + on_complete(task.task_id, { + "stdout": stdout, "stderr": stderr, + "status": 1 if p.returncode == 0 else 2 + }, task.trace_id) + + except subprocess.TimeoutExpired: + self.cancel(task.task_id) + on_complete(task.task_id, {"stderr": "TASK_TIMEOUT", "status": 2}, task.trace_id) + except Exception as e: + on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id) + finally: + with self.lock: + self.processes.pop(task.task_id, None) + + def cancel(self, task_id: str): + """Standard process termination for shell tasks.""" + with self.lock: + p = self.processes.get(task_id) + if p: + print(f"[🛑] Killing Shell Task: {task_id}") + p.kill() + return True + return False diff --git a/poc-grpc-agent/agent_node/utils/__init__.py b/poc-grpc-agent/agent_node/utils/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/agent_node/utils/__init__.py diff --git a/poc-grpc-agent/agent_node/utils/auth.py b/poc-grpc-agent/agent_node/utils/auth.py new file mode 100644 index 0000000..202fd4c --- /dev/null +++ b/poc-grpc-agent/agent_node/utils/auth.py @@ -0,0 +1,28 @@ +import jwt +import datetime +import hmac +import hashlib +from protos import agent_pb2 +from agent_node.config import SECRET_KEY + +def create_auth_token(node_id: str) -> str: + """Creates a JWT for node authentication.""" + payload = { + "sub": node_id, + "iat": datetime.datetime.utcnow(), + "exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=10) + } + return jwt.encode(payload, SECRET_KEY, algorithm="HS256") + +def verify_task_signature(task, secret=SECRET_KEY) -> bool: + """Verifies HMAC signature for shell or browser tasks.""" + if task.HasField("browser_action"): + a = task.browser_action + # Aligned with orchestrator's sign_browser_action using the string Name + kind = agent_pb2.BrowserAction.ActionType.Name(a.action) + sign_base = f"{kind}:{a.url}:{a.session_id}" + else: + sign_base = task.payload_json + + expected_sig = hmac.new(secret.encode(), sign_base.encode(), hashlib.sha256).hexdigest() + return hmac.compare_digest(task.signature, expected_sig) diff --git a/poc-grpc-agent/agent_node/utils/network.py b/poc-grpc-agent/agent_node/utils/network.py new file mode 100644 index 0000000..6ba7cc2 --- /dev/null +++ b/poc-grpc-agent/agent_node/utils/network.py @@ -0,0 +1,13 @@ +import grpc +from protos import agent_pb2_grpc +from agent_node.config import SERVER_HOST, SERVER_PORT, CERT_CA, CERT_CLIENT_CRT, CERT_CLIENT_KEY + +def get_secure_stub(): + """Initializes a gRPC secure channel and returns the orchestrator stub.""" + with open(CERT_CLIENT_KEY, 'rb') as f: pkey = f.read() + with open(CERT_CLIENT_CRT, 'rb') as f: cert = f.read() + with open(CERT_CA, 'rb') as f: ca = f.read() + + creds = grpc.ssl_channel_credentials(ca, pkey, cert) + channel = grpc.secure_channel(f'{SERVER_HOST}:{SERVER_PORT}', creds) + return agent_pb2_grpc.AgentOrchestratorStub(channel) diff --git a/poc-grpc-agent/agent_pb2.py b/poc-grpc-agent/agent_pb2.py deleted file mode 100644 index f51d6f2..0000000 --- a/poc-grpc-agent/agent_pb2.py +++ /dev/null @@ -1,78 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: agent.proto -# Protobuf Python Version: 4.25.1 -"""Generated protocol buffer code.""" -from google.protobuf import descriptor as _descriptor -from google.protobuf import descriptor_pool as _descriptor_pool -from google.protobuf import symbol_database as _symbol_database -from google.protobuf.internal import builder as _builder -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - - - -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61gent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc5\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xd2\x01\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x42\t\n\x07payload\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xe0\x01\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xbd\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xdc\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xc1\x01\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') - -_globals = globals() -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_pb2', _globals) -if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._options = None - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_options = b'8\001' - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._options = None - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_options = b'8\001' - _globals['_REGISTRATIONREQUEST']._serialized_start=23 - _globals['_REGISTRATIONREQUEST']._serialized_end=245 - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=194 - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=245 - _globals['_SANDBOXPOLICY']._serialized_start=248 - _globals['_SANDBOXPOLICY']._serialized_end=445 - _globals['_SANDBOXPOLICY_MODE']._serialized_start=411 - _globals['_SANDBOXPOLICY_MODE']._serialized_end=445 - _globals['_REGISTRATIONRESPONSE']._serialized_start=447 - _globals['_REGISTRATIONRESPONSE']._serialized_end=567 - _globals['_CLIENTTASKMESSAGE']._serialized_start=570 - _globals['_CLIENTTASKMESSAGE']._serialized_end=780 - _globals['_NODEANNOUNCE']._serialized_start=782 - _globals['_NODEANNOUNCE']._serialized_end=813 - _globals['_BROWSEREVENT']._serialized_start=816 - _globals['_BROWSEREVENT']._serialized_end=951 - _globals['_SERVERTASKMESSAGE']._serialized_start=954 - _globals['_SERVERTASKMESSAGE']._serialized_end=1178 - _globals['_TASKCANCELREQUEST']._serialized_start=1180 - _globals['_TASKCANCELREQUEST']._serialized_end=1216 - _globals['_TASKREQUEST']._serialized_start=1219 - _globals['_TASKREQUEST']._serialized_end=1408 - _globals['_BROWSERACTION']._serialized_start=1411 - _globals['_BROWSERACTION']._serialized_end=1699 - _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1565 - _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=1699 - _globals['_TASKRESPONSE']._serialized_start=1702 - _globals['_TASKRESPONSE']._serialized_end=2054 - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=1934 - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=1982 - _globals['_TASKRESPONSE_STATUS']._serialized_start=1984 - _globals['_TASKRESPONSE_STATUS']._serialized_end=2044 - _globals['_BROWSERRESPONSE']._serialized_start=2057 - _globals['_BROWSERRESPONSE']._serialized_end=2277 - _globals['_CONSOLEMESSAGE']._serialized_start=2279 - _globals['_CONSOLEMESSAGE']._serialized_end=2346 - _globals['_NETWORKREQUEST']._serialized_start=2348 - _globals['_NETWORKREQUEST']._serialized_end=2452 - _globals['_WORKPOOLUPDATE']._serialized_start=2454 - _globals['_WORKPOOLUPDATE']._serialized_end=2498 - _globals['_TASKCLAIMREQUEST']._serialized_start=2500 - _globals['_TASKCLAIMREQUEST']._serialized_end=2552 - _globals['_TASKCLAIMRESPONSE']._serialized_start=2554 - _globals['_TASKCLAIMRESPONSE']._serialized_end=2623 - _globals['_HEARTBEAT']._serialized_start=2626 - _globals['_HEARTBEAT']._serialized_end=2819 - _globals['_HEALTHCHECKRESPONSE']._serialized_start=2821 - _globals['_HEALTHCHECKRESPONSE']._serialized_end=2866 - _globals['_AGENTORCHESTRATOR']._serialized_start=2869 - _globals['_AGENTORCHESTRATOR']._serialized_end=3102 -# @@protoc_insertion_point(module_scope) diff --git a/poc-grpc-agent/agent_pb2_grpc.py b/poc-grpc-agent/agent_pb2_grpc.py deleted file mode 100644 index 932d45e..0000000 --- a/poc-grpc-agent/agent_pb2_grpc.py +++ /dev/null @@ -1,138 +0,0 @@ -# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! -"""Client and server classes corresponding to protobuf-defined services.""" -import grpc - -import agent_pb2 as agent__pb2 - - -class AgentOrchestratorStub(object): - """The Cortex Server exposes this service - """ - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.SyncConfiguration = channel.unary_unary( - '/agent.AgentOrchestrator/SyncConfiguration', - request_serializer=agent__pb2.RegistrationRequest.SerializeToString, - response_deserializer=agent__pb2.RegistrationResponse.FromString, - ) - self.TaskStream = channel.stream_stream( - '/agent.AgentOrchestrator/TaskStream', - request_serializer=agent__pb2.ClientTaskMessage.SerializeToString, - response_deserializer=agent__pb2.ServerTaskMessage.FromString, - ) - self.ReportHealth = channel.stream_stream( - '/agent.AgentOrchestrator/ReportHealth', - request_serializer=agent__pb2.Heartbeat.SerializeToString, - response_deserializer=agent__pb2.HealthCheckResponse.FromString, - ) - - -class AgentOrchestratorServicer(object): - """The Cortex Server exposes this service - """ - - def SyncConfiguration(self, request, context): - """1. Control Channel: Sync policies and settings (Unary) - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def TaskStream(self, request_iterator, context): - """2. Task Channel: Bidirectional work dispatch and reporting (Persistent) - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ReportHealth(self, request_iterator, context): - """3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_AgentOrchestratorServicer_to_server(servicer, server): - rpc_method_handlers = { - 'SyncConfiguration': grpc.unary_unary_rpc_method_handler( - servicer.SyncConfiguration, - request_deserializer=agent__pb2.RegistrationRequest.FromString, - response_serializer=agent__pb2.RegistrationResponse.SerializeToString, - ), - 'TaskStream': grpc.stream_stream_rpc_method_handler( - servicer.TaskStream, - request_deserializer=agent__pb2.ClientTaskMessage.FromString, - response_serializer=agent__pb2.ServerTaskMessage.SerializeToString, - ), - 'ReportHealth': grpc.stream_stream_rpc_method_handler( - servicer.ReportHealth, - request_deserializer=agent__pb2.Heartbeat.FromString, - response_serializer=agent__pb2.HealthCheckResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'agent.AgentOrchestrator', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - # This class is part of an EXPERIMENTAL API. -class AgentOrchestrator(object): - """The Cortex Server exposes this service - """ - - @staticmethod - def SyncConfiguration(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/agent.AgentOrchestrator/SyncConfiguration', - agent__pb2.RegistrationRequest.SerializeToString, - agent__pb2.RegistrationResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def TaskStream(request_iterator, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/TaskStream', - agent__pb2.ClientTaskMessage.SerializeToString, - agent__pb2.ServerTaskMessage.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def ReportHealth(request_iterator, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/ReportHealth', - agent__pb2.Heartbeat.SerializeToString, - agent__pb2.HealthCheckResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/poc-grpc-agent/client.py b/poc-grpc-agent/client.py deleted file mode 100644 index 33c782f..0000000 --- a/poc-grpc-agent/client.py +++ /dev/null @@ -1,319 +0,0 @@ -import grpc -import time -import os -import agent_pb2 -import agent_pb2_grpc -import threading -import subprocess -import json -import jwt -import datetime -import hmac -import hashlib -import queue -import sys -import platform -from concurrent import futures -from playwright.sync_api import sync_playwright - -SECRET_KEY = "cortex-secret-shared-key" - -class BaseSkill: - """Interface for pluggable node capabilities.""" - def execute(self, task, sandbox, on_complete, on_event=None): - raise NotImplementedError - - def cancel(self, task_id): - return False - -class ShellSkill(BaseSkill): - """Default Skill: Executing shell commands.""" - def __init__(self): - self.processes = {} # task_id -> Popen - self.lock = threading.Lock() - - def execute(self, task, sandbox, on_complete, on_event=None): - try: - cmd = task.payload_json - - allowed, status_msg = sandbox.verify(cmd) - if not allowed: - return on_complete(task.task_id, {"stderr": f"SANDBOX_VIOLATION: {status_msg}", "status": 2}, task.trace_id) - - print(f" [🐚] Executing Shell: {cmd}", flush=True) - p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - - with self.lock: self.processes[task.task_id] = p - - timeout = task.timeout_ms / 1000.0 if task.timeout_ms > 0 else None - stdout, stderr = p.communicate(timeout=timeout) - print(f" [🐚] Shell Done: {cmd} | Stdout Size: {len(stdout)}", flush=True) - - on_complete(task.task_id, {"stdout": stdout, "stderr": stderr, "status": 1 if p.returncode == 0 else 2}, task.trace_id) - except subprocess.TimeoutExpired: - self.cancel(task.task_id) - on_complete(task.task_id, {"stderr": "TIMEOUT", "status": 2}, task.trace_id) - except Exception as e: - on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id) - finally: - with self.lock: self.processes.pop(task.task_id, None) - - def cancel(self, task_id): - with self.lock: - p = self.processes.get(task_id) - if p: - print(f"[🛑] Killing Shell Process: {task_id}") - p.kill() - return True - return False - -class BrowserSkill(BaseSkill): - """The 'Antigravity Bridge': Persistent Browser Skill using a dedicated Actor thread.""" - def __init__(self): - self.task_queue = queue.Queue() - self.sessions = {} # session_id -> { "context": Context, "page": Page } - self.lock = threading.Lock() - threading.Thread(target=self._browser_actor, daemon=True, name="BrowserActor").start() - - def _setup_listeners(self, sid, page, on_event): - if not on_event: return - page.on("console", lambda msg: on_event(agent_pb2.BrowserEvent( - session_id=sid, console_msg=agent_pb2.ConsoleMessage( - level=msg.type, text=msg.text, timestamp_ms=int(time.time()*1000) - ) - ))) - page.on("requestfinished", lambda req: on_event(agent_pb2.BrowserEvent( - session_id=sid, network_req=agent_pb2.NetworkRequest( - method=req.method, url=req.url, status=req.response().status if req.response() else 0, - resource_type=req.resource_type, latency_ms=0 - ) - ))) - - def _browser_actor(self): - print("[🌐] Browser Actor Starting...", flush=True) - try: - pw = sync_playwright().start() - browser = pw.chromium.launch(headless=True, args=[ - '--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-gpu' - ]) - print("[🌐] Browser Engine Online.", flush=True) - except Exception as e: - print(f"[!] Browser Actor Startup Fail: {e}", flush=True) - return - - while True: - try: - task, sandbox, on_complete, on_event = self.task_queue.get() - action = task.browser_action - sid = action.session_id or "default" - - with self.lock: - if sid not in self.sessions: - context = browser.new_context() - page = context.new_page() - self._setup_listeners(sid, page, on_event) - self.sessions[sid] = {"context": context, "page": page} - page = self.sessions[sid]["page"] - - print(f" [🌐] Browser Actor Processing: {agent_pb2.BrowserAction.ActionType.Name(action.action)} | Session: {sid}", flush=True) - - res_data = {} - if action.action == agent_pb2.BrowserAction.NAVIGATE: - page.goto(action.url, wait_until="commit") - elif action.action == agent_pb2.BrowserAction.CLICK: - page.click(action.selector) - elif action.action == agent_pb2.BrowserAction.TYPE: - page.fill(action.selector, action.text) - elif action.action == agent_pb2.BrowserAction.SCREENSHOT: - res_data["snapshot"] = page.screenshot() - elif action.action == agent_pb2.BrowserAction.GET_DOM: - res_data["dom_content"] = page.content() - elif action.action == agent_pb2.BrowserAction.HOVER: - page.hover(action.selector) - elif action.action == agent_pb2.BrowserAction.SCROLL: - page.mouse.wheel(x=0, y=action.y) - elif action.action == agent_pb2.BrowserAction.EVAL: - res_data["eval_result"] = str(page.evaluate(action.text)) - elif action.action == agent_pb2.BrowserAction.GET_A11Y: - res_data["a11y_tree"] = json.dumps(page.accessibility.snapshot()) - elif action.action == agent_pb2.BrowserAction.CLOSE: - with self.lock: - sess = self.sessions.pop(sid, None) - if sess: sess["context"].close() - - # Refresh metadata After - br_res = agent_pb2.BrowserResponse( - url=page.url, title=page.title(), - snapshot=res_data.get("snapshot", b""), - dom_content=res_data.get("dom_content", ""), - a11y_tree=res_data.get("a11y_tree", ""), - eval_result=res_data.get("eval_result", "") - ) - on_complete(task.task_id, {"status": 1, "browser_result": br_res}, task.trace_id) - except Exception as e: - print(f" [!] Browser Actor Error: {e}", flush=True) - on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id) - - def execute(self, task, sandbox, on_complete, on_event=None): - self.task_queue.put((task, sandbox, on_complete, on_event)) - - def cancel(self, task_id): return False - -class SkillManager: - """Orchestrates multiple skills and manages the worker thread pool.""" - def __init__(self, max_workers=5): - self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker") - self.active_tasks = {} # task_id -> future - self.skills = { - "shell": ShellSkill(), - "browser": BrowserSkill() - } - self.max_workers = max_workers - self.lock = threading.Lock() - - def submit(self, task, sandbox, on_complete, on_event=None): - with self.lock: - if len(self.active_tasks) >= self.max_workers: - return False, "Node Capacity Reached" - - # Decide Skill - if task.HasField("browser_action") or task.task_type == "browser": - skill = self.skills["browser"] - else: - skill = self.skills["shell"] - - future = self.executor.submit(skill.execute, task, sandbox, on_complete, on_event) - self.active_tasks[task.task_id] = future - - # Cleanup hook - future.add_done_callback(lambda f: self._cleanup(task.task_id)) - return True, "Accepted" - - def cancel(self, task_id): - with self.lock: - # Tell all skills to try and cancel this ID - cancelled = any(s.cancel(task_id) for s in self.skills.values()) - return cancelled - - def get_active_ids(self): - with self.lock: - return list(self.active_tasks.keys()) - - def _cleanup(self, task_id): - with self.lock: self.active_tasks.pop(task_id, None) - -class SandboxEngine: - def __init__(self): - self.policy = None - - def sync(self, p): - self.policy = {"MODE": "STRICT" if p.mode == agent_pb2.SandboxPolicy.STRICT else "PERMISSIVE", - "ALLOWED": list(p.allowed_commands), "DENIED": list(p.denied_commands), "SENSITIVE": list(p.sensitive_commands)} - - def verify(self, command_str): - if not self.policy: return False, "No Policy" - parts = (command_str or "").strip().split() - if not parts: return False, "Empty" - base_cmd = parts[0] - if base_cmd in self.policy["DENIED"]: return False, "Forbidden" - if self.policy["MODE"] == "STRICT" and base_cmd not in self.policy["ALLOWED"]: - return False, "Not Whitelisted" - return True, "OK" - -class AgentNode: - def __init__(self, node_id="agent-node-007"): - self.node_id = node_id - self.skills = SkillManager() - self.sandbox = SandboxEngine() - self.task_queue = queue.Queue() - - # gRPC Setup - with open('certs/client.key', 'rb') as f: pkey = f.read() - with open('certs/client.crt', 'rb') as f: cert = f.read() - with open('certs/ca.crt', 'rb') as f: ca = f.read() - creds = grpc.ssl_channel_credentials(ca, pkey, cert) - self.channel = grpc.secure_channel('localhost:50051', creds) - self.stub = agent_pb2_grpc.AgentOrchestratorStub(self.channel) - - def _create_token(self): - return jwt.encode({"sub": self.node_id, "iat": datetime.datetime.utcnow(), - "exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=10)}, SECRET_KEY, algorithm="HS256") - - def sync_configuration(self): - print(f"[*] Handshake: {self.node_id}") - reg = agent_pb2.RegistrationRequest(node_id=self.node_id, auth_token=self._create_token(), - node_description="Refactored Stateful Node with Browser Skill", - capabilities={"shell": "v1", "browser": "playwright-1.42"}) - res = self.stub.SyncConfiguration(reg) - if res.success: self.sandbox.sync(res.policy); print("[OK] Policy Synced.") - else: print(f"[!] Rejected: {res.error_message}"); sys.exit(1) - - def start_health_reporting(self): - def _gen(): - while True: - ids = self.skills.get_active_ids() - yield agent_pb2.Heartbeat(node_id=self.node_id, cpu_usage_percent=1.0, - active_worker_count=len(ids), max_worker_capacity=5, running_task_ids=ids) - time.sleep(10) - threading.Thread(target=lambda: list(self.stub.ReportHealth(_gen())), daemon=True).start() - - def run_task_stream(self): - def _gen(): - yield agent_pb2.ClientTaskMessage(announce=agent_pb2.NodeAnnounce(node_id=self.node_id)) - while True: yield self.task_queue.get() - responses = self.stub.TaskStream(_gen()) - print(f"[*] Stream processing started: {self.node_id}", flush=True) - try: - for msg in responses: - kind = msg.WhichOneof('payload') - print(f"[*] Received message from server: {kind}", flush=True) - if kind == 'task_request': - self._handle_task(msg.task_request) - elif kind == 'task_cancel': - if self.skills.cancel(msg.task_cancel.task_id): - self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=agent_pb2.TaskResponse( - task_id=msg.task_cancel.task_id, status=agent_pb2.TaskResponse.CANCELLED))) - elif kind == 'work_pool_update': - for tid in msg.work_pool_update.available_task_ids: - if len(self.skills.get_active_ids()) < self.skills.max_workers: - self.task_queue.put(agent_pb2.ClientTaskMessage(task_claim=agent_pb2.TaskClaimRequest(task_id=tid, node_id=self.node_id))) - except Exception as e: - print(f"[!] Stream Error: {e}", flush=True) - - def _handle_task(self, task): - print(f"[*] Handling Task: {task.task_id}", flush=True) - # Sig Verify logic based on payload type - if task.HasField("browser_action"): - a = task.browser_action - sign_base = f"{a.action}:{a.url}:{a.session_id}".encode() - else: - sign_base = task.payload_json.encode() - - expected_sig = hmac.new(SECRET_KEY.encode(), sign_base, hashlib.sha256).hexdigest() - if not hmac.compare_digest(task.signature, expected_sig): - return print(f"[!] Sig Fail for {task.task_id} | Raw: {sign_base}", flush=True) - - print(f"[✅] Signature Verified for {task.task_id}", flush=True) - self.skills.submit(task, self.sandbox, self._on_finish, self._on_event) - - def _on_event(self, event): - self.task_queue.put(agent_pb2.ClientTaskMessage(browser_event=event)) - - def _on_finish(self, tid, res, trace): - print(f"[*] Task {tid} finished.", flush=True) - status = agent_pb2.TaskResponse.SUCCESS if res['status'] == 1 else agent_pb2.TaskResponse.ERROR - - tr = agent_pb2.TaskResponse( - task_id=tid, status=status, - stdout=res.get('stdout',''), - stderr=res.get('stderr',''), - trace_id=trace, - browser_result=res.get("browser_result") - ) - self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=tr)) - -if __name__ == '__main__': - node = AgentNode() - node.sync_configuration() - node.start_health_reporting() - node.run_task_stream() diff --git a/poc-grpc-agent/orchestrator/__init__.py b/poc-grpc-agent/orchestrator/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/orchestrator/__init__.py diff --git a/poc-grpc-agent/orchestrator/app.py b/poc-grpc-agent/orchestrator/app.py new file mode 100644 index 0000000..7507f55 --- /dev/null +++ b/poc-grpc-agent/orchestrator/app.py @@ -0,0 +1,91 @@ +import grpc +import time +import os +import sys +from concurrent import futures + +# Add root to path to find protos +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from protos import agent_pb2, agent_pb2_grpc +from orchestrator.config import ( + CERT_CA, CERT_SERVER_CRT, CERT_SERVER_KEY, + GRPC_HOST, GRPC_PORT, SIMULATION_DELAY_SEC, MAX_WORKERS +) +from orchestrator.services.grpc_server import AgentOrchestrator + +def serve(): + print(f"[🛡️] Boss Plane Orchestrator Starting on {GRPC_HOST}:{GRPC_PORT}...") + + # 1. SSL/TLS Setup + with open(CERT_SERVER_KEY, 'rb') as f: pkey = f.read() + with open(CERT_SERVER_CRT, 'rb') as f: cert = f.read() + with open(CERT_CA, 'rb') as f: ca = f.read() + creds = grpc.ssl_server_credentials([(pkey, cert)], ca, True) + + # 2. Server Initialization + server = grpc.server(futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)) + orch = AgentOrchestrator() + agent_pb2_grpc.add_AgentOrchestratorServicer_to_server(orch, server) + + server.add_secure_port(f'{GRPC_HOST}:{GRPC_PORT}', creds) + + # 3. Start + server.start() + print("[🛡️] Boss Plane Refactored & Online.", flush=True) + + # 4. Simulation Launcher + # (In Production, this would be an API interface or Webhook handler) + _run_simulation(orch) + + server.wait_for_termination() + +def _run_simulation(orch): + """Refactored AI Simulation logic using the TaskAssistant service.""" + time.sleep(SIMULATION_DELAY_SEC) + print("\n[🧠] AI Simulation Start...", flush=True) + + target_node = "agent-node-007" + + # Phase 1: Shell + res_single = orch.assistant.dispatch_single(target_node, 'uname -a') + print(f" Uname Output: {res_single}", flush=True) + + # Phase 4: Browser Bridge + print("\n[🧠] AI Phase 4: Navigating Browser (Antigravity Bridge)...") + nav_action = agent_pb2.BrowserAction( + action=agent_pb2.BrowserAction.NAVIGATE, + url="https://example.com", + session_id="antigravity-session-1" + ) + res_nav = orch.assistant.dispatch_browser(target_node, nav_action) + print(f" Nav Result: {res_nav}") + + # Phase 4 Pro: Perception & Evaluation + print("\n[🧠] AI Phase 4 Pro: Perception & Advanced Logic...") + a11y_action = agent_pb2.BrowserAction( + action=agent_pb2.BrowserAction.GET_A11Y, + session_id="antigravity-session-1" + ) + res_a11y = orch.assistant.dispatch_browser(target_node, a11y_action) + print(f" A11y Result: {res_a11y.get('browser', {}).get('a11y')}") + + eval_action = agent_pb2.BrowserAction( + action=agent_pb2.BrowserAction.EVAL, + text="window.performance.now()", + session_id="antigravity-session-1" + ) + res_eval = orch.assistant.dispatch_browser(target_node, eval_action) + print(f" Eval Result: {res_eval.get('browser', {}).get('eval')}") + + # Real-time Events + print("\n[🧠] AI Phase 4 Pro: Triggering Real-time Events (Tunneling)...") + trigger_action = agent_pb2.BrowserAction( + action=agent_pb2.BrowserAction.EVAL, + text="console.log('Refactored Hello!'); fetch('https://example.com/api/ping');", + session_id="antigravity-session-1" + ) + orch.assistant.dispatch_browser(target_node, trigger_action) + +if __name__ == '__main__': + serve() diff --git a/poc-grpc-agent/orchestrator/config.py b/poc-grpc-agent/orchestrator/config.py new file mode 100644 index 0000000..2493988 --- /dev/null +++ b/poc-grpc-agent/orchestrator/config.py @@ -0,0 +1,17 @@ +import os + +# 12-Factor Config: Load from environment variables with defaults +SECRET_KEY = os.getenv("ORCHESTRATOR_SECRET_KEY", "cortex-secret-shared-key") + +# Network Settings +GRPC_HOST = os.getenv("GRPC_HOST", "[::]") +GRPC_PORT = os.getenv("GRPC_PORT", "50051") + +# Certificate Paths +CERT_CA = os.getenv("CERT_CA", "certs/ca.crt") +CERT_SERVER_CRT = os.getenv("CERT_SERVER_CRT", "certs/server.crt") +CERT_SERVER_KEY = os.getenv("CERT_SERVER_KEY", "certs/server.key") + +# Operational Settings +SIMULATION_DELAY_SEC = int(os.getenv("SIMULATION_DELAY_SEC", "10")) +MAX_WORKERS = int(os.getenv("MAX_WORKERS", "10")) diff --git a/poc-grpc-agent/orchestrator/core/__init__.py b/poc-grpc-agent/orchestrator/core/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/orchestrator/core/__init__.py diff --git a/poc-grpc-agent/orchestrator/core/journal.py b/poc-grpc-agent/orchestrator/core/journal.py new file mode 100644 index 0000000..b223f2f --- /dev/null +++ b/poc-grpc-agent/orchestrator/core/journal.py @@ -0,0 +1,34 @@ +import threading + +class TaskJournal: + """State machine for tracking tasks through their asynchronous lifecycle.""" + def __init__(self): + self.lock = threading.Lock() + self.tasks = {} # task_id -> { "event": Event, "result": None, "node_id": str } + + def register(self, task_id, node_id=None): + """Initializes state for a new task and returns its notification event.""" + event = threading.Event() + with self.lock: + self.tasks[task_id] = {"event": event, "result": None, "node_id": node_id} + return event + + def fulfill(self, task_id, result): + """Processes a result from a node and triggers the waiting thread.""" + with self.lock: + if task_id in self.tasks: + self.tasks[task_id]["result"] = result + self.tasks[task_id]["event"].set() + return True + return False + + def get_result(self, task_id): + """Returns the result associated with the given task ID.""" + with self.lock: + data = self.tasks.get(task_id) + return data["result"] if data else None + + def pop(self, task_id): + """Removes the task's state from the journal.""" + with self.lock: + return self.tasks.pop(task_id, None) diff --git a/poc-grpc-agent/orchestrator/core/pool.py b/poc-grpc-agent/orchestrator/core/pool.py new file mode 100644 index 0000000..222a28b --- /dev/null +++ b/poc-grpc-agent/orchestrator/core/pool.py @@ -0,0 +1,20 @@ +import threading + +class GlobalWorkPool: + """Thread-safe pool of unassigned tasks that can be claimed by any node.""" + def __init__(self): + self.lock = threading.Lock() + self.available = {"shared-001": "uname -a", "shared-002": "uptime"} + + def claim(self, task_id, node_id): + """Allows a node to pull a specific task from the pool.""" + with self.lock: + if task_id in self.available: + print(f" [📦] Task {task_id} Claimed by {node_id}") + return True, self.available.pop(task_id) + return False, None + + def list_available(self): + """Returns IDs of all currently available unclaimed tasks.""" + with self.lock: + return list(self.available.keys()) diff --git a/poc-grpc-agent/orchestrator/core/registry.py b/poc-grpc-agent/orchestrator/core/registry.py new file mode 100644 index 0000000..b40e6e4 --- /dev/null +++ b/poc-grpc-agent/orchestrator/core/registry.py @@ -0,0 +1,36 @@ +import threading +import queue + +class AbstractNodeRegistry: + """Interface for finding and tracking Agent Nodes.""" + def register(self, node_id: str, q: queue.Queue, metadata: dict): raise NotImplementedError + def update_stats(self, node_id: str, stats: dict): raise NotImplementedError + def get_best(self) -> str: raise NotImplementedError + def get_node(self, node_id: str) -> dict: raise NotImplementedError + +class MemoryNodeRegistry(AbstractNodeRegistry): + """In-memory implementation of the Node Registry.""" + def __init__(self): + self.lock = threading.Lock() + self.nodes = {} # node_id -> { stats: {}, queue: queue, metadata: {} } + + def register(self, node_id, q, metadata): + with self.lock: + self.nodes[node_id] = {"stats": {}, "queue": q, "metadata": metadata} + print(f"[📋] Registered Agent Node: {node_id}") + + def update_stats(self, node_id, stats): + with self.lock: + if node_id in self.nodes: + self.nodes[node_id]["stats"].update(stats) + + def get_best(self): + """Picks the agent with the lowest active worker count.""" + with self.lock: + if not self.nodes: return None + # Simple heuristic: sort by active worker count + return sorted(self.nodes.items(), key=lambda x: x[1]["stats"].get("active_worker_count", 999))[0][0] + + def get_node(self, node_id): + with self.lock: + return self.nodes.get(node_id) diff --git a/poc-grpc-agent/orchestrator/services/__init__.py b/poc-grpc-agent/orchestrator/services/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/orchestrator/services/__init__.py diff --git a/poc-grpc-agent/orchestrator/services/assistant.py b/poc-grpc-agent/orchestrator/services/assistant.py new file mode 100644 index 0000000..67266ba --- /dev/null +++ b/poc-grpc-agent/orchestrator/services/assistant.py @@ -0,0 +1,62 @@ +import time +import json +from orchestrator.utils.crypto import sign_payload, sign_browser_action +from protos import agent_pb2 + +class TaskAssistant: + """The 'Brain' of the Orchestrator: High-Level AI API for Dispatching Tasks.""" + def __init__(self, registry, journal, pool): + self.registry = registry + self.journal = journal + self.pool = pool + + def dispatch_single(self, node_id, cmd, timeout=30): + """Dispatches a shell command to a specific node.""" + node = self.registry.get_node(node_id) + if not node: return {"error": f"Node {node_id} Offline"} + + tid = f"task-{int(time.time()*1000)}" + event = self.journal.register(tid, node_id) + + # 12-Factor Signing Logic + sig = sign_payload(cmd) + req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest( + task_id=tid, payload_json=cmd, signature=sig)) + + print(f"[📤] Dispatching shell {tid} to {node_id}") + node["queue"].put(req) + + if event.wait(timeout): + res = self.journal.get_result(tid) + self.journal.pop(tid) + return res + self.journal.pop(tid) + return {"error": "Timeout"} + + def dispatch_browser(self, node_id, action, timeout=60): + """Dispatches a browser action to a directed session node.""" + node = self.registry.get_node(node_id) + if not node: return {"error": f"Node {node_id} Offline"} + + tid = f"br-{int(time.time()*1000)}" + event = self.journal.register(tid, node_id) + + # Secure Browser Signing + sig = sign_browser_action( + agent_pb2.BrowserAction.ActionType.Name(action.action), + action.url, + action.session_id + ) + + req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest( + task_id=tid, browser_action=action, signature=sig)) + + print(f"[🌐📤] Dispatching browser {tid} to {node_id}") + node["queue"].put(req) + + if event.wait(timeout): + res = self.journal.get_result(tid) + self.journal.pop(tid) + return res + self.journal.pop(tid) + return {"error": "Timeout"} diff --git a/poc-grpc-agent/orchestrator/services/grpc_server.py b/poc-grpc-agent/orchestrator/services/grpc_server.py new file mode 100644 index 0000000..7112d89 --- /dev/null +++ b/poc-grpc-agent/orchestrator/services/grpc_server.py @@ -0,0 +1,113 @@ +import threading +import queue +import time +from protos import agent_pb2, agent_pb2_grpc +from orchestrator.core.registry import MemoryNodeRegistry +from orchestrator.core.journal import TaskJournal +from orchestrator.core.pool import GlobalWorkPool +from orchestrator.services.assistant import TaskAssistant +from orchestrator.utils.crypto import sign_payload + +class AgentOrchestrator(agent_pb2_grpc.AgentOrchestratorServicer): + """Refactored gRPC Servicer for Agent Orchestration.""" + def __init__(self): + self.registry = MemoryNodeRegistry() + self.journal = TaskJournal() + self.pool = GlobalWorkPool() + self.assistant = TaskAssistant(self.registry, self.journal, self.pool) + + def SyncConfiguration(self, request, context): + """Standard Handshake: Authenticate and Send Policy.""" + # Pre-registration for metadata search + self.registry.register(request.node_id, queue.Queue(), { + "desc": request.node_description, + "caps": dict(request.capabilities) + }) + + # 12-Factor Sandbox Policy (Standardized Mode) + return agent_pb2.RegistrationResponse( + success=True, + policy=agent_pb2.SandboxPolicy( + mode=agent_pb2.SandboxPolicy.STRICT, + allowed_commands=["ls", "uname", "echo", "sleep"] + ) + ) + + def TaskStream(self, request_iterator, context): + """Persistent Bi-directional Stream for Command & Control.""" + try: + # 1. Blocking wait for Node Identity + first_msg = next(request_iterator) + if first_msg.WhichOneof('payload') != 'announce': + print("[!] Stream rejected: No NodeAnnounce") + return + + node_id = first_msg.announce.node_id + node = self.registry.get_node(node_id) + if not node: + print(f"[!] Stream rejected: Node {node_id} not registered") + return + + print(f"[📶] Stream Online for {node_id}") + + # 2. Results Listener (Read Thread) + def _read_results(): + for msg in request_iterator: + self._handle_client_message(msg, node_id, node) + + threading.Thread(target=_read_results, daemon=True, name=f"Results-{node_id}").start() + + # 3. Work Dispatcher (Main Stream) + while context.is_active(): + try: + # Non-blocking wait to check context periodically + msg = node["queue"].get(timeout=1.0) + yield msg + print(f" [🚀] Streamed message to {node_id}") + except queue.Empty: + # Minimal background traffic keeps connection alive + if self.pool.available: + # Only broadcast every 5s or if state changes in a real system + yield agent_pb2.ServerTaskMessage( + work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=self.pool.list_available()) + ) + continue + + except StopIteration: pass + except Exception as e: + print(f"[!] TaskStream Error for {node_id}: {e}") + + def _handle_client_message(self, msg, node_id, node): + kind = msg.WhichOneof('payload') + if kind == 'task_claim': + success, payload = self.pool.claim(msg.task_claim.task_id, node_id) + if success: + sig = sign_payload(payload) + node["queue"].put(agent_pb2.ServerTaskMessage( + task_request=agent_pb2.TaskRequest(task_id=msg.task_claim.task_id, payload_json=payload, signature=sig))) + + elif kind == 'task_response': + res_obj = {"stdout": msg.task_response.stdout, "status": msg.task_response.status} + if msg.task_response.HasField("browser_result"): + br = msg.task_response.browser_result + res_obj["browser"] = { + "url": br.url, "title": br.title, "has_snapshot": len(br.snapshot) > 0, + "a11y": br.a11y_tree[:100] + "..." if br.a11y_tree else None, + "eval": br.eval_result + } + self.journal.fulfill(msg.task_response.task_id, res_obj) + + elif kind == 'browser_event': + e = msg.browser_event + prefix = "[🖥️] Live Console" if e.HasField("console_msg") else "[🌐] Net Inspect" + content = e.console_msg.text if e.HasField("console_msg") else f"{e.network_req.method} {e.network_req.url}" + print(f" {prefix}: {content}", flush=True) + + def ReportHealth(self, request_iterator, context): + """Collect Health Metrics and Feed Policy Updates.""" + for hb in request_iterator: + self.registry.update_stats(hb.node_id, { + "active_worker_count": hb.active_worker_count, + "running": list(hb.running_task_ids) + }) + yield agent_pb2.HealthCheckResponse(server_time_ms=int(time.time()*1000)) diff --git a/poc-grpc-agent/orchestrator/utils/__init__.py b/poc-grpc-agent/orchestrator/utils/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/orchestrator/utils/__init__.py diff --git a/poc-grpc-agent/orchestrator/utils/crypto.py b/poc-grpc-agent/orchestrator/utils/crypto.py new file mode 100644 index 0000000..c34a495 --- /dev/null +++ b/poc-grpc-agent/orchestrator/utils/crypto.py @@ -0,0 +1,17 @@ +import hmac +import hashlib +from orchestrator.config import SECRET_KEY + +def sign_payload(payload: str) -> str: + """Signs a string payload using HMAC-SHA256.""" + return hmac.new(SECRET_KEY.encode(), payload.encode(), hashlib.sha256).hexdigest() + +def sign_browser_action(action_type: str, url: str, session_id: str) -> str: + """Signs a browser action based on its key identify fields.""" + sign_base = f"{action_type}:{url}:{session_id}" + return sign_payload(sign_base) + +def verify_signature(payload: str, signature: str) -> bool: + """Verifies a signature against a payload using HMAC-SHA256.""" + expected = sign_payload(payload) + return hmac.compare_digest(signature, expected) diff --git a/poc-grpc-agent/protos/__init__.py b/poc-grpc-agent/protos/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/poc-grpc-agent/protos/__init__.py diff --git a/poc-grpc-agent/protos/agent_pb2.py b/poc-grpc-agent/protos/agent_pb2.py new file mode 100644 index 0000000..f51d6f2 --- /dev/null +++ b/poc-grpc-agent/protos/agent_pb2.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: agent.proto +# Protobuf Python Version: 4.25.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61gent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc5\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xd2\x01\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x42\t\n\x07payload\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xe0\x01\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xbd\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xdc\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xc1\x01\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_options = b'8\001' + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._options = None + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_options = b'8\001' + _globals['_REGISTRATIONREQUEST']._serialized_start=23 + _globals['_REGISTRATIONREQUEST']._serialized_end=245 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=194 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=245 + _globals['_SANDBOXPOLICY']._serialized_start=248 + _globals['_SANDBOXPOLICY']._serialized_end=445 + _globals['_SANDBOXPOLICY_MODE']._serialized_start=411 + _globals['_SANDBOXPOLICY_MODE']._serialized_end=445 + _globals['_REGISTRATIONRESPONSE']._serialized_start=447 + _globals['_REGISTRATIONRESPONSE']._serialized_end=567 + _globals['_CLIENTTASKMESSAGE']._serialized_start=570 + _globals['_CLIENTTASKMESSAGE']._serialized_end=780 + _globals['_NODEANNOUNCE']._serialized_start=782 + _globals['_NODEANNOUNCE']._serialized_end=813 + _globals['_BROWSEREVENT']._serialized_start=816 + _globals['_BROWSEREVENT']._serialized_end=951 + _globals['_SERVERTASKMESSAGE']._serialized_start=954 + _globals['_SERVERTASKMESSAGE']._serialized_end=1178 + _globals['_TASKCANCELREQUEST']._serialized_start=1180 + _globals['_TASKCANCELREQUEST']._serialized_end=1216 + _globals['_TASKREQUEST']._serialized_start=1219 + _globals['_TASKREQUEST']._serialized_end=1408 + _globals['_BROWSERACTION']._serialized_start=1411 + _globals['_BROWSERACTION']._serialized_end=1699 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1565 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=1699 + _globals['_TASKRESPONSE']._serialized_start=1702 + _globals['_TASKRESPONSE']._serialized_end=2054 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=1934 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=1982 + _globals['_TASKRESPONSE_STATUS']._serialized_start=1984 + _globals['_TASKRESPONSE_STATUS']._serialized_end=2044 + _globals['_BROWSERRESPONSE']._serialized_start=2057 + _globals['_BROWSERRESPONSE']._serialized_end=2277 + _globals['_CONSOLEMESSAGE']._serialized_start=2279 + _globals['_CONSOLEMESSAGE']._serialized_end=2346 + _globals['_NETWORKREQUEST']._serialized_start=2348 + _globals['_NETWORKREQUEST']._serialized_end=2452 + _globals['_WORKPOOLUPDATE']._serialized_start=2454 + _globals['_WORKPOOLUPDATE']._serialized_end=2498 + _globals['_TASKCLAIMREQUEST']._serialized_start=2500 + _globals['_TASKCLAIMREQUEST']._serialized_end=2552 + _globals['_TASKCLAIMRESPONSE']._serialized_start=2554 + _globals['_TASKCLAIMRESPONSE']._serialized_end=2623 + _globals['_HEARTBEAT']._serialized_start=2626 + _globals['_HEARTBEAT']._serialized_end=2819 + _globals['_HEALTHCHECKRESPONSE']._serialized_start=2821 + _globals['_HEALTHCHECKRESPONSE']._serialized_end=2866 + _globals['_AGENTORCHESTRATOR']._serialized_start=2869 + _globals['_AGENTORCHESTRATOR']._serialized_end=3102 +# @@protoc_insertion_point(module_scope) diff --git a/poc-grpc-agent/protos/agent_pb2_grpc.py b/poc-grpc-agent/protos/agent_pb2_grpc.py new file mode 100644 index 0000000..b91c8a0 --- /dev/null +++ b/poc-grpc-agent/protos/agent_pb2_grpc.py @@ -0,0 +1,138 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from . import agent_pb2 as agent__pb2 + + +class AgentOrchestratorStub(object): + """The Cortex Server exposes this service + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SyncConfiguration = channel.unary_unary( + '/agent.AgentOrchestrator/SyncConfiguration', + request_serializer=agent__pb2.RegistrationRequest.SerializeToString, + response_deserializer=agent__pb2.RegistrationResponse.FromString, + ) + self.TaskStream = channel.stream_stream( + '/agent.AgentOrchestrator/TaskStream', + request_serializer=agent__pb2.ClientTaskMessage.SerializeToString, + response_deserializer=agent__pb2.ServerTaskMessage.FromString, + ) + self.ReportHealth = channel.stream_stream( + '/agent.AgentOrchestrator/ReportHealth', + request_serializer=agent__pb2.Heartbeat.SerializeToString, + response_deserializer=agent__pb2.HealthCheckResponse.FromString, + ) + + +class AgentOrchestratorServicer(object): + """The Cortex Server exposes this service + """ + + def SyncConfiguration(self, request, context): + """1. Control Channel: Sync policies and settings (Unary) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def TaskStream(self, request_iterator, context): + """2. Task Channel: Bidirectional work dispatch and reporting (Persistent) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ReportHealth(self, request_iterator, context): + """3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_AgentOrchestratorServicer_to_server(servicer, server): + rpc_method_handlers = { + 'SyncConfiguration': grpc.unary_unary_rpc_method_handler( + servicer.SyncConfiguration, + request_deserializer=agent__pb2.RegistrationRequest.FromString, + response_serializer=agent__pb2.RegistrationResponse.SerializeToString, + ), + 'TaskStream': grpc.stream_stream_rpc_method_handler( + servicer.TaskStream, + request_deserializer=agent__pb2.ClientTaskMessage.FromString, + response_serializer=agent__pb2.ServerTaskMessage.SerializeToString, + ), + 'ReportHealth': grpc.stream_stream_rpc_method_handler( + servicer.ReportHealth, + request_deserializer=agent__pb2.Heartbeat.FromString, + response_serializer=agent__pb2.HealthCheckResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'agent.AgentOrchestrator', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class AgentOrchestrator(object): + """The Cortex Server exposes this service + """ + + @staticmethod + def SyncConfiguration(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/agent.AgentOrchestrator/SyncConfiguration', + agent__pb2.RegistrationRequest.SerializeToString, + agent__pb2.RegistrationResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def TaskStream(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/TaskStream', + agent__pb2.ClientTaskMessage.SerializeToString, + agent__pb2.ServerTaskMessage.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def ReportHealth(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/ReportHealth', + agent__pb2.Heartbeat.SerializeToString, + agent__pb2.HealthCheckResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/poc-grpc-agent/server.py b/poc-grpc-agent/server.py deleted file mode 100644 index c82fb75..0000000 --- a/poc-grpc-agent/server.py +++ /dev/null @@ -1,294 +0,0 @@ -import grpc -import os -from concurrent import futures -import time -import agent_pb2 -import agent_pb2_grpc -import threading -import queue -import jwt -import hmac -import hashlib -import json - -SECRET_KEY = "cortex-secret-shared-key" - -class TaskJournal: - """State machine for tracking tasks through their lifecycle.""" - def __init__(self): - self.lock = threading.Lock() - self.tasks = {} # task_id -> { "event": Event, "result": None, "node_id": str } - - def register(self, task_id, node_id=None): - event = threading.Event() - with self.lock: - self.tasks[task_id] = {"event": event, "result": None, "node_id": node_id} - return event - - def fulfill(self, task_id, result): - with self.lock: - if task_id in self.tasks: - self.tasks[task_id]["result"] = result - self.tasks[task_id]["event"].set() - return True - return False - - def get_result(self, task_id): - with self.lock: - data = self.tasks.get(task_id) - return data["result"] if data else None - - def pop(self, task_id): - with self.lock: - return self.tasks.pop(task_id, None) - -class AbstractNodeRegistry: - """Interface for finding and tracking Managers.""" - def register(self, node_id, data): raise NotImplementedError - def update_stats(self, node_id, stats): raise NotImplementedError - def get_best(self): raise NotImplementedError - def get_node(self, node_id): raise NotImplementedError - -class MemoryNodeRegistry(AbstractNodeRegistry): - def __init__(self): - self.lock = threading.Lock() - self.nodes = {} # node_id -> { stats: {}, queue: queue, metadata: {} } - - def register(self, node_id, q, metadata): - with self.lock: - self.nodes[node_id] = {"stats": {}, "queue": q, "metadata": metadata} - print(f"[📋] Registered: {node_id}") - - def update_stats(self, node_id, stats): - with self.lock: - if node_id in self.nodes: self.nodes[node_id]["stats"].update(stats) - - def get_best(self): - with self.lock: - if not self.nodes: return None - # Pick based on active worker count - return sorted(self.nodes.items(), key=lambda x: x[1]["stats"].get("active_worker_count", 999))[0][0] - - def get_node(self, node_id): - with self.lock: return self.nodes.get(node_id) - -class GlobalWorkPool: - def __init__(self): - self.lock = threading.Lock() - self.available = {"shared-001": "uname -a", "shared-002": "uptime"} - - def claim(self, task_id, node_id): - with self.lock: - if task_id in self.available: - return True, self.available.pop(task_id) - return False, None - -class TaskAssistant: - """The High-Level AI API.""" - def __init__(self, registry, journal, pool): - self.registry = registry - self.journal = journal - self.pool = pool - - def dispatch_single(self, node_id, cmd, timeout=30): - # Implementation of retry logic and signing - node = self.registry.get_node(node_id) - if not node: return {"error": "Offline"} - - tid = f"task-{int(time.time()*1000)}" - event = self.journal.register(tid, node_id) - - # Don't wrap in JSON, use raw cmd for shells - msg_json = json.dumps({"command": cmd}) # Simulation legacy check - sig = hmac.new(SECRET_KEY.encode(), cmd.encode(), hashlib.sha256).hexdigest() - req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest( - task_id=tid, payload_json=cmd, signature=sig)) - - print(f"[📤] Dispatching {tid} to {node_id}") - node["queue"].put(req) - - if event.wait(timeout): - res = self.journal.get_result(tid) - self.journal.pop(tid) - return res - self.journal.pop(tid) - return {"error": "Timeout"} - - def dispatch_browser(self, node_id, action, timeout=60): - node = self.registry.get_node(node_id) - if not node: return {"error": "Offline"} - - tid = f"br-{int(time.time()*1000)}" - event = self.journal.register(tid, node_id) - - # Basic signature for POC: Sign the action enum name + URL - sign_base = f"{action.action}:{action.url}:{action.session_id}" - sig = hmac.new(SECRET_KEY.encode(), sign_base.encode(), hashlib.sha256).hexdigest() - - req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest( - task_id=tid, browser_action=action, signature=sig)) - - node["queue"].put(req) - if event.wait(timeout): - res = self.journal.get_result(tid) - self.journal.pop(tid) - return res - self.journal.pop(tid) - return {"error": "Timeout"} - -class AgentOrchestrator(agent_pb2_grpc.AgentOrchestratorServicer): - def __init__(self): - self.registry = MemoryNodeRegistry() - self.journal = TaskJournal() - self.pool = GlobalWorkPool() - self.assistant = TaskAssistant(self.registry, self.journal, self.pool) - - def SyncConfiguration(self, request, context): - # Pre-registration for metadata search - self.registry.register(request.node_id, queue.Queue(), {"desc": request.node_description, "caps": dict(request.capabilities)}) - return agent_pb2.RegistrationResponse(success=True, - policy=agent_pb2.SandboxPolicy(mode=agent_pb2.SandboxPolicy.STRICT, - allowed_commands=["ls", "uname", "echo", "sleep"])) - - def TaskStream(self, request_iterator, context): - try: - # 1. Blocking wait for identity - first_msg = next(request_iterator) - if first_msg.WhichOneof('payload') != 'announce': - print("[!] Stream rejected: No NodeAnnounce") - return - - node_id = first_msg.announce.node_id - node = self.registry.get_node(node_id) - if not node: - print(f"[!] Stream rejected: Node {node_id} not registered via SyncConfiguration") - return - - print(f"[📶] Stream established for {node_id}") - - # 2. Results Listener - def _read_results(): - for msg in request_iterator: - kind = msg.WhichOneof('payload') - if kind == 'task_claim': - success, payload = self.pool.claim(msg.task_claim.task_id, node_id) - if success: - sig = hmac.new(SECRET_KEY.encode(), payload.encode(), hashlib.sha256).hexdigest() - node["queue"].put(agent_pb2.ServerTaskMessage( - task_request=agent_pb2.TaskRequest(task_id=msg.task_claim.task_id, payload_json=payload, signature=sig))) - elif kind == 'task_response': - res_obj = {"stdout": msg.task_response.stdout, "status": msg.task_response.status} - if msg.task_response.HasField("browser_result"): - br = msg.task_response.browser_result - res_obj["browser"] = { - "url": br.url, "title": br.title, "has_snapshot": len(br.snapshot) > 0, - "a11y": br.a11y_tree[:100] + "..." if br.a11y_tree else None, - "eval": br.eval_result - } - self.journal.fulfill(msg.task_response.task_id, res_obj) - elif kind == 'browser_event': - e = msg.browser_event - if e.HasField("console_msg"): - print(f" [🖥️] Live Browser Console: {e.console_msg.text}", flush=True) - elif e.HasField("network_req"): - print(f" [🌐] Live Network Request: {e.network_req.method} {e.network_req.url}", flush=True) - - threading.Thread(target=_read_results, daemon=True).start() - - # 3. Work Dispatcher (Main Stream) - while context.is_active(): - try: - msg = node["queue"].get(timeout=1.0) - yield msg - print(f"[🚀] Pushed message from queue to stream: {node_id}") - except queue.Empty: - continue - - except StopIteration: - pass - except Exception as e: - print(f"[!] TaskStream Error: {e}") - # Broadcast pool - if self.pool.available: - yield agent_pb2.ServerTaskMessage(work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=list(self.pool.available.keys()))) - - # Send direct tasks - if node_id and self.registry.get_node(node_id): - try: - msg = self.registry.get_node(node_id)["queue"].get(timeout=2) - yield msg - except queue.Empty: pass - else: time.sleep(1) - - def ReportHealth(self, request_iterator, context): - for hb in request_iterator: - self.registry.update_stats(hb.node_id, {"active_worker_count": hb.active_worker_count, "running": list(hb.running_task_ids)}) - yield agent_pb2.HealthCheckResponse(server_time_ms=int(time.time()*1000)) - -def serve(): - with open('certs/server.key', 'rb') as f: pkey = f.read() - with open('certs/server.crt', 'rb') as f: cert = f.read() - with open('certs/ca.crt', 'rb') as f: ca = f.read() - creds = grpc.ssl_server_credentials([(pkey, cert)], ca, True) - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - orch = AgentOrchestrator() - agent_pb2_grpc.add_AgentOrchestratorServicer_to_server(orch, server) - server.add_secure_port('[::]:50051', creds) - - print("[🛡️] Boss Plane Refactored & Online.") - server.start() - - # Simple AI Simulation loop - time.sleep(10) - print("\n[🧠] AI Simulation Start...", flush=True) - res_single = orch.assistant.dispatch_single('agent-node-007', 'uname -a') - print(f" Uname: {res_single}", flush=True) - - # NEW: Browser Phase - print("\n[🧠] AI Phase 4: Navigating Browser (Antigravity Bridge)...") - nav_action = agent_pb2.BrowserAction( - action=agent_pb2.BrowserAction.NAVIGATE, - url="https://example.com", - session_id="antigravity-session-1" - ) - res_nav = orch.assistant.dispatch_browser("agent-node-007", nav_action) - print(f" Nav Result: {res_nav}") - - print("\n[🧠] AI Phase 5: Multi-Action Persistence (Screenshot)...") - snap_action = agent_pb2.BrowserAction( - action=agent_pb2.BrowserAction.SCREENSHOT, - session_id="antigravity-session-1" - ) - res_snap = orch.assistant.dispatch_browser("agent-node-007", snap_action) - print(f" Snap Result: {res_snap.get('browser', {}).get('title')} | Snapshot captured: {res_snap.get('browser', {}).get('has_snapshot')}") - - # NEW: Phase 4 Pro Features - print("\n[🧠] AI Phase 4 Pro: Perception & Advanced Logic...") - a11y_action = agent_pb2.BrowserAction( - action=agent_pb2.BrowserAction.GET_A11Y, - session_id="antigravity-session-1" - ) - res_a11y = orch.assistant.dispatch_browser("agent-node-007", a11y_action) - print(f" A11y Result: {res_a11y.get('browser', {}).get('a11y')}") - - eval_action = agent_pb2.BrowserAction( - action=agent_pb2.BrowserAction.EVAL, - text="window.performance.now()", - session_id="antigravity-session-1" - ) - res_eval = orch.assistant.dispatch_browser("agent-node-007", eval_action) - print(f" Eval Result (Timestamp): {res_eval.get('browser', {}).get('eval')}") - - # NEW: Phase 4 Pro Features - Real-time Events - print("\n[🧠] AI Phase 4 Pro: Triggering Real-time Events (Tunneling)...") - trigger_action = agent_pb2.BrowserAction( - action=agent_pb2.BrowserAction.EVAL, - text="console.log('Hello from Antigravity Bridge!'); fetch('https://example.com/api/ping');", - session_id="antigravity-session-1" - ) - orch.assistant.dispatch_browser("agent-node-007", trigger_action) - - server.wait_for_termination() - -if __name__ == '__main__': - serve()