diff --git a/.agent/workflows/deployment_reference.md b/.agent/workflows/deployment_reference.md index c11ad5f..e8094da 100644 --- a/.agent/workflows/deployment_reference.md +++ b/.agent/workflows/deployment_reference.md @@ -131,6 +131,17 @@ **Symptoms**: `curl -v https://ai.jerxie.com` dumps a generic 404 or `503 Service Unavailable` with `server: envoy`. **Root Cause**: The Envoy FilterChain (Listener SNI Map) doesn't trace back to a correct, valid Docker IP:Port allocation. **Verification Check**: -* Query the Control Plane API: `curl -s http://192.168.68.90:8090/get-cluster?name=_ai_unified_server`. -* **Explicit Port Matching**: Envoy in production is configured to match both `ai.jerxie.com` and `ai.jerxie.com:443`. If a gRPC client (the Agent) includes the port in its authority header, Envoy must have it in its `virtual_hosts.domains` array or it will throw a 404. * Make sure `portValue` in the JSON Endpoint equates to the one published in `docker-compose.yml` (`8002` vs `8000`). If mismatched, you must format a JSON package and `POST` it to `/add-cluster` utilizing the EnvoryControlPlane workflow. + +### 5. OIDC Login Failures (Stale/Empty Client ID) +**Symptoms**: User attempts to login but is immediately rejected by Auth. Jerxie (Dex). Dex logs show: `level=ERROR msg="failed to parse authorization request" err="Invalid client_id (\"\")."` +**Root Cause**: API routes were incorrectly capturing OIDC settings from `settings` at the module's initial import time rather than at request time. Empty/Default environment variables were being permanentized until a full container restart. +**Verification Check**: +* Check `ai_hub_service` logs for: `Initiating OIDC login. Client ID: 'cortex-server'`. +* Ensure that the `settings` singleton is accessed **inside** the route function (or via a helper like `get_oidc_urls`) to ensure dynamic resolution. + +### 6. Periodic Hub "Freezes" or Disconnections +**Symptoms**: The `ai.jerxie.com` UI becomes unresponsive (spinning loaders or time-outs) for 1-2 minutes before suddenly recovering. No backend crashes are recorded. +**Root Cause**: **SQLite on NFS**. The production deployment uses an NFS-mounted volume (`192.168.68.90`) for the relational database. SQLite's write-ahead-logging (WAL) and intense locking requirements are highly incompatible with network filesystem latencies. Any minor network spike or high NFS load causes the Hub process to block entirely while waiting for a lock. +**Resolution**: +* **High Priority**: Move `ai_hub.db` to a **Local Volume** or a **Local Host Path** on `192.168.68.113`. NFS should be reserved for static sync folders or bulky assets, never for the primary relational database. diff --git a/agent-node/bootstrap_installer.py b/agent-node/bootstrap_installer.py index 74a1845..befebc3 100644 --- a/agent-node/bootstrap_installer.py +++ b/agent-node/bootstrap_installer.py @@ -127,21 +127,42 @@ except Exception as e: _print(f"Warning: Failed to bootstrap pip: {e}. If dependencies fail, please install python3-pip manually.") + install_req_file = req_file + tmp_req_path = None + if skip_browsers: + try: + with open(req_file, 'r') as f: + lines = f.readlines() + # Reuse the already imported tempfile + with tempfile.NamedTemporaryFile(mode='w', suffix='_req.txt', delete=False) as tmp_req: + for line in lines: + if 'playwright' not in line.lower(): + tmp_req.write(line) + tmp_req_path = tmp_req.name + install_req_file = tmp_req_path + _print("Filtered 'playwright' from dependencies as requested.") + except Exception as e: + _print(f"Warning: Failed to filter requirements.txt: {e}") + _print("Installing Python dependencies (resilient mode) ...") try: # Using --ignore-installed to bypass "no RECORD file found" metadata errors common on Mac/Anaconda # and --user if we don't have root (though usually we do on NAS) - args = [sys.executable, "-m", "pip", "install", "-r", req_file, "--quiet", "--ignore-installed"] + args = [sys.executable, "-m", "pip", "install", "-r", install_req_file, "--quiet", "--ignore-installed"] # Try a quick check for root/write access to site-packages try: subprocess.check_call(args, cwd=install_dir) - except subprocess.CalledProcessError: - _print("Standard install failed. Trying --user install...") - args.append("--user") - subprocess.check_call(args, cwd=install_dir) + except subprocess.CalledProcessError as e: + _print(f"Standard install failed (exit {e.returncode}). Trying --user install...") + args_user = args + ["--user"] + subprocess.check_call(args_user, cwd=install_dir) - _print("Dependencies installed.") + _print("Dependencies installed successfully.") + + # Cleanup temp file if created + if tmp_req_path and os.path.exists(tmp_req_path): + os.remove(tmp_req_path) # New: Auto-install playwright browsers if the package is present if skip_browsers: @@ -162,6 +183,10 @@ except Exception as e: _print(f"ERROR: Failed to install dependencies: {e}") + _print("-----------------------------------------------------------------------") + _print("HINT: If you are on Raspberry Pi / ARM and 'protobuf' or 'grpcio' fails:") + _print(" Try manual install: sudo apt-get install python3-protobuf python3-psutil python3-grpcio") + _print("-----------------------------------------------------------------------") _print("The agent might fail to start if core libraries (grpcio, psutil) are missing.") @@ -316,9 +341,10 @@ skip_browsers = args.skip_browsers or existing_config.get("skip_browsers", False) + secret_key_to_save = existing_config.get("secret_key") or hub_token _install(hub_url, hub_token, install_dir) _install_deps(install_dir, skip_browsers=skip_browsers) - _write_config(install_dir, node_id, hub_url, node_token, grpc, secret_key=hub_token) + _write_config(install_dir, node_id, hub_url, node_token, grpc, secret_key=secret_key_to_save) if args.update_only: _print(f"โœ… Updated to v{remote_version}. Not launching (--update-only).") diff --git a/agent-node/install_service.py b/agent-node/install_service.py index d6c0672..181cc59 100755 --- a/agent-node/install_service.py +++ b/agent-node/install_service.py @@ -49,6 +49,8 @@ {os.path.expanduser("~")}/.cortex/agent.out.log EnvironmentVariables + GRPC_ENABLE_FORK_SUPPORT + 1 PATH /usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin @@ -111,6 +113,7 @@ echo "Starting Cortex Agent..." mkdir -p "$(dirname "$LOGFILE")" cd "{get_working_dir()}" + export GRPC_ENABLE_FORK_SUPPORT=1 nohup {get_python_path()} {get_agent_main_path()} >> "$LOGFILE" 2>&1 & echo $! > "$PIDFILE" echo "Agent started (PID $!)" @@ -169,6 +172,7 @@ WorkingDirectory={get_working_dir()} Restart=always RestartSec=5 +Environment=GRPC_ENABLE_FORK_SUPPORT=1 StandardOutput=append:{os.path.expanduser("~")}/.cortex/agent.out.log StandardError=append:{os.path.expanduser("~")}/.cortex/agent.err.log diff --git a/agent-node/requirements.txt b/agent-node/requirements.txt index 5647eb6..ee9cb24 100644 --- a/agent-node/requirements.txt +++ b/agent-node/requirements.txt @@ -1,7 +1,8 @@ -grpcio==1.62.1 -grpcio-tools==1.62.1 +grpcio>=1.48.0 +grpcio-tools>=1.48.0 PyJWT==2.8.0 -playwright==1.42.0 -watchdog==4.0.0 +playwright>=1.47.0 +watchdog>=3.0.0 PyYAML==6.0.1 -psutil==5.9.8 +psutil>=5.8.0 +protobuf>=4.21.6,<6.0.0 diff --git a/agent-node/src/agent_node/config.py b/agent-node/src/agent_node/config.py index cdf9e12..c355717 100644 --- a/agent-node/src/agent_node/config.py +++ b/agent-node/src/agent_node/config.py @@ -1,6 +1,9 @@ import os import platform -import yaml +try: + import yaml +except ImportError: + yaml = None # Path to the generated config file in the bundled distribution # It sits next to the 'src' directory (two levels up from src/agent_node/config.py) @@ -31,7 +34,7 @@ TLS_ENABLED = _defaults["tls"] HEALTH_REPORT_INTERVAL = _defaults["health_report_interval"] MAX_SKILL_WORKERS = _defaults["max_skill_workers"] -DEBUG_GRPC = False +DEBUG_GRPC = True SECRET_KEY = "dev-secret-key-1337" HUB_URL = _defaults["hub_url"] AUTO_UPDATE = _defaults["auto_update"] @@ -40,6 +43,7 @@ CERT_CLIENT_CRT = "certs/client.crt" CERT_CLIENT_KEY = "certs/client.key" FS_ROOT = "/" if platform.system() in ["Darwin", "Linux"] else "C:\\" +BROWSER_HEADLESS = True # Runtime-togglable: False = headed mode (requires has_display) def reload(): global NODE_ID, NODE_DESC, SERVER_HOST_PORT, AUTH_TOKEN, SYNC_DIR, TLS_ENABLED @@ -98,5 +102,11 @@ if not FS_ROOT: FS_ROOT = "/" if platform.system() in ["Darwin", "Linux"] else "C:\\" +def set_browser_headless(headless: bool): + """Toggle browser headless mode at runtime (no restart needed).""" + global BROWSER_HEADLESS + BROWSER_HEADLESS = headless + print(f"[๐ŸŒ] Browser mode updated: {'headless' if headless else 'headed (UI visible)'}") + # Initial load reload() diff --git a/agent-node/src/agent_node/core/sandbox.py b/agent-node/src/agent_node/core/sandbox.py index 8fcfed5..33b2cf0 100644 --- a/agent-node/src/agent_node/core/sandbox.py +++ b/agent-node/src/agent_node/core/sandbox.py @@ -12,7 +12,8 @@ "ALLOWED": list(p.allowed_commands), "DENIED": list(p.denied_commands), "SENSITIVE": list(p.sensitive_commands), - "WORKING_DIR_JAIL": p.working_dir_jail + "WORKING_DIR_JAIL": p.working_dir_jail, + "SKILL_CONFIG": p.skill_config_json } def verify(self, command_str): diff --git a/agent-node/src/agent_node/core/sync.py b/agent-node/src/agent_node/core/sync.py index f130762..76f8ae8 100644 --- a/agent-node/src/agent_node/core/sync.py +++ b/agent-node/src/agent_node/core/sync.py @@ -1,5 +1,8 @@ import os import hashlib +import time +import json +import zlib from agent_node.config import SYNC_DIR from protos import agent_pb2 @@ -53,7 +56,7 @@ for name in files: abs_path = os.path.join(root, name) rel_path = os.path.relpath(abs_path, session_dir) - if rel_path in [".cortexignore", ".gitignore"]: continue + if rel_path in [".cortexignore", ".gitignore"] or ".cortex_browser" in rel_path: continue if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): try: os.remove(abs_path) @@ -64,7 +67,7 @@ for name in dirs: abs_path = os.path.join(root, name) rel_path = os.path.relpath(abs_path, session_dir) - if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): + if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path) and ".cortex_browser" not in rel_path: try: if not os.listdir(abs_path): os.rmdir(abs_path) @@ -73,7 +76,7 @@ needs_update = [] for file_info in manifest.files: - target_path = os.path.join(session_dir, file_info.path) + target_path = os.path.join(session_dir, file_info.path.lstrip("/")) if file_info.is_dir: os.makedirs(target_path, exist_ok=True) @@ -83,9 +86,14 @@ if not os.path.exists(target_path): needs_update.append(file_info.path) else: - # Hash comparison + # Memory-safe incremental hashing + h = hashlib.sha256() with open(target_path, "rb") as f: - actual_hash = hashlib.sha256(f.read()).hexdigest() + while True: + chunk = f.read(1024 * 1024) + if not chunk: break + h.update(chunk) + actual_hash = h.hexdigest() if actual_hash != file_info.hash: print(f" [โš ๏ธ] Drift Detected: {file_info.path} (Local: {actual_hash[:8]} vs Remote: {file_info.hash[:8]})") needs_update.append(file_info.path) @@ -93,26 +101,93 @@ return needs_update def write_chunk(self, session_id: str, payload: agent_pb2.FilePayload) -> bool: - """Writes a file chunk to the local session directory.""" + """Writes a file chunk to a shadow file and swaps to target on completion.""" session_dir = self.get_session_dir(session_id, create=True) - target_path = os.path.normpath(os.path.join(session_dir, payload.path)) + target_path = os.path.normpath(os.path.join(session_dir, payload.path.lstrip("/"))) if not target_path.startswith(session_dir): return False # Path traversal guard os.makedirs(os.path.dirname(target_path), exist_ok=True) - mode = "ab" if payload.chunk_index > 0 else "wb" - with open(target_path, mode) as f: - f.write(payload.chunk) + # We always write to a temporary "shadow" file during the sync + tmp_path = target_path + ".cortex_tmp" + lock_path = target_path + ".cortex_lock" + + if payload.chunk_index == 0: + # 1. Handle Locks + if os.path.exists(lock_path): + try: + with open(lock_path, "r") as lf: + lock_data = json.loads(lf.read()) + if time.time() - lock_data.get("ts", 0) < 30: + print(f" [๐Ÿ“๐Ÿ”’] Lock active for {payload.path}. Proceeding with shadow write...") + except: pass + + try: + with open(lock_path, "w") as lf: + lf.write(json.dumps({"ts": time.time(), "owner": "node", "path": payload.path})) + except: pass + + # 2. Initialize Shadow File (Truncate) + data = payload.chunk + if payload.compressed: + try: data = zlib.decompress(data) + except: pass + + with open(tmp_path, "wb") as f: + f.write(data) + else: + # Random access write to shadow file + if not os.path.exists(tmp_path): + with open(tmp_path, "ab") as f: pass - if payload.is_final and payload.hash: - return self._verify(target_path, payload.hash) + data = payload.chunk + if payload.compressed: + try: data = zlib.decompress(data) + except: pass + + with open(tmp_path, "r+b") as f: + f.seek(payload.offset if payload.HasField("offset") else 0) + f.write(data) + + if payload.is_final: + # 3. Finalization: Verify and Swap + success = True + if payload.hash: + success = self._verify(tmp_path, payload.hash) + + if success: + try: + # Atomic swap: The destination only changes once we are 100% sure the file is right. + import shutil + os.replace(tmp_path, target_path) + except Exception as e: + print(f" [๐Ÿ“โŒ] Atomic swap failed for {payload.path}: {e}") + success = False + + # 4. Cleanup + if os.path.exists(lock_path): + try: os.remove(lock_path) + except: pass + if os.path.exists(tmp_path) and not success: + # If it failed verification or swap, we might want to keep it or delete it. + # Let's delete it to allow a clean retry. + try: os.remove(tmp_path) + except: pass + + return success return True def _verify(self, path, expected_hash): + # Memory-safe incremental hashing for verification + h = hashlib.sha256() with open(path, "rb") as f: - actual = hashlib.sha256(f.read()).hexdigest() + while True: + chunk = f.read(1024 * 1024) + if not chunk: break + h.update(chunk) + actual = h.hexdigest() if actual != expected_hash: print(f"[โš ๏ธ] Sync Hash Mismatch for {path}") return False diff --git a/agent-node/src/agent_node/main.py b/agent-node/src/agent_node/main.py index 1085314..2f894e1 100644 --- a/agent-node/src/agent_node/main.py +++ b/agent-node/src/agent_node/main.py @@ -1,14 +1,128 @@ import sys import os +# gRPC/Mac Stability Tuning +os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "0" +os.environ["GRPC_FORK_SUPPORT_ONLY_FOR_SIGCHLD"] = "1" +os.environ["GRPC_POLL_STRATEGY"] = "poll" -# Add root to path to find protos and other packages -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) +# RELIABILITY: Ensure sys.argv[0] is an absolute path. +# This is critical for os.execv(...) restarts during auto-updates, +# especially when the agent is started with a relative path (e.g., 'python3 src/main.py'). +if sys.argv and sys.argv[0] and not os.path.isabs(sys.argv[0]): + sys.argv[0] = os.path.abspath(sys.argv[0]) + +# Add root and protos to path with HIGHEST priority to avoid collision with installed packages +_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +sys.path.insert(0, _root) +sys.path.insert(0, os.path.join(_root, "protos")) import signal import time from agent_node.config import NODE_ID, HUB_URL, AUTH_TOKEN, SECRET_KEY, AUTO_UPDATE, UPDATE_CHECK_INTERVAL from agent_node.core import updater +# Pre-flight check for core dependencies +try: + import grpc + import google.protobuf + import watchdog +except ImportError as e: + err_str = str(e).lower() + if "grpc" in err_str: + missing, pkg = "grpcio", "python3-grpcio" + elif "google" in err_str or "protobuf" in err_str: + missing, pkg = "protobuf", "python3-protobuf" + else: + missing, pkg = "watchdog", "python3-watchdog" + + print("\n" + "!"*71) + print(f" CRITICAL ERROR: '{missing}' library is not installed.") + print(f" If you are on Raspberry Pi / ARM, run:") + print(f" sudo apt-get install {pkg}") + print("!"*71 + "\n") +except ImportError: + pass + +def enforce_singleton(): + """ + Ensures that only one instance of the agent is running from this directory. + If siblings are found, they are terminated to prevent resource/port collisions. + This version is robust across Linux and Darwin and avoids unnecessary OS bails. + """ + import psutil + import os + + current_pid = os.getpid() + try: + # Use realpath to resolve any symlinks for accurate comparison + my_path = os.path.realpath(__file__) + except: + my_path = os.path.abspath(__file__) + + cleaned = 0 + try: + # iterate over all processes once + for proc in psutil.process_iter(['pid', 'cmdline']): + try: + pid = proc.info['pid'] + if pid == current_pid: + continue + + cmd = proc.info['cmdline'] + if not cmd or not isinstance(cmd, list): + continue + + # We identify a sibling if it's running 'main.py' and resolves to our same directory. + is_sibling = False + for arg in cmd: + if 'main.py' in arg: + try: + # 1. Try absolute path resolution + if os.path.isabs(arg): + check_path = os.path.realpath(arg) + else: + # 2. Try relative resolution based on the sibling's current working directory + try: + cwd = proc.cwd() + check_path = os.path.realpath(os.path.join(cwd, arg)) + except (psutil.AccessDenied, psutil.NoSuchProcess): + # If we can't get the CWD of the other process, we rely on a direct name match + # but stay conservative to avoid killing unrelated processes. + check_path = None + + if check_path and check_path == my_path: + is_sibling = True + break + except: + continue + + if is_sibling: + print(f"[*] Cleaning up orphaned agent instance (PID {pid})...") + try: + p = psutil.Process(pid) + p.terminate() + try: + p.wait(timeout=2) + except psutil.TimeoutExpired: + p.kill() + cleaned += 1 + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + except (psutil.NoSuchProcess, psutil.AccessDenied, KeyError): + continue + except Exception as e: + # Non-fatal: if we can't iterate processes at all, just log and continue. + # This prevents the agent from being a 'brick' in restricted environments. + print(f"[!] Singleton check warning: {e}") + + if cleaned > 0: + print(f"[*] Successfully reaped {cleaned} orphaned instances.") + + if cleaned > 0: + print(f"[*] Cleaned up {cleaned} orphaned instances.") + else: + print("[*] No conflicting agent instances detected.") + def main(): import logging logging.basicConfig( @@ -16,6 +130,16 @@ format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[logging.StreamHandler(sys.stdout)] ) + + # 0. Singleton Enforcement: Murder siblings before booting + try: + import psutil + enforce_singleton() + except ImportError: + print("[!] psutil not installed โ€” skipping singleton enforcement. Beware of orphaned processes!") + except Exception as e: + print(f"[!] Singleton check failed: {e}") + print(f"[*] Starting Agent Node: {NODE_ID}...") # 0. Auto-Update Check (before anything else โ€” if we're behind, restart now) @@ -62,6 +186,11 @@ except Exception as e: print(f"[!] Main Agent process crashed: {e}. Retrying boot in 10s...", flush=True) + try: + if 'node' in locals(): + node.stop() + except: + pass import traceback traceback.print_exc() time.sleep(10) diff --git a/agent-node/src/agent_node/skills/base.py b/agent-node/src/agent_node/skills/base.py index 33c88ec..eafe500 100644 --- a/agent-node/src/agent_node/skills/base.py +++ b/agent-node/src/agent_node/skills/base.py @@ -8,6 +8,10 @@ """Attempts to cancel the task and returns success status.""" return False + def is_available(self) -> bool: + """Returns True if the skill's dependencies and hardware are available.""" + return True + def shutdown(self): """Cleanup resources on node exit.""" pass diff --git a/agent-node/src/agent_node/skills/browser_bridge.py b/agent-node/src/agent_node/skills/browser_bridge.py new file mode 100644 index 0000000..babddd5 --- /dev/null +++ b/agent-node/src/agent_node/skills/browser_bridge.py @@ -0,0 +1,548 @@ +import threading +import queue +import time +import json +import re +try: + from playwright.sync_api import sync_playwright +except ImportError: + sync_playwright = None +from agent_node.skills.base import BaseSkill +from protos import agent_pb2 +try: + from agent_node import config as node_config +except ImportError: + node_config = None + +# ============================================================ +# Role-Ref Registry +# Inspired by Openclaw's pw-role-snapshot.ts +# Maps `ref=eN` shorthand -> (role, name, nth) for every +# interactive / content element on the last snapshotted page. +# ============================================================ + +INTERACTIVE_ROLES = { + "button", "link", "textbox", "checkbox", "radio", "combobox", + "listbox", "menuitem", "menuitemcheckbox", "menuitemradio", + "option", "searchbox", "slider", "spinbutton", "switch", "tab", "treeitem", +} +CONTENT_ROLES = { + "heading", "cell", "gridcell", "columnheader", "rowheader", + "listitem", "article", "region", "main", "navigation", +} +STRUCTURAL_ROLES = { + "generic", "group", "list", "table", "row", "rowgroup", "grid", + "treegrid", "menu", "menubar", "toolbar", "tablist", "tree", + "directory", "document", "application", "presentation", "none", +} + + +def _build_aria_snapshot(aria_text: str) -> tuple[str, dict]: + """ + Parse Playwright's ariaSnapshot() output and annotate interactive/content + elements with stable [ref=eN] labels that the AI can refer back to. + Returns (annotated_snapshot, ref_map). + """ + lines = aria_text.split("\n") + refs = {} + counter = [0] + role_counts = {} # (role, name) -> count (for nth disambiguation) + output_lines = [] + + def next_ref(): + counter[0] += 1 + return f"e{counter[0]}" + + for line in lines: + m = re.match(r'^(\s*-\s*)(\w+)(?:\s+"([^"]*)")?(.*)$', line) + if not m: + output_lines.append(line) + continue + + prefix, role_raw, name, suffix = m.group(1), m.group(2), m.group(3), m.group(4) + role = role_raw.lower() + + is_interactive = role in INTERACTIVE_ROLES + is_content_with_name = role in CONTENT_ROLES and name + + if not (is_interactive or is_content_with_name): + output_lines.append(line) + continue + + # assign ref + ref = next_ref() + key = (role, name) + nth = role_counts.get(key, 0) + role_counts[key] = nth + 1 + + refs[ref] = {"role": role, "name": name, "nth": nth if nth > 0 else None} + + enhanced = f"{prefix}{role_raw}" + if name: + enhanced += f' "{name}"' + enhanced += f" [ref={ref}]" + if nth > 0: + enhanced += f" [nth={nth}]" + if suffix: + enhanced += suffix + output_lines.append(enhanced) + + return "\n".join(output_lines), refs + + +def _resolve_ref(page, ref: str, role_refs: dict): + """Resolve a [ref=eN] string to a Playwright Locator.""" + info = role_refs.get(ref) + if not info: + raise ValueError(f"Unknown ref '{ref}'. Run aria_snapshot first and use a ref from that output.") + role = info["role"] + name = info.get("name") + nth = info.get("nth") or 0 + if name: + loc = page.get_by_role(role, name=name, exact=True) + else: + loc = page.get_by_role(role) + if nth: + loc = loc.nth(nth) + return loc + + +class BrowserSkill(BaseSkill): + """ + Persistent Browser Skill โ€” OpenClaw-inspired role-snapshot architecture. + + Key innovation over the prior version: + - `aria_snapshot` action returns a compact semantic role tree with [ref=eN] labels. + - All `click`, `type`, `hover` actions accept either a CSS/XPath selector OR a + ref string like 'e3', enabling the AI to address elements without fragile selectors. + - Page errors and console output are tracked per-session and included in results. + - Supports headed (UI visible) mode for OIDC/OAuth login flows, toggled live from the hub. + """ + def __init__(self, sync_mgr=None): + self.task_queue = queue.Queue() + # session_id -> { "context", "page", "role_refs", "console", "errors", "download_dir" } + self.sessions = {} + self.sync_mgr = sync_mgr + self.lock = threading.Lock() + self._headless = self._read_headless_config() + self._actor_thread = threading.Thread(target=self._browser_actor, daemon=True, name="BrowserActor") + self._actor_thread.start() + + def is_available(self) -> bool: + return sync_playwright is not None + + def _read_headless_config(self) -> bool: + """Read headless preference from node config (default: True).""" + if node_config: + return getattr(node_config, 'BROWSER_HEADLESS', True) + return True + + def apply_config(self, skill_config: dict): + """ + Called by the node when the Hub pushes a skill config update. + skill_config example: {"browser": {"headless": false}} + If headless mode changed, gracefully restarts the browser engine. + """ + browser_cfg = skill_config.get("browser", {}) + if sync_playwright is None: + return + + # Also respect the global node_config toggle + new_headless = browser_cfg.get("headless", self._read_headless_config()) + if new_headless == self._headless: + # Only log if specifically requested to headed mode or if we are indeed headed + if not self._headless: + print(f" [๐ŸŒ] Browser mode remains: headed") + return # No change + + mode_str = "headless -> headed" if not new_headless else "headed -> headless" + print(f" [๐ŸŒ] Browser mode changing: {mode_str}") + self._headless = new_headless + # Signal the actor to restart with the new mode + self.task_queue.put("__restart__") + + # ------------------------------------------------------------------ + # Session Management + # ------------------------------------------------------------------ + + def _get_or_create_session(self, browser, sid, task, on_event): + """Return existing session dict or create a new one.""" + with self.lock: + if sid in self.sessions: + return self.sessions[sid] + + download_dir = None + if self.sync_mgr and task.session_id: + download_dir = self.sync_mgr.get_session_dir(task.session_id) + print(f" [๐ŸŒ๐Ÿ“] Mapping Browser Context to: {download_dir}") + + ctx = browser.new_context(accept_downloads=True) + page = ctx.new_page() + + sess = { + "context": ctx, + "page": page, + "role_refs": {}, # ref -> {role, name, nth} + "console": [], + "errors": [], + "download_dir": download_dir, + } + self.sessions[sid] = sess + + # Listeners + self._attach_listeners(sid, page, on_event, sess) + return sess + + def _attach_listeners(self, sid, page, on_event, sess): + # Console log capture + def _on_console(msg): + entry = {"level": msg.type, "text": msg.text, "ts": int(time.time() * 1000)} + sess["console"].append(entry) + if len(sess["console"]) > 200: + sess["console"].pop(0) + if on_event: + on_event(agent_pb2.BrowserEvent( + session_id=sid, + console_msg=agent_pb2.ConsoleMessage( + level=msg.type, text=msg.text, timestamp_ms=entry["ts"] + ) + )) + + def _on_page_error(err): + sess["errors"].append({"message": str(err), "ts": int(time.time() * 1000)}) + if len(sess["errors"]) > 100: + sess["errors"].pop(0) + + def _on_network(req): + resp = req.response() + if on_event: + on_event(agent_pb2.BrowserEvent( + session_id=sid, + network_req=agent_pb2.NetworkRequest( + method=req.method, url=req.url, + status=resp.status if resp else 0, + resource_type=req.resource_type, latency_ms=0 + ) + )) + + def _on_download(dl): + import os + with self.lock: + s = self.sessions.get(sid) + if s and s.get("download_dir"): + os.makedirs(s["download_dir"], exist_ok=True) + target = os.path.join(s["download_dir"], dl.suggested_filename) + print(f" [๐ŸŒ๐Ÿ“ฅ] Download: {dl.suggested_filename} -> {target}") + dl.save_as(target) + + page.on("console", _on_console) + page.on("pageerror", _on_page_error) + page.on("requestfinished", _on_network) + page.on("download", _on_download) + + # ------------------------------------------------------------------ + # Browser Actor Loop + # ------------------------------------------------------------------ + + def _browser_actor(self): + """ + Immortal worker thread for browser operations. + Processes the task queue and manages the Playwright lifecycle. + """ + if sync_playwright is None: + return + + print("[๐ŸŒ] Browser Actor Starting...", flush=True) + pw = browser = None + + def _cleanup_internal(): + nonlocal pw, browser + print("[๐ŸŒ] Cleaning up Browser Engine...", flush=True) + with self.lock: + for s in self.sessions.values(): + try: s["context"].close() + except: pass + self.sessions.clear() + if browser: + try: browser.close() + except: pass + if pw: + try: pw.stop() + except: pass + browser = pw = None + + while True: + item = self.task_queue.get() + if item is None: + _cleanup_internal() + print("[๐ŸŒ] Browser Actor Shutting Down.", flush=True) + break + + # Handle Restart Signal + if item == "__restart__": + print("[๐ŸŒ] Browser Actor Restarting (Mode Change)...", flush=True) + _cleanup_internal() + continue + + task, sandbox, on_complete, on_event = item + + # --- Lazy Initialization --- + if not pw or not browser: + try: + pw = sync_playwright().start() + headless = self._headless + launch_mode = "headless" if headless else "headed" + print(f"[๐ŸŒ] Launching Chromium in {launch_mode} mode...", flush=True) + + args = ['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage'] + if headless: args.append('--disable-gpu') + + browser = pw.chromium.launch(headless=headless, args=args) + print("[๐ŸŒ] Browser Engine Online.", flush=True) + except Exception as e: + print(f"[!] Browser Setup Failed: {e}", flush=True) + # Report back to AI immediately so it doesn't hang + on_complete(task.task_id, {"stderr": f"Playwright/Chromium Error: {e}", "status": 1}, task.trace_id) + _cleanup_internal() + continue + + # --- Task Execution --- + try: + action = task.browser_action + sid = action.session_id or "default" + action_name = agent_pb2.BrowserAction.ActionType.Name(action.action) + print(f" [๐ŸŒ] {action_name} | Session: {sid}", flush=True) + + sess = self._get_or_create_session(browser, sid, task, on_event) + page = sess.get("page") + res_data = {} + + try: + self._dispatch_action(action, page, sess, res_data) + # Support for offloading large results to files + self._maybe_offload(sess, res_data, on_event) + except Exception as e: + on_complete(task.task_id, {"stderr": str(e), "status": 1}, task.trace_id) + continue + + try: + # Build BrowserResponse + br_res = agent_pb2.BrowserResponse( + url=page.url if page else "", + title=page.title() if page else "", + snapshot=res_data.get("snapshot", b""), + dom_content=res_data.get("dom_content", ""), + a11y_tree=res_data.get("a11y_tree", ""), + eval_result=res_data.get("eval_result", ""), + offloaded=res_data.get("offloaded", False), + ) + on_complete(task.task_id, {"status": 0, "browser_result": br_res}, task.trace_id) + except Exception as ex: + print(f" [!] Error building response: {ex}", flush=True) + on_complete(task.task_id, {"stderr": f"Result parsing error: {ex}", "status": 1}, task.trace_id) + + except Exception as e: + print(f" [!] Browser Actor Loop Exception: {e}", flush=True) + try: + on_complete(task.task_id, {"stderr": f"Actor internal error: {e}", "status": 1}, task.trace_id) + except: + pass + + def _maybe_offload(self, sess, res_data, on_event): + """Offload large strings/bytes to files in the sync directory.""" + download_dir = sess.get("download_dir") + if not download_dir: + return + + import os + import hashlib + offload_dir = os.path.join(download_dir, ".cortex_browser") + os.makedirs(offload_dir, exist_ok=True) + + OFFLOAD_THRESHOLD = 128 * 1024 # 128KB + offloaded = False + + files_to_push = [] + + # 1. Aria Snapshot + a11y = res_data.get("a11y_tree", "") + if a11y and len(a11y) > OFFLOAD_THRESHOLD: + rel_path = ".cortex_browser/last_a11y.txt" + abs_path = os.path.join(offload_dir, "last_a11y.txt") + with open(abs_path, "w") as f: + f.write(a11y) + res_data["a11y_tree"] = f"[OFFLOADED to {rel_path}]" + files_to_push.append((rel_path, a11y.encode('utf-8'))) + offloaded = True + + # 2. DOM Content + dom = res_data.get("dom_content", "") + if dom and len(dom) > OFFLOAD_THRESHOLD: + rel_path = ".cortex_browser/last_dom.html" + abs_path = os.path.join(offload_dir, "last_dom.html") + with open(abs_path, "w") as f: + f.write(dom) + res_data["dom_content"] = f"[OFFLOADED to {rel_path}]" + files_to_push.append((rel_path, dom.encode('utf-8'))) + offloaded = True + + # 3. Screenshot + snap = res_data.get("snapshot", b"") + if snap and len(snap) > OFFLOAD_THRESHOLD: + rel_path = ".cortex_browser/last_screenshot.png" + abs_path = os.path.join(offload_dir, "last_screenshot.png") + with open(abs_path, "wb") as f: + f.write(snap) + res_data["snapshot"] = f"[OFFLOADED to {rel_path}]".encode('utf-8') + files_to_push.append((rel_path, snap)) + offloaded = True + + if offloaded: + res_data["offloaded"] = True + # Proactively push files via event bus so they reach Hub BEFORE TaskResponse + if on_event: + for rel_p, data in files_to_push: + self._push_sync_event(sess, rel_p, data, on_event) + print(f" [๐ŸŒ๐Ÿ“] Browser Result Offloaded and Synced to Hub: {offload_dir}") + + def _push_sync_event(self, sess, rel_path, data, on_event): + """Manually chunk data into FileSyncMessage events to ensure Hub-side availability.""" + import hashlib + full_hash = hashlib.sha256(data).hexdigest() + chunk_size = 1024 * 256 # 256KB chunks + + # We need the session_id that the node uses for syncing + # In BrowserSkill, sess["download_dir"] is SYNC_DIR / session_id + # We can extract session_id back from download_dir or just pass it in + download_dir = sess.get("download_dir") + session_id = os.path.basename(download_dir) + + t_id = f"br-sync-{int(time.time()*1000)}" + + for i in range(0, len(data), chunk_size): + chunk = data[i:i+chunk_size] + is_final = (i + chunk_size) >= len(data) + + payload = agent_pb2.FilePayload( + path=rel_path, + chunk=chunk, + chunk_index=i // chunk_size, + is_final=is_final, + hash=full_hash if is_final else "" + ) + + msg = agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + file_data=payload, + task_id=t_id + ) + ) + on_event(msg) + + # ------------------------------------------------------------------ + # Action Dispatcher + # ------------------------------------------------------------------ + + def _get_aria_snapshot_safe(self, page): + """Safe aria_snapshot with fallback for older Playwright versions.""" + try: + if hasattr(page.locator(":root"), "aria_snapshot"): + # Use a locator-level timeout of 10s to prevent hanging the actor thread + return page.locator(":root").aria_snapshot(timeout=10000) + + # Fallback for Playwright < 1.44 + print(" [๐ŸŒโš ๏ธ] aria_snapshot() not available in this Playwright version. Using fallback.") + return "- application \"Browser\"\n - generic \"Accessibility Tree Unavailable (Playwright < 1.44)\"" + except Exception as e: + print(f" [๐ŸŒโŒ] Error generating aria snapshot: {e}") + return f"- error \"Failed to generate accessibility tree (Timeout or version issue): {e}\"" + + def _dispatch_action(self, action, page, sess, res_data): + A = agent_pb2.BrowserAction + role_refs = sess["role_refs"] + + def resolve(selector_or_ref: str): + """Accept either a CSS selector or a ref like 'e3'.""" + s = (selector_or_ref or "").strip() + if re.match(r'^e\d+$', s): + return _resolve_ref(page, s, role_refs) + return page.locator(s) + + if action.action == A.NAVIGATE: + # PERFORMANCE: Remove auto-snapshot on navigation for Mac Mini stability. + # Large Aria trees cause massive CPU bursts and gRPC timeouts. + page.goto(action.url, wait_until="domcontentloaded", timeout=45000) + res_data["eval_result"] = "Navigation successful." + + elif action.action == A.CLICK: + target = action.selector or "" + resolve(target).click(timeout=8000) + + elif action.action == A.TYPE: + target = action.selector or "" + resolve(target).fill(action.text, timeout=8000) + + elif action.action == A.SCREENSHOT: + res_data["snapshot"] = page.screenshot(full_page=False) + + elif action.action == A.GET_DOM: + res_data["dom_content"] = page.content() + + elif action.action == A.HOVER: + target = action.selector or "" + resolve(target).hover(timeout=5000) + + elif action.action == A.SCROLL: + page.mouse.wheel(x=0, y=action.y or 400) + + elif action.action == A.EVAL: + result = page.evaluate(action.text) + res_data["eval_result"] = str(result) + + elif action.action == A.GET_A11Y: + aria_raw = self._get_aria_snapshot_safe(page) + snap, refs = _build_aria_snapshot(aria_raw) + sess["role_refs"] = refs + + # Trim large snapshots + res_data["a11y_tree"] = snap[:12000] + stats = { + "total_refs": len(refs), + "url": page.url, + "title": page.title(), + } + res_data["eval_result"] = json.dumps(stats) + + # Trim large snapshots (news pages can be huge) + MAX = 10000 + if len(snap) > MAX: + snap = snap[:MAX] + "\n\n[...snapshot truncated - use eval/scroll to see more...]" + + stats = { + "total_refs": len(refs), + "interactive": sum(1 for r in refs.values() if r["role"] in INTERACTIVE_ROLES), + "url": page.url, + "title": page.title(), + } + res_data["a11y_tree"] = snap + res_data["eval_result"] = json.dumps(stats) + + elif action.action == A.CLOSE: + with self.lock: + s = self.sessions.pop(action.session_id or "default", None) + if s: + s["context"].close() + + # ------------------------------------------------------------------ + # Public Interface + # ------------------------------------------------------------------ + + def execute(self, task, sandbox, on_complete, on_event=None): + self.task_queue.put((task, sandbox, on_complete, on_event)) + + def cancel(self, task_id): + return False + + def shutdown(self): + self.task_queue.put(None) diff --git a/agent-node/src/agent_node/skills/file_bridge.py b/agent-node/src/agent_node/skills/file_bridge.py new file mode 100644 index 0000000..b441d73 --- /dev/null +++ b/agent-node/src/agent_node/skills/file_bridge.py @@ -0,0 +1,77 @@ +import os +import json +import logging +from agent_node.skills.base import BaseSkill + +logger = logging.getLogger(__name__) + +class FileSkill(BaseSkill): + """Provides file system navigation and inspection capabilities.""" + + def __init__(self, sync_mgr=None): + self.sync_mgr = sync_mgr + + def execute(self, task, sandbox, on_complete, on_event=None): + """ + Executes a file-related task (list, stats). + Payload JSON: { "action": "list", "path": "...", "recursive": false } + """ + try: + payload = json.loads(task.payload_json) + action = payload.get("action", "list") + path = payload.get("path", ".") + + # 1. Sandbox Jail Check + # (In a real implementation, we'd use sandbox.check_path(path)) + # For now, we'll assume the node allows browsing its root or session dir. + + if action == "list": + result = self._list_dir(path, payload.get("recursive", False)) + on_complete(task.task_id, {"status": 0, "stdout": json.dumps(result)}, task.trace_id) + else: + on_complete(task.task_id, {"status": 1, "stderr": f"Unknown action: {action}"}, task.trace_id) + + except Exception as e: + logger.error(f"[FileSkill] Task {task.task_id} failed: {e}") + on_complete(task.task_id, {"status": 1, "stderr": str(e)}, task.trace_id) + + def _list_dir(self, path, recursive=False): + """Lists directory contents with metadata.""" + if not os.path.exists(path): + return {"error": "Path not found"} + + items = [] + if recursive: + for root, dirs, files in os.walk(path): + for name in dirs + files: + abs_path = os.path.join(root, name) + rel_path = os.path.relpath(abs_path, path) + st = os.stat(abs_path) + items.append({ + "name": name, + "path": rel_path, + "is_dir": os.path.isdir(abs_path), + "size": st.st_size, + "mtime": st.st_mtime + }) + else: + for name in os.listdir(path): + abs_path = os.path.join(path, name) + st = os.stat(abs_path) + items.append({ + "name": name, + "is_dir": os.path.isdir(abs_path), + "size": st.st_size, + "mtime": st.st_mtime + }) + + return { + "root": os.path.abspath(path), + "items": sorted(items, key=lambda x: (not x["is_dir"], x["name"])) + } + + def cancel(self, task_id): + return False # Listing is usually fast, no cancellation needed + + def shutdown(self): + pass diff --git a/agent-node/src/agent_node/skills/manager.py b/agent-node/src/agent_node/skills/manager.py index f565b0e..74dbd3d 100644 --- a/agent-node/src/agent_node/skills/manager.py +++ b/agent-node/src/agent_node/skills/manager.py @@ -11,19 +11,53 @@ self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker") self.active_tasks = {} # task_id -> future self.sync_mgr = sync_mgr - self.skills = self._discover_skills(sync_mgr) self.max_workers = max_workers self.lock = threading.Lock() + + # 1. Start with Hardcoded Bridges (Pure Programmatic Logic) + self.skills = self._load_core_bridges(sync_mgr) + + # 2. Optionally supplement with dynamic skills from disk + dynamic_skills = self._discover_skills(sync_mgr) + self.skills.update(dynamic_skills) + + def _load_core_bridges(self, sync_mgr): + """Hard-imports the core execution bridges to ensure they are always available.""" + bridges = {} + # Shell Bridge + try: + from agent_node.skills.shell_bridge import ShellSkill + instance = ShellSkill(sync_mgr=sync_mgr) + bridges["shell"] = instance + bridges["mesh-terminal-control"] = instance + print(" [๐Ÿ”ง๐Ÿ“ฆ] Core Shell Bridge Loaded.") + except ImportError as e: + print(f" [๐Ÿ”งโš ๏ธ] Fatal: Core Shell Bridge not found: {e}") + + # File Bridge + try: + from agent_node.skills.file_bridge import FileSkill + instance = FileSkill(sync_mgr=sync_mgr) + bridges["file"] = instance + bridges["mesh-file-explorer"] = instance + print(" [๐Ÿ”ง๐Ÿ“ฆ] Core File Bridge Loaded.") + except ImportError: pass + + # Browser Bridge + try: + from agent_node.skills.browser_bridge import BrowserSkill + instance = BrowserSkill(sync_mgr=sync_mgr) + bridges["browser"] = instance + bridges["browser-automation-agent"] = instance + print(" [๐Ÿ”ง๐Ÿ“ฆ] Core Browser Bridge Loaded.") + except ImportError: pass + + return bridges def _discover_skills(self, sync_mgr): - """Scans the skills/ directory for logic.py and loads skill implementations.""" + """Scans the disk for additional logic.py plugins (supplemental).""" # Find candidate locations for skills - # 1. Monorepo root (../../../skills from this file) - # 2. Agent-node local (../../skills from this file) - # 3. Docker standard (/app/skills) candidates = [ - os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../skills")), - os.path.abspath(os.path.join(os.path.dirname(__file__), "../../skills")), "/app/skills", "/app/node_skills" ] @@ -31,7 +65,6 @@ skills_dir = None for cand in candidates: if os.path.exists(cand) and os.path.isdir(cand): - # Ensure it's not a broken symlink and has actual content try: if any(os.path.isdir(os.path.join(cand, d)) for d in os.listdir(cand)): skills_dir = cand @@ -41,11 +74,12 @@ discovered = {} if not skills_dir: - print(f" [๐Ÿ”งโš ๏ธ] Skills directory not found in candidate locations: {candidates}") return discovered - print(f" [๐Ÿ”ง] Using skills directory: {skills_dir}") + print(f" [๐Ÿ”ง] Scanning supplemental skills: {skills_dir}") for skill_dir in os.listdir(skills_dir): + if skill_dir in self.skills: continue # Skip if already hardcoded + item_path = os.path.join(skills_dir, skill_dir) if os.path.isdir(item_path): logic_py = os.path.join(item_path, "logic.py") @@ -54,29 +88,23 @@ try: spec = importlib.util.spec_from_file_location(f"skill_{skill_dir}", logic_py) module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) + if spec.loader: + spec.loader.exec_module(module) - # Find the first class that inherits from BaseSkill + # Robust class detection for attr_name in dir(module): attr = getattr(module, attr_name) - if isinstance(attr, type) and issubclass(attr, BaseSkill) and attr is not BaseSkill: - # We map the internal skill name (e.g. mesh_terminal_control) - # if we can find it in the module or assume it based on folder name - # For backward compatibility with task_type routing, we map common ones - instance = attr(sync_mgr=sync_mgr) - discovered[skill_dir] = instance - # Also map legacy names for the routing engine below - if "terminal" in skill_dir or "shell" in skill_dir: - discovered["shell"] = instance - if "browser" in skill_dir: - discovered["browser"] = instance - if "file" in skill_dir: - discovered["file"] = instance - break + if isinstance(attr, type) and any(b.__name__ == 'BaseSkill' for b in attr.__mro__) and attr.__name__ != 'BaseSkill': + try: + instance = attr(sync_mgr=sync_mgr) + discovered[skill_dir] = instance + print(f" [๐Ÿ”งโœ…] Loaded supplemental skill: {skill_dir}") + except Exception as e: + print(f" [๐Ÿ”งโŒ] Failed to instantiate skill {skill_dir}: {e}") + break except Exception as e: print(f" [๐Ÿ”งโŒ] Failed to load skill from {logic_py}: {e}") - print(f" [๐Ÿ”ง] Discovered skills: {list(discovered.keys())}") return discovered def submit(self, task, sandbox, on_complete, on_event=None): diff --git a/agent-node/src/agent_node/skills/shell_bridge.py b/agent-node/src/agent_node/skills/shell_bridge.py new file mode 100644 index 0000000..479bbb6 --- /dev/null +++ b/agent-node/src/agent_node/skills/shell_bridge.py @@ -0,0 +1,413 @@ +import os +import pty +import select +import threading +import time +import termios +import struct +import fcntl +import tempfile +from agent_node.skills.base import BaseSkill +from protos import agent_pb2 + +class ShellSkill(BaseSkill): + """Admin Console Skill: Persistent stateful Bash via PTY.""" + def __init__(self, sync_mgr=None): + self.sync_mgr = sync_mgr + self.sessions = {} # session_id -> {fd, pid, thread, last_activity, ...} + self.lock = threading.Lock() + + # Phase 3: Prompt Patterns for Edge Intelligence + self.PROMPT_PATTERNS = [ + r"[\r\n].*[@\w\.\-]+:.*[#$]\s*$", # bash/zsh: user@host:~$ + r">>>\s*$", # python + r"\.\.\.\s*$", # python multi-line + r">\s*$", # node/js + ] + + # --- M7: Idle Session Reaper --- + # Automatically kills dormant bash processes to free up system resources. + self.reaper_thread = threading.Thread(target=self._session_reaper, daemon=True, name="ShellReaper") + self.reaper_thread.start() + + def _session_reaper(self): + """Background thread that cleans up unused PTY sessions.""" + while True: + time.sleep(60) + with self.lock: + now = time.time() + for sid, sess in list(self.sessions.items()): + # Avoid reaping currently active tasks + if sess.get("active_task"): + continue + + # 10 minute idle timeout + if now - sess.get("last_activity", 0) > 600: + print(f" [๐Ÿš๐Ÿงน] Reaping idle shell session: {sid}") + try: + os.close(sess["fd"]) + os.kill(sess["pid"], 9) + except: pass + self.sessions.pop(sid, None) + + def _ensure_session(self, session_id, cwd, on_event): + with self.lock: + if session_id in self.sessions: + self.sessions[session_id]["last_activity"] = time.time() + return self.sessions[session_id] + + print(f" [๐Ÿš] Initializing Persistent Shell Session: {session_id}") + # Spawn bash in a pty + pid, fd = pty.fork() + if pid == 0: # Child + # Environment prep + os.environ["TERM"] = "xterm-256color" + + # Change to CWD + if cwd and os.path.exists(cwd): + os.chdir(cwd) + + # Launch shell + shell_path = "/bin/bash" + if not os.path.exists(shell_path): + shell_path = "/bin/sh" + os.execv(shell_path, [shell_path, "--login"]) + + # Parent + # Set non-blocking + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + sess = { + "fd": fd, + "pid": pid, + "last_activity": time.time(), + "buffer_file": None, + "tail_buffer": "", + "active_task": None + } + + def reader(): + while True: + try: + r, _, _ = select.select([fd], [], [], 0.1) + if fd in r: + data = os.read(fd, 4096) + if not data: break + + decoded = data.decode("utf-8", errors="replace") + + # Streaming/Sync logic (Detect completion marker) + with self.lock: + active_tid = sess.get("active_task") + marker = sess.get("marker") + if active_tid and marker and sess.get("buffer_file"): + # Phase 2: Persistence Offloading + # Write directly to disk instead of heap memory + sess["buffer_file"].write(decoded) + sess["buffer_file"].flush() + + # Keep a tiny 4KB tail in RAM for marker detection and prompt scanning + sess["tail_buffer"] = (sess.get("tail_buffer", "") + decoded)[-4096:] + + if marker in sess["tail_buffer"]: + # Marker found! Extract exit code + try: + # The tail buffer has the marker + after_marker = sess["tail_buffer"].split(marker)[1].strip().split() + exit_code = int(after_marker[0]) if after_marker else 0 + + # Formulate final stdout summary from the disk file + bf = sess["buffer_file"] + bf.seek(0, 2) + file_len = bf.tell() + + HEAD, TAIL = 10_000, 30_000 + if file_len > HEAD + TAIL: + bf.seek(0) + head_str = bf.read(HEAD) + bf.seek(file_len - TAIL) + tail_str = bf.read() + omitted = file_len - HEAD - TAIL + pure_stdout = head_str + f"\n\n[... {omitted:,} bytes omitted (full output safely preserved at {bf.name}) ...]\n\n" + tail_str + else: + bf.seek(0) + pure_stdout = bf.read() + + # Slice off the marker string and anything after it from the final result + pure_stdout = pure_stdout.split(marker)[0] + + sess["result"]["stdout"] = pure_stdout + sess["result"]["status"] = 0 if exit_code == 0 else 1 + + # Close the file handle (leaves file on disk) + sess["buffer_file"].close() + sess["buffer_file"] = None + + sess["event"].set() + decoded = pure_stdout.split(marker)[0][-4096:] if marker in pure_stdout else pure_stdout + except Exception as e: + print(f" [๐Ÿšโš ๏ธ] Marker parsing failed: {e}") + sess["event"].set() + + # Stream terminal output back (with stealth filtering) + if on_event: + stealth_out = decoded + if "__CORTEX_FIN_SH_" in decoded: + import re + # We remove any line that contains our internal marker to hide plumbing from user. + # This covers both the initial command echo and the exit code output. + stealth_out = re.sub(r'.*__CORTEX_FIN_SH_.*[\r\n]*', '', decoded) + + if stealth_out: + # Phase 3: Client-Side Truncation (Stream Rate Limiting) + # Limit real-time stream to 15KB/sec per session to prevent flooding the Hub over gRPC. + # The full output is still safely written to the tempfile on disk. + with self.lock: + now = time.time() + if now - sess.get("stream_window_start", 0) > 1.0: + sess["stream_window_start"] = now + sess["stream_bytes_sent"] = 0 + dropped = sess.get("stream_dropped_bytes", 0) + if dropped > 0: + drop_msg = f"\n[... {dropped:,} bytes truncated from live stream ...]\n" + event = agent_pb2.SkillEvent( + session_id=session_id, task_id=sess.get("active_task") or "", terminal_out=drop_msg + ) + on_event(agent_pb2.ClientTaskMessage(skill_event=event)) + sess["stream_dropped_bytes"] = 0 + + if sess.get("stream_bytes_sent", 0) + len(stealth_out) > 15_000: + sess["stream_dropped_bytes"] = sess.get("stream_dropped_bytes", 0) + len(stealth_out) + else: + sess["stream_bytes_sent"] = sess.get("stream_bytes_sent", 0) + len(stealth_out) + event = agent_pb2.SkillEvent( + session_id=session_id, + task_id=sess.get("active_task") or "", + terminal_out=stealth_out + ) + on_event(agent_pb2.ClientTaskMessage(skill_event=event)) + + # EDGE INTELLIGENCE: Proactively signal prompt detection + # We only check for prompts if we are actively running a task and haven't found the marker yet. + if active_tid and not sess["event"].is_set(): + import re + tail = sess["tail_buffer"][-100:] if len(sess["tail_buffer"]) > 100 else sess["tail_buffer"] + for pattern in self.PROMPT_PATTERNS: + if re.search(pattern, tail): + # Send specific prompt signal + # Use last 20 chars as the 'prompt' hint + p_hint = tail[-20:].strip() + prompt_event = agent_pb2.SkillEvent( + session_id=session_id, + task_id=active_tid, + prompt=p_hint + ) + on_event(agent_pb2.ClientTaskMessage(skill_event=prompt_event)) + break + except (EOFError, OSError): + break + + # Thread Cleanup + print(f" [๐Ÿš] Shell Session Terminated: {session_id}") + with self.lock: + self.sessions.pop(session_id, None) + + t = threading.Thread(target=reader, daemon=True, name=f"ShellReader-{session_id}") + t.start() + sess["thread"] = t + + self.sessions[session_id] = sess + return sess + + + def handle_transparent_tty(self, task, on_complete, on_event=None): + """Processes raw TTY/Resize events synchronously (bypasses threadpool/sandbox).""" + cmd = task.payload_json + session_id = task.session_id or "default-session" + try: + import json + if cmd.startswith('{') and cmd.endswith('}'): + raw_payload = json.loads(cmd) + + # 1. Raw Keystroke forward + if isinstance(raw_payload, dict) and "tty" in raw_payload: + raw_bytes = raw_payload["tty"] + sess = self._ensure_session(session_id, None, on_event) + os.write(sess["fd"], raw_bytes.encode("utf-8")) + on_complete(task.task_id, {"stdout": "", "status": 0}, task.trace_id) + return True + + # 2. Window Resize + if isinstance(raw_payload, dict) and raw_payload.get("action") == "resize": + cols = raw_payload.get("cols", 80) + rows = raw_payload.get("rows", 24) + sess = self._ensure_session(session_id, None, on_event) + import termios, struct, fcntl + s = struct.pack('HHHH', rows, cols, 0, 0) + fcntl.ioctl(sess["fd"], termios.TIOCSWINSZ, s) + print(f" [๐Ÿš] Terminal Resized to {cols}x{rows}") + on_complete(task.task_id, {"stdout": f"resized to {cols}x{rows}", "status": 0}, task.trace_id) + return True + except Exception as pe: + print(f" [๐Ÿš] Transparent TTY Fail: {pe}") + return False + + def execute(self, task, sandbox, on_complete, on_event=None): + """Dispatches command string to the persistent PTY shell and WAITS for completion.""" + session_id = task.session_id or "default-session" + tid = task.task_id + try: + cmd = task.payload_json + + # --- Legacy Full-Command Execution (Sandboxed) --- + allowed, status_msg = sandbox.verify(cmd) + if not allowed: + err_msg = f"\r\n[System] Command blocked: {status_msg}\r\n" + if on_event: + event = agent_pb2.SkillEvent( + session_id=session_id, task_id=tid, + terminal_out=err_msg + ) + on_event(agent_pb2.ClientTaskMessage(skill_event=event)) + + return on_complete(tid, {"stderr": f"SANDBOX_VIOLATION: {status_msg}", "status": 2}, task.trace_id) + + # Resolve CWD jail + cwd = None + if self.sync_mgr and task.session_id: + cwd = self.sync_mgr.get_session_dir(task.session_id) + elif sandbox.policy.get("WORKING_DIR_JAIL"): + cwd = sandbox.policy["WORKING_DIR_JAIL"] + if not os.path.exists(cwd): + try: os.makedirs(cwd, exist_ok=True) + except: pass + + # Handle Session Persistent Process + sess = self._ensure_session(session_id, cwd, on_event) + + # Check for RAW mode first (bypasses busy check for interactive control) + is_raw = cmd.startswith("!RAW:") + if is_raw: + input_str = cmd[5:] + "\n" + print(f" [๐ŸšโŒจ๏ธ] RAW Input Injection: {input_str.strip()}") + os.write(sess["fd"], input_str.encode("utf-8")) + return on_complete(tid, {"stdout": "INJECTED", "status": 0}, task.trace_id) + + # --- 0. Busy Check: Serialize access to the PTY for standard commands --- + with self.lock: + if sess.get("active_task"): + curr_tid = sess.get("active_task") + return on_complete(tid, {"stderr": f"[BUSY] Session {session_id} is already running task {curr_tid}", "status": 1}, task.trace_id) + + # --- Blocking Wait Logic --- + # --- Blocking Wait Logic --- + marker_id = int(time.time()) + marker = f"__CORTEX_FIN_SH_{marker_id}__" + event = threading.Event() + result_container = {"stdout": "", "status": 0} # 0 = Success (Protobuf Convention) + + # Register waiter in session state + with self.lock: + sess["active_task"] = tid + sess["marker"] = marker + sess["event"] = event + # Create a persistent tempfile for stdout instead of RAM buffer + sess["buffer_file"] = tempfile.NamedTemporaryFile("w+", encoding="utf-8", prefix=f"cortex_task_{tid}_", delete=False) + sess["tail_buffer"] = "" + sess["result"] = result_container + sess["cancel_event"] = threading.Event() + + # Input injection: execute command then echo marker and exit code + try: + # 12-factor bash: ( cmd ) ; echo marker $? + # We use "" concatenation in the echo command to ensure the marker literal + # DOES NOT appear in the PTY input echo, preventing premature completion. + full_input = f"({cmd}) ; echo \"__CORTEX_FIN_SH_\"\"{marker_id}__\" $?\n" + os.write(sess["fd"], full_input.encode("utf-8")) + + # Wait for completion (triggered by reader) OR cancellation + timeout = (task.timeout_ms / 1000.0) if task.timeout_ms > 0 else 60.0 + start_time = time.time() + while time.time() - start_time < timeout: + # Check for completion (reader found marker) + if event.is_set(): + return on_complete(tid, result_container, task.trace_id) + + # Check for cancellation (HUB sent cancel) + if sess["cancel_event"].is_set(): + print(f" [๐Ÿš๐Ÿ›‘] Task {tid} cancelled on node.") + return on_complete(tid, {"stderr": "ABORTED", "status": 2}, task.trace_id) + + # Sleep slightly to avoid busy loop + time.sleep(0.1) + + # Timeout Case + print(f" [๐Ÿšโš ๏ธ] Task {tid} timed out on node.") + with self.lock: + if sess.get("buffer_file"): + try: + sess["buffer_file"].seek(0, 2) + file_len = sess["buffer_file"].tell() + HEAD, TAIL = 10_000, 30_000 + if file_len > HEAD + TAIL: + sess["buffer_file"].seek(0) + head_str = sess["buffer_file"].read(HEAD) + sess["buffer_file"].seek(file_len - TAIL) + tail_str = sess["buffer_file"].read() + omitted = file_len - HEAD - TAIL + partial_out = head_str + f"\n\n[... {omitted:,} bytes omitted (full timeout output saved to {sess['buffer_file'].name}) ...]\n\n" + tail_str + else: + sess["buffer_file"].seek(0) + partial_out = sess["buffer_file"].read() + except: + partial_out = "" + else: + partial_out = "" + + on_complete(tid, {"stdout": partial_out, "stderr": "TIMEOUT", "status": 2}, task.trace_id) + + finally: + # Cleanup session task state + with self.lock: + if sess.get("active_task") == tid: + if sess.get("buffer_file"): + try: + sess["buffer_file"].close() + except: pass + sess["buffer_file"] = None + sess["active_task"] = None + sess["marker"] = None + sess["event"] = None + sess["result"] = None + sess["cancel_event"] = None + + except Exception as e: + print(f" [๐ŸšโŒ] Execute Error for {tid}: {e}") + on_complete(tid, {"stderr": str(e), "status": 2}, task.trace_id) + + def cancel(self, task_id: str): + """Cancels an active task โ€” for persistent shell, this sends a SIGINT (Ctrl+C).""" + with self.lock: + for sid, sess in self.sessions.items(): + if sess.get("active_task") == task_id: + print(f"[๐Ÿ›‘] Sending SIGINT (Ctrl+C) to shell session (Task {task_id}): {sid}") + # Write \x03 (Ctrl+C) to the master FD + os.write(sess["fd"], b"\x03") + # Break the wait loop in execute thread + if sess.get("cancel_event"): + sess["cancel_event"].set() + return True + + + def shutdown(self): + """Cleanup: Terminates all persistent shells.""" + with self.lock: + for sid, sess in list(self.sessions.items()): + print(f"[๐Ÿ›‘] Cleaning up persistent shell: {sid}") + try: os.close(sess["fd"]) + except: pass + # kill pid + try: os.kill(sess["pid"], 9) + except: pass + self.sessions.clear() diff --git a/agent-node/src/agent_node/utils/network.py b/agent-node/src/agent_node/utils/network.py index 04b97c3..58abf64 100644 --- a/agent-node/src/agent_node/utils/network.py +++ b/agent-node/src/agent_node/utils/network.py @@ -7,7 +7,7 @@ """Initializes a gRPC channel (Secure or Insecure) and returns the orchestrator stub.""" options = [ - ('grpc.keepalive_time_ms', 30000), # Send keepalive ping every 30s + ('grpc.keepalive_time_ms', 10000), # Send keepalive ping every 10s ('grpc.keepalive_timeout_ms', 10000), # Wait 10s for pong ('grpc.keepalive_permit_without_calls', True), ('grpc.http2.max_pings_without_data', 0), # Allow infinite pings @@ -18,7 +18,7 @@ if not TLS_ENABLED: print(f"[!] TLS is disabled. Connecting via insecure channel to {SERVER_HOST_PORT}") channel = grpc.insecure_channel(SERVER_HOST_PORT, options=options) - return agent_pb2_grpc.AgentOrchestratorStub(channel) + return agent_pb2_grpc.AgentOrchestratorStub(channel), channel print(f"[*] Connecting via secure (mTLS) channel to {SERVER_HOST_PORT}") try: @@ -28,11 +28,10 @@ creds = grpc.ssl_channel_credentials(ca, pkey, cert) channel = grpc.secure_channel(SERVER_HOST_PORT, creds, options=options) - return agent_pb2_grpc.AgentOrchestratorStub(channel) + return agent_pb2_grpc.AgentOrchestratorStub(channel), channel except FileNotFoundError as e: print(f"[!] mTLS Certificate files not found: {e}. Falling back to standard TLS (Server Verify)...") # Fallback to standard TLS (uses system CA roots by default) creds = grpc.ssl_channel_credentials() channel = grpc.secure_channel(SERVER_HOST_PORT, creds, options=options) - return agent_pb2_grpc.AgentOrchestratorStub(channel) - + return agent_pb2_grpc.AgentOrchestratorStub(channel), channel diff --git a/agent-node/src/agent_pb2.py b/agent-node/src/agent_pb2.py new file mode 100644 index 0000000..7d12fb3 --- /dev/null +++ b/agent-node/src/agent_pb2.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: agent.proto +# Protobuf Python Version: 4.25.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61gent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe0\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\x12\x19\n\x11skill_config_json\x18\x06 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xa9\x02\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12(\n\x0bskill_event\x18\x06 \x01(\x0b\x32\x11.agent.SkillEventH\x00\x42\t\n\x07payload\"y\n\nSkillEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x16\n\x0cterminal_out\x18\x03 \x01(\tH\x00\x12\x10\n\x06prompt\x18\x04 \x01(\tH\x00\x12\x14\n\nkeep_alive\x18\x05 \x01(\x08H\x00\x42\x06\n\x04\x64\x61ta\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xbc\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12-\n\rpolicy_update\x18\x06 \x01(\x0b\x32\x14.agent.SandboxPolicyH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xd1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xef\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\x12\x11\n\toffloaded\x18\t \x01(\x08\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xe6\x02\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\x12\x11\n\tcpu_count\x18\x08 \x01(\x05\x12\x16\n\x0ememory_used_gb\x18\t \x01(\x02\x12\x17\n\x0fmemory_total_gb\x18\n \x01(\x02\x12\x1a\n\x12\x63pu_usage_per_core\x18\x0b \x03(\x02\x12\x14\n\x0c\x63pu_freq_mhz\x18\x0c \x01(\x02\x12\x1b\n\x13memory_available_gb\x18\r \x01(\x02\x12\x10\n\x08load_avg\x18\x0e \x03(\x02\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xe4\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x12\x0f\n\x07task_id\x18\x06 \x01(\tB\t\n\x07payload\"\xab\x02\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x15\n\rrequest_paths\x18\x03 \x03(\t\x12\x0f\n\x07\x63ontent\x18\x04 \x01(\x0c\x12\x0e\n\x06is_dir\x18\x05 \x01(\x08\"\xaa\x01\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\x12\x08\n\x04LIST\x10\x06\x12\x08\n\x04READ\x10\x07\x12\t\n\x05WRITE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\t\n\x05PURGE\x10\n\x12\x0b\n\x07\x43LEANUP\x10\x0b\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"\xad\x01\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\x12\x0e\n\x06offset\x18\x06 \x01(\x03\x12\x12\n\ncompressed\x18\x07 \x01(\x08\x12\x14\n\x0ctotal_chunks\x18\x08 \x01(\x05\x12\x12\n\ntotal_size\x18\t \x01(\x03\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_options = b'8\001' + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._options = None + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_options = b'8\001' + _globals['_REGISTRATIONREQUEST']._serialized_start=23 + _globals['_REGISTRATIONREQUEST']._serialized_end=245 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=194 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=245 + _globals['_SANDBOXPOLICY']._serialized_start=248 + _globals['_SANDBOXPOLICY']._serialized_end=472 + _globals['_SANDBOXPOLICY_MODE']._serialized_start=438 + _globals['_SANDBOXPOLICY_MODE']._serialized_end=472 + _globals['_REGISTRATIONRESPONSE']._serialized_start=474 + _globals['_REGISTRATIONRESPONSE']._serialized_end=594 + _globals['_CLIENTTASKMESSAGE']._serialized_start=597 + _globals['_CLIENTTASKMESSAGE']._serialized_end=894 + _globals['_SKILLEVENT']._serialized_start=896 + _globals['_SKILLEVENT']._serialized_end=1017 + _globals['_NODEANNOUNCE']._serialized_start=1019 + _globals['_NODEANNOUNCE']._serialized_end=1050 + _globals['_BROWSEREVENT']._serialized_start=1053 + _globals['_BROWSEREVENT']._serialized_end=1188 + _globals['_SERVERTASKMESSAGE']._serialized_start=1191 + _globals['_SERVERTASKMESSAGE']._serialized_end=1507 + _globals['_TASKCANCELREQUEST']._serialized_start=1509 + _globals['_TASKCANCELREQUEST']._serialized_end=1545 + _globals['_TASKREQUEST']._serialized_start=1548 + _globals['_TASKREQUEST']._serialized_end=1757 + _globals['_BROWSERACTION']._serialized_start=1760 + _globals['_BROWSERACTION']._serialized_end=2048 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1914 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=2048 + _globals['_TASKRESPONSE']._serialized_start=2051 + _globals['_TASKRESPONSE']._serialized_end=2403 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2283 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2331 + _globals['_TASKRESPONSE_STATUS']._serialized_start=2333 + _globals['_TASKRESPONSE_STATUS']._serialized_end=2393 + _globals['_BROWSERRESPONSE']._serialized_start=2406 + _globals['_BROWSERRESPONSE']._serialized_end=2645 + _globals['_CONSOLEMESSAGE']._serialized_start=2647 + _globals['_CONSOLEMESSAGE']._serialized_end=2714 + _globals['_NETWORKREQUEST']._serialized_start=2716 + _globals['_NETWORKREQUEST']._serialized_end=2820 + _globals['_WORKPOOLUPDATE']._serialized_start=2822 + _globals['_WORKPOOLUPDATE']._serialized_end=2866 + _globals['_TASKCLAIMREQUEST']._serialized_start=2868 + _globals['_TASKCLAIMREQUEST']._serialized_end=2920 + _globals['_TASKCLAIMRESPONSE']._serialized_start=2922 + _globals['_TASKCLAIMRESPONSE']._serialized_end=2991 + _globals['_HEARTBEAT']._serialized_start=2994 + _globals['_HEARTBEAT']._serialized_end=3352 + _globals['_HEALTHCHECKRESPONSE']._serialized_start=3354 + _globals['_HEALTHCHECKRESPONSE']._serialized_end=3399 + _globals['_FILESYNCMESSAGE']._serialized_start=3402 + _globals['_FILESYNCMESSAGE']._serialized_end=3630 + _globals['_SYNCCONTROL']._serialized_start=3633 + _globals['_SYNCCONTROL']._serialized_end=3932 + _globals['_SYNCCONTROL_ACTION']._serialized_start=3762 + _globals['_SYNCCONTROL_ACTION']._serialized_end=3932 + _globals['_DIRECTORYMANIFEST']._serialized_start=3934 + _globals['_DIRECTORYMANIFEST']._serialized_end=4004 + _globals['_FILEINFO']._serialized_start=4006 + _globals['_FILEINFO']._serialized_end=4074 + _globals['_FILEPAYLOAD']._serialized_start=4077 + _globals['_FILEPAYLOAD']._serialized_end=4250 + _globals['_SYNCSTATUS']._serialized_start=4253 + _globals['_SYNCSTATUS']._serialized_end=4413 + _globals['_SYNCSTATUS_CODE']._serialized_start=4347 + _globals['_SYNCSTATUS_CODE']._serialized_end=4413 + _globals['_AGENTORCHESTRATOR']._serialized_start=4416 + _globals['_AGENTORCHESTRATOR']._serialized_end=4649 +# @@protoc_insertion_point(module_scope) diff --git a/agent-node/src/agent_pb2_grpc.py b/agent-node/src/agent_pb2_grpc.py new file mode 100644 index 0000000..932d45e --- /dev/null +++ b/agent-node/src/agent_pb2_grpc.py @@ -0,0 +1,138 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import agent_pb2 as agent__pb2 + + +class AgentOrchestratorStub(object): + """The Cortex Server exposes this service + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SyncConfiguration = channel.unary_unary( + '/agent.AgentOrchestrator/SyncConfiguration', + request_serializer=agent__pb2.RegistrationRequest.SerializeToString, + response_deserializer=agent__pb2.RegistrationResponse.FromString, + ) + self.TaskStream = channel.stream_stream( + '/agent.AgentOrchestrator/TaskStream', + request_serializer=agent__pb2.ClientTaskMessage.SerializeToString, + response_deserializer=agent__pb2.ServerTaskMessage.FromString, + ) + self.ReportHealth = channel.stream_stream( + '/agent.AgentOrchestrator/ReportHealth', + request_serializer=agent__pb2.Heartbeat.SerializeToString, + response_deserializer=agent__pb2.HealthCheckResponse.FromString, + ) + + +class AgentOrchestratorServicer(object): + """The Cortex Server exposes this service + """ + + def SyncConfiguration(self, request, context): + """1. Control Channel: Sync policies and settings (Unary) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def TaskStream(self, request_iterator, context): + """2. Task Channel: Bidirectional work dispatch and reporting (Persistent) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ReportHealth(self, request_iterator, context): + """3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_AgentOrchestratorServicer_to_server(servicer, server): + rpc_method_handlers = { + 'SyncConfiguration': grpc.unary_unary_rpc_method_handler( + servicer.SyncConfiguration, + request_deserializer=agent__pb2.RegistrationRequest.FromString, + response_serializer=agent__pb2.RegistrationResponse.SerializeToString, + ), + 'TaskStream': grpc.stream_stream_rpc_method_handler( + servicer.TaskStream, + request_deserializer=agent__pb2.ClientTaskMessage.FromString, + response_serializer=agent__pb2.ServerTaskMessage.SerializeToString, + ), + 'ReportHealth': grpc.stream_stream_rpc_method_handler( + servicer.ReportHealth, + request_deserializer=agent__pb2.Heartbeat.FromString, + response_serializer=agent__pb2.HealthCheckResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'agent.AgentOrchestrator', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class AgentOrchestrator(object): + """The Cortex Server exposes this service + """ + + @staticmethod + def SyncConfiguration(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/agent.AgentOrchestrator/SyncConfiguration', + agent__pb2.RegistrationRequest.SerializeToString, + agent__pb2.RegistrationResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def TaskStream(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/TaskStream', + agent__pb2.ClientTaskMessage.SerializeToString, + agent__pb2.ServerTaskMessage.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def ReportHealth(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/ReportHealth', + agent__pb2.Heartbeat.SerializeToString, + agent__pb2.HealthCheckResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/agent-node/src/protos/agent.proto b/agent-node/src/protos/agent.proto index 46b74fa..3076b89 100644 --- a/agent-node/src/protos/agent.proto +++ b/agent-node/src/protos/agent.proto @@ -33,6 +33,7 @@ repeated string denied_commands = 3; repeated string sensitive_commands = 4; string working_dir_jail = 5; + string skill_config_json = 6; // NEW: Map of skill settings (e.g. {"browser": {"headless": false}}) } message RegistrationResponse { @@ -155,6 +156,7 @@ string eval_result = 6; repeated ConsoleMessage console_history = 7; repeated NetworkRequest network_history = 8; + bool offloaded = 9; // NEW: Indicates content is stored in the sync mirror (.cortex_browser/) } message ConsoleMessage { @@ -265,6 +267,10 @@ int32 chunk_index = 3; bool is_final = 4; string hash = 5; // Full file hash for verification on final chunk + int64 offset = 6; // NEW: Byte offset for random-access parallel writes + bool compressed = 7; // NEW: Whether the chunk is compressed (zlib) + int32 total_chunks = 8; // NEW: Total number of chunks expected + int64 total_size = 9; // NEW: Total file size in bytes } message SyncStatus { diff --git a/agent-node/src/protos/agent_pb2.py b/agent-node/src/protos/agent_pb2.py index 3075f56..bea769c 100644 --- a/agent-node/src/protos/agent_pb2.py +++ b/agent-node/src/protos/agent_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12protos/agent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc5\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xa9\x02\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12(\n\x0bskill_event\x18\x06 \x01(\x0b\x32\x11.agent.SkillEventH\x00\x42\t\n\x07payload\"y\n\nSkillEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x16\n\x0cterminal_out\x18\x03 \x01(\tH\x00\x12\x10\n\x06prompt\x18\x04 \x01(\tH\x00\x12\x14\n\nkeep_alive\x18\x05 \x01(\x08H\x00\x42\x06\n\x04\x64\x61ta\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xbc\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12-\n\rpolicy_update\x18\x06 \x01(\x0b\x32\x14.agent.SandboxPolicyH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xd1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xdc\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xe6\x02\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\x12\x11\n\tcpu_count\x18\x08 \x01(\x05\x12\x16\n\x0ememory_used_gb\x18\t \x01(\x02\x12\x17\n\x0fmemory_total_gb\x18\n \x01(\x02\x12\x1a\n\x12\x63pu_usage_per_core\x18\x0b \x03(\x02\x12\x14\n\x0c\x63pu_freq_mhz\x18\x0c \x01(\x02\x12\x1b\n\x13memory_available_gb\x18\r \x01(\x02\x12\x10\n\x08load_avg\x18\x0e \x03(\x02\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xe4\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x12\x0f\n\x07task_id\x18\x06 \x01(\tB\t\n\x07payload\"\xab\x02\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x15\n\rrequest_paths\x18\x03 \x03(\t\x12\x0f\n\x07\x63ontent\x18\x04 \x01(\x0c\x12\x0e\n\x06is_dir\x18\x05 \x01(\x08\"\xaa\x01\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\x12\x08\n\x04LIST\x10\x06\x12\x08\n\x04READ\x10\x07\x12\t\n\x05WRITE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\t\n\x05PURGE\x10\n\x12\x0b\n\x07\x43LEANUP\x10\x0b\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"_\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12protos/agent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe0\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\x12\x19\n\x11skill_config_json\x18\x06 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xa9\x02\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12(\n\x0bskill_event\x18\x06 \x01(\x0b\x32\x11.agent.SkillEventH\x00\x42\t\n\x07payload\"y\n\nSkillEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x16\n\x0cterminal_out\x18\x03 \x01(\tH\x00\x12\x10\n\x06prompt\x18\x04 \x01(\tH\x00\x12\x14\n\nkeep_alive\x18\x05 \x01(\x08H\x00\x42\x06\n\x04\x64\x61ta\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xbc\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12-\n\rpolicy_update\x18\x06 \x01(\x0b\x32\x14.agent.SandboxPolicyH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xd1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xef\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\x12\x11\n\toffloaded\x18\t \x01(\x08\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xe6\x02\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\x12\x11\n\tcpu_count\x18\x08 \x01(\x05\x12\x16\n\x0ememory_used_gb\x18\t \x01(\x02\x12\x17\n\x0fmemory_total_gb\x18\n \x01(\x02\x12\x1a\n\x12\x63pu_usage_per_core\x18\x0b \x03(\x02\x12\x14\n\x0c\x63pu_freq_mhz\x18\x0c \x01(\x02\x12\x1b\n\x13memory_available_gb\x18\r \x01(\x02\x12\x10\n\x08load_avg\x18\x0e \x03(\x02\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xe4\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x12\x0f\n\x07task_id\x18\x06 \x01(\tB\t\n\x07payload\"\xab\x02\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x15\n\rrequest_paths\x18\x03 \x03(\t\x12\x0f\n\x07\x63ontent\x18\x04 \x01(\x0c\x12\x0e\n\x06is_dir\x18\x05 \x01(\x08\"\xaa\x01\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\x12\x08\n\x04LIST\x10\x06\x12\x08\n\x04READ\x10\x07\x12\t\n\x05WRITE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\t\n\x05PURGE\x10\n\x12\x0b\n\x07\x43LEANUP\x10\x0b\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"\x83\x01\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\x12\x0e\n\x06offset\x18\x06 \x01(\x03\x12\x12\n\ncompressed\x18\x07 \x01(\x08\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -30,67 +30,67 @@ _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=201 _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=252 _globals['_SANDBOXPOLICY']._serialized_start=255 - _globals['_SANDBOXPOLICY']._serialized_end=452 - _globals['_SANDBOXPOLICY_MODE']._serialized_start=418 - _globals['_SANDBOXPOLICY_MODE']._serialized_end=452 - _globals['_REGISTRATIONRESPONSE']._serialized_start=454 - _globals['_REGISTRATIONRESPONSE']._serialized_end=574 - _globals['_CLIENTTASKMESSAGE']._serialized_start=577 - _globals['_CLIENTTASKMESSAGE']._serialized_end=874 - _globals['_SKILLEVENT']._serialized_start=876 - _globals['_SKILLEVENT']._serialized_end=997 - _globals['_NODEANNOUNCE']._serialized_start=999 - _globals['_NODEANNOUNCE']._serialized_end=1030 - _globals['_BROWSEREVENT']._serialized_start=1033 - _globals['_BROWSEREVENT']._serialized_end=1168 - _globals['_SERVERTASKMESSAGE']._serialized_start=1171 - _globals['_SERVERTASKMESSAGE']._serialized_end=1487 - _globals['_TASKCANCELREQUEST']._serialized_start=1489 - _globals['_TASKCANCELREQUEST']._serialized_end=1525 - _globals['_TASKREQUEST']._serialized_start=1528 - _globals['_TASKREQUEST']._serialized_end=1737 - _globals['_BROWSERACTION']._serialized_start=1740 - _globals['_BROWSERACTION']._serialized_end=2028 - _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1894 - _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=2028 - _globals['_TASKRESPONSE']._serialized_start=2031 - _globals['_TASKRESPONSE']._serialized_end=2383 - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2263 - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2311 - _globals['_TASKRESPONSE_STATUS']._serialized_start=2313 - _globals['_TASKRESPONSE_STATUS']._serialized_end=2373 - _globals['_BROWSERRESPONSE']._serialized_start=2386 - _globals['_BROWSERRESPONSE']._serialized_end=2606 - _globals['_CONSOLEMESSAGE']._serialized_start=2608 - _globals['_CONSOLEMESSAGE']._serialized_end=2675 - _globals['_NETWORKREQUEST']._serialized_start=2677 - _globals['_NETWORKREQUEST']._serialized_end=2781 - _globals['_WORKPOOLUPDATE']._serialized_start=2783 - _globals['_WORKPOOLUPDATE']._serialized_end=2827 - _globals['_TASKCLAIMREQUEST']._serialized_start=2829 - _globals['_TASKCLAIMREQUEST']._serialized_end=2881 - _globals['_TASKCLAIMRESPONSE']._serialized_start=2883 - _globals['_TASKCLAIMRESPONSE']._serialized_end=2952 - _globals['_HEARTBEAT']._serialized_start=2955 - _globals['_HEARTBEAT']._serialized_end=3313 - _globals['_HEALTHCHECKRESPONSE']._serialized_start=3315 - _globals['_HEALTHCHECKRESPONSE']._serialized_end=3360 - _globals['_FILESYNCMESSAGE']._serialized_start=3363 - _globals['_FILESYNCMESSAGE']._serialized_end=3591 - _globals['_SYNCCONTROL']._serialized_start=3594 - _globals['_SYNCCONTROL']._serialized_end=3893 - _globals['_SYNCCONTROL_ACTION']._serialized_start=3723 - _globals['_SYNCCONTROL_ACTION']._serialized_end=3893 - _globals['_DIRECTORYMANIFEST']._serialized_start=3895 - _globals['_DIRECTORYMANIFEST']._serialized_end=3965 - _globals['_FILEINFO']._serialized_start=3967 - _globals['_FILEINFO']._serialized_end=4035 - _globals['_FILEPAYLOAD']._serialized_start=4037 - _globals['_FILEPAYLOAD']._serialized_end=4132 - _globals['_SYNCSTATUS']._serialized_start=4135 - _globals['_SYNCSTATUS']._serialized_end=4295 - _globals['_SYNCSTATUS_CODE']._serialized_start=4229 - _globals['_SYNCSTATUS_CODE']._serialized_end=4295 - _globals['_AGENTORCHESTRATOR']._serialized_start=4298 - _globals['_AGENTORCHESTRATOR']._serialized_end=4531 + _globals['_SANDBOXPOLICY']._serialized_end=479 + _globals['_SANDBOXPOLICY_MODE']._serialized_start=445 + _globals['_SANDBOXPOLICY_MODE']._serialized_end=479 + _globals['_REGISTRATIONRESPONSE']._serialized_start=481 + _globals['_REGISTRATIONRESPONSE']._serialized_end=601 + _globals['_CLIENTTASKMESSAGE']._serialized_start=604 + _globals['_CLIENTTASKMESSAGE']._serialized_end=901 + _globals['_SKILLEVENT']._serialized_start=903 + _globals['_SKILLEVENT']._serialized_end=1024 + _globals['_NODEANNOUNCE']._serialized_start=1026 + _globals['_NODEANNOUNCE']._serialized_end=1057 + _globals['_BROWSEREVENT']._serialized_start=1060 + _globals['_BROWSEREVENT']._serialized_end=1195 + _globals['_SERVERTASKMESSAGE']._serialized_start=1198 + _globals['_SERVERTASKMESSAGE']._serialized_end=1514 + _globals['_TASKCANCELREQUEST']._serialized_start=1516 + _globals['_TASKCANCELREQUEST']._serialized_end=1552 + _globals['_TASKREQUEST']._serialized_start=1555 + _globals['_TASKREQUEST']._serialized_end=1764 + _globals['_BROWSERACTION']._serialized_start=1767 + _globals['_BROWSERACTION']._serialized_end=2055 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1921 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=2055 + _globals['_TASKRESPONSE']._serialized_start=2058 + _globals['_TASKRESPONSE']._serialized_end=2410 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2290 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2338 + _globals['_TASKRESPONSE_STATUS']._serialized_start=2340 + _globals['_TASKRESPONSE_STATUS']._serialized_end=2400 + _globals['_BROWSERRESPONSE']._serialized_start=2413 + _globals['_BROWSERRESPONSE']._serialized_end=2652 + _globals['_CONSOLEMESSAGE']._serialized_start=2654 + _globals['_CONSOLEMESSAGE']._serialized_end=2721 + _globals['_NETWORKREQUEST']._serialized_start=2723 + _globals['_NETWORKREQUEST']._serialized_end=2827 + _globals['_WORKPOOLUPDATE']._serialized_start=2829 + _globals['_WORKPOOLUPDATE']._serialized_end=2873 + _globals['_TASKCLAIMREQUEST']._serialized_start=2875 + _globals['_TASKCLAIMREQUEST']._serialized_end=2927 + _globals['_TASKCLAIMRESPONSE']._serialized_start=2929 + _globals['_TASKCLAIMRESPONSE']._serialized_end=2998 + _globals['_HEARTBEAT']._serialized_start=3001 + _globals['_HEARTBEAT']._serialized_end=3359 + _globals['_HEALTHCHECKRESPONSE']._serialized_start=3361 + _globals['_HEALTHCHECKRESPONSE']._serialized_end=3406 + _globals['_FILESYNCMESSAGE']._serialized_start=3409 + _globals['_FILESYNCMESSAGE']._serialized_end=3637 + _globals['_SYNCCONTROL']._serialized_start=3640 + _globals['_SYNCCONTROL']._serialized_end=3939 + _globals['_SYNCCONTROL_ACTION']._serialized_start=3769 + _globals['_SYNCCONTROL_ACTION']._serialized_end=3939 + _globals['_DIRECTORYMANIFEST']._serialized_start=3941 + _globals['_DIRECTORYMANIFEST']._serialized_end=4011 + _globals['_FILEINFO']._serialized_start=4013 + _globals['_FILEINFO']._serialized_end=4081 + _globals['_FILEPAYLOAD']._serialized_start=4084 + _globals['_FILEPAYLOAD']._serialized_end=4215 + _globals['_SYNCSTATUS']._serialized_start=4218 + _globals['_SYNCSTATUS']._serialized_end=4378 + _globals['_SYNCSTATUS_CODE']._serialized_start=4312 + _globals['_SYNCSTATUS_CODE']._serialized_end=4378 + _globals['_AGENTORCHESTRATOR']._serialized_start=4381 + _globals['_AGENTORCHESTRATOR']._serialized_end=4614 # @@protoc_insertion_point(module_scope) diff --git a/ai-hub/app/api/routes/agent_update.py b/ai-hub/app/api/routes/agent_update.py index 2b0923d..05a6a31 100644 --- a/ai-hub/app/api/routes/agent_update.py +++ b/ai-hub/app/api/routes/agent_update.py @@ -56,9 +56,9 @@ return True # M4 Fallback: Also allow any valid invite_token currently in the DB - from app.api.deps import get_db - from app.models.agent_node import AgentNode - db = next(get_db()) + from app.db.session import SessionLocal + from app.db.models import AgentNode + db = SessionLocal() try: exists = db.query(AgentNode).filter(AgentNode.invite_token == token, AgentNode.is_active == True).first() return exists is not None diff --git a/ai-hub/app/api/routes/nodes.py b/ai-hub/app/api/routes/nodes.py index e96791a..4feab51 100644 --- a/ai-hub/app/api/routes/nodes.py +++ b/ai-hub/app/api/routes/nodes.py @@ -367,8 +367,8 @@ timeout_ms=request.timeout_ms, session_id=request.session_id or "", ) - # Push directly to the node's live gRPC outbound queue - live.queue.put(agent_pb2.ServerTaskMessage(task_request=task_req)) + # Push directly to the node's live gRPC outbound queue (Priority 1 for interactive) + live.send_message(agent_pb2.ServerTaskMessage(task_request=task_req), priority=1) registry.emit(node_id, "task_start", {"command": request.command}, task_id=task_id) except Exception as e: logger.error(f"[nodes/dispatch] Failed to put task onto queue for {node_id}: {e}") @@ -395,7 +395,7 @@ from app.protos import agent_pb2 cancel_req = agent_pb2.TaskCancelRequest(task_id=task_id) - live.queue.put(agent_pb2.ServerTaskMessage(task_cancel=cancel_req)) + live.send_message(agent_pb2.ServerTaskMessage(task_cancel=cancel_req), priority=0) registry.emit(node_id, "task_cancel", {"task_id": task_id}) return {"status": "cancel_sent", "task_id": task_id} @@ -612,7 +612,13 @@ # 4. Run installer with --daemon (or --non-interactive) print("[*] Bootstrapping agent...") -cmd = [sys.executable, "bootstrap_installer.py", "--daemon"] +cmd = [ + sys.executable, "bootstrap_installer.py", + "--daemon", + "--hub", "{base_url}", + "--token", "{node.invite_token}", + "--node-id", "{node_id}" +] if {skip_browsers}: cmd.append("--skip-browsers") subprocess.run(cmd) @@ -774,7 +780,7 @@ "data": live.to_dict() if live else {"status": "offline"}, }) - q: queue.Queue = queue.Queue() + q: queue.Queue = queue.Queue(maxsize=500) # Capped to prevent memory leak registry.subscribe_node(node_id, q) async def send_events(): @@ -836,7 +842,7 @@ timeout_ms=0, session_id=session_id ) - live_node.queue.put(agent_pb2.ServerTaskMessage(task_request=task_req)) + live_node.send_message(agent_pb2.ServerTaskMessage(task_request=task_req), priority=1) registry.emit(node_id, "task_start", {"command": cmd}, task_id=task_id) except Exception as e: logger.error(f"[ws/dispatch] Error: {e}") @@ -921,7 +927,7 @@ await websocket.close(code=1011) return - q: queue.Queue = queue.Queue() + q: queue.Queue = queue.Queue(maxsize=500) # Subscribe to each accessible node individually for nid in accessible_ids: registry.subscribe_node(nid, q) @@ -1250,7 +1256,7 @@ lines += [ "", "# Workspace sync root โ€” override if needed", - "sync_root: \"/tmp/cortex-workspace\"", + "sync_root: \"/tmp/cortex-sync\"", "", "# FS Explorer root โ€” defaults to user home if not specified here", "# fs_root: \"/User/username/Documents\"", diff --git a/ai-hub/app/api/routes/sessions.py b/ai-hub/app/api/routes/sessions.py index f931ac8..ca5b372 100644 --- a/ai-hub/app/api/routes/sessions.py +++ b/ai-hub/app/api/routes/sessions.py @@ -36,9 +36,20 @@ node_prefs = (user.preferences or {}).get("nodes", {}) default_nodes = node_prefs.get("default_node_ids", []) node_config = node_prefs.get("data_source", {"source": "empty"}) - - if default_nodes: + + # M3/M6: Generate stable workspace ID for all Swarm Control sessions or if defaults exist + if request.feature_name == "swarm_control" or default_nodes: new_session.sync_workspace_id = f"session-{new_session.id}-{_uuid.uuid4().hex[:8]}" + + # Ensure server-side ghost mirror directory is created immediately + try: + from app.main import app + if hasattr(app.state, "orchestrator") and app.state.orchestrator.mirror: + app.state.orchestrator.mirror.get_workspace_path(new_session.sync_workspace_id) + except Exception as mirror_err: + logger.error(f"[create_session] Failed to pre-initialize server mirror: {mirror_err}") + + if default_nodes: new_session.attached_node_ids = list(default_nodes) new_session.node_sync_status = { nid: {"status": "pending", "last_sync": None} diff --git a/ai-hub/app/api/routes/user.py b/ai-hub/app/api/routes/user.py index 1dea9c6..af5f6b6 100644 --- a/ai-hub/app/api/routes/user.py +++ b/ai-hub/app/api/routes/user.py @@ -1,6 +1,7 @@ from fastapi import APIRouter, HTTPException, Depends, Header, Query, Request, UploadFile, File from fastapi.responses import RedirectResponse as redirect from sqlalchemy.orm import Session +from app.config import settings from app.db import models from typing import Optional, Annotated import logging @@ -18,17 +19,14 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# Minimum OIDC configuration from settings -from app.config import settings -OIDC_CLIENT_ID = settings.OIDC_CLIENT_ID -OIDC_CLIENT_SECRET = settings.OIDC_CLIENT_SECRET -OIDC_SERVER_URL = settings.OIDC_SERVER_URL -OIDC_REDIRECT_URI = settings.OIDC_REDIRECT_URI - -# --- Derived OIDC Configuration --- -OIDC_AUTHORIZATION_URL = f"{OIDC_SERVER_URL}/auth" -OIDC_TOKEN_URL = f"{OIDC_SERVER_URL}/token" -OIDC_USERINFO_URL = f"{OIDC_SERVER_URL}/userinfo" +# --- Derived OIDC Configuration Helpers --- +def get_oidc_urls(): + server_url = settings.OIDC_SERVER_URL.rstrip("/") + return { + "auth": f"{server_url}/auth", + "token": f"{server_url}/token", + "userinfo": f"{server_url}/userinfo" + } # A dependency to simulate getting the current user ID from a request header def get_current_user_id(x_user_id: Annotated[Optional[str], Header()] = None) -> Optional[str]: @@ -58,17 +56,18 @@ # For simplicity, we will pass it as a query parameter in the callback. # A more robust solution would use a state parameter. - # Use urllib.parse.urlencode to properly encode parameters + oidc_urls = get_oidc_urls() params = { "response_type": "code", "scope": "openid profile email", - "client_id": OIDC_CLIENT_ID, - "redirect_uri": OIDC_REDIRECT_URI, + "client_id": settings.OIDC_CLIENT_ID, + "redirect_uri": settings.OIDC_REDIRECT_URI, "state": frontend_callback_uri or "" } - auth_url = f"{OIDC_AUTHORIZATION_URL}?{urllib.parse.urlencode(params)}" - logger.info(f"Redirecting to OIDC authorization URL: {auth_url}") + auth_url = f"{oidc_urls['auth']}?{urllib.parse.urlencode(params)}" + logger.info(f"Initiating OIDC login. Client ID: '{settings.OIDC_CLIENT_ID}', Redirect URI: '{settings.OIDC_REDIRECT_URI}', State: '{params['state']}'") + logger.debug(f"Full redirect URL: {auth_url}") return redirect(url=auth_url) @router.get("/login/callback", summary="Handle OIDC Login Callback") @@ -85,20 +84,21 @@ """ logger.info(f"Received callback with authorization code: {code[:10]}... and state: {state}") + oidc_urls = get_oidc_urls() try: - logger.info(f"Exchanging code for tokens at: {OIDC_TOKEN_URL}") + logger.info(f"Exchanging code for tokens at: {oidc_urls['token']}") # Step 1: Exchange the authorization code for an access token and an ID token token_data = { "grant_type": "authorization_code", "code": code, - "redirect_uri": OIDC_REDIRECT_URI, - "client_id": OIDC_CLIENT_ID, - "client_secret": OIDC_CLIENT_SECRET, + "redirect_uri": settings.OIDC_REDIRECT_URI, + "client_id": settings.OIDC_CLIENT_ID, + "client_secret": settings.OIDC_CLIENT_SECRET, } async with httpx.AsyncClient() as client: - logger.debug(f"Sending POST to {OIDC_TOKEN_URL} with data keys: {list(token_data.keys())}") - token_response = await client.post(OIDC_TOKEN_URL, data=token_data, timeout=30.0) + logger.debug(f"Sending POST to {oidc_urls['token']} with data keys: {list(token_data.keys())}") + token_response = await client.post(oidc_urls['token'], data=token_data, timeout=30.0) token_response.raise_for_status() response_json = token_response.json() diff --git a/ai-hub/app/api/schemas.py b/ai-hub/app/api/schemas.py index 1307489..ee725e0 100644 --- a/ai-hub/app/api/schemas.py +++ b/ai-hub/app/api/schemas.py @@ -301,6 +301,7 @@ enabled: bool = True cwd_jail: Optional[str] = None # shell only: restrict working directory max_file_size_mb: Optional[int] = None # sync only: file size cap + headless: Optional[bool] = None # browser only: toggle UI visibility sandbox: Optional[SandboxConfig] = None # NEW: shell sandbox config class NodeSkillConfig(BaseModel): diff --git a/ai-hub/app/core/grpc/core/journal.py b/ai-hub/app/core/grpc/core/journal.py index dba559b..4de45cf 100644 --- a/ai-hub/app/core/grpc/core/journal.py +++ b/ai-hub/app/core/grpc/core/journal.py @@ -207,14 +207,14 @@ with self.lock: return self.tasks.pop(task_id, None) - def cleanup(self, max_age_s: int = 3600): + def cleanup(self, max_age_s: int = 900): """Purges stale tasks to prevent slow memory accumulation.""" now = time.time() with self.lock: to_remove = [ tid for tid, t in self.tasks.items() - if (t["completed_at"] and (now - t["completed_at"]) > 300) # finished: keep 5m - or (now - t["created_at"]) > max_age_s # pending: keep 1h + if (t["completed_at"] and (now - t["completed_at"]) > 120) # finished: keep 2m + or (now - t["created_at"]) > 900 # pending: keep 15m ] for tid in to_remove: del self.tasks[tid] diff --git a/ai-hub/app/core/grpc/core/pool.py b/ai-hub/app/core/grpc/core/pool.py index f53a1db..2c22207 100644 --- a/ai-hub/app/core/grpc/core/pool.py +++ b/ai-hub/app/core/grpc/core/pool.py @@ -1,16 +1,29 @@ import threading +import time class GlobalWorkPool: """Thread-safe pool of unassigned tasks that can be claimed by any node.""" def __init__(self): self.lock = threading.Lock() - self.available = {} # task_id -> payload + self.available = {} # task_id -> {"payload": p, "ts": time.time()} self.on_new_work = None # Callback to notify nodes + threading.Thread(target=self._cleanup_loop, daemon=True, name="WorkPoolCleanup").start() + def _cleanup_loop(self): + import time + while True: + time.sleep(300) + now = time.time() + with self.lock: + to_remove = [tid for tid, d in self.available.items() if (now - d["ts"]) > 3600] + for tid in to_remove: + del self.available[tid] + def push_work(self, task_id, payload): """Adds new task to global discovery pool.""" + import time with self.lock: - self.available[task_id] = payload + self.available[task_id] = {"payload": payload, "ts": time.time()} print(f" [๐Ÿ“ฆ] New Shared Task: {task_id}") if self.on_new_work: self.on_new_work(task_id) @@ -20,7 +33,7 @@ with self.lock: if task_id in self.available: print(f" [๐Ÿ“ฆ] Task {task_id} Claimed by {node_id}") - return True, self.available.pop(task_id) + return True, self.available.pop(task_id)["payload"] return False, None def list_available(self): diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index 86782fe..024d5b3 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -2,10 +2,14 @@ import json import os import hashlib +import zlib import logging import shutil +import threading from app.core.grpc.utils.crypto import sign_payload, sign_browser_action from app.protos import agent_pb2 +from app.db.session import get_db_session +from app.db.models import Session logger = logging.getLogger(__name__) @@ -17,34 +21,42 @@ self.pool = pool self.mirror = mirror self.memberships = {} # session_id -> list(node_id) + self.membership_lock = threading.Lock() def push_workspace(self, node_id, session_id): """Initial unidirectional push from server ghost mirror to a node.""" - node = self.registry.get_node(node_id) - if not node or not self.mirror: return + if not self.mirror: return + # 1. Ensure Server Mirror exists immediately + manifest = self.mirror.generate_manifest(session_id) + + # 2. Track relationship for recovery/reconciliation + with self.membership_lock: + if session_id not in self.memberships: + self.memberships[session_id] = [] + if node_id not in self.memberships[session_id]: + self.memberships[session_id].append(node_id) + + # 3. If node is online, push actual data + node = self.registry.get_node(node_id) + if not node: + logger.info(f"[๐Ÿ“๐Ÿ“ค] Workspace {session_id} prepared on server for offline node {node_id}") + return + print(f"[๐Ÿ“๐Ÿ“ค] Initiating Workspace Push for Session {session_id} to {node_id}") - # Track for recovery - if session_id not in self.memberships: - self.memberships[session_id] = [] - if node_id not in self.memberships[session_id]: - self.memberships[session_id].append(node_id) - - manifest = self.mirror.generate_manifest(session_id) - - # 1. Send Manifest - node.queue.put(agent_pb2.ServerTaskMessage( + # Send Manifest to Node. The node will compare this with its local state + # and send back RECONCILE_REQUIRED for any files it is missing. + # This prevents the "Double Push" race where the server blasts data + # while the node is still trying to decide what it needs. + node.send_message(agent_pb2.ServerTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, manifest=manifest ) - )) + ), priority=1) - # 2. Send File Data - for file_info in manifest.files: - if not file_info.is_dir: - self.push_file(node_id, session_id, file_info.path) + # NOTE: Proactive parallel push removed. Manifest-driven reactive sync is cleaner. def push_file(self, node_id, session_id, rel_path): """Pushes a specific file to a node (used for drift recovery).""" @@ -58,34 +70,44 @@ print(f" [๐Ÿ“โ“] Requested file {rel_path} not found in mirror") return - with open(abs_path, "rb") as f: - full_data = f.read() - full_hash = hashlib.sha256(full_data).hexdigest() - f.seek(0) - - index = 0 - while True: - import time - chunk = f.read(1024 * 512) # 512KB chunks - is_final = len(chunk) < 1024 * 512 - - node.queue.put(agent_pb2.ServerTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - file_data=agent_pb2.FilePayload( - path=rel_path, - chunk=chunk, - chunk_index=index, - is_final=is_final, - hash=full_hash if is_final else "" + # Line-rate Optimization: 4MB chunks + No Software Throttling + hasher = hashlib.sha256() + file_size = os.path.getsize(abs_path) + + try: + with open(abs_path, "rb") as f: + index = 0 + while True: + chunk = f.read(4 * 1024 * 1024) # 4MB chunks (optimal for gRPC) + if not chunk: break + + hasher.update(chunk) + offset = f.tell() - len(chunk) + is_final = f.tell() >= file_size + + # Compress Chunk for transit + compressed_chunk = zlib.compress(chunk) + + # Put into priority dispatcher (priority 2 for sync data) + node.send_message(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + file_data=agent_pb2.FilePayload( + path=rel_path, + chunk=compressed_chunk, + chunk_index=index, + is_final=is_final, + hash=hasher.hexdigest() if is_final else "", + offset=offset, + compressed=True + ) ) - ) - )) - time.sleep(0.02) # ~25MB/s throttle to allow interleaving with other Node messages - - if is_final or not chunk: - break - index += 1 + ), priority=2) + + if is_final: break + index += 1 + except Exception as e: + logger.error(f"[๐Ÿ“๐Ÿ“ค] Line-rate push error for {rel_path}: {e}") def clear_workspace(self, node_id, session_id): """Sends a SyncControl command to purge the local sync directory on a node, and removes from active mesh.""" @@ -95,26 +117,53 @@ node = self.registry.get_node(node_id) if not node: return - node.queue.put(agent_pb2.ServerTaskMessage( + node.send_message(agent_pb2.ServerTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.PURGE, path=".") ) - )) + ), priority=1) def reconcile_node(self, node_id): """Forces a re-sync check for all sessions this node belongs to and purges dead sessions.""" print(f" [๐Ÿ“๐Ÿ”„] Triggering Resync Check for {node_id}...") active_sessions = [] - for sid, nodes in self.memberships.items(): - if node_id in nodes: - active_sessions.append(sid) + try: + with get_db_session() as db: + sessions = db.query(Session).filter( + Session.is_archived == False, + Session.sync_workspace_id.isnot(None) + ).all() + + with self.membership_lock: + for s in sessions: + attached = s.attached_node_ids or [] + if node_id in attached: + active_sessions.append(s.sync_workspace_id) + if s.sync_workspace_id not in self.memberships: + self.memberships[s.sync_workspace_id] = [] + if node_id not in self.memberships[s.sync_workspace_id]: + self.memberships[s.sync_workspace_id].append(node_id) + + # Aggressive memory cleanup: Purge orphaned session memberships + current_active_workspace_ids = {s.sync_workspace_id for s in sessions} + with self.membership_lock: + to_purge = [sid for sid in self.memberships.keys() if sid not in current_active_workspace_ids] + for sid in to_purge: + del self.memberships[sid] + except Exception as e: + print(f" [๐Ÿ“โš ๏ธ] Failed to fetch active sessions for node reconciliation: {e}") + # Fallback to in-memory if DB fails + with self.membership_lock: + for sid, nodes in self.memberships.items(): + if node_id in nodes: + active_sessions.append(sid) # Send proactive cleanup payload with the active sessions whitelist node = self.registry.get_node(node_id) if node: - node.queue.put(agent_pb2.ServerTaskMessage( + node.send_message(agent_pb2.ServerTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id="global", control=agent_pb2.SyncControl( @@ -122,32 +171,41 @@ request_paths=active_sessions ) ) - )) + ), priority=0) for sid in active_sessions: # Re-push manifest to trigger node-side drift check self.push_workspace(node_id, sid) + # Add a small delay to prevent saturating the gRPC stream for multiple sessions + time.sleep(0.5) def broadcast_file_chunk(self, session_id: str, sender_node_id: str, file_payload): """Broadcasts a file chunk received from one node to all other nodes in the mesh.""" - session_members = self.memberships.get(session_id, []) - destinations = [n for n in session_members if n != sender_node_id] + with self.membership_lock: + session_members = self.memberships.get(session_id, []) + destinations = [n for n in session_members if n != sender_node_id] if destinations: print(f" [๐Ÿ“๐Ÿ“ข] Broadcasting {file_payload.path} from {sender_node_id} to: {', '.join(destinations)}") - for node_id in destinations: - node = self.registry.get_node(node_id) - if not node: - continue - - # Forward the exact same FileSyncMessage - node.queue.put(agent_pb2.ServerTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - file_data=file_payload - ) - )) + def _send_to_node(nid): + node = self.registry.get_node(nid) + if node: + # Forward the exact same FileSyncMessage (Priority 2 for Sync Data) + node.send_message(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + file_data=file_payload + ) + ), priority=2) + + # M6: Use registry executor if available for parallel mesh broadcast + if self.registry.executor: + for nid in destinations: + self.registry.executor.submit(_send_to_node, nid) + else: + for nid in destinations: + _send_to_node(nid) def lock_workspace(self, node_id, session_id): """Disables user-side synchronization from a node during AI refactors.""" @@ -161,12 +219,12 @@ """Requests a full directory manifest from a node for drift checking.""" node = self.registry.get_node(node_id) if not node: return - node.queue.put(agent_pb2.ServerTaskMessage( + node.send_message(agent_pb2.ServerTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.REFRESH_MANIFEST, path=path) ) - )) + ), priority=1) def control_sync(self, node_id, session_id, action="START", path="."): """Sends a SyncControl command to a node (e.g. START_WATCHING, LOCK).""" @@ -188,12 +246,12 @@ if node_id not in self.memberships[session_id]: self.memberships[session_id].append(node_id) - node.queue.put(agent_pb2.ServerTaskMessage( + node.send_message(agent_pb2.ServerTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, control=agent_pb2.SyncControl(action=proto_action, path=path) ) - )) + ), priority=1) # ================================================================== # Modular FS Explorer / Mesh Navigation @@ -227,13 +285,13 @@ tid = f"fs-ls-{int(time.time()*1000)}" event = self.journal.register(tid, node_id) - node.queue.put(agent_pb2.ServerTaskMessage( + node.send_message(agent_pb2.ServerTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, task_id=tid, control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.LIST, path=path) ) - )) + ), priority=1) if event.wait(timeout): res = self.journal.get_result(tid) @@ -249,11 +307,12 @@ def _proactive_explorer_sync(self, node_id, files, session_id): """Starts background tasks to mirror files to Hub so dots turn green.""" - import threading for f in files: if f.get("is_dir"): continue if not f.get("is_synced") and f.get("size", 0) < 1024 * 512: # Skip large files - threading.Thread(target=self.cat, args=(node_id, f["path"], 15, session_id), daemon=True).start() + # M6: Use shared registry executor instead of spawning loose threads + if self.registry.executor: + self.registry.executor.submit(self.cat, node_id, f["path"], 15, session_id) def cat(self, node_id: str, path: str, timeout=15, session_id="__fs_explorer__", force_remote: bool = False): """Requests file content from a node (waits for result).""" @@ -278,13 +337,13 @@ tid = f"fs-cat-{int(time.time()*1000)}" event = self.journal.register(tid, node_id) - node.queue.put(agent_pb2.ServerTaskMessage( + node.send_message(agent_pb2.ServerTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, task_id=tid, control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.READ, path=path) ) - )) + ), priority=1) if event.wait(timeout): res = self.journal.get_result(tid) @@ -311,21 +370,34 @@ with open(dest, "wb") as f: f.write(content) - # Fire and forget synchronization to the edge node - tid = f"fs-write-{int(time.time()*1000)}" - node.queue.put(agent_pb2.ServerTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - task_id=tid, - control=agent_pb2.SyncControl( - action=agent_pb2.SyncControl.WRITE, - path=path, - content=content, - is_dir=is_dir + # Multi-node broadcast for sessions + targets = [] + if session_id != "__fs_explorer__": + targets = self.memberships.get(session_id, [node_id]) + else: + targets = [node_id] + + print(f"[๐Ÿ“โœ๏ธ] AI Write: {path} (Session: {session_id}) -> Dispatching to {len(targets)} nodes") + + for target_nid in targets: + target_node = self.registry.get_node(target_nid) + if not target_node: continue + + tid = f"fs-write-{int(time.time()*1000)}" + target_node.send_message(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=tid, + control=agent_pb2.SyncControl( + action=agent_pb2.SyncControl.WRITE, + path=path, + content=content, + is_dir=is_dir + ) ) - ) - )) - return {"success": True, "message": "Synchronized to local mirror and dispatched to node"} + ), priority=2) + + return {"success": True, "message": f"Synchronized to local mirror and dispatched to {len(targets)} nodes"} except Exception as e: logger.error(f"[๐Ÿ“โœ๏ธ] Local mirror write error: {e}") return {"error": str(e)} @@ -334,7 +406,7 @@ tid = f"fs-write-{int(time.time()*1000)}" event = self.journal.register(tid, node_id) - node.queue.put(agent_pb2.ServerTaskMessage( + node.send_message(agent_pb2.ServerTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, task_id=tid, @@ -345,7 +417,7 @@ is_dir=is_dir ) ) - )) + ), priority=2) if event.wait(timeout): res = self.journal.get_result(tid) @@ -409,16 +481,28 @@ elif os.path.exists(dest): os.remove(dest) - # Fire and forget to edge node - tid = f"fs-rm-{int(time.time()*1000)}" - node.queue.put(agent_pb2.ServerTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - task_id=tid, - control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.DELETE, path=path) - ) - )) - return {"success": True, "message": "Removed from local mirror and dispatched delete to node"} + # Multi-node broadcast for sessions + targets = [] + if session_id != "__fs_explorer__": + targets = self.memberships.get(session_id, [node_id]) + else: + targets = [node_id] + + print(f"[๐Ÿ“๐Ÿ—‘๏ธ] AI Remove: {path} (Session: {session_id}) -> Dispatching to {len(targets)} nodes") + + for target_nid in targets: + target_node = self.registry.get_node(target_nid) + if not target_node: continue + + tid = f"fs-rm-{int(time.time()*1000)}" + target_node.send_message(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=tid, + control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.DELETE, path=path) + ) + ), priority=2) + return {"success": True, "message": f"Removed from local mirror and dispatched delete to {len(targets)} nodes"} except Exception as e: logger.error(f"[๐Ÿ“๐Ÿ—‘๏ธ] Local mirror rm error: {e}") return {"error": str(e)} @@ -427,13 +511,13 @@ tid = f"fs-rm-{int(time.time()*1000)}" event = self.journal.register(tid, node_id) - node.queue.put(agent_pb2.ServerTaskMessage( + node.send_message(agent_pb2.ServerTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, task_id=tid, control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.DELETE, path=path) ) - )) + ), priority=2) if event.wait(timeout): res = self.journal.get_result(tid) @@ -442,7 +526,7 @@ self.journal.pop(tid) return {"error": "Timeout"} - def dispatch_swarm(self, node_ids, cmd, timeout=30, session_id=None, no_abort=False): + def dispatch_swarm(self, node_ids, cmd, timeout=120, session_id=None, no_abort=False): """Dispatches a command to multiple nodes in parallel and waits for all results.""" from concurrent.futures import ThreadPoolExecutor, as_completed @@ -462,7 +546,7 @@ return results - def dispatch_single(self, node_id, cmd, timeout=30, session_id=None, no_abort=False): + def dispatch_single(self, node_id, cmd, timeout=120, session_id=None, no_abort=False): """Dispatches a shell command to a specific node.""" import uuid node = self.registry.get_node(node_id) @@ -480,7 +564,7 @@ logger.info(f"[๐Ÿ“ค] Dispatching shell {tid} to {node_id}") self.registry.emit(node_id, "task_assigned", {"command": cmd, "session_id": session_id}, task_id=tid) - node.queue.put(req) + node.send_message(req, priority=1) self.registry.emit(node_id, "task_start", {"command": cmd}, task_id=tid) # Immediate peek if timeout is 0 @@ -504,7 +588,7 @@ logger.warning(f"[โš ๏ธ] Shell task {tid} TIMEOUT after {timeout}s on {node_id}. Sending ABORT.") try: - node.queue.put(agent_pb2.ServerTaskMessage(task_cancel=agent_pb2.TaskCancelRequest(task_id=tid))) + node.send_message(agent_pb2.ServerTaskMessage(task_cancel=agent_pb2.TaskCancelRequest(task_id=tid)), priority=0) except: pass # Return partial result captured in buffer before popping @@ -532,7 +616,7 @@ logger.info(f"[๐ŸŒ๐Ÿ“ค] Dispatching browser {tid} to {node_id}") self.registry.emit(node_id, "task_assigned", {"browser_action": action.action, "url": action.url}, task_id=tid) - node.queue.put(req) + node.send_message(req, priority=1) self.registry.emit(node_id, "task_start", {"browser_action": action.action}, task_id=tid) if event.wait(timeout): @@ -591,7 +675,7 @@ logger.warning(f"[โš ๏ธ] Wait for task {task_id} TIMEOUT again. Sending ABORT.") node = self.registry.get_node(node_id) if node: - try: node.queue.put(agent_pb2.ServerTaskMessage(task_cancel=agent_pb2.TaskCancelRequest(task_id=task_id))) + try: node.send_message(agent_pb2.ServerTaskMessage(task_cancel=agent_pb2.TaskCancelRequest(task_id=task_id)), priority=0) except: pass res = self.journal.get_result(task_id) diff --git a/ai-hub/app/core/services/node_registry.py b/ai-hub/app/core/services/node_registry.py index 0054d64..971fb02 100644 --- a/ai-hub/app/core/services/node_registry.py +++ b/ai-hub/app/core/services/node_registry.py @@ -11,11 +11,14 @@ """ import threading import queue +import time import logging import re import uuid +import asyncio from datetime import datetime from typing import Dict, Optional, List, Any +from concurrent.futures import ThreadPoolExecutor logger = logging.getLogger(__name__) @@ -49,7 +52,8 @@ self.node_id = node_id self.user_id = user_id # Owner โ€” maps node to a Hub user self.metadata = metadata # desc, caps (capabilities dict) - self.queue: queue.Queue = queue.Queue() # gRPC outbound message queue + # Increased queue size to 1000 to handle high-concurrency file sync without dropping interactive tasks + self.queue: queue.PriorityQueue = queue.PriorityQueue(maxsize=1000) self.stats: dict = { "active_worker_count": 0, "cpu_usage_percent": 0.0, @@ -58,9 +62,40 @@ } self.connected_at: datetime = datetime.utcnow() self.last_heartbeat_at: datetime = datetime.utcnow() - import uuid self.session_id: str = str(uuid.uuid4()) self.terminal_history: List[str] = [] # Recent PTY lines for AI reading + self._registry_executor = None # Set by registry + + def send_message(self, msg: Any, priority: int = 2): + """ + Thread-safe and Async-safe message dispatcher. + priority: 0 (Admin/Control), 1 (Terminal/Interactive), 2 (File Sync) + """ + item = (priority, time.time(), msg) + + def _blocking_put(): + try: + self.queue.put(item, block=True, timeout=2.0) + except queue.Full: + logger.warning(f"[๐Ÿ“‹โš ๏ธ] Message dropped for {self.node_id}: outbound queue FULL. Node may be unresponsive.") + except Exception as e: + logger.error(f"[๐Ÿ“‹โŒ] Sync error sending to {self.node_id}: {e}") + + try: + # Check if we are in an async loop (FastAPI context) + loop = asyncio.get_running_loop() + if loop.is_running(): + if self._registry_executor: + self._registry_executor.submit(_blocking_put) + else: + # Fallback to fire-and-forget thread if executor not yet ready + threading.Thread(target=_blocking_put, daemon=True).start() + return + except RuntimeError: + pass # Not in async loop + + # Standard sync put (from gRPC thread) + _blocking_put() def update_stats(self, stats: dict): self.stats.update(stats) @@ -105,16 +140,17 @@ """ def __init__(self): - self._lock = threading.Lock() self._nodes: Dict[str, LiveNodeRecord] = {} + self._lock = threading.Lock() + self._connection_history: Dict[str, List[datetime]] = {} # Per-node WS subscribers: node_id -> [queue, ...] self._node_listeners: Dict[str, List[queue.Queue]] = {} # Per-user WS subscribers: user_id -> [queue, ...] (ALL nodes for that user) self._user_listeners: Dict[str, List[queue.Queue]] = {} - # Connection history for flapping detection: node_id -> [timestamp, ...] - self._connection_history: Dict[str, List[datetime]] = {} - self._FLAP_THRESHOLD = 3 # Max connections - self._FLAP_WINDOW_S = 30 # In 30 seconds + self._FLAP_WINDOW_S = 60 + self._FLAP_THRESHOLD = 5 + # Shared Hub-wide work executor to prevent thread-spawning leaks + self.executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix="RegistryWorker") # ------------------------------------------------------------------ # # DB Helpers # @@ -186,6 +222,33 @@ self._connection_history.clear() logger.info(f"[NodeRegistry] EMERGENCY: Cleared {count} nodes from memory cache.") return count + + def validate_invite_token(self, node_id: str, token: str) -> dict: + """ + Directly validates an invite token against the DB. + Used by the gRPC server to avoid HTTP self-call deadlocks during startup. + """ + from app.db.models import AgentNode + from app.db.session import get_db_session + try: + with get_db_session() as db: + node = db.query(AgentNode).filter( + AgentNode.node_id == node_id, + AgentNode.invite_token == token, + AgentNode.is_active == True, + ).first() + if not node: + return {"valid": False, "reason": "Invalid token or unknown node."} + return { + "valid": True, + "node_id": node.node_id, + "display_name": node.display_name, + "user_id": node.registered_by, + "skill_config": node.skill_config or {}, + } + except Exception as e: + logger.error(f"[NodeRegistry] Token validation exception: {e}") + return {"valid": False, "reason": str(e)} # ------------------------------------------------------------------ # @@ -213,10 +276,11 @@ # 2. Register the live connection record = LiveNodeRecord(node_id=node_id, user_id=user_id, metadata=metadata) + record._registry_executor = self.executor # Inject shared executor self._nodes[node_id] = record - # Persist to DB (background-safe โ€” session is scoped) - self._db_upsert_node(node_id, user_id, metadata) + # Persist to DB asynchronously to avoid blocking gRPC stream setup during NFS lag + self.executor.submit(self._db_upsert_node, node_id, user_id, metadata) logger.info(f"[๐Ÿ“‹] NodeRegistry: Registered {node_id} (owner: {user_id}) | Stats enabled") self.emit(node_id, "node_online", record.to_dict()) @@ -236,7 +300,7 @@ node = self._nodes.pop(node_id, None) user_id = node.user_id if node else None - self._db_mark_offline(node_id) + self.executor.submit(self._db_mark_offline, node_id) self.emit(node_id, "node_offline", {"node_id": node_id, "user_id": user_id}) logger.info(f"[๐Ÿ“‹] NodeRegistry: Deregistered {node_id}") @@ -274,8 +338,8 @@ node.update_stats(stats) if stats.get("cpu_usage_percent", 0) > 0 or stats.get("memory_usage_percent", 0) > 0: logger.debug(f"[๐Ÿ’“] Heartbeat {node_id}: CPU {stats.get('cpu_usage_percent')}% | MEM {stats.get('memory_usage_percent')}%") - # Persist heartbeat timestamp to DB (throttle: already ~10s cadence from node) - self._db_update_heartbeat(node_id) + # Persist heartbeat timestamp to DB asynchronously + self.executor.submit(self._db_update_heartbeat, node_id) # Emit heartbeat event to live UI self.emit(node_id, "heartbeat", stats) @@ -322,16 +386,20 @@ if not is_tty_char: node.terminal_history.append(f"$ {cmd}\n") elif event_type == "task_stdout" and isinstance(data, str): - # NEW: Strip ANSI codes for AI readability + # NEW: Strip ANSI codes and CAP size to 100KB per chunk to prevent memory bloat ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') clean_output = ansi_escape.sub('', data) + if len(clean_output) > 100_000: + clean_output = clean_output[:100_000] + "\n[... Output Truncated ...]\n" node.terminal_history.append(clean_output) elif event_type == "skill_event" and isinstance(data, dict): if data.get("type") == "output": output_data = data.get("data", "") - # Strip ANSI codes for AI readability + # Strip ANSI codes and CAP size ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') clean_output = ansi_escape.sub('', output_data) + if len(clean_output) > 100_000: + clean_output = clean_output[:100_000] + "\n[... Output Truncated ...]\n" node.terminal_history.append(clean_output) # Keep a rolling buffer of 150 terminal interaction chunks diff --git a/ai-hub/app/db/session.py b/ai-hub/app/db/session.py index 6f46c4d..fb50db0 100644 --- a/ai-hub/app/db/session.py +++ b/ai-hub/app/db/session.py @@ -6,13 +6,21 @@ # Determine engine arguments based on the database mode from the central config engine_args = {} if settings.DB_MODE == "sqlite": - # This argument is required for SQLite to allow it to be used by multiple threads, - # which is the case in a web application like FastAPI. - engine_args["connect_args"] = {"check_same_thread": False} + # This argument is required for SQLite to allow it to be used by multiple threads. + # We also increase the timeout for NFS environments to 30 seconds. + engine_args["connect_args"] = {"check_same_thread": False, "timeout": 60} + # M6: Scale pool size for high-concurrency background ops + engine_args["pool_size"] = 20 + engine_args["max_overflow"] = 30 else: # 'pool_pre_ping' checks if a database connection is still alive before using it. # This prevents errors from connections that have been timed out by the DB server. engine_args["pool_pre_ping"] = True + engine_args["pool_size"] = 20 + engine_args["max_overflow"] = 30 + +# Always pre-check connections +engine_args["pool_pre_ping"] = True import os @@ -24,6 +32,16 @@ # Create the SQLAlchemy engine using the centralized URL and determined arguments engine = create_engine(settings.DATABASE_URL, **engine_args) +# M6: Speed up SQLite on NFS by using Write-Ahead Logging (WAL) and reducing sync level +if settings.DB_MODE == "sqlite": + from sqlalchemy import event + @event.listens_for(engine, "connect") + def set_sqlite_pragma(dbapi_connection, connection_record): + cursor = dbapi_connection.cursor() + cursor.execute("PRAGMA journal_mode=WAL") + cursor.execute("PRAGMA synchronous=NORMAL") + cursor.close() + # SessionLocal is a factory for creating new database session objects. # It's the standard way to interact with the database in SQLAlchemy. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) diff --git a/ai-hub/app/protos/agent.proto b/ai-hub/app/protos/agent.proto index 46b74fa..3076b89 100644 --- a/ai-hub/app/protos/agent.proto +++ b/ai-hub/app/protos/agent.proto @@ -33,6 +33,7 @@ repeated string denied_commands = 3; repeated string sensitive_commands = 4; string working_dir_jail = 5; + string skill_config_json = 6; // NEW: Map of skill settings (e.g. {"browser": {"headless": false}}) } message RegistrationResponse { @@ -155,6 +156,7 @@ string eval_result = 6; repeated ConsoleMessage console_history = 7; repeated NetworkRequest network_history = 8; + bool offloaded = 9; // NEW: Indicates content is stored in the sync mirror (.cortex_browser/) } message ConsoleMessage { @@ -265,6 +267,10 @@ int32 chunk_index = 3; bool is_final = 4; string hash = 5; // Full file hash for verification on final chunk + int64 offset = 6; // NEW: Byte offset for random-access parallel writes + bool compressed = 7; // NEW: Whether the chunk is compressed (zlib) + int32 total_chunks = 8; // NEW: Total number of chunks expected + int64 total_size = 9; // NEW: Total file size in bytes } message SyncStatus { diff --git a/ai-hub/app/protos/agent_pb2.py b/ai-hub/app/protos/agent_pb2.py index da2f584..9c98de4 100644 --- a/ai-hub/app/protos/agent_pb2.py +++ b/ai-hub/app/protos/agent_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16\x61pp/protos/agent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc5\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xa9\x02\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12(\n\x0bskill_event\x18\x06 \x01(\x0b\x32\x11.agent.SkillEventH\x00\x42\t\n\x07payload\"y\n\nSkillEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x16\n\x0cterminal_out\x18\x03 \x01(\tH\x00\x12\x10\n\x06prompt\x18\x04 \x01(\tH\x00\x12\x14\n\nkeep_alive\x18\x05 \x01(\x08H\x00\x42\x06\n\x04\x64\x61ta\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xbc\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12-\n\rpolicy_update\x18\x06 \x01(\x0b\x32\x14.agent.SandboxPolicyH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xd1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xdc\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xe6\x02\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\x12\x11\n\tcpu_count\x18\x08 \x01(\x05\x12\x16\n\x0ememory_used_gb\x18\t \x01(\x02\x12\x17\n\x0fmemory_total_gb\x18\n \x01(\x02\x12\x1a\n\x12\x63pu_usage_per_core\x18\x0b \x03(\x02\x12\x14\n\x0c\x63pu_freq_mhz\x18\x0c \x01(\x02\x12\x1b\n\x13memory_available_gb\x18\r \x01(\x02\x12\x10\n\x08load_avg\x18\x0e \x03(\x02\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xe4\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x12\x0f\n\x07task_id\x18\x06 \x01(\tB\t\n\x07payload\"\xab\x02\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x15\n\rrequest_paths\x18\x03 \x03(\t\x12\x0f\n\x07\x63ontent\x18\x04 \x01(\x0c\x12\x0e\n\x06is_dir\x18\x05 \x01(\x08\"\xaa\x01\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\x12\x08\n\x04LIST\x10\x06\x12\x08\n\x04READ\x10\x07\x12\t\n\x05WRITE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\t\n\x05PURGE\x10\n\x12\x0b\n\x07\x43LEANUP\x10\x0b\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"_\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16\x61pp/protos/agent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe0\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\x12\x19\n\x11skill_config_json\x18\x06 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xa9\x02\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12(\n\x0bskill_event\x18\x06 \x01(\x0b\x32\x11.agent.SkillEventH\x00\x42\t\n\x07payload\"y\n\nSkillEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x16\n\x0cterminal_out\x18\x03 \x01(\tH\x00\x12\x10\n\x06prompt\x18\x04 \x01(\tH\x00\x12\x14\n\nkeep_alive\x18\x05 \x01(\x08H\x00\x42\x06\n\x04\x64\x61ta\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xbc\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12-\n\rpolicy_update\x18\x06 \x01(\x0b\x32\x14.agent.SandboxPolicyH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xd1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xef\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\x12\x11\n\toffloaded\x18\t \x01(\x08\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xe6\x02\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\x12\x11\n\tcpu_count\x18\x08 \x01(\x05\x12\x16\n\x0ememory_used_gb\x18\t \x01(\x02\x12\x17\n\x0fmemory_total_gb\x18\n \x01(\x02\x12\x1a\n\x12\x63pu_usage_per_core\x18\x0b \x03(\x02\x12\x14\n\x0c\x63pu_freq_mhz\x18\x0c \x01(\x02\x12\x1b\n\x13memory_available_gb\x18\r \x01(\x02\x12\x10\n\x08load_avg\x18\x0e \x03(\x02\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xe4\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x12\x0f\n\x07task_id\x18\x06 \x01(\tB\t\n\x07payload\"\xab\x02\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x15\n\rrequest_paths\x18\x03 \x03(\t\x12\x0f\n\x07\x63ontent\x18\x04 \x01(\x0c\x12\x0e\n\x06is_dir\x18\x05 \x01(\x08\"\xaa\x01\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\x12\x08\n\x04LIST\x10\x06\x12\x08\n\x04READ\x10\x07\x12\t\n\x05WRITE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\t\n\x05PURGE\x10\n\x12\x0b\n\x07\x43LEANUP\x10\x0b\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"\xad\x01\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\x12\x0e\n\x06offset\x18\x06 \x01(\x03\x12\x12\n\ncompressed\x18\x07 \x01(\x08\x12\x14\n\x0ctotal_chunks\x18\x08 \x01(\x05\x12\x12\n\ntotal_size\x18\t \x01(\x03\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -30,67 +30,67 @@ _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=205 _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=256 _globals['_SANDBOXPOLICY']._serialized_start=259 - _globals['_SANDBOXPOLICY']._serialized_end=456 - _globals['_SANDBOXPOLICY_MODE']._serialized_start=422 - _globals['_SANDBOXPOLICY_MODE']._serialized_end=456 - _globals['_REGISTRATIONRESPONSE']._serialized_start=458 - _globals['_REGISTRATIONRESPONSE']._serialized_end=578 - _globals['_CLIENTTASKMESSAGE']._serialized_start=581 - _globals['_CLIENTTASKMESSAGE']._serialized_end=878 - _globals['_SKILLEVENT']._serialized_start=880 - _globals['_SKILLEVENT']._serialized_end=1001 - _globals['_NODEANNOUNCE']._serialized_start=1003 - _globals['_NODEANNOUNCE']._serialized_end=1034 - _globals['_BROWSEREVENT']._serialized_start=1037 - _globals['_BROWSEREVENT']._serialized_end=1172 - _globals['_SERVERTASKMESSAGE']._serialized_start=1175 - _globals['_SERVERTASKMESSAGE']._serialized_end=1491 - _globals['_TASKCANCELREQUEST']._serialized_start=1493 - _globals['_TASKCANCELREQUEST']._serialized_end=1529 - _globals['_TASKREQUEST']._serialized_start=1532 - _globals['_TASKREQUEST']._serialized_end=1741 - _globals['_BROWSERACTION']._serialized_start=1744 - _globals['_BROWSERACTION']._serialized_end=2032 - _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1898 - _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=2032 - _globals['_TASKRESPONSE']._serialized_start=2035 - _globals['_TASKRESPONSE']._serialized_end=2387 - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2267 - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2315 - _globals['_TASKRESPONSE_STATUS']._serialized_start=2317 - _globals['_TASKRESPONSE_STATUS']._serialized_end=2377 - _globals['_BROWSERRESPONSE']._serialized_start=2390 - _globals['_BROWSERRESPONSE']._serialized_end=2610 - _globals['_CONSOLEMESSAGE']._serialized_start=2612 - _globals['_CONSOLEMESSAGE']._serialized_end=2679 - _globals['_NETWORKREQUEST']._serialized_start=2681 - _globals['_NETWORKREQUEST']._serialized_end=2785 - _globals['_WORKPOOLUPDATE']._serialized_start=2787 - _globals['_WORKPOOLUPDATE']._serialized_end=2831 - _globals['_TASKCLAIMREQUEST']._serialized_start=2833 - _globals['_TASKCLAIMREQUEST']._serialized_end=2885 - _globals['_TASKCLAIMRESPONSE']._serialized_start=2887 - _globals['_TASKCLAIMRESPONSE']._serialized_end=2956 - _globals['_HEARTBEAT']._serialized_start=2959 - _globals['_HEARTBEAT']._serialized_end=3317 - _globals['_HEALTHCHECKRESPONSE']._serialized_start=3319 - _globals['_HEALTHCHECKRESPONSE']._serialized_end=3364 - _globals['_FILESYNCMESSAGE']._serialized_start=3367 - _globals['_FILESYNCMESSAGE']._serialized_end=3595 - _globals['_SYNCCONTROL']._serialized_start=3598 - _globals['_SYNCCONTROL']._serialized_end=3897 - _globals['_SYNCCONTROL_ACTION']._serialized_start=3727 - _globals['_SYNCCONTROL_ACTION']._serialized_end=3897 - _globals['_DIRECTORYMANIFEST']._serialized_start=3899 - _globals['_DIRECTORYMANIFEST']._serialized_end=3969 - _globals['_FILEINFO']._serialized_start=3971 - _globals['_FILEINFO']._serialized_end=4039 - _globals['_FILEPAYLOAD']._serialized_start=4041 - _globals['_FILEPAYLOAD']._serialized_end=4136 - _globals['_SYNCSTATUS']._serialized_start=4139 - _globals['_SYNCSTATUS']._serialized_end=4299 - _globals['_SYNCSTATUS_CODE']._serialized_start=4233 - _globals['_SYNCSTATUS_CODE']._serialized_end=4299 - _globals['_AGENTORCHESTRATOR']._serialized_start=4302 - _globals['_AGENTORCHESTRATOR']._serialized_end=4535 + _globals['_SANDBOXPOLICY']._serialized_end=483 + _globals['_SANDBOXPOLICY_MODE']._serialized_start=449 + _globals['_SANDBOXPOLICY_MODE']._serialized_end=483 + _globals['_REGISTRATIONRESPONSE']._serialized_start=485 + _globals['_REGISTRATIONRESPONSE']._serialized_end=605 + _globals['_CLIENTTASKMESSAGE']._serialized_start=608 + _globals['_CLIENTTASKMESSAGE']._serialized_end=905 + _globals['_SKILLEVENT']._serialized_start=907 + _globals['_SKILLEVENT']._serialized_end=1028 + _globals['_NODEANNOUNCE']._serialized_start=1030 + _globals['_NODEANNOUNCE']._serialized_end=1061 + _globals['_BROWSEREVENT']._serialized_start=1064 + _globals['_BROWSEREVENT']._serialized_end=1199 + _globals['_SERVERTASKMESSAGE']._serialized_start=1202 + _globals['_SERVERTASKMESSAGE']._serialized_end=1518 + _globals['_TASKCANCELREQUEST']._serialized_start=1520 + _globals['_TASKCANCELREQUEST']._serialized_end=1556 + _globals['_TASKREQUEST']._serialized_start=1559 + _globals['_TASKREQUEST']._serialized_end=1768 + _globals['_BROWSERACTION']._serialized_start=1771 + _globals['_BROWSERACTION']._serialized_end=2059 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1925 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=2059 + _globals['_TASKRESPONSE']._serialized_start=2062 + _globals['_TASKRESPONSE']._serialized_end=2414 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2294 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2342 + _globals['_TASKRESPONSE_STATUS']._serialized_start=2344 + _globals['_TASKRESPONSE_STATUS']._serialized_end=2404 + _globals['_BROWSERRESPONSE']._serialized_start=2417 + _globals['_BROWSERRESPONSE']._serialized_end=2656 + _globals['_CONSOLEMESSAGE']._serialized_start=2658 + _globals['_CONSOLEMESSAGE']._serialized_end=2725 + _globals['_NETWORKREQUEST']._serialized_start=2727 + _globals['_NETWORKREQUEST']._serialized_end=2831 + _globals['_WORKPOOLUPDATE']._serialized_start=2833 + _globals['_WORKPOOLUPDATE']._serialized_end=2877 + _globals['_TASKCLAIMREQUEST']._serialized_start=2879 + _globals['_TASKCLAIMREQUEST']._serialized_end=2931 + _globals['_TASKCLAIMRESPONSE']._serialized_start=2933 + _globals['_TASKCLAIMRESPONSE']._serialized_end=3002 + _globals['_HEARTBEAT']._serialized_start=3005 + _globals['_HEARTBEAT']._serialized_end=3363 + _globals['_HEALTHCHECKRESPONSE']._serialized_start=3365 + _globals['_HEALTHCHECKRESPONSE']._serialized_end=3410 + _globals['_FILESYNCMESSAGE']._serialized_start=3413 + _globals['_FILESYNCMESSAGE']._serialized_end=3641 + _globals['_SYNCCONTROL']._serialized_start=3644 + _globals['_SYNCCONTROL']._serialized_end=3943 + _globals['_SYNCCONTROL_ACTION']._serialized_start=3773 + _globals['_SYNCCONTROL_ACTION']._serialized_end=3943 + _globals['_DIRECTORYMANIFEST']._serialized_start=3945 + _globals['_DIRECTORYMANIFEST']._serialized_end=4015 + _globals['_FILEINFO']._serialized_start=4017 + _globals['_FILEINFO']._serialized_end=4085 + _globals['_FILEPAYLOAD']._serialized_start=4088 + _globals['_FILEPAYLOAD']._serialized_end=4261 + _globals['_SYNCSTATUS']._serialized_start=4264 + _globals['_SYNCSTATUS']._serialized_end=4424 + _globals['_SYNCSTATUS_CODE']._serialized_start=4358 + _globals['_SYNCSTATUS_CODE']._serialized_end=4424 + _globals['_AGENTORCHESTRATOR']._serialized_start=4427 + _globals['_AGENTORCHESTRATOR']._serialized_end=4660 # @@protoc_insertion_point(module_scope) diff --git a/deployment/jerxie-prod/docker-compose.production.yml b/deployment/jerxie-prod/docker-compose.production.yml index b5f137c..64d1040 100644 --- a/deployment/jerxie-prod/docker-compose.production.yml +++ b/deployment/jerxie-prod/docker-compose.production.yml @@ -15,11 +15,7 @@ - SUPER_ADMINS=axieyangb@gmail.com,jerxie.app@gmail.com - SECRET_KEY=aYc2j1lYUUZXkBFFUndnleZI -# Redirect the persistent data to the NFS volume +# Use local storage for database to avoid NFS locking issues volumes: ai_hub_data: driver: local - driver_opts: - type: "nfs" - o: "addr=192.168.68.90,rw" - device: ":/volume1/docker/ai-hub/data" diff --git a/deployment/test-nodes/docker-compose.test-nodes.yml b/deployment/test-nodes/docker-compose.test-nodes.yml index 58d20ee..a0d8f98 100644 --- a/deployment/test-nodes/docker-compose.test-nodes.yml +++ b/deployment/test-nodes/docker-compose.test-nodes.yml @@ -21,6 +21,10 @@ privileged: true volumes: - ./skills:/app/node_skills:ro + deploy: + resources: + limits: + memory: 512M test-node-2: build: @@ -41,3 +45,7 @@ privileged: true volumes: - ./skills:/app/node_skills:ro + deploy: + resources: + limits: + memory: 512M diff --git a/frontend/src/pages/NodesPage.js b/frontend/src/pages/NodesPage.js index e414ff9..9639d9e 100644 --- a/frontend/src/pages/NodesPage.js +++ b/frontend/src/pages/NodesPage.js @@ -613,6 +613,56 @@ )} +
+ Browser Skill + {editingNodeId === node.node_id ? ( +
+
+ setEditForm({ + ...editForm, + skill_config: { + ...editForm.skill_config, + browser: { ...editForm.skill_config.browser, enabled: e.target.checked } + } + })} + /> + Enable +
+ {editForm.skill_config?.browser?.enabled && ( +
+ setEditForm({ + ...editForm, + skill_config: { + ...editForm.skill_config, + browser: { ...editForm.skill_config.browser, headless: !e.target.checked } + } + })} + /> + Show UI (Headed) +
+ )} +
+ ) : ( +
+ + {node.skill_config?.browser?.enabled ? 'Active' : 'Disabled'} + + {node.skill_config?.browser?.enabled && ( + + {node.skill_config?.browser?.headless === false ? 'Headed' : 'Headless'} + + )} +
+ )} +
{/* SANDBOX POLICY CONFIGURATION โ€” New M6 Feature */} {editingNodeId === node.node_id && editForm.skill_config?.shell?.enabled ? ( diff --git a/scripts/deploy_test_nodes.sh b/scripts/deploy_test_nodes.sh index ad5d8f8..06bf74c 100755 --- a/scripts/deploy_test_nodes.sh +++ b/scripts/deploy_test_nodes.sh @@ -39,7 +39,7 @@ # 2. Cleanup any previous test nodes echo "Cleaning up old test nodes..." - echo '$PASS' | sudo -S docker ps -a --filter "name=cortex-test-node-" -q | xargs -r sudo docker rm -f + echo '$PASS' | sudo -S docker ps -a --filter "name=cortex-test-node-" -q | xargs -r -I % sh -c "echo '$PASS' | sudo -S docker rm -f %" # 3. Spawn N nodes for i in \$(seq 1 $COUNT); do @@ -48,13 +48,22 @@ echo "[+] Starting \$CONTAINER_NAME..." + # Determine the correct token based on node number (matches docker-compose.test-nodes.yml) + if [ "\$i" -eq 1 ]; then + NODE_TOKEN="cortex-secret-shared-key" + else + NODE_TOKEN="ysHjZIRXeWo-YYK6EWtBsIgJ4uNBihSnZMtt0BQW3eI" + fi + echo '$PASS' | sudo -S docker run -d \\ --name "\$CONTAINER_NAME" \\ --network cortex-hub_default \\ -e AGENT_NODE_ID="\$NODE_ID" \\ -e AGENT_NODE_DESC="Scalable Test Node #\$i" \\ -e GRPC_ENDPOINT="ai_hub_service:50051" \\ - -e AGENT_SECRET_KEY="cortex-secret-shared-key" \\ + -e AGENT_HUB_URL="http://ai_hub_service:8000" \\ + -e AGENT_AUTH_TOKEN="\$NODE_TOKEN" \\ + -e AGENT_SECRET_KEY="aYc2j1lYUUZXkBFFUndnleZI" \\ -e AGENT_TLS_ENABLED="false" \\ agent-node-base done diff --git a/scripts/register_test_nodes.py b/scripts/register_test_nodes.py index 867819a..ae30fb0 100644 --- a/scripts/register_test_nodes.py +++ b/scripts/register_test_nodes.py @@ -4,7 +4,7 @@ # Ensure we can import from app sys.path.append("/app") -from app.db.database import SessionLocal +from app.db.session import SessionLocal from app.db.models import AgentNode, NodeGroupAccess db = SessionLocal() @@ -29,7 +29,7 @@ "display_name": "Test Node 2", "description": "Scaled Pod 2", "registered_by": "9a333ccd-9c3f-432f-a030-7b1e1284a436", - "invite_token": "cortex-secret-shared-key", + "invite_token": "ysHjZIRXeWo-YYK6EWtBsIgJ4uNBihSnZMtt0BQW3eI", "is_active": True, "skill_config": { "shell": {"enabled": True}, diff --git a/scripts/remote_deploy.sh b/scripts/remote_deploy.sh index 3963e52..737cfa9 100755 --- a/scripts/remote_deploy.sh +++ b/scripts/remote_deploy.sh @@ -61,7 +61,7 @@ # 1. Sync local codebase to temporary directory on remote server echo "Syncing local files to production [USER: $USER, HOST: $HOST]..." -sshpass -p "$PASS" rsync -avz \ +sshpass -p "$PASS" rsync -avz --delete \ --exclude '.git' \ --exclude 'node_modules' \ --exclude 'frontend/node_modules' \ @@ -75,11 +75,10 @@ exit 1 fi -# 2. Copy the synced files into the actual project directory replacing the old ones +# 2. Sync from the temporary directory to the actual project directory echo "Overwriting production project files..." sshpass -p "$PASS" ssh -o StrictHostKeyChecking=no "$USER@$HOST" << EOF - echo '$PASS' | sudo -S rm -rf $REMOTE_PROJ/nginx.conf - echo '$PASS' | sudo -S cp -r ${REMOTE_TMP}* $REMOTE_PROJ/ + echo '$PASS' | sudo -S rsync -av --delete ${REMOTE_TMP}/ $REMOTE_PROJ/ echo '$PASS' | sudo -S chown -R $USER:$USER $REMOTE_PROJ EOF diff --git a/skills/browser-automation-agent/logic.py b/skills/browser-automation-agent/logic.py deleted file mode 100644 index c7d7ea7..0000000 --- a/skills/browser-automation-agent/logic.py +++ /dev/null @@ -1,369 +0,0 @@ -import threading -import queue -import time -import json -import re -from playwright.sync_api import sync_playwright -from agent_node.skills.base import BaseSkill -from protos import agent_pb2 - -# ============================================================ -# Role-Ref Registry -# Inspired by Openclaw's pw-role-snapshot.ts -# Maps `ref=eN` shorthand -> (role, name, nth) for every -# interactive / content element on the last snapshotted page. -# ============================================================ - -INTERACTIVE_ROLES = { - "button", "link", "textbox", "checkbox", "radio", "combobox", - "listbox", "menuitem", "menuitemcheckbox", "menuitemradio", - "option", "searchbox", "slider", "spinbutton", "switch", "tab", "treeitem", -} -CONTENT_ROLES = { - "heading", "cell", "gridcell", "columnheader", "rowheader", - "listitem", "article", "region", "main", "navigation", -} -STRUCTURAL_ROLES = { - "generic", "group", "list", "table", "row", "rowgroup", "grid", - "treegrid", "menu", "menubar", "toolbar", "tablist", "tree", - "directory", "document", "application", "presentation", "none", -} - - -def _build_aria_snapshot(aria_text: str) -> tuple[str, dict]: - """ - Parse Playwright's ariaSnapshot() output and annotate interactive/content - elements with stable [ref=eN] labels that the AI can refer back to. - Returns (annotated_snapshot, ref_map). - """ - lines = aria_text.split("\n") - refs = {} - counter = [0] - role_counts = {} # (role, name) -> count (for nth disambiguation) - output_lines = [] - - def next_ref(): - counter[0] += 1 - return f"e{counter[0]}" - - for line in lines: - m = re.match(r'^(\s*-\s*)(\w+)(?:\s+"([^"]*)")?(.*)$', line) - if not m: - output_lines.append(line) - continue - - prefix, role_raw, name, suffix = m.group(1), m.group(2), m.group(3), m.group(4) - role = role_raw.lower() - - is_interactive = role in INTERACTIVE_ROLES - is_content_with_name = role in CONTENT_ROLES and name - - if not (is_interactive or is_content_with_name): - output_lines.append(line) - continue - - # assign ref - ref = next_ref() - key = (role, name) - nth = role_counts.get(key, 0) - role_counts[key] = nth + 1 - - refs[ref] = {"role": role, "name": name, "nth": nth if nth > 0 else None} - - enhanced = f"{prefix}{role_raw}" - if name: - enhanced += f' "{name}"' - enhanced += f" [ref={ref}]" - if nth > 0: - enhanced += f" [nth={nth}]" - if suffix: - enhanced += suffix - output_lines.append(enhanced) - - return "\n".join(output_lines), refs - - -def _resolve_ref(page, ref: str, role_refs: dict): - """Resolve a [ref=eN] string to a Playwright Locator.""" - info = role_refs.get(ref) - if not info: - raise ValueError(f"Unknown ref '{ref}'. Run aria_snapshot first and use a ref from that output.") - role = info["role"] - name = info.get("name") - nth = info.get("nth") or 0 - if name: - loc = page.get_by_role(role, name=name, exact=True) - else: - loc = page.get_by_role(role) - if nth: - loc = loc.nth(nth) - return loc - - -class BrowserSkill(BaseSkill): - """ - Persistent Browser Skill โ€” OpenClaw-inspired role-snapshot architecture. - - Key innovation over the prior version: - - `aria_snapshot` action returns a compact semantic role tree with [ref=eN] labels. - - All `click`, `type`, `hover` actions accept either a CSS/XPath selector OR a - ref string like 'e3', enabling the AI to address elements without fragile selectors. - - Page errors and console output are tracked per-session and included in results. - """ - def __init__(self, sync_mgr=None): - self.task_queue = queue.Queue() - # session_id -> { "context", "page", "role_refs", "console", "errors", "download_dir" } - self.sessions = {} - self.sync_mgr = sync_mgr - self.lock = threading.Lock() - threading.Thread(target=self._browser_actor, daemon=True, name="BrowserActor").start() - - # ------------------------------------------------------------------ - # Session Management - # ------------------------------------------------------------------ - - def _get_or_create_session(self, browser, sid, task, on_event): - """Return existing session dict or create a new one.""" - with self.lock: - if sid in self.sessions: - return self.sessions[sid] - - download_dir = None - if self.sync_mgr and task.session_id: - download_dir = self.sync_mgr.get_session_dir(task.session_id) - print(f" [๐ŸŒ๐Ÿ“] Mapping Browser Context to: {download_dir}") - - ctx = browser.new_context(accept_downloads=True) - page = ctx.new_page() - - sess = { - "context": ctx, - "page": page, - "role_refs": {}, # ref -> {role, name, nth} - "console": [], - "errors": [], - "download_dir": download_dir, - } - self.sessions[sid] = sess - - # Listeners - self._attach_listeners(sid, page, on_event, sess) - return sess - - def _attach_listeners(self, sid, page, on_event, sess): - # Console log capture - def _on_console(msg): - entry = {"level": msg.type, "text": msg.text, "ts": int(time.time() * 1000)} - sess["console"].append(entry) - if len(sess["console"]) > 200: - sess["console"].pop(0) - if on_event: - on_event(agent_pb2.BrowserEvent( - session_id=sid, - console_msg=agent_pb2.ConsoleMessage( - level=msg.type, text=msg.text, timestamp_ms=entry["ts"] - ) - )) - - def _on_page_error(err): - sess["errors"].append({"message": str(err), "ts": int(time.time() * 1000)}) - if len(sess["errors"]) > 100: - sess["errors"].pop(0) - - def _on_network(req): - resp = req.response() - if on_event: - on_event(agent_pb2.BrowserEvent( - session_id=sid, - network_req=agent_pb2.NetworkRequest( - method=req.method, url=req.url, - status=resp.status if resp else 0, - resource_type=req.resource_type, latency_ms=0 - ) - )) - - def _on_download(dl): - import os - with self.lock: - s = self.sessions.get(sid) - if s and s.get("download_dir"): - os.makedirs(s["download_dir"], exist_ok=True) - target = os.path.join(s["download_dir"], dl.suggested_filename) - print(f" [๐ŸŒ๐Ÿ“ฅ] Download: {dl.suggested_filename} -> {target}") - dl.save_as(target) - - page.on("console", _on_console) - page.on("pageerror", _on_page_error) - page.on("requestfinished", _on_network) - page.on("download", _on_download) - - # ------------------------------------------------------------------ - # Browser Actor Loop - # ------------------------------------------------------------------ - - def _browser_actor(self): - print("[๐ŸŒ] Browser Actor Starting...", flush=True) - pw = browser = None - try: - try: - pw = sync_playwright().start() - except Exception as pe: - print(f"[!] Playwright failed to start: {pe}", flush=True) - return - - try: - browser = pw.chromium.launch(headless=True, args=[ - '--no-sandbox', '--disable-setuid-sandbox', - '--disable-dev-shm-usage', '--disable-gpu' - ]) - print("[๐ŸŒ] Browser Engine Online.", flush=True) - except Exception as be: - print(f"[!] Chromium launch failed: {be}", flush=True) - if pw: pw.stop() - return - except Exception as e: - print(f"[!] Browser Actor critical failure: {e}", flush=True) - if pw: pw.stop() - return - - while True: - try: - item = self.task_queue.get() - if item is None: - print("[๐ŸŒ] Browser Actor Shutting Down...", flush=True) - break - - task, sandbox, on_complete, on_event = item - action = task.browser_action - sid = action.session_id or "default" - action_name = agent_pb2.BrowserAction.ActionType.Name(action.action) - print(f" [๐ŸŒ] {action_name} | Session: {sid}", flush=True) - - sess = self._get_or_create_session(browser, sid, task, on_event) - page = sess["page"] - - res_data = {} - try: - self._dispatch_action(action, page, sess, res_data) - except Exception as e: - on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id) - continue - - # Build BrowserResponse โ€” include aria_snapshot result in eval_result - br_res = agent_pb2.BrowserResponse( - url=page.url, - title=page.title(), - snapshot=res_data.get("snapshot", b""), - dom_content=res_data.get("dom_content", ""), - a11y_tree=res_data.get("a11y_tree", ""), - eval_result=res_data.get("eval_result", ""), - ) - on_complete(task.task_id, {"status": 1, "browser_result": br_res}, task.trace_id) - - except Exception as e: - print(f" [!] Browser Actor Error: {e}", flush=True) - try: - on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id) - except Exception: - pass - - # Cleanup - print("[๐ŸŒ] Cleaning up Browser Engine...", flush=True) - with self.lock: - for s in self.sessions.values(): - try: s["context"].close() - except: pass - self.sessions.clear() - if browser: browser.close() - if pw: pw.stop() - - # ------------------------------------------------------------------ - # Action Dispatcher - # ------------------------------------------------------------------ - - def _dispatch_action(self, action, page, sess, res_data): - A = agent_pb2.BrowserAction - role_refs = sess["role_refs"] - - def resolve(selector_or_ref: str): - """Accept either a CSS selector or a ref like 'e3'.""" - s = (selector_or_ref or "").strip() - if re.match(r'^e\d+$', s): - return _resolve_ref(page, s, role_refs) - return page.locator(s) - - if action.action == A.NAVIGATE: - page.goto(action.url, wait_until="domcontentloaded", timeout=25000) - # Auto-snapshot after every navigation: give AI page context immediately - aria_raw = page.locator(":root").aria_snapshot() - snap, refs = _build_aria_snapshot(aria_raw) - sess["role_refs"] = refs - # Trim to 8000 chars to avoid bloating the grpc response - trimmed = snap[:8000] + ("\n\n[...snapshot truncated...]" if len(snap) > 8000 else "") - stats = f"refs={len(refs)}" - res_data["a11y_tree"] = trimmed - res_data["eval_result"] = stats - - elif action.action == A.CLICK: - target = action.selector or "" - resolve(target).click(timeout=8000) - - elif action.action == A.TYPE: - target = action.selector or "" - resolve(target).fill(action.text, timeout=8000) - - elif action.action == A.SCREENSHOT: - res_data["snapshot"] = page.screenshot(full_page=False) - - elif action.action == A.GET_DOM: - res_data["dom_content"] = page.content() - - elif action.action == A.HOVER: - target = action.selector or "" - resolve(target).hover(timeout=5000) - - elif action.action == A.SCROLL: - page.mouse.wheel(x=0, y=action.y or 400) - - elif action.action == A.EVAL: - result = page.evaluate(action.text) - res_data["eval_result"] = str(result) - - elif action.action == A.GET_A11Y: - # OpenClaw-style role snapshot with ref labels โ€” the key feature! - aria_raw = page.locator(":root").aria_snapshot() - snap, refs = _build_aria_snapshot(aria_raw) - sess["role_refs"] = refs # remember refs for subsequent click/type calls - - # Trim large snapshots (news pages can be huge) - MAX = 10000 - if len(snap) > MAX: - snap = snap[:MAX] + "\n\n[...snapshot truncated - use eval/scroll to see more...]" - - stats = { - "total_refs": len(refs), - "interactive": sum(1 for r in refs.values() if r["role"] in INTERACTIVE_ROLES), - "url": page.url, - "title": page.title(), - } - res_data["a11y_tree"] = snap - res_data["eval_result"] = json.dumps(stats) - - elif action.action == A.CLOSE: - with self.lock: - s = self.sessions.pop(action.session_id or "default", None) - if s: - s["context"].close() - - # ------------------------------------------------------------------ - # Public Interface - # ------------------------------------------------------------------ - - def execute(self, task, sandbox, on_complete, on_event=None): - self.task_queue.put((task, sandbox, on_complete, on_event)) - - def cancel(self, task_id): - return False - - def shutdown(self): - self.task_queue.put(None) diff --git a/skills/mesh-file-explorer/SKILL.md b/skills/mesh-file-explorer/SKILL.md index 1c7f064..625fd8f 100644 --- a/skills/mesh-file-explorer/SKILL.md +++ b/skills/mesh-file-explorer/SKILL.md @@ -47,9 +47,22 @@ # Mesh File Explorer -You are a file management assistant. Use this tool for high-performance file operations: -1. **`list`**: Explore directories. If a 'session_id' is provided, it uses the zero-latency Hub mirror. -2. **`read`**: Fetch file content. Uses local Hub mirror fast-path if available. -3. **`write`**: Synchronously update Hub mirror and background push to node. -4. **`delete`**: Remove from Hub and dispatch remote delete. -Always include 'session_id' for improved performance unless you need to bypass the ghost mirror. +You are a decentralized file management specialist. Use this tool based on the context: + +### 1. ๐Ÿ”„ Standard Workspace Sync (Ghost Mirror) +- **WHEN**: You are working on project files intended to sync across all nodes. +- **PATH**: Use a **RELATIVE** path (e.g., `src/main.py`). NEVER use absolute paths starting with `/tmp/cortex-sync/`. +- **SESSION**: You MUST provide the `session_id` (usually your current Ghost Mirror ID). +- **BENEFIT**: Zero-latency write to the Hub mirror + background push to nodes. + +### 2. ๐Ÿ–ฅ๏ธ Physical Node Maintenance +- **WHEN**: You need to interact with system files OUTSIDE the project workspace (e.g., `/etc/hosts` or personal home dirs). +- **PATH**: Use an **ABSOLUTE** path. +- **SESSION**: Set `session_id` to `__fs_explorer__`. +- **BEHAVIOR**: Direct gRPC call to the physical node. Slower, but bypasses the mirror. + +### Actions +- **`list`**: Explore the filesystem. +- **`read`**: Retrieve content. +- **`write`**: Create/Update files. +- **`delete`**: Remove files. diff --git a/skills/mesh-file-explorer/logic.py b/skills/mesh-file-explorer/logic.py deleted file mode 100644 index a8bd080..0000000 --- a/skills/mesh-file-explorer/logic.py +++ /dev/null @@ -1,77 +0,0 @@ -import os -import json -import logging -from agent_node.skills.base import BaseSkill - -logger = logging.getLogger(__name__) - -class FileSkill(BaseSkill): - """Provides file system navigation and inspection capabilities.""" - - def __init__(self, sync_mgr=None): - self.sync_mgr = sync_mgr - - def execute(self, task, sandbox, on_complete, on_event=None): - """ - Executes a file-related task (list, stats). - Payload JSON: { "action": "list", "path": "...", "recursive": false } - """ - try: - payload = json.loads(task.payload_json) - action = payload.get("action", "list") - path = payload.get("path", ".") - - # 1. Sandbox Jail Check - # (In a real implementation, we'd use sandbox.check_path(path)) - # For now, we'll assume the node allows browsing its root or session dir. - - if action == "list": - result = self._list_dir(path, payload.get("recursive", False)) - on_complete(task.task_id, {"status": 1, "stdout": json.dumps(result)}, task.trace_id) - else: - on_complete(task.task_id, {"status": 0, "stderr": f"Unknown action: {action}"}, task.trace_id) - - except Exception as e: - logger.error(f"[FileSkill] Task {task.task_id} failed: {e}") - on_complete(task.task_id, {"status": 0, "stderr": str(e)}, task.trace_id) - - def _list_dir(self, path, recursive=False): - """Lists directory contents with metadata.""" - if not os.path.exists(path): - return {"error": "Path not found"} - - items = [] - if recursive: - for root, dirs, files in os.walk(path): - for name in dirs + files: - abs_path = os.path.join(root, name) - rel_path = os.path.relpath(abs_path, path) - st = os.stat(abs_path) - items.append({ - "name": name, - "path": rel_path, - "is_dir": os.path.isdir(abs_path), - "size": st.st_size, - "mtime": st.st_mtime - }) - else: - for name in os.listdir(path): - abs_path = os.path.join(path, name) - st = os.stat(abs_path) - items.append({ - "name": name, - "is_dir": os.path.isdir(abs_path), - "size": st.st_size, - "mtime": st.st_mtime - }) - - return { - "root": os.path.abspath(path), - "items": sorted(items, key=lambda x: (not x["is_dir"], x["name"])) - } - - def cancel(self, task_id): - return False # Listing is usually fast, no cancellation needed - - def shutdown(self): - pass diff --git a/skills/mesh-terminal-control/logic.py b/skills/mesh-terminal-control/logic.py deleted file mode 100644 index 5a138b7..0000000 --- a/skills/mesh-terminal-control/logic.py +++ /dev/null @@ -1,413 +0,0 @@ -import os -import pty -import select -import threading -import time -import termios -import struct -import fcntl -import tempfile -from agent_node.skills.base import BaseSkill -from protos import agent_pb2 - -class ShellSkill(BaseSkill): - """Admin Console Skill: Persistent stateful Bash via PTY.""" - def __init__(self, sync_mgr=None): - self.sync_mgr = sync_mgr - self.sessions = {} # session_id -> {fd, pid, thread, last_activity, ...} - self.lock = threading.Lock() - - # Phase 3: Prompt Patterns for Edge Intelligence - self.PROMPT_PATTERNS = [ - r"[\r\n].*[@\w\.\-]+:.*[#$]\s*$", # bash/zsh: user@host:~$ - r">>>\s*$", # python - r"\.\.\.\s*$", # python multi-line - r">\s*$", # node/js - ] - - # --- M7: Idle Session Reaper --- - # Automatically kills dormant bash processes to free up system resources. - self.reaper_thread = threading.Thread(target=self._session_reaper, daemon=True, name="ShellReaper") - self.reaper_thread.start() - - def _session_reaper(self): - """Background thread that cleans up unused PTY sessions.""" - while True: - time.sleep(60) - with self.lock: - now = time.time() - for sid, sess in list(self.sessions.items()): - # Avoid reaping currently active tasks - if sess.get("active_task"): - continue - - # 10 minute idle timeout - if now - sess.get("last_activity", 0) > 600: - print(f" [๐Ÿš๐Ÿงน] Reaping idle shell session: {sid}") - try: - os.close(sess["fd"]) - os.kill(sess["pid"], 9) - except: pass - self.sessions.pop(sid, None) - - def _ensure_session(self, session_id, cwd, on_event): - with self.lock: - if session_id in self.sessions: - self.sessions[session_id]["last_activity"] = time.time() - return self.sessions[session_id] - - print(f" [๐Ÿš] Initializing Persistent Shell Session: {session_id}") - # Spawn bash in a pty - pid, fd = pty.fork() - if pid == 0: # Child - # Environment prep - os.environ["TERM"] = "xterm-256color" - - # Change to CWD - if cwd and os.path.exists(cwd): - os.chdir(cwd) - - # Launch shell - shell_path = "/bin/bash" - if not os.path.exists(shell_path): - shell_path = "/bin/sh" - os.execv(shell_path, [shell_path, "--login"]) - - # Parent - # Set non-blocking - fl = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) - - sess = { - "fd": fd, - "pid": pid, - "last_activity": time.time(), - "buffer_file": None, - "tail_buffer": "", - "active_task": None - } - - def reader(): - while True: - try: - r, _, _ = select.select([fd], [], [], 0.1) - if fd in r: - data = os.read(fd, 4096) - if not data: break - - decoded = data.decode("utf-8", errors="replace") - - # Streaming/Sync logic (Detect completion marker) - with self.lock: - active_tid = sess.get("active_task") - marker = sess.get("marker") - if active_tid and marker and sess.get("buffer_file"): - # Phase 2: Persistence Offloading - # Write directly to disk instead of heap memory - sess["buffer_file"].write(decoded) - sess["buffer_file"].flush() - - # Keep a tiny 4KB tail in RAM for marker detection and prompt scanning - sess["tail_buffer"] = (sess.get("tail_buffer", "") + decoded)[-4096:] - - if marker in sess["tail_buffer"]: - # Marker found! Extract exit code - try: - # The tail buffer has the marker - after_marker = sess["tail_buffer"].split(marker)[1].strip().split() - exit_code = int(after_marker[0]) if after_marker else 0 - - # Formulate final stdout summary from the disk file - bf = sess["buffer_file"] - bf.seek(0, 2) - file_len = bf.tell() - - HEAD, TAIL = 10_000, 30_000 - if file_len > HEAD + TAIL: - bf.seek(0) - head_str = bf.read(HEAD) - bf.seek(file_len - TAIL) - tail_str = bf.read() - omitted = file_len - HEAD - TAIL - pure_stdout = head_str + f"\n\n[... {omitted:,} bytes omitted (full output safely preserved at {bf.name}) ...]\n\n" + tail_str - else: - bf.seek(0) - pure_stdout = bf.read() - - # Slice off the marker string and anything after it from the final result - pure_stdout = pure_stdout.split(marker)[0] - - sess["result"]["stdout"] = pure_stdout - sess["result"]["status"] = 0 if exit_code == 0 else 1 - - # Close the file handle (leaves file on disk) - sess["buffer_file"].close() - sess["buffer_file"] = None - - sess["event"].set() - decoded = pure_stdout.split(marker)[0][-4096:] if marker in pure_stdout else pure_stdout - except Exception as e: - print(f" [๐Ÿšโš ๏ธ] Marker parsing failed: {e}") - sess["event"].set() - - # Stream terminal output back (with stealth filtering) - if on_event: - stealth_out = decoded - if "__CORTEX_FIN_SH_" in decoded: - import re - # We remove any line that contains our internal marker to hide plumbing from user. - # This covers both the initial command echo and the exit code output. - stealth_out = re.sub(r'.*__CORTEX_FIN_SH_.*[\r\n]*', '', decoded) - - if stealth_out: - # Phase 3: Client-Side Truncation (Stream Rate Limiting) - # Limit real-time stream to 15KB/sec per session to prevent flooding the Hub over gRPC. - # The full output is still safely written to the tempfile on disk. - with self.lock: - now = time.time() - if now - sess.get("stream_window_start", 0) > 1.0: - sess["stream_window_start"] = now - sess["stream_bytes_sent"] = 0 - dropped = sess.get("stream_dropped_bytes", 0) - if dropped > 0: - drop_msg = f"\n[... {dropped:,} bytes truncated from live stream ...]\n" - event = agent_pb2.SkillEvent( - session_id=session_id, task_id=sess.get("active_task") or "", terminal_out=drop_msg - ) - on_event(agent_pb2.ClientTaskMessage(skill_event=event)) - sess["stream_dropped_bytes"] = 0 - - if sess.get("stream_bytes_sent", 0) + len(stealth_out) > 15_000: - sess["stream_dropped_bytes"] = sess.get("stream_dropped_bytes", 0) + len(stealth_out) - else: - sess["stream_bytes_sent"] = sess.get("stream_bytes_sent", 0) + len(stealth_out) - event = agent_pb2.SkillEvent( - session_id=session_id, - task_id=sess.get("active_task") or "", - terminal_out=stealth_out - ) - on_event(agent_pb2.ClientTaskMessage(skill_event=event)) - - # EDGE INTELLIGENCE: Proactively signal prompt detection - # We only check for prompts if we are actively running a task and haven't found the marker yet. - if active_tid and not sess["event"].is_set(): - import re - tail = sess["tail_buffer"][-100:] if len(sess["tail_buffer"]) > 100 else sess["tail_buffer"] - for pattern in self.PROMPT_PATTERNS: - if re.search(pattern, tail): - # Send specific prompt signal - # Use last 20 chars as the 'prompt' hint - p_hint = tail[-20:].strip() - prompt_event = agent_pb2.SkillEvent( - session_id=session_id, - task_id=active_tid, - prompt=p_hint - ) - on_event(agent_pb2.ClientTaskMessage(skill_event=prompt_event)) - break - except (EOFError, OSError): - break - - # Thread Cleanup - print(f" [๐Ÿš] Shell Session Terminated: {session_id}") - with self.lock: - self.sessions.pop(session_id, None) - - t = threading.Thread(target=reader, daemon=True, name=f"ShellReader-{session_id}") - t.start() - sess["thread"] = t - - self.sessions[session_id] = sess - return sess - - - def handle_transparent_tty(self, task, on_complete, on_event=None): - """Processes raw TTY/Resize events synchronously (bypasses threadpool/sandbox).""" - cmd = task.payload_json - session_id = task.session_id or "default-session" - try: - import json - if cmd.startswith('{') and cmd.endswith('}'): - raw_payload = json.loads(cmd) - - # 1. Raw Keystroke forward - if isinstance(raw_payload, dict) and "tty" in raw_payload: - raw_bytes = raw_payload["tty"] - sess = self._ensure_session(session_id, None, on_event) - os.write(sess["fd"], raw_bytes.encode("utf-8")) - on_complete(task.task_id, {"stdout": "", "status": 0}, task.trace_id) - return True - - # 2. Window Resize - if isinstance(raw_payload, dict) and raw_payload.get("action") == "resize": - cols = raw_payload.get("cols", 80) - rows = raw_payload.get("rows", 24) - sess = self._ensure_session(session_id, None, on_event) - import termios, struct, fcntl - s = struct.pack('HHHH', rows, cols, 0, 0) - fcntl.ioctl(sess["fd"], termios.TIOCSWINSZ, s) - print(f" [๐Ÿš] Terminal Resized to {cols}x{rows}") - on_complete(task.task_id, {"stdout": f"resized to {cols}x{rows}", "status": 0}, task.trace_id) - return True - except Exception as pe: - print(f" [๐Ÿš] Transparent TTY Fail: {pe}") - return False - - def execute(self, task, sandbox, on_complete, on_event=None): - """Dispatches command string to the persistent PTY shell and WAITS for completion.""" - session_id = task.session_id or "default-session" - tid = task.task_id - try: - cmd = task.payload_json - - # --- Legacy Full-Command Execution (Sandboxed) --- - allowed, status_msg = sandbox.verify(cmd) - if not allowed: - err_msg = f"\r\n[System] Command blocked: {status_msg}\r\n" - if on_event: - event = agent_pb2.SkillEvent( - session_id=session_id, task_id=tid, - terminal_out=err_msg - ) - on_event(agent_pb2.ClientTaskMessage(skill_event=event)) - - return on_complete(tid, {"stderr": f"SANDBOX_VIOLATION: {status_msg}", "status": 2}, task.trace_id) - - # Resolve CWD jail - cwd = None - if self.sync_mgr and task.session_id: - cwd = self.sync_mgr.get_session_dir(task.session_id) - elif sandbox.policy.get("WORKING_DIR_JAIL"): - cwd = sandbox.policy["WORKING_DIR_JAIL"] - if not os.path.exists(cwd): - try: os.makedirs(cwd, exist_ok=True) - except: pass - - # Handle Session Persistent Process - sess = self._ensure_session(session_id, cwd, on_event) - - # Check for RAW mode first (bypasses busy check for interactive control) - is_raw = cmd.startswith("!RAW:") - if is_raw: - input_str = cmd[5:] + "\n" - print(f" [๐ŸšโŒจ๏ธ] RAW Input Injection: {input_str.strip()}") - os.write(sess["fd"], input_str.encode("utf-8")) - return on_complete(tid, {"stdout": "INJECTED", "status": 1}, task.trace_id) - - # --- 0. Busy Check: Serialize access to the PTY for standard commands --- - with self.lock: - if sess.get("active_task"): - curr_tid = sess.get("active_task") - return on_complete(tid, {"stderr": f"[BUSY] Session {session_id} is already running task {curr_tid}", "status": 2}, task.trace_id) - - # --- Blocking Wait Logic --- - # --- Blocking Wait Logic --- - marker_id = int(time.time()) - marker = f"__CORTEX_FIN_SH_{marker_id}__" - event = threading.Event() - result_container = {"stdout": "", "status": 1} # 1 = Success by default (node.py convention) - - # Register waiter in session state - with self.lock: - sess["active_task"] = tid - sess["marker"] = marker - sess["event"] = event - # Create a persistent tempfile for stdout instead of RAM buffer - sess["buffer_file"] = tempfile.NamedTemporaryFile("w+", encoding="utf-8", prefix=f"cortex_task_{tid}_", delete=False) - sess["tail_buffer"] = "" - sess["result"] = result_container - sess["cancel_event"] = threading.Event() - - # Input injection: execute command then echo marker and exit code - try: - # 12-factor bash: ( cmd ) ; echo marker $? - # We use "" concatenation in the echo command to ensure the marker literal - # DOES NOT appear in the PTY input echo, preventing premature completion. - full_input = f"({cmd}) ; echo \"__CORTEX_FIN_SH_\"\"{marker_id}__\" $?\n" - os.write(sess["fd"], full_input.encode("utf-8")) - - # Wait for completion (triggered by reader) OR cancellation - timeout = (task.timeout_ms / 1000.0) if task.timeout_ms > 0 else 60.0 - start_time = time.time() - while time.time() - start_time < timeout: - # Check for completion (reader found marker) - if event.is_set(): - return on_complete(tid, result_container, task.trace_id) - - # Check for cancellation (HUB sent cancel) - if sess["cancel_event"].is_set(): - print(f" [๐Ÿš๐Ÿ›‘] Task {tid} cancelled on node.") - return on_complete(tid, {"stderr": "ABORTED", "status": 2}, task.trace_id) - - # Sleep slightly to avoid busy loop - time.sleep(0.1) - - # Timeout Case - print(f" [๐Ÿšโš ๏ธ] Task {tid} timed out on node.") - with self.lock: - if sess.get("buffer_file"): - try: - sess["buffer_file"].seek(0, 2) - file_len = sess["buffer_file"].tell() - HEAD, TAIL = 10_000, 30_000 - if file_len > HEAD + TAIL: - sess["buffer_file"].seek(0) - head_str = sess["buffer_file"].read(HEAD) - sess["buffer_file"].seek(file_len - TAIL) - tail_str = sess["buffer_file"].read() - omitted = file_len - HEAD - TAIL - partial_out = head_str + f"\n\n[... {omitted:,} bytes omitted (full timeout output saved to {sess['buffer_file'].name}) ...]\n\n" + tail_str - else: - sess["buffer_file"].seek(0) - partial_out = sess["buffer_file"].read() - except: - partial_out = "" - else: - partial_out = "" - - on_complete(tid, {"stdout": partial_out, "stderr": "TIMEOUT", "status": 2}, task.trace_id) - - finally: - # Cleanup session task state - with self.lock: - if sess.get("active_task") == tid: - if sess.get("buffer_file"): - try: - sess["buffer_file"].close() - except: pass - sess["buffer_file"] = None - sess["active_task"] = None - sess["marker"] = None - sess["event"] = None - sess["result"] = None - sess["cancel_event"] = None - - except Exception as e: - print(f" [๐ŸšโŒ] Execute Error for {tid}: {e}") - on_complete(tid, {"stderr": str(e), "status": 2}, task.trace_id) - - def cancel(self, task_id: str): - """Cancels an active task โ€” for persistent shell, this sends a SIGINT (Ctrl+C).""" - with self.lock: - for sid, sess in self.sessions.items(): - if sess.get("active_task") == task_id: - print(f"[๐Ÿ›‘] Sending SIGINT (Ctrl+C) to shell session (Task {task_id}): {sid}") - # Write \x03 (Ctrl+C) to the master FD - os.write(sess["fd"], b"\x03") - # Break the wait loop in execute thread - if sess.get("cancel_event"): - sess["cancel_event"].set() - return True - - - def shutdown(self): - """Cleanup: Terminates all persistent shells.""" - with self.lock: - for sid, sess in list(self.sessions.items()): - print(f"[๐Ÿ›‘] Cleaning up persistent shell: {sid}") - try: os.close(sess["fd"]) - except: pass - # kill pid - try: os.kill(sess["pid"], 9) - except: pass - self.sessions.clear()