diff --git a/.agent/workflows/deployment_reference.md b/.agent/workflows/deployment_reference.md
index c11ad5f..e8094da 100644
--- a/.agent/workflows/deployment_reference.md
+++ b/.agent/workflows/deployment_reference.md
@@ -131,6 +131,17 @@
**Symptoms**: `curl -v https://ai.jerxie.com` dumps a generic 404 or `503 Service Unavailable` with `server: envoy`.
**Root Cause**: The Envoy FilterChain (Listener SNI Map) doesn't trace back to a correct, valid Docker IP:Port allocation.
**Verification Check**:
-* Query the Control Plane API: `curl -s http://192.168.68.90:8090/get-cluster?name=_ai_unified_server`.
-* **Explicit Port Matching**: Envoy in production is configured to match both `ai.jerxie.com` and `ai.jerxie.com:443`. If a gRPC client (the Agent) includes the port in its authority header, Envoy must have it in its `virtual_hosts.domains` array or it will throw a 404.
* Make sure `portValue` in the JSON Endpoint equates to the one published in `docker-compose.yml` (`8002` vs `8000`). If mismatched, you must format a JSON package and `POST` it to `/add-cluster` utilizing the EnvoryControlPlane workflow.
+
+### 5. OIDC Login Failures (Stale/Empty Client ID)
+**Symptoms**: User attempts to login but is immediately rejected by Auth. Jerxie (Dex). Dex logs show: `level=ERROR msg="failed to parse authorization request" err="Invalid client_id (\"\")."`
+**Root Cause**: API routes were incorrectly capturing OIDC settings from `settings` at the module's initial import time rather than at request time. Empty/Default environment variables were being permanentized until a full container restart.
+**Verification Check**:
+* Check `ai_hub_service` logs for: `Initiating OIDC login. Client ID: 'cortex-server'`.
+* Ensure that the `settings` singleton is accessed **inside** the route function (or via a helper like `get_oidc_urls`) to ensure dynamic resolution.
+
+### 6. Periodic Hub "Freezes" or Disconnections
+**Symptoms**: The `ai.jerxie.com` UI becomes unresponsive (spinning loaders or time-outs) for 1-2 minutes before suddenly recovering. No backend crashes are recorded.
+**Root Cause**: **SQLite on NFS**. The production deployment uses an NFS-mounted volume (`192.168.68.90`) for the relational database. SQLite's write-ahead-logging (WAL) and intense locking requirements are highly incompatible with network filesystem latencies. Any minor network spike or high NFS load causes the Hub process to block entirely while waiting for a lock.
+**Resolution**:
+* **High Priority**: Move `ai_hub.db` to a **Local Volume** or a **Local Host Path** on `192.168.68.113`. NFS should be reserved for static sync folders or bulky assets, never for the primary relational database.
diff --git a/agent-node/bootstrap_installer.py b/agent-node/bootstrap_installer.py
index 74a1845..befebc3 100644
--- a/agent-node/bootstrap_installer.py
+++ b/agent-node/bootstrap_installer.py
@@ -127,21 +127,42 @@
except Exception as e:
_print(f"Warning: Failed to bootstrap pip: {e}. If dependencies fail, please install python3-pip manually.")
+ install_req_file = req_file
+ tmp_req_path = None
+ if skip_browsers:
+ try:
+ with open(req_file, 'r') as f:
+ lines = f.readlines()
+ # Reuse the already imported tempfile
+ with tempfile.NamedTemporaryFile(mode='w', suffix='_req.txt', delete=False) as tmp_req:
+ for line in lines:
+ if 'playwright' not in line.lower():
+ tmp_req.write(line)
+ tmp_req_path = tmp_req.name
+ install_req_file = tmp_req_path
+ _print("Filtered 'playwright' from dependencies as requested.")
+ except Exception as e:
+ _print(f"Warning: Failed to filter requirements.txt: {e}")
+
_print("Installing Python dependencies (resilient mode) ...")
try:
# Using --ignore-installed to bypass "no RECORD file found" metadata errors common on Mac/Anaconda
# and --user if we don't have root (though usually we do on NAS)
- args = [sys.executable, "-m", "pip", "install", "-r", req_file, "--quiet", "--ignore-installed"]
+ args = [sys.executable, "-m", "pip", "install", "-r", install_req_file, "--quiet", "--ignore-installed"]
# Try a quick check for root/write access to site-packages
try:
subprocess.check_call(args, cwd=install_dir)
- except subprocess.CalledProcessError:
- _print("Standard install failed. Trying --user install...")
- args.append("--user")
- subprocess.check_call(args, cwd=install_dir)
+ except subprocess.CalledProcessError as e:
+ _print(f"Standard install failed (exit {e.returncode}). Trying --user install...")
+ args_user = args + ["--user"]
+ subprocess.check_call(args_user, cwd=install_dir)
- _print("Dependencies installed.")
+ _print("Dependencies installed successfully.")
+
+ # Cleanup temp file if created
+ if tmp_req_path and os.path.exists(tmp_req_path):
+ os.remove(tmp_req_path)
# New: Auto-install playwright browsers if the package is present
if skip_browsers:
@@ -162,6 +183,10 @@
except Exception as e:
_print(f"ERROR: Failed to install dependencies: {e}")
+ _print("-----------------------------------------------------------------------")
+ _print("HINT: If you are on Raspberry Pi / ARM and 'protobuf' or 'grpcio' fails:")
+ _print(" Try manual install: sudo apt-get install python3-protobuf python3-psutil python3-grpcio")
+ _print("-----------------------------------------------------------------------")
_print("The agent might fail to start if core libraries (grpcio, psutil) are missing.")
@@ -316,9 +341,10 @@
skip_browsers = args.skip_browsers or existing_config.get("skip_browsers", False)
+ secret_key_to_save = existing_config.get("secret_key") or hub_token
_install(hub_url, hub_token, install_dir)
_install_deps(install_dir, skip_browsers=skip_browsers)
- _write_config(install_dir, node_id, hub_url, node_token, grpc, secret_key=hub_token)
+ _write_config(install_dir, node_id, hub_url, node_token, grpc, secret_key=secret_key_to_save)
if args.update_only:
_print(f"โ
Updated to v{remote_version}. Not launching (--update-only).")
diff --git a/agent-node/install_service.py b/agent-node/install_service.py
index d6c0672..181cc59 100755
--- a/agent-node/install_service.py
+++ b/agent-node/install_service.py
@@ -49,6 +49,8 @@
{os.path.expanduser("~")}/.cortex/agent.out.log
EnvironmentVariables
+ GRPC_ENABLE_FORK_SUPPORT
+ 1
PATH
/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin
@@ -111,6 +113,7 @@
echo "Starting Cortex Agent..."
mkdir -p "$(dirname "$LOGFILE")"
cd "{get_working_dir()}"
+ export GRPC_ENABLE_FORK_SUPPORT=1
nohup {get_python_path()} {get_agent_main_path()} >> "$LOGFILE" 2>&1 &
echo $! > "$PIDFILE"
echo "Agent started (PID $!)"
@@ -169,6 +172,7 @@
WorkingDirectory={get_working_dir()}
Restart=always
RestartSec=5
+Environment=GRPC_ENABLE_FORK_SUPPORT=1
StandardOutput=append:{os.path.expanduser("~")}/.cortex/agent.out.log
StandardError=append:{os.path.expanduser("~")}/.cortex/agent.err.log
diff --git a/agent-node/requirements.txt b/agent-node/requirements.txt
index 5647eb6..ee9cb24 100644
--- a/agent-node/requirements.txt
+++ b/agent-node/requirements.txt
@@ -1,7 +1,8 @@
-grpcio==1.62.1
-grpcio-tools==1.62.1
+grpcio>=1.48.0
+grpcio-tools>=1.48.0
PyJWT==2.8.0
-playwright==1.42.0
-watchdog==4.0.0
+playwright>=1.47.0
+watchdog>=3.0.0
PyYAML==6.0.1
-psutil==5.9.8
+psutil>=5.8.0
+protobuf>=4.21.6,<6.0.0
diff --git a/agent-node/src/agent_node/config.py b/agent-node/src/agent_node/config.py
index cdf9e12..c355717 100644
--- a/agent-node/src/agent_node/config.py
+++ b/agent-node/src/agent_node/config.py
@@ -1,6 +1,9 @@
import os
import platform
-import yaml
+try:
+ import yaml
+except ImportError:
+ yaml = None
# Path to the generated config file in the bundled distribution
# It sits next to the 'src' directory (two levels up from src/agent_node/config.py)
@@ -31,7 +34,7 @@
TLS_ENABLED = _defaults["tls"]
HEALTH_REPORT_INTERVAL = _defaults["health_report_interval"]
MAX_SKILL_WORKERS = _defaults["max_skill_workers"]
-DEBUG_GRPC = False
+DEBUG_GRPC = True
SECRET_KEY = "dev-secret-key-1337"
HUB_URL = _defaults["hub_url"]
AUTO_UPDATE = _defaults["auto_update"]
@@ -40,6 +43,7 @@
CERT_CLIENT_CRT = "certs/client.crt"
CERT_CLIENT_KEY = "certs/client.key"
FS_ROOT = "/" if platform.system() in ["Darwin", "Linux"] else "C:\\"
+BROWSER_HEADLESS = True # Runtime-togglable: False = headed mode (requires has_display)
def reload():
global NODE_ID, NODE_DESC, SERVER_HOST_PORT, AUTH_TOKEN, SYNC_DIR, TLS_ENABLED
@@ -98,5 +102,11 @@
if not FS_ROOT:
FS_ROOT = "/" if platform.system() in ["Darwin", "Linux"] else "C:\\"
+def set_browser_headless(headless: bool):
+ """Toggle browser headless mode at runtime (no restart needed)."""
+ global BROWSER_HEADLESS
+ BROWSER_HEADLESS = headless
+ print(f"[๐] Browser mode updated: {'headless' if headless else 'headed (UI visible)'}")
+
# Initial load
reload()
diff --git a/agent-node/src/agent_node/core/sandbox.py b/agent-node/src/agent_node/core/sandbox.py
index 8fcfed5..33b2cf0 100644
--- a/agent-node/src/agent_node/core/sandbox.py
+++ b/agent-node/src/agent_node/core/sandbox.py
@@ -12,7 +12,8 @@
"ALLOWED": list(p.allowed_commands),
"DENIED": list(p.denied_commands),
"SENSITIVE": list(p.sensitive_commands),
- "WORKING_DIR_JAIL": p.working_dir_jail
+ "WORKING_DIR_JAIL": p.working_dir_jail,
+ "SKILL_CONFIG": p.skill_config_json
}
def verify(self, command_str):
diff --git a/agent-node/src/agent_node/core/sync.py b/agent-node/src/agent_node/core/sync.py
index f130762..76f8ae8 100644
--- a/agent-node/src/agent_node/core/sync.py
+++ b/agent-node/src/agent_node/core/sync.py
@@ -1,5 +1,8 @@
import os
import hashlib
+import time
+import json
+import zlib
from agent_node.config import SYNC_DIR
from protos import agent_pb2
@@ -53,7 +56,7 @@
for name in files:
abs_path = os.path.join(root, name)
rel_path = os.path.relpath(abs_path, session_dir)
- if rel_path in [".cortexignore", ".gitignore"]: continue
+ if rel_path in [".cortexignore", ".gitignore"] or ".cortex_browser" in rel_path: continue
if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path):
try:
os.remove(abs_path)
@@ -64,7 +67,7 @@
for name in dirs:
abs_path = os.path.join(root, name)
rel_path = os.path.relpath(abs_path, session_dir)
- if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path):
+ if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path) and ".cortex_browser" not in rel_path:
try:
if not os.listdir(abs_path):
os.rmdir(abs_path)
@@ -73,7 +76,7 @@
needs_update = []
for file_info in manifest.files:
- target_path = os.path.join(session_dir, file_info.path)
+ target_path = os.path.join(session_dir, file_info.path.lstrip("/"))
if file_info.is_dir:
os.makedirs(target_path, exist_ok=True)
@@ -83,9 +86,14 @@
if not os.path.exists(target_path):
needs_update.append(file_info.path)
else:
- # Hash comparison
+ # Memory-safe incremental hashing
+ h = hashlib.sha256()
with open(target_path, "rb") as f:
- actual_hash = hashlib.sha256(f.read()).hexdigest()
+ while True:
+ chunk = f.read(1024 * 1024)
+ if not chunk: break
+ h.update(chunk)
+ actual_hash = h.hexdigest()
if actual_hash != file_info.hash:
print(f" [โ ๏ธ] Drift Detected: {file_info.path} (Local: {actual_hash[:8]} vs Remote: {file_info.hash[:8]})")
needs_update.append(file_info.path)
@@ -93,26 +101,93 @@
return needs_update
def write_chunk(self, session_id: str, payload: agent_pb2.FilePayload) -> bool:
- """Writes a file chunk to the local session directory."""
+ """Writes a file chunk to a shadow file and swaps to target on completion."""
session_dir = self.get_session_dir(session_id, create=True)
- target_path = os.path.normpath(os.path.join(session_dir, payload.path))
+ target_path = os.path.normpath(os.path.join(session_dir, payload.path.lstrip("/")))
if not target_path.startswith(session_dir):
return False # Path traversal guard
os.makedirs(os.path.dirname(target_path), exist_ok=True)
- mode = "ab" if payload.chunk_index > 0 else "wb"
- with open(target_path, mode) as f:
- f.write(payload.chunk)
+ # We always write to a temporary "shadow" file during the sync
+ tmp_path = target_path + ".cortex_tmp"
+ lock_path = target_path + ".cortex_lock"
+
+ if payload.chunk_index == 0:
+ # 1. Handle Locks
+ if os.path.exists(lock_path):
+ try:
+ with open(lock_path, "r") as lf:
+ lock_data = json.loads(lf.read())
+ if time.time() - lock_data.get("ts", 0) < 30:
+ print(f" [๐๐] Lock active for {payload.path}. Proceeding with shadow write...")
+ except: pass
+
+ try:
+ with open(lock_path, "w") as lf:
+ lf.write(json.dumps({"ts": time.time(), "owner": "node", "path": payload.path}))
+ except: pass
+
+ # 2. Initialize Shadow File (Truncate)
+ data = payload.chunk
+ if payload.compressed:
+ try: data = zlib.decompress(data)
+ except: pass
+
+ with open(tmp_path, "wb") as f:
+ f.write(data)
+ else:
+ # Random access write to shadow file
+ if not os.path.exists(tmp_path):
+ with open(tmp_path, "ab") as f: pass
- if payload.is_final and payload.hash:
- return self._verify(target_path, payload.hash)
+ data = payload.chunk
+ if payload.compressed:
+ try: data = zlib.decompress(data)
+ except: pass
+
+ with open(tmp_path, "r+b") as f:
+ f.seek(payload.offset if payload.HasField("offset") else 0)
+ f.write(data)
+
+ if payload.is_final:
+ # 3. Finalization: Verify and Swap
+ success = True
+ if payload.hash:
+ success = self._verify(tmp_path, payload.hash)
+
+ if success:
+ try:
+ # Atomic swap: The destination only changes once we are 100% sure the file is right.
+ import shutil
+ os.replace(tmp_path, target_path)
+ except Exception as e:
+ print(f" [๐โ] Atomic swap failed for {payload.path}: {e}")
+ success = False
+
+ # 4. Cleanup
+ if os.path.exists(lock_path):
+ try: os.remove(lock_path)
+ except: pass
+ if os.path.exists(tmp_path) and not success:
+ # If it failed verification or swap, we might want to keep it or delete it.
+ # Let's delete it to allow a clean retry.
+ try: os.remove(tmp_path)
+ except: pass
+
+ return success
return True
def _verify(self, path, expected_hash):
+ # Memory-safe incremental hashing for verification
+ h = hashlib.sha256()
with open(path, "rb") as f:
- actual = hashlib.sha256(f.read()).hexdigest()
+ while True:
+ chunk = f.read(1024 * 1024)
+ if not chunk: break
+ h.update(chunk)
+ actual = h.hexdigest()
if actual != expected_hash:
print(f"[โ ๏ธ] Sync Hash Mismatch for {path}")
return False
diff --git a/agent-node/src/agent_node/main.py b/agent-node/src/agent_node/main.py
index 1085314..2f894e1 100644
--- a/agent-node/src/agent_node/main.py
+++ b/agent-node/src/agent_node/main.py
@@ -1,14 +1,128 @@
import sys
import os
+# gRPC/Mac Stability Tuning
+os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "0"
+os.environ["GRPC_FORK_SUPPORT_ONLY_FOR_SIGCHLD"] = "1"
+os.environ["GRPC_POLL_STRATEGY"] = "poll"
-# Add root to path to find protos and other packages
-sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
+# RELIABILITY: Ensure sys.argv[0] is an absolute path.
+# This is critical for os.execv(...) restarts during auto-updates,
+# especially when the agent is started with a relative path (e.g., 'python3 src/main.py').
+if sys.argv and sys.argv[0] and not os.path.isabs(sys.argv[0]):
+ sys.argv[0] = os.path.abspath(sys.argv[0])
+
+# Add root and protos to path with HIGHEST priority to avoid collision with installed packages
+_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
+sys.path.insert(0, _root)
+sys.path.insert(0, os.path.join(_root, "protos"))
import signal
import time
from agent_node.config import NODE_ID, HUB_URL, AUTH_TOKEN, SECRET_KEY, AUTO_UPDATE, UPDATE_CHECK_INTERVAL
from agent_node.core import updater
+# Pre-flight check for core dependencies
+try:
+ import grpc
+ import google.protobuf
+ import watchdog
+except ImportError as e:
+ err_str = str(e).lower()
+ if "grpc" in err_str:
+ missing, pkg = "grpcio", "python3-grpcio"
+ elif "google" in err_str or "protobuf" in err_str:
+ missing, pkg = "protobuf", "python3-protobuf"
+ else:
+ missing, pkg = "watchdog", "python3-watchdog"
+
+ print("\n" + "!"*71)
+ print(f" CRITICAL ERROR: '{missing}' library is not installed.")
+ print(f" If you are on Raspberry Pi / ARM, run:")
+ print(f" sudo apt-get install {pkg}")
+ print("!"*71 + "\n")
+except ImportError:
+ pass
+
+def enforce_singleton():
+ """
+ Ensures that only one instance of the agent is running from this directory.
+ If siblings are found, they are terminated to prevent resource/port collisions.
+ This version is robust across Linux and Darwin and avoids unnecessary OS bails.
+ """
+ import psutil
+ import os
+
+ current_pid = os.getpid()
+ try:
+ # Use realpath to resolve any symlinks for accurate comparison
+ my_path = os.path.realpath(__file__)
+ except:
+ my_path = os.path.abspath(__file__)
+
+ cleaned = 0
+ try:
+ # iterate over all processes once
+ for proc in psutil.process_iter(['pid', 'cmdline']):
+ try:
+ pid = proc.info['pid']
+ if pid == current_pid:
+ continue
+
+ cmd = proc.info['cmdline']
+ if not cmd or not isinstance(cmd, list):
+ continue
+
+ # We identify a sibling if it's running 'main.py' and resolves to our same directory.
+ is_sibling = False
+ for arg in cmd:
+ if 'main.py' in arg:
+ try:
+ # 1. Try absolute path resolution
+ if os.path.isabs(arg):
+ check_path = os.path.realpath(arg)
+ else:
+ # 2. Try relative resolution based on the sibling's current working directory
+ try:
+ cwd = proc.cwd()
+ check_path = os.path.realpath(os.path.join(cwd, arg))
+ except (psutil.AccessDenied, psutil.NoSuchProcess):
+ # If we can't get the CWD of the other process, we rely on a direct name match
+ # but stay conservative to avoid killing unrelated processes.
+ check_path = None
+
+ if check_path and check_path == my_path:
+ is_sibling = True
+ break
+ except:
+ continue
+
+ if is_sibling:
+ print(f"[*] Cleaning up orphaned agent instance (PID {pid})...")
+ try:
+ p = psutil.Process(pid)
+ p.terminate()
+ try:
+ p.wait(timeout=2)
+ except psutil.TimeoutExpired:
+ p.kill()
+ cleaned += 1
+ except (psutil.NoSuchProcess, psutil.AccessDenied):
+ pass
+ except (psutil.NoSuchProcess, psutil.AccessDenied, KeyError):
+ continue
+ except Exception as e:
+ # Non-fatal: if we can't iterate processes at all, just log and continue.
+ # This prevents the agent from being a 'brick' in restricted environments.
+ print(f"[!] Singleton check warning: {e}")
+
+ if cleaned > 0:
+ print(f"[*] Successfully reaped {cleaned} orphaned instances.")
+
+ if cleaned > 0:
+ print(f"[*] Cleaned up {cleaned} orphaned instances.")
+ else:
+ print("[*] No conflicting agent instances detected.")
+
def main():
import logging
logging.basicConfig(
@@ -16,6 +130,16 @@
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
+
+ # 0. Singleton Enforcement: Murder siblings before booting
+ try:
+ import psutil
+ enforce_singleton()
+ except ImportError:
+ print("[!] psutil not installed โ skipping singleton enforcement. Beware of orphaned processes!")
+ except Exception as e:
+ print(f"[!] Singleton check failed: {e}")
+
print(f"[*] Starting Agent Node: {NODE_ID}...")
# 0. Auto-Update Check (before anything else โ if we're behind, restart now)
@@ -62,6 +186,11 @@
except Exception as e:
print(f"[!] Main Agent process crashed: {e}. Retrying boot in 10s...", flush=True)
+ try:
+ if 'node' in locals():
+ node.stop()
+ except:
+ pass
import traceback
traceback.print_exc()
time.sleep(10)
diff --git a/agent-node/src/agent_node/skills/base.py b/agent-node/src/agent_node/skills/base.py
index 33c88ec..eafe500 100644
--- a/agent-node/src/agent_node/skills/base.py
+++ b/agent-node/src/agent_node/skills/base.py
@@ -8,6 +8,10 @@
"""Attempts to cancel the task and returns success status."""
return False
+ def is_available(self) -> bool:
+ """Returns True if the skill's dependencies and hardware are available."""
+ return True
+
def shutdown(self):
"""Cleanup resources on node exit."""
pass
diff --git a/agent-node/src/agent_node/skills/browser_bridge.py b/agent-node/src/agent_node/skills/browser_bridge.py
new file mode 100644
index 0000000..babddd5
--- /dev/null
+++ b/agent-node/src/agent_node/skills/browser_bridge.py
@@ -0,0 +1,548 @@
+import threading
+import queue
+import time
+import json
+import re
+try:
+ from playwright.sync_api import sync_playwright
+except ImportError:
+ sync_playwright = None
+from agent_node.skills.base import BaseSkill
+from protos import agent_pb2
+try:
+ from agent_node import config as node_config
+except ImportError:
+ node_config = None
+
+# ============================================================
+# Role-Ref Registry
+# Inspired by Openclaw's pw-role-snapshot.ts
+# Maps `ref=eN` shorthand -> (role, name, nth) for every
+# interactive / content element on the last snapshotted page.
+# ============================================================
+
+INTERACTIVE_ROLES = {
+ "button", "link", "textbox", "checkbox", "radio", "combobox",
+ "listbox", "menuitem", "menuitemcheckbox", "menuitemradio",
+ "option", "searchbox", "slider", "spinbutton", "switch", "tab", "treeitem",
+}
+CONTENT_ROLES = {
+ "heading", "cell", "gridcell", "columnheader", "rowheader",
+ "listitem", "article", "region", "main", "navigation",
+}
+STRUCTURAL_ROLES = {
+ "generic", "group", "list", "table", "row", "rowgroup", "grid",
+ "treegrid", "menu", "menubar", "toolbar", "tablist", "tree",
+ "directory", "document", "application", "presentation", "none",
+}
+
+
+def _build_aria_snapshot(aria_text: str) -> tuple[str, dict]:
+ """
+ Parse Playwright's ariaSnapshot() output and annotate interactive/content
+ elements with stable [ref=eN] labels that the AI can refer back to.
+ Returns (annotated_snapshot, ref_map).
+ """
+ lines = aria_text.split("\n")
+ refs = {}
+ counter = [0]
+ role_counts = {} # (role, name) -> count (for nth disambiguation)
+ output_lines = []
+
+ def next_ref():
+ counter[0] += 1
+ return f"e{counter[0]}"
+
+ for line in lines:
+ m = re.match(r'^(\s*-\s*)(\w+)(?:\s+"([^"]*)")?(.*)$', line)
+ if not m:
+ output_lines.append(line)
+ continue
+
+ prefix, role_raw, name, suffix = m.group(1), m.group(2), m.group(3), m.group(4)
+ role = role_raw.lower()
+
+ is_interactive = role in INTERACTIVE_ROLES
+ is_content_with_name = role in CONTENT_ROLES and name
+
+ if not (is_interactive or is_content_with_name):
+ output_lines.append(line)
+ continue
+
+ # assign ref
+ ref = next_ref()
+ key = (role, name)
+ nth = role_counts.get(key, 0)
+ role_counts[key] = nth + 1
+
+ refs[ref] = {"role": role, "name": name, "nth": nth if nth > 0 else None}
+
+ enhanced = f"{prefix}{role_raw}"
+ if name:
+ enhanced += f' "{name}"'
+ enhanced += f" [ref={ref}]"
+ if nth > 0:
+ enhanced += f" [nth={nth}]"
+ if suffix:
+ enhanced += suffix
+ output_lines.append(enhanced)
+
+ return "\n".join(output_lines), refs
+
+
+def _resolve_ref(page, ref: str, role_refs: dict):
+ """Resolve a [ref=eN] string to a Playwright Locator."""
+ info = role_refs.get(ref)
+ if not info:
+ raise ValueError(f"Unknown ref '{ref}'. Run aria_snapshot first and use a ref from that output.")
+ role = info["role"]
+ name = info.get("name")
+ nth = info.get("nth") or 0
+ if name:
+ loc = page.get_by_role(role, name=name, exact=True)
+ else:
+ loc = page.get_by_role(role)
+ if nth:
+ loc = loc.nth(nth)
+ return loc
+
+
+class BrowserSkill(BaseSkill):
+ """
+ Persistent Browser Skill โ OpenClaw-inspired role-snapshot architecture.
+
+ Key innovation over the prior version:
+ - `aria_snapshot` action returns a compact semantic role tree with [ref=eN] labels.
+ - All `click`, `type`, `hover` actions accept either a CSS/XPath selector OR a
+ ref string like 'e3', enabling the AI to address elements without fragile selectors.
+ - Page errors and console output are tracked per-session and included in results.
+ - Supports headed (UI visible) mode for OIDC/OAuth login flows, toggled live from the hub.
+ """
+ def __init__(self, sync_mgr=None):
+ self.task_queue = queue.Queue()
+ # session_id -> { "context", "page", "role_refs", "console", "errors", "download_dir" }
+ self.sessions = {}
+ self.sync_mgr = sync_mgr
+ self.lock = threading.Lock()
+ self._headless = self._read_headless_config()
+ self._actor_thread = threading.Thread(target=self._browser_actor, daemon=True, name="BrowserActor")
+ self._actor_thread.start()
+
+ def is_available(self) -> bool:
+ return sync_playwright is not None
+
+ def _read_headless_config(self) -> bool:
+ """Read headless preference from node config (default: True)."""
+ if node_config:
+ return getattr(node_config, 'BROWSER_HEADLESS', True)
+ return True
+
+ def apply_config(self, skill_config: dict):
+ """
+ Called by the node when the Hub pushes a skill config update.
+ skill_config example: {"browser": {"headless": false}}
+ If headless mode changed, gracefully restarts the browser engine.
+ """
+ browser_cfg = skill_config.get("browser", {})
+ if sync_playwright is None:
+ return
+
+ # Also respect the global node_config toggle
+ new_headless = browser_cfg.get("headless", self._read_headless_config())
+ if new_headless == self._headless:
+ # Only log if specifically requested to headed mode or if we are indeed headed
+ if not self._headless:
+ print(f" [๐] Browser mode remains: headed")
+ return # No change
+
+ mode_str = "headless -> headed" if not new_headless else "headed -> headless"
+ print(f" [๐] Browser mode changing: {mode_str}")
+ self._headless = new_headless
+ # Signal the actor to restart with the new mode
+ self.task_queue.put("__restart__")
+
+ # ------------------------------------------------------------------
+ # Session Management
+ # ------------------------------------------------------------------
+
+ def _get_or_create_session(self, browser, sid, task, on_event):
+ """Return existing session dict or create a new one."""
+ with self.lock:
+ if sid in self.sessions:
+ return self.sessions[sid]
+
+ download_dir = None
+ if self.sync_mgr and task.session_id:
+ download_dir = self.sync_mgr.get_session_dir(task.session_id)
+ print(f" [๐๐] Mapping Browser Context to: {download_dir}")
+
+ ctx = browser.new_context(accept_downloads=True)
+ page = ctx.new_page()
+
+ sess = {
+ "context": ctx,
+ "page": page,
+ "role_refs": {}, # ref -> {role, name, nth}
+ "console": [],
+ "errors": [],
+ "download_dir": download_dir,
+ }
+ self.sessions[sid] = sess
+
+ # Listeners
+ self._attach_listeners(sid, page, on_event, sess)
+ return sess
+
+ def _attach_listeners(self, sid, page, on_event, sess):
+ # Console log capture
+ def _on_console(msg):
+ entry = {"level": msg.type, "text": msg.text, "ts": int(time.time() * 1000)}
+ sess["console"].append(entry)
+ if len(sess["console"]) > 200:
+ sess["console"].pop(0)
+ if on_event:
+ on_event(agent_pb2.BrowserEvent(
+ session_id=sid,
+ console_msg=agent_pb2.ConsoleMessage(
+ level=msg.type, text=msg.text, timestamp_ms=entry["ts"]
+ )
+ ))
+
+ def _on_page_error(err):
+ sess["errors"].append({"message": str(err), "ts": int(time.time() * 1000)})
+ if len(sess["errors"]) > 100:
+ sess["errors"].pop(0)
+
+ def _on_network(req):
+ resp = req.response()
+ if on_event:
+ on_event(agent_pb2.BrowserEvent(
+ session_id=sid,
+ network_req=agent_pb2.NetworkRequest(
+ method=req.method, url=req.url,
+ status=resp.status if resp else 0,
+ resource_type=req.resource_type, latency_ms=0
+ )
+ ))
+
+ def _on_download(dl):
+ import os
+ with self.lock:
+ s = self.sessions.get(sid)
+ if s and s.get("download_dir"):
+ os.makedirs(s["download_dir"], exist_ok=True)
+ target = os.path.join(s["download_dir"], dl.suggested_filename)
+ print(f" [๐๐ฅ] Download: {dl.suggested_filename} -> {target}")
+ dl.save_as(target)
+
+ page.on("console", _on_console)
+ page.on("pageerror", _on_page_error)
+ page.on("requestfinished", _on_network)
+ page.on("download", _on_download)
+
+ # ------------------------------------------------------------------
+ # Browser Actor Loop
+ # ------------------------------------------------------------------
+
+ def _browser_actor(self):
+ """
+ Immortal worker thread for browser operations.
+ Processes the task queue and manages the Playwright lifecycle.
+ """
+ if sync_playwright is None:
+ return
+
+ print("[๐] Browser Actor Starting...", flush=True)
+ pw = browser = None
+
+ def _cleanup_internal():
+ nonlocal pw, browser
+ print("[๐] Cleaning up Browser Engine...", flush=True)
+ with self.lock:
+ for s in self.sessions.values():
+ try: s["context"].close()
+ except: pass
+ self.sessions.clear()
+ if browser:
+ try: browser.close()
+ except: pass
+ if pw:
+ try: pw.stop()
+ except: pass
+ browser = pw = None
+
+ while True:
+ item = self.task_queue.get()
+ if item is None:
+ _cleanup_internal()
+ print("[๐] Browser Actor Shutting Down.", flush=True)
+ break
+
+ # Handle Restart Signal
+ if item == "__restart__":
+ print("[๐] Browser Actor Restarting (Mode Change)...", flush=True)
+ _cleanup_internal()
+ continue
+
+ task, sandbox, on_complete, on_event = item
+
+ # --- Lazy Initialization ---
+ if not pw or not browser:
+ try:
+ pw = sync_playwright().start()
+ headless = self._headless
+ launch_mode = "headless" if headless else "headed"
+ print(f"[๐] Launching Chromium in {launch_mode} mode...", flush=True)
+
+ args = ['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage']
+ if headless: args.append('--disable-gpu')
+
+ browser = pw.chromium.launch(headless=headless, args=args)
+ print("[๐] Browser Engine Online.", flush=True)
+ except Exception as e:
+ print(f"[!] Browser Setup Failed: {e}", flush=True)
+ # Report back to AI immediately so it doesn't hang
+ on_complete(task.task_id, {"stderr": f"Playwright/Chromium Error: {e}", "status": 1}, task.trace_id)
+ _cleanup_internal()
+ continue
+
+ # --- Task Execution ---
+ try:
+ action = task.browser_action
+ sid = action.session_id or "default"
+ action_name = agent_pb2.BrowserAction.ActionType.Name(action.action)
+ print(f" [๐] {action_name} | Session: {sid}", flush=True)
+
+ sess = self._get_or_create_session(browser, sid, task, on_event)
+ page = sess.get("page")
+ res_data = {}
+
+ try:
+ self._dispatch_action(action, page, sess, res_data)
+ # Support for offloading large results to files
+ self._maybe_offload(sess, res_data, on_event)
+ except Exception as e:
+ on_complete(task.task_id, {"stderr": str(e), "status": 1}, task.trace_id)
+ continue
+
+ try:
+ # Build BrowserResponse
+ br_res = agent_pb2.BrowserResponse(
+ url=page.url if page else "",
+ title=page.title() if page else "",
+ snapshot=res_data.get("snapshot", b""),
+ dom_content=res_data.get("dom_content", ""),
+ a11y_tree=res_data.get("a11y_tree", ""),
+ eval_result=res_data.get("eval_result", ""),
+ offloaded=res_data.get("offloaded", False),
+ )
+ on_complete(task.task_id, {"status": 0, "browser_result": br_res}, task.trace_id)
+ except Exception as ex:
+ print(f" [!] Error building response: {ex}", flush=True)
+ on_complete(task.task_id, {"stderr": f"Result parsing error: {ex}", "status": 1}, task.trace_id)
+
+ except Exception as e:
+ print(f" [!] Browser Actor Loop Exception: {e}", flush=True)
+ try:
+ on_complete(task.task_id, {"stderr": f"Actor internal error: {e}", "status": 1}, task.trace_id)
+ except:
+ pass
+
+ def _maybe_offload(self, sess, res_data, on_event):
+ """Offload large strings/bytes to files in the sync directory."""
+ download_dir = sess.get("download_dir")
+ if not download_dir:
+ return
+
+ import os
+ import hashlib
+ offload_dir = os.path.join(download_dir, ".cortex_browser")
+ os.makedirs(offload_dir, exist_ok=True)
+
+ OFFLOAD_THRESHOLD = 128 * 1024 # 128KB
+ offloaded = False
+
+ files_to_push = []
+
+ # 1. Aria Snapshot
+ a11y = res_data.get("a11y_tree", "")
+ if a11y and len(a11y) > OFFLOAD_THRESHOLD:
+ rel_path = ".cortex_browser/last_a11y.txt"
+ abs_path = os.path.join(offload_dir, "last_a11y.txt")
+ with open(abs_path, "w") as f:
+ f.write(a11y)
+ res_data["a11y_tree"] = f"[OFFLOADED to {rel_path}]"
+ files_to_push.append((rel_path, a11y.encode('utf-8')))
+ offloaded = True
+
+ # 2. DOM Content
+ dom = res_data.get("dom_content", "")
+ if dom and len(dom) > OFFLOAD_THRESHOLD:
+ rel_path = ".cortex_browser/last_dom.html"
+ abs_path = os.path.join(offload_dir, "last_dom.html")
+ with open(abs_path, "w") as f:
+ f.write(dom)
+ res_data["dom_content"] = f"[OFFLOADED to {rel_path}]"
+ files_to_push.append((rel_path, dom.encode('utf-8')))
+ offloaded = True
+
+ # 3. Screenshot
+ snap = res_data.get("snapshot", b"")
+ if snap and len(snap) > OFFLOAD_THRESHOLD:
+ rel_path = ".cortex_browser/last_screenshot.png"
+ abs_path = os.path.join(offload_dir, "last_screenshot.png")
+ with open(abs_path, "wb") as f:
+ f.write(snap)
+ res_data["snapshot"] = f"[OFFLOADED to {rel_path}]".encode('utf-8')
+ files_to_push.append((rel_path, snap))
+ offloaded = True
+
+ if offloaded:
+ res_data["offloaded"] = True
+ # Proactively push files via event bus so they reach Hub BEFORE TaskResponse
+ if on_event:
+ for rel_p, data in files_to_push:
+ self._push_sync_event(sess, rel_p, data, on_event)
+ print(f" [๐๐] Browser Result Offloaded and Synced to Hub: {offload_dir}")
+
+ def _push_sync_event(self, sess, rel_path, data, on_event):
+ """Manually chunk data into FileSyncMessage events to ensure Hub-side availability."""
+ import hashlib
+ full_hash = hashlib.sha256(data).hexdigest()
+ chunk_size = 1024 * 256 # 256KB chunks
+
+ # We need the session_id that the node uses for syncing
+ # In BrowserSkill, sess["download_dir"] is SYNC_DIR / session_id
+ # We can extract session_id back from download_dir or just pass it in
+ download_dir = sess.get("download_dir")
+ session_id = os.path.basename(download_dir)
+
+ t_id = f"br-sync-{int(time.time()*1000)}"
+
+ for i in range(0, len(data), chunk_size):
+ chunk = data[i:i+chunk_size]
+ is_final = (i + chunk_size) >= len(data)
+
+ payload = agent_pb2.FilePayload(
+ path=rel_path,
+ chunk=chunk,
+ chunk_index=i // chunk_size,
+ is_final=is_final,
+ hash=full_hash if is_final else ""
+ )
+
+ msg = agent_pb2.ClientTaskMessage(
+ file_sync=agent_pb2.FileSyncMessage(
+ session_id=session_id,
+ file_data=payload,
+ task_id=t_id
+ )
+ )
+ on_event(msg)
+
+ # ------------------------------------------------------------------
+ # Action Dispatcher
+ # ------------------------------------------------------------------
+
+ def _get_aria_snapshot_safe(self, page):
+ """Safe aria_snapshot with fallback for older Playwright versions."""
+ try:
+ if hasattr(page.locator(":root"), "aria_snapshot"):
+ # Use a locator-level timeout of 10s to prevent hanging the actor thread
+ return page.locator(":root").aria_snapshot(timeout=10000)
+
+ # Fallback for Playwright < 1.44
+ print(" [๐โ ๏ธ] aria_snapshot() not available in this Playwright version. Using fallback.")
+ return "- application \"Browser\"\n - generic \"Accessibility Tree Unavailable (Playwright < 1.44)\""
+ except Exception as e:
+ print(f" [๐โ] Error generating aria snapshot: {e}")
+ return f"- error \"Failed to generate accessibility tree (Timeout or version issue): {e}\""
+
+ def _dispatch_action(self, action, page, sess, res_data):
+ A = agent_pb2.BrowserAction
+ role_refs = sess["role_refs"]
+
+ def resolve(selector_or_ref: str):
+ """Accept either a CSS selector or a ref like 'e3'."""
+ s = (selector_or_ref or "").strip()
+ if re.match(r'^e\d+$', s):
+ return _resolve_ref(page, s, role_refs)
+ return page.locator(s)
+
+ if action.action == A.NAVIGATE:
+ # PERFORMANCE: Remove auto-snapshot on navigation for Mac Mini stability.
+ # Large Aria trees cause massive CPU bursts and gRPC timeouts.
+ page.goto(action.url, wait_until="domcontentloaded", timeout=45000)
+ res_data["eval_result"] = "Navigation successful."
+
+ elif action.action == A.CLICK:
+ target = action.selector or ""
+ resolve(target).click(timeout=8000)
+
+ elif action.action == A.TYPE:
+ target = action.selector or ""
+ resolve(target).fill(action.text, timeout=8000)
+
+ elif action.action == A.SCREENSHOT:
+ res_data["snapshot"] = page.screenshot(full_page=False)
+
+ elif action.action == A.GET_DOM:
+ res_data["dom_content"] = page.content()
+
+ elif action.action == A.HOVER:
+ target = action.selector or ""
+ resolve(target).hover(timeout=5000)
+
+ elif action.action == A.SCROLL:
+ page.mouse.wheel(x=0, y=action.y or 400)
+
+ elif action.action == A.EVAL:
+ result = page.evaluate(action.text)
+ res_data["eval_result"] = str(result)
+
+ elif action.action == A.GET_A11Y:
+ aria_raw = self._get_aria_snapshot_safe(page)
+ snap, refs = _build_aria_snapshot(aria_raw)
+ sess["role_refs"] = refs
+
+ # Trim large snapshots
+ res_data["a11y_tree"] = snap[:12000]
+ stats = {
+ "total_refs": len(refs),
+ "url": page.url,
+ "title": page.title(),
+ }
+ res_data["eval_result"] = json.dumps(stats)
+
+ # Trim large snapshots (news pages can be huge)
+ MAX = 10000
+ if len(snap) > MAX:
+ snap = snap[:MAX] + "\n\n[...snapshot truncated - use eval/scroll to see more...]"
+
+ stats = {
+ "total_refs": len(refs),
+ "interactive": sum(1 for r in refs.values() if r["role"] in INTERACTIVE_ROLES),
+ "url": page.url,
+ "title": page.title(),
+ }
+ res_data["a11y_tree"] = snap
+ res_data["eval_result"] = json.dumps(stats)
+
+ elif action.action == A.CLOSE:
+ with self.lock:
+ s = self.sessions.pop(action.session_id or "default", None)
+ if s:
+ s["context"].close()
+
+ # ------------------------------------------------------------------
+ # Public Interface
+ # ------------------------------------------------------------------
+
+ def execute(self, task, sandbox, on_complete, on_event=None):
+ self.task_queue.put((task, sandbox, on_complete, on_event))
+
+ def cancel(self, task_id):
+ return False
+
+ def shutdown(self):
+ self.task_queue.put(None)
diff --git a/agent-node/src/agent_node/skills/file_bridge.py b/agent-node/src/agent_node/skills/file_bridge.py
new file mode 100644
index 0000000..b441d73
--- /dev/null
+++ b/agent-node/src/agent_node/skills/file_bridge.py
@@ -0,0 +1,77 @@
+import os
+import json
+import logging
+from agent_node.skills.base import BaseSkill
+
+logger = logging.getLogger(__name__)
+
+class FileSkill(BaseSkill):
+ """Provides file system navigation and inspection capabilities."""
+
+ def __init__(self, sync_mgr=None):
+ self.sync_mgr = sync_mgr
+
+ def execute(self, task, sandbox, on_complete, on_event=None):
+ """
+ Executes a file-related task (list, stats).
+ Payload JSON: { "action": "list", "path": "...", "recursive": false }
+ """
+ try:
+ payload = json.loads(task.payload_json)
+ action = payload.get("action", "list")
+ path = payload.get("path", ".")
+
+ # 1. Sandbox Jail Check
+ # (In a real implementation, we'd use sandbox.check_path(path))
+ # For now, we'll assume the node allows browsing its root or session dir.
+
+ if action == "list":
+ result = self._list_dir(path, payload.get("recursive", False))
+ on_complete(task.task_id, {"status": 0, "stdout": json.dumps(result)}, task.trace_id)
+ else:
+ on_complete(task.task_id, {"status": 1, "stderr": f"Unknown action: {action}"}, task.trace_id)
+
+ except Exception as e:
+ logger.error(f"[FileSkill] Task {task.task_id} failed: {e}")
+ on_complete(task.task_id, {"status": 1, "stderr": str(e)}, task.trace_id)
+
+ def _list_dir(self, path, recursive=False):
+ """Lists directory contents with metadata."""
+ if not os.path.exists(path):
+ return {"error": "Path not found"}
+
+ items = []
+ if recursive:
+ for root, dirs, files in os.walk(path):
+ for name in dirs + files:
+ abs_path = os.path.join(root, name)
+ rel_path = os.path.relpath(abs_path, path)
+ st = os.stat(abs_path)
+ items.append({
+ "name": name,
+ "path": rel_path,
+ "is_dir": os.path.isdir(abs_path),
+ "size": st.st_size,
+ "mtime": st.st_mtime
+ })
+ else:
+ for name in os.listdir(path):
+ abs_path = os.path.join(path, name)
+ st = os.stat(abs_path)
+ items.append({
+ "name": name,
+ "is_dir": os.path.isdir(abs_path),
+ "size": st.st_size,
+ "mtime": st.st_mtime
+ })
+
+ return {
+ "root": os.path.abspath(path),
+ "items": sorted(items, key=lambda x: (not x["is_dir"], x["name"]))
+ }
+
+ def cancel(self, task_id):
+ return False # Listing is usually fast, no cancellation needed
+
+ def shutdown(self):
+ pass
diff --git a/agent-node/src/agent_node/skills/manager.py b/agent-node/src/agent_node/skills/manager.py
index f565b0e..74dbd3d 100644
--- a/agent-node/src/agent_node/skills/manager.py
+++ b/agent-node/src/agent_node/skills/manager.py
@@ -11,19 +11,53 @@
self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker")
self.active_tasks = {} # task_id -> future
self.sync_mgr = sync_mgr
- self.skills = self._discover_skills(sync_mgr)
self.max_workers = max_workers
self.lock = threading.Lock()
+
+ # 1. Start with Hardcoded Bridges (Pure Programmatic Logic)
+ self.skills = self._load_core_bridges(sync_mgr)
+
+ # 2. Optionally supplement with dynamic skills from disk
+ dynamic_skills = self._discover_skills(sync_mgr)
+ self.skills.update(dynamic_skills)
+
+ def _load_core_bridges(self, sync_mgr):
+ """Hard-imports the core execution bridges to ensure they are always available."""
+ bridges = {}
+ # Shell Bridge
+ try:
+ from agent_node.skills.shell_bridge import ShellSkill
+ instance = ShellSkill(sync_mgr=sync_mgr)
+ bridges["shell"] = instance
+ bridges["mesh-terminal-control"] = instance
+ print(" [๐ง๐ฆ] Core Shell Bridge Loaded.")
+ except ImportError as e:
+ print(f" [๐งโ ๏ธ] Fatal: Core Shell Bridge not found: {e}")
+
+ # File Bridge
+ try:
+ from agent_node.skills.file_bridge import FileSkill
+ instance = FileSkill(sync_mgr=sync_mgr)
+ bridges["file"] = instance
+ bridges["mesh-file-explorer"] = instance
+ print(" [๐ง๐ฆ] Core File Bridge Loaded.")
+ except ImportError: pass
+
+ # Browser Bridge
+ try:
+ from agent_node.skills.browser_bridge import BrowserSkill
+ instance = BrowserSkill(sync_mgr=sync_mgr)
+ bridges["browser"] = instance
+ bridges["browser-automation-agent"] = instance
+ print(" [๐ง๐ฆ] Core Browser Bridge Loaded.")
+ except ImportError: pass
+
+ return bridges
def _discover_skills(self, sync_mgr):
- """Scans the skills/ directory for logic.py and loads skill implementations."""
+ """Scans the disk for additional logic.py plugins (supplemental)."""
# Find candidate locations for skills
- # 1. Monorepo root (../../../skills from this file)
- # 2. Agent-node local (../../skills from this file)
- # 3. Docker standard (/app/skills)
candidates = [
- os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../skills")),
- os.path.abspath(os.path.join(os.path.dirname(__file__), "../../skills")),
"/app/skills",
"/app/node_skills"
]
@@ -31,7 +65,6 @@
skills_dir = None
for cand in candidates:
if os.path.exists(cand) and os.path.isdir(cand):
- # Ensure it's not a broken symlink and has actual content
try:
if any(os.path.isdir(os.path.join(cand, d)) for d in os.listdir(cand)):
skills_dir = cand
@@ -41,11 +74,12 @@
discovered = {}
if not skills_dir:
- print(f" [๐งโ ๏ธ] Skills directory not found in candidate locations: {candidates}")
return discovered
- print(f" [๐ง] Using skills directory: {skills_dir}")
+ print(f" [๐ง] Scanning supplemental skills: {skills_dir}")
for skill_dir in os.listdir(skills_dir):
+ if skill_dir in self.skills: continue # Skip if already hardcoded
+
item_path = os.path.join(skills_dir, skill_dir)
if os.path.isdir(item_path):
logic_py = os.path.join(item_path, "logic.py")
@@ -54,29 +88,23 @@
try:
spec = importlib.util.spec_from_file_location(f"skill_{skill_dir}", logic_py)
module = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(module)
+ if spec.loader:
+ spec.loader.exec_module(module)
- # Find the first class that inherits from BaseSkill
+ # Robust class detection
for attr_name in dir(module):
attr = getattr(module, attr_name)
- if isinstance(attr, type) and issubclass(attr, BaseSkill) and attr is not BaseSkill:
- # We map the internal skill name (e.g. mesh_terminal_control)
- # if we can find it in the module or assume it based on folder name
- # For backward compatibility with task_type routing, we map common ones
- instance = attr(sync_mgr=sync_mgr)
- discovered[skill_dir] = instance
- # Also map legacy names for the routing engine below
- if "terminal" in skill_dir or "shell" in skill_dir:
- discovered["shell"] = instance
- if "browser" in skill_dir:
- discovered["browser"] = instance
- if "file" in skill_dir:
- discovered["file"] = instance
- break
+ if isinstance(attr, type) and any(b.__name__ == 'BaseSkill' for b in attr.__mro__) and attr.__name__ != 'BaseSkill':
+ try:
+ instance = attr(sync_mgr=sync_mgr)
+ discovered[skill_dir] = instance
+ print(f" [๐งโ
] Loaded supplemental skill: {skill_dir}")
+ except Exception as e:
+ print(f" [๐งโ] Failed to instantiate skill {skill_dir}: {e}")
+ break
except Exception as e:
print(f" [๐งโ] Failed to load skill from {logic_py}: {e}")
- print(f" [๐ง] Discovered skills: {list(discovered.keys())}")
return discovered
def submit(self, task, sandbox, on_complete, on_event=None):
diff --git a/agent-node/src/agent_node/skills/shell_bridge.py b/agent-node/src/agent_node/skills/shell_bridge.py
new file mode 100644
index 0000000..479bbb6
--- /dev/null
+++ b/agent-node/src/agent_node/skills/shell_bridge.py
@@ -0,0 +1,413 @@
+import os
+import pty
+import select
+import threading
+import time
+import termios
+import struct
+import fcntl
+import tempfile
+from agent_node.skills.base import BaseSkill
+from protos import agent_pb2
+
+class ShellSkill(BaseSkill):
+ """Admin Console Skill: Persistent stateful Bash via PTY."""
+ def __init__(self, sync_mgr=None):
+ self.sync_mgr = sync_mgr
+ self.sessions = {} # session_id -> {fd, pid, thread, last_activity, ...}
+ self.lock = threading.Lock()
+
+ # Phase 3: Prompt Patterns for Edge Intelligence
+ self.PROMPT_PATTERNS = [
+ r"[\r\n].*[@\w\.\-]+:.*[#$]\s*$", # bash/zsh: user@host:~$
+ r">>>\s*$", # python
+ r"\.\.\.\s*$", # python multi-line
+ r">\s*$", # node/js
+ ]
+
+ # --- M7: Idle Session Reaper ---
+ # Automatically kills dormant bash processes to free up system resources.
+ self.reaper_thread = threading.Thread(target=self._session_reaper, daemon=True, name="ShellReaper")
+ self.reaper_thread.start()
+
+ def _session_reaper(self):
+ """Background thread that cleans up unused PTY sessions."""
+ while True:
+ time.sleep(60)
+ with self.lock:
+ now = time.time()
+ for sid, sess in list(self.sessions.items()):
+ # Avoid reaping currently active tasks
+ if sess.get("active_task"):
+ continue
+
+ # 10 minute idle timeout
+ if now - sess.get("last_activity", 0) > 600:
+ print(f" [๐๐งน] Reaping idle shell session: {sid}")
+ try:
+ os.close(sess["fd"])
+ os.kill(sess["pid"], 9)
+ except: pass
+ self.sessions.pop(sid, None)
+
+ def _ensure_session(self, session_id, cwd, on_event):
+ with self.lock:
+ if session_id in self.sessions:
+ self.sessions[session_id]["last_activity"] = time.time()
+ return self.sessions[session_id]
+
+ print(f" [๐] Initializing Persistent Shell Session: {session_id}")
+ # Spawn bash in a pty
+ pid, fd = pty.fork()
+ if pid == 0: # Child
+ # Environment prep
+ os.environ["TERM"] = "xterm-256color"
+
+ # Change to CWD
+ if cwd and os.path.exists(cwd):
+ os.chdir(cwd)
+
+ # Launch shell
+ shell_path = "/bin/bash"
+ if not os.path.exists(shell_path):
+ shell_path = "/bin/sh"
+ os.execv(shell_path, [shell_path, "--login"])
+
+ # Parent
+ # Set non-blocking
+ fl = fcntl.fcntl(fd, fcntl.F_GETFL)
+ fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
+
+ sess = {
+ "fd": fd,
+ "pid": pid,
+ "last_activity": time.time(),
+ "buffer_file": None,
+ "tail_buffer": "",
+ "active_task": None
+ }
+
+ def reader():
+ while True:
+ try:
+ r, _, _ = select.select([fd], [], [], 0.1)
+ if fd in r:
+ data = os.read(fd, 4096)
+ if not data: break
+
+ decoded = data.decode("utf-8", errors="replace")
+
+ # Streaming/Sync logic (Detect completion marker)
+ with self.lock:
+ active_tid = sess.get("active_task")
+ marker = sess.get("marker")
+ if active_tid and marker and sess.get("buffer_file"):
+ # Phase 2: Persistence Offloading
+ # Write directly to disk instead of heap memory
+ sess["buffer_file"].write(decoded)
+ sess["buffer_file"].flush()
+
+ # Keep a tiny 4KB tail in RAM for marker detection and prompt scanning
+ sess["tail_buffer"] = (sess.get("tail_buffer", "") + decoded)[-4096:]
+
+ if marker in sess["tail_buffer"]:
+ # Marker found! Extract exit code
+ try:
+ # The tail buffer has the marker
+ after_marker = sess["tail_buffer"].split(marker)[1].strip().split()
+ exit_code = int(after_marker[0]) if after_marker else 0
+
+ # Formulate final stdout summary from the disk file
+ bf = sess["buffer_file"]
+ bf.seek(0, 2)
+ file_len = bf.tell()
+
+ HEAD, TAIL = 10_000, 30_000
+ if file_len > HEAD + TAIL:
+ bf.seek(0)
+ head_str = bf.read(HEAD)
+ bf.seek(file_len - TAIL)
+ tail_str = bf.read()
+ omitted = file_len - HEAD - TAIL
+ pure_stdout = head_str + f"\n\n[... {omitted:,} bytes omitted (full output safely preserved at {bf.name}) ...]\n\n" + tail_str
+ else:
+ bf.seek(0)
+ pure_stdout = bf.read()
+
+ # Slice off the marker string and anything after it from the final result
+ pure_stdout = pure_stdout.split(marker)[0]
+
+ sess["result"]["stdout"] = pure_stdout
+ sess["result"]["status"] = 0 if exit_code == 0 else 1
+
+ # Close the file handle (leaves file on disk)
+ sess["buffer_file"].close()
+ sess["buffer_file"] = None
+
+ sess["event"].set()
+ decoded = pure_stdout.split(marker)[0][-4096:] if marker in pure_stdout else pure_stdout
+ except Exception as e:
+ print(f" [๐โ ๏ธ] Marker parsing failed: {e}")
+ sess["event"].set()
+
+ # Stream terminal output back (with stealth filtering)
+ if on_event:
+ stealth_out = decoded
+ if "__CORTEX_FIN_SH_" in decoded:
+ import re
+ # We remove any line that contains our internal marker to hide plumbing from user.
+ # This covers both the initial command echo and the exit code output.
+ stealth_out = re.sub(r'.*__CORTEX_FIN_SH_.*[\r\n]*', '', decoded)
+
+ if stealth_out:
+ # Phase 3: Client-Side Truncation (Stream Rate Limiting)
+ # Limit real-time stream to 15KB/sec per session to prevent flooding the Hub over gRPC.
+ # The full output is still safely written to the tempfile on disk.
+ with self.lock:
+ now = time.time()
+ if now - sess.get("stream_window_start", 0) > 1.0:
+ sess["stream_window_start"] = now
+ sess["stream_bytes_sent"] = 0
+ dropped = sess.get("stream_dropped_bytes", 0)
+ if dropped > 0:
+ drop_msg = f"\n[... {dropped:,} bytes truncated from live stream ...]\n"
+ event = agent_pb2.SkillEvent(
+ session_id=session_id, task_id=sess.get("active_task") or "", terminal_out=drop_msg
+ )
+ on_event(agent_pb2.ClientTaskMessage(skill_event=event))
+ sess["stream_dropped_bytes"] = 0
+
+ if sess.get("stream_bytes_sent", 0) + len(stealth_out) > 15_000:
+ sess["stream_dropped_bytes"] = sess.get("stream_dropped_bytes", 0) + len(stealth_out)
+ else:
+ sess["stream_bytes_sent"] = sess.get("stream_bytes_sent", 0) + len(stealth_out)
+ event = agent_pb2.SkillEvent(
+ session_id=session_id,
+ task_id=sess.get("active_task") or "",
+ terminal_out=stealth_out
+ )
+ on_event(agent_pb2.ClientTaskMessage(skill_event=event))
+
+ # EDGE INTELLIGENCE: Proactively signal prompt detection
+ # We only check for prompts if we are actively running a task and haven't found the marker yet.
+ if active_tid and not sess["event"].is_set():
+ import re
+ tail = sess["tail_buffer"][-100:] if len(sess["tail_buffer"]) > 100 else sess["tail_buffer"]
+ for pattern in self.PROMPT_PATTERNS:
+ if re.search(pattern, tail):
+ # Send specific prompt signal
+ # Use last 20 chars as the 'prompt' hint
+ p_hint = tail[-20:].strip()
+ prompt_event = agent_pb2.SkillEvent(
+ session_id=session_id,
+ task_id=active_tid,
+ prompt=p_hint
+ )
+ on_event(agent_pb2.ClientTaskMessage(skill_event=prompt_event))
+ break
+ except (EOFError, OSError):
+ break
+
+ # Thread Cleanup
+ print(f" [๐] Shell Session Terminated: {session_id}")
+ with self.lock:
+ self.sessions.pop(session_id, None)
+
+ t = threading.Thread(target=reader, daemon=True, name=f"ShellReader-{session_id}")
+ t.start()
+ sess["thread"] = t
+
+ self.sessions[session_id] = sess
+ return sess
+
+
+ def handle_transparent_tty(self, task, on_complete, on_event=None):
+ """Processes raw TTY/Resize events synchronously (bypasses threadpool/sandbox)."""
+ cmd = task.payload_json
+ session_id = task.session_id or "default-session"
+ try:
+ import json
+ if cmd.startswith('{') and cmd.endswith('}'):
+ raw_payload = json.loads(cmd)
+
+ # 1. Raw Keystroke forward
+ if isinstance(raw_payload, dict) and "tty" in raw_payload:
+ raw_bytes = raw_payload["tty"]
+ sess = self._ensure_session(session_id, None, on_event)
+ os.write(sess["fd"], raw_bytes.encode("utf-8"))
+ on_complete(task.task_id, {"stdout": "", "status": 0}, task.trace_id)
+ return True
+
+ # 2. Window Resize
+ if isinstance(raw_payload, dict) and raw_payload.get("action") == "resize":
+ cols = raw_payload.get("cols", 80)
+ rows = raw_payload.get("rows", 24)
+ sess = self._ensure_session(session_id, None, on_event)
+ import termios, struct, fcntl
+ s = struct.pack('HHHH', rows, cols, 0, 0)
+ fcntl.ioctl(sess["fd"], termios.TIOCSWINSZ, s)
+ print(f" [๐] Terminal Resized to {cols}x{rows}")
+ on_complete(task.task_id, {"stdout": f"resized to {cols}x{rows}", "status": 0}, task.trace_id)
+ return True
+ except Exception as pe:
+ print(f" [๐] Transparent TTY Fail: {pe}")
+ return False
+
+ def execute(self, task, sandbox, on_complete, on_event=None):
+ """Dispatches command string to the persistent PTY shell and WAITS for completion."""
+ session_id = task.session_id or "default-session"
+ tid = task.task_id
+ try:
+ cmd = task.payload_json
+
+ # --- Legacy Full-Command Execution (Sandboxed) ---
+ allowed, status_msg = sandbox.verify(cmd)
+ if not allowed:
+ err_msg = f"\r\n[System] Command blocked: {status_msg}\r\n"
+ if on_event:
+ event = agent_pb2.SkillEvent(
+ session_id=session_id, task_id=tid,
+ terminal_out=err_msg
+ )
+ on_event(agent_pb2.ClientTaskMessage(skill_event=event))
+
+ return on_complete(tid, {"stderr": f"SANDBOX_VIOLATION: {status_msg}", "status": 2}, task.trace_id)
+
+ # Resolve CWD jail
+ cwd = None
+ if self.sync_mgr and task.session_id:
+ cwd = self.sync_mgr.get_session_dir(task.session_id)
+ elif sandbox.policy.get("WORKING_DIR_JAIL"):
+ cwd = sandbox.policy["WORKING_DIR_JAIL"]
+ if not os.path.exists(cwd):
+ try: os.makedirs(cwd, exist_ok=True)
+ except: pass
+
+ # Handle Session Persistent Process
+ sess = self._ensure_session(session_id, cwd, on_event)
+
+ # Check for RAW mode first (bypasses busy check for interactive control)
+ is_raw = cmd.startswith("!RAW:")
+ if is_raw:
+ input_str = cmd[5:] + "\n"
+ print(f" [๐โจ๏ธ] RAW Input Injection: {input_str.strip()}")
+ os.write(sess["fd"], input_str.encode("utf-8"))
+ return on_complete(tid, {"stdout": "INJECTED", "status": 0}, task.trace_id)
+
+ # --- 0. Busy Check: Serialize access to the PTY for standard commands ---
+ with self.lock:
+ if sess.get("active_task"):
+ curr_tid = sess.get("active_task")
+ return on_complete(tid, {"stderr": f"[BUSY] Session {session_id} is already running task {curr_tid}", "status": 1}, task.trace_id)
+
+ # --- Blocking Wait Logic ---
+ # --- Blocking Wait Logic ---
+ marker_id = int(time.time())
+ marker = f"__CORTEX_FIN_SH_{marker_id}__"
+ event = threading.Event()
+ result_container = {"stdout": "", "status": 0} # 0 = Success (Protobuf Convention)
+
+ # Register waiter in session state
+ with self.lock:
+ sess["active_task"] = tid
+ sess["marker"] = marker
+ sess["event"] = event
+ # Create a persistent tempfile for stdout instead of RAM buffer
+ sess["buffer_file"] = tempfile.NamedTemporaryFile("w+", encoding="utf-8", prefix=f"cortex_task_{tid}_", delete=False)
+ sess["tail_buffer"] = ""
+ sess["result"] = result_container
+ sess["cancel_event"] = threading.Event()
+
+ # Input injection: execute command then echo marker and exit code
+ try:
+ # 12-factor bash: ( cmd ) ; echo marker $?
+ # We use "" concatenation in the echo command to ensure the marker literal
+ # DOES NOT appear in the PTY input echo, preventing premature completion.
+ full_input = f"({cmd}) ; echo \"__CORTEX_FIN_SH_\"\"{marker_id}__\" $?\n"
+ os.write(sess["fd"], full_input.encode("utf-8"))
+
+ # Wait for completion (triggered by reader) OR cancellation
+ timeout = (task.timeout_ms / 1000.0) if task.timeout_ms > 0 else 60.0
+ start_time = time.time()
+ while time.time() - start_time < timeout:
+ # Check for completion (reader found marker)
+ if event.is_set():
+ return on_complete(tid, result_container, task.trace_id)
+
+ # Check for cancellation (HUB sent cancel)
+ if sess["cancel_event"].is_set():
+ print(f" [๐๐] Task {tid} cancelled on node.")
+ return on_complete(tid, {"stderr": "ABORTED", "status": 2}, task.trace_id)
+
+ # Sleep slightly to avoid busy loop
+ time.sleep(0.1)
+
+ # Timeout Case
+ print(f" [๐โ ๏ธ] Task {tid} timed out on node.")
+ with self.lock:
+ if sess.get("buffer_file"):
+ try:
+ sess["buffer_file"].seek(0, 2)
+ file_len = sess["buffer_file"].tell()
+ HEAD, TAIL = 10_000, 30_000
+ if file_len > HEAD + TAIL:
+ sess["buffer_file"].seek(0)
+ head_str = sess["buffer_file"].read(HEAD)
+ sess["buffer_file"].seek(file_len - TAIL)
+ tail_str = sess["buffer_file"].read()
+ omitted = file_len - HEAD - TAIL
+ partial_out = head_str + f"\n\n[... {omitted:,} bytes omitted (full timeout output saved to {sess['buffer_file'].name}) ...]\n\n" + tail_str
+ else:
+ sess["buffer_file"].seek(0)
+ partial_out = sess["buffer_file"].read()
+ except:
+ partial_out = ""
+ else:
+ partial_out = ""
+
+ on_complete(tid, {"stdout": partial_out, "stderr": "TIMEOUT", "status": 2}, task.trace_id)
+
+ finally:
+ # Cleanup session task state
+ with self.lock:
+ if sess.get("active_task") == tid:
+ if sess.get("buffer_file"):
+ try:
+ sess["buffer_file"].close()
+ except: pass
+ sess["buffer_file"] = None
+ sess["active_task"] = None
+ sess["marker"] = None
+ sess["event"] = None
+ sess["result"] = None
+ sess["cancel_event"] = None
+
+ except Exception as e:
+ print(f" [๐โ] Execute Error for {tid}: {e}")
+ on_complete(tid, {"stderr": str(e), "status": 2}, task.trace_id)
+
+ def cancel(self, task_id: str):
+ """Cancels an active task โ for persistent shell, this sends a SIGINT (Ctrl+C)."""
+ with self.lock:
+ for sid, sess in self.sessions.items():
+ if sess.get("active_task") == task_id:
+ print(f"[๐] Sending SIGINT (Ctrl+C) to shell session (Task {task_id}): {sid}")
+ # Write \x03 (Ctrl+C) to the master FD
+ os.write(sess["fd"], b"\x03")
+ # Break the wait loop in execute thread
+ if sess.get("cancel_event"):
+ sess["cancel_event"].set()
+ return True
+
+
+ def shutdown(self):
+ """Cleanup: Terminates all persistent shells."""
+ with self.lock:
+ for sid, sess in list(self.sessions.items()):
+ print(f"[๐] Cleaning up persistent shell: {sid}")
+ try: os.close(sess["fd"])
+ except: pass
+ # kill pid
+ try: os.kill(sess["pid"], 9)
+ except: pass
+ self.sessions.clear()
diff --git a/agent-node/src/agent_node/utils/network.py b/agent-node/src/agent_node/utils/network.py
index 04b97c3..58abf64 100644
--- a/agent-node/src/agent_node/utils/network.py
+++ b/agent-node/src/agent_node/utils/network.py
@@ -7,7 +7,7 @@
"""Initializes a gRPC channel (Secure or Insecure) and returns the orchestrator stub."""
options = [
- ('grpc.keepalive_time_ms', 30000), # Send keepalive ping every 30s
+ ('grpc.keepalive_time_ms', 10000), # Send keepalive ping every 10s
('grpc.keepalive_timeout_ms', 10000), # Wait 10s for pong
('grpc.keepalive_permit_without_calls', True),
('grpc.http2.max_pings_without_data', 0), # Allow infinite pings
@@ -18,7 +18,7 @@
if not TLS_ENABLED:
print(f"[!] TLS is disabled. Connecting via insecure channel to {SERVER_HOST_PORT}")
channel = grpc.insecure_channel(SERVER_HOST_PORT, options=options)
- return agent_pb2_grpc.AgentOrchestratorStub(channel)
+ return agent_pb2_grpc.AgentOrchestratorStub(channel), channel
print(f"[*] Connecting via secure (mTLS) channel to {SERVER_HOST_PORT}")
try:
@@ -28,11 +28,10 @@
creds = grpc.ssl_channel_credentials(ca, pkey, cert)
channel = grpc.secure_channel(SERVER_HOST_PORT, creds, options=options)
- return agent_pb2_grpc.AgentOrchestratorStub(channel)
+ return agent_pb2_grpc.AgentOrchestratorStub(channel), channel
except FileNotFoundError as e:
print(f"[!] mTLS Certificate files not found: {e}. Falling back to standard TLS (Server Verify)...")
# Fallback to standard TLS (uses system CA roots by default)
creds = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(SERVER_HOST_PORT, creds, options=options)
- return agent_pb2_grpc.AgentOrchestratorStub(channel)
-
+ return agent_pb2_grpc.AgentOrchestratorStub(channel), channel
diff --git a/agent-node/src/agent_pb2.py b/agent-node/src/agent_pb2.py
new file mode 100644
index 0000000..7d12fb3
--- /dev/null
+++ b/agent-node/src/agent_pb2.py
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# source: agent.proto
+# Protobuf Python Version: 4.25.1
+"""Generated protocol buffer code."""
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import descriptor_pool as _descriptor_pool
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf.internal import builder as _builder
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61gent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe0\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\x12\x19\n\x11skill_config_json\x18\x06 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xa9\x02\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12(\n\x0bskill_event\x18\x06 \x01(\x0b\x32\x11.agent.SkillEventH\x00\x42\t\n\x07payload\"y\n\nSkillEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x16\n\x0cterminal_out\x18\x03 \x01(\tH\x00\x12\x10\n\x06prompt\x18\x04 \x01(\tH\x00\x12\x14\n\nkeep_alive\x18\x05 \x01(\x08H\x00\x42\x06\n\x04\x64\x61ta\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xbc\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12-\n\rpolicy_update\x18\x06 \x01(\x0b\x32\x14.agent.SandboxPolicyH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xd1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xef\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\x12\x11\n\toffloaded\x18\t \x01(\x08\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xe6\x02\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\x12\x11\n\tcpu_count\x18\x08 \x01(\x05\x12\x16\n\x0ememory_used_gb\x18\t \x01(\x02\x12\x17\n\x0fmemory_total_gb\x18\n \x01(\x02\x12\x1a\n\x12\x63pu_usage_per_core\x18\x0b \x03(\x02\x12\x14\n\x0c\x63pu_freq_mhz\x18\x0c \x01(\x02\x12\x1b\n\x13memory_available_gb\x18\r \x01(\x02\x12\x10\n\x08load_avg\x18\x0e \x03(\x02\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xe4\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x12\x0f\n\x07task_id\x18\x06 \x01(\tB\t\n\x07payload\"\xab\x02\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x15\n\rrequest_paths\x18\x03 \x03(\t\x12\x0f\n\x07\x63ontent\x18\x04 \x01(\x0c\x12\x0e\n\x06is_dir\x18\x05 \x01(\x08\"\xaa\x01\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\x12\x08\n\x04LIST\x10\x06\x12\x08\n\x04READ\x10\x07\x12\t\n\x05WRITE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\t\n\x05PURGE\x10\n\x12\x0b\n\x07\x43LEANUP\x10\x0b\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"\xad\x01\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\x12\x0e\n\x06offset\x18\x06 \x01(\x03\x12\x12\n\ncompressed\x18\x07 \x01(\x08\x12\x14\n\x0ctotal_chunks\x18\x08 \x01(\x05\x12\x12\n\ntotal_size\x18\t \x01(\x03\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3')
+
+_globals = globals()
+_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
+_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_pb2', _globals)
+if _descriptor._USE_C_DESCRIPTORS == False:
+ DESCRIPTOR._options = None
+ _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._options = None
+ _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_options = b'8\001'
+ _globals['_TASKRESPONSE_ARTIFACTSENTRY']._options = None
+ _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_options = b'8\001'
+ _globals['_REGISTRATIONREQUEST']._serialized_start=23
+ _globals['_REGISTRATIONREQUEST']._serialized_end=245
+ _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=194
+ _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=245
+ _globals['_SANDBOXPOLICY']._serialized_start=248
+ _globals['_SANDBOXPOLICY']._serialized_end=472
+ _globals['_SANDBOXPOLICY_MODE']._serialized_start=438
+ _globals['_SANDBOXPOLICY_MODE']._serialized_end=472
+ _globals['_REGISTRATIONRESPONSE']._serialized_start=474
+ _globals['_REGISTRATIONRESPONSE']._serialized_end=594
+ _globals['_CLIENTTASKMESSAGE']._serialized_start=597
+ _globals['_CLIENTTASKMESSAGE']._serialized_end=894
+ _globals['_SKILLEVENT']._serialized_start=896
+ _globals['_SKILLEVENT']._serialized_end=1017
+ _globals['_NODEANNOUNCE']._serialized_start=1019
+ _globals['_NODEANNOUNCE']._serialized_end=1050
+ _globals['_BROWSEREVENT']._serialized_start=1053
+ _globals['_BROWSEREVENT']._serialized_end=1188
+ _globals['_SERVERTASKMESSAGE']._serialized_start=1191
+ _globals['_SERVERTASKMESSAGE']._serialized_end=1507
+ _globals['_TASKCANCELREQUEST']._serialized_start=1509
+ _globals['_TASKCANCELREQUEST']._serialized_end=1545
+ _globals['_TASKREQUEST']._serialized_start=1548
+ _globals['_TASKREQUEST']._serialized_end=1757
+ _globals['_BROWSERACTION']._serialized_start=1760
+ _globals['_BROWSERACTION']._serialized_end=2048
+ _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1914
+ _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=2048
+ _globals['_TASKRESPONSE']._serialized_start=2051
+ _globals['_TASKRESPONSE']._serialized_end=2403
+ _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2283
+ _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2331
+ _globals['_TASKRESPONSE_STATUS']._serialized_start=2333
+ _globals['_TASKRESPONSE_STATUS']._serialized_end=2393
+ _globals['_BROWSERRESPONSE']._serialized_start=2406
+ _globals['_BROWSERRESPONSE']._serialized_end=2645
+ _globals['_CONSOLEMESSAGE']._serialized_start=2647
+ _globals['_CONSOLEMESSAGE']._serialized_end=2714
+ _globals['_NETWORKREQUEST']._serialized_start=2716
+ _globals['_NETWORKREQUEST']._serialized_end=2820
+ _globals['_WORKPOOLUPDATE']._serialized_start=2822
+ _globals['_WORKPOOLUPDATE']._serialized_end=2866
+ _globals['_TASKCLAIMREQUEST']._serialized_start=2868
+ _globals['_TASKCLAIMREQUEST']._serialized_end=2920
+ _globals['_TASKCLAIMRESPONSE']._serialized_start=2922
+ _globals['_TASKCLAIMRESPONSE']._serialized_end=2991
+ _globals['_HEARTBEAT']._serialized_start=2994
+ _globals['_HEARTBEAT']._serialized_end=3352
+ _globals['_HEALTHCHECKRESPONSE']._serialized_start=3354
+ _globals['_HEALTHCHECKRESPONSE']._serialized_end=3399
+ _globals['_FILESYNCMESSAGE']._serialized_start=3402
+ _globals['_FILESYNCMESSAGE']._serialized_end=3630
+ _globals['_SYNCCONTROL']._serialized_start=3633
+ _globals['_SYNCCONTROL']._serialized_end=3932
+ _globals['_SYNCCONTROL_ACTION']._serialized_start=3762
+ _globals['_SYNCCONTROL_ACTION']._serialized_end=3932
+ _globals['_DIRECTORYMANIFEST']._serialized_start=3934
+ _globals['_DIRECTORYMANIFEST']._serialized_end=4004
+ _globals['_FILEINFO']._serialized_start=4006
+ _globals['_FILEINFO']._serialized_end=4074
+ _globals['_FILEPAYLOAD']._serialized_start=4077
+ _globals['_FILEPAYLOAD']._serialized_end=4250
+ _globals['_SYNCSTATUS']._serialized_start=4253
+ _globals['_SYNCSTATUS']._serialized_end=4413
+ _globals['_SYNCSTATUS_CODE']._serialized_start=4347
+ _globals['_SYNCSTATUS_CODE']._serialized_end=4413
+ _globals['_AGENTORCHESTRATOR']._serialized_start=4416
+ _globals['_AGENTORCHESTRATOR']._serialized_end=4649
+# @@protoc_insertion_point(module_scope)
diff --git a/agent-node/src/agent_pb2_grpc.py b/agent-node/src/agent_pb2_grpc.py
new file mode 100644
index 0000000..932d45e
--- /dev/null
+++ b/agent-node/src/agent_pb2_grpc.py
@@ -0,0 +1,138 @@
+# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
+"""Client and server classes corresponding to protobuf-defined services."""
+import grpc
+
+import agent_pb2 as agent__pb2
+
+
+class AgentOrchestratorStub(object):
+ """The Cortex Server exposes this service
+ """
+
+ def __init__(self, channel):
+ """Constructor.
+
+ Args:
+ channel: A grpc.Channel.
+ """
+ self.SyncConfiguration = channel.unary_unary(
+ '/agent.AgentOrchestrator/SyncConfiguration',
+ request_serializer=agent__pb2.RegistrationRequest.SerializeToString,
+ response_deserializer=agent__pb2.RegistrationResponse.FromString,
+ )
+ self.TaskStream = channel.stream_stream(
+ '/agent.AgentOrchestrator/TaskStream',
+ request_serializer=agent__pb2.ClientTaskMessage.SerializeToString,
+ response_deserializer=agent__pb2.ServerTaskMessage.FromString,
+ )
+ self.ReportHealth = channel.stream_stream(
+ '/agent.AgentOrchestrator/ReportHealth',
+ request_serializer=agent__pb2.Heartbeat.SerializeToString,
+ response_deserializer=agent__pb2.HealthCheckResponse.FromString,
+ )
+
+
+class AgentOrchestratorServicer(object):
+ """The Cortex Server exposes this service
+ """
+
+ def SyncConfiguration(self, request, context):
+ """1. Control Channel: Sync policies and settings (Unary)
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def TaskStream(self, request_iterator, context):
+ """2. Task Channel: Bidirectional work dispatch and reporting (Persistent)
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def ReportHealth(self, request_iterator, context):
+ """3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent)
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+
+def add_AgentOrchestratorServicer_to_server(servicer, server):
+ rpc_method_handlers = {
+ 'SyncConfiguration': grpc.unary_unary_rpc_method_handler(
+ servicer.SyncConfiguration,
+ request_deserializer=agent__pb2.RegistrationRequest.FromString,
+ response_serializer=agent__pb2.RegistrationResponse.SerializeToString,
+ ),
+ 'TaskStream': grpc.stream_stream_rpc_method_handler(
+ servicer.TaskStream,
+ request_deserializer=agent__pb2.ClientTaskMessage.FromString,
+ response_serializer=agent__pb2.ServerTaskMessage.SerializeToString,
+ ),
+ 'ReportHealth': grpc.stream_stream_rpc_method_handler(
+ servicer.ReportHealth,
+ request_deserializer=agent__pb2.Heartbeat.FromString,
+ response_serializer=agent__pb2.HealthCheckResponse.SerializeToString,
+ ),
+ }
+ generic_handler = grpc.method_handlers_generic_handler(
+ 'agent.AgentOrchestrator', rpc_method_handlers)
+ server.add_generic_rpc_handlers((generic_handler,))
+
+
+ # This class is part of an EXPERIMENTAL API.
+class AgentOrchestrator(object):
+ """The Cortex Server exposes this service
+ """
+
+ @staticmethod
+ def SyncConfiguration(request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.unary_unary(request, target, '/agent.AgentOrchestrator/SyncConfiguration',
+ agent__pb2.RegistrationRequest.SerializeToString,
+ agent__pb2.RegistrationResponse.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
+
+ @staticmethod
+ def TaskStream(request_iterator,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/TaskStream',
+ agent__pb2.ClientTaskMessage.SerializeToString,
+ agent__pb2.ServerTaskMessage.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
+
+ @staticmethod
+ def ReportHealth(request_iterator,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/ReportHealth',
+ agent__pb2.Heartbeat.SerializeToString,
+ agent__pb2.HealthCheckResponse.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
diff --git a/agent-node/src/protos/agent.proto b/agent-node/src/protos/agent.proto
index 46b74fa..3076b89 100644
--- a/agent-node/src/protos/agent.proto
+++ b/agent-node/src/protos/agent.proto
@@ -33,6 +33,7 @@
repeated string denied_commands = 3;
repeated string sensitive_commands = 4;
string working_dir_jail = 5;
+ string skill_config_json = 6; // NEW: Map of skill settings (e.g. {"browser": {"headless": false}})
}
message RegistrationResponse {
@@ -155,6 +156,7 @@
string eval_result = 6;
repeated ConsoleMessage console_history = 7;
repeated NetworkRequest network_history = 8;
+ bool offloaded = 9; // NEW: Indicates content is stored in the sync mirror (.cortex_browser/)
}
message ConsoleMessage {
@@ -265,6 +267,10 @@
int32 chunk_index = 3;
bool is_final = 4;
string hash = 5; // Full file hash for verification on final chunk
+ int64 offset = 6; // NEW: Byte offset for random-access parallel writes
+ bool compressed = 7; // NEW: Whether the chunk is compressed (zlib)
+ int32 total_chunks = 8; // NEW: Total number of chunks expected
+ int64 total_size = 9; // NEW: Total file size in bytes
}
message SyncStatus {
diff --git a/agent-node/src/protos/agent_pb2.py b/agent-node/src/protos/agent_pb2.py
index 3075f56..bea769c 100644
--- a/agent-node/src/protos/agent_pb2.py
+++ b/agent-node/src/protos/agent_pb2.py
@@ -14,7 +14,7 @@
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12protos/agent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc5\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xa9\x02\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12(\n\x0bskill_event\x18\x06 \x01(\x0b\x32\x11.agent.SkillEventH\x00\x42\t\n\x07payload\"y\n\nSkillEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x16\n\x0cterminal_out\x18\x03 \x01(\tH\x00\x12\x10\n\x06prompt\x18\x04 \x01(\tH\x00\x12\x14\n\nkeep_alive\x18\x05 \x01(\x08H\x00\x42\x06\n\x04\x64\x61ta\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xbc\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12-\n\rpolicy_update\x18\x06 \x01(\x0b\x32\x14.agent.SandboxPolicyH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xd1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xdc\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xe6\x02\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\x12\x11\n\tcpu_count\x18\x08 \x01(\x05\x12\x16\n\x0ememory_used_gb\x18\t \x01(\x02\x12\x17\n\x0fmemory_total_gb\x18\n \x01(\x02\x12\x1a\n\x12\x63pu_usage_per_core\x18\x0b \x03(\x02\x12\x14\n\x0c\x63pu_freq_mhz\x18\x0c \x01(\x02\x12\x1b\n\x13memory_available_gb\x18\r \x01(\x02\x12\x10\n\x08load_avg\x18\x0e \x03(\x02\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xe4\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x12\x0f\n\x07task_id\x18\x06 \x01(\tB\t\n\x07payload\"\xab\x02\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x15\n\rrequest_paths\x18\x03 \x03(\t\x12\x0f\n\x07\x63ontent\x18\x04 \x01(\x0c\x12\x0e\n\x06is_dir\x18\x05 \x01(\x08\"\xaa\x01\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\x12\x08\n\x04LIST\x10\x06\x12\x08\n\x04READ\x10\x07\x12\t\n\x05WRITE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\t\n\x05PURGE\x10\n\x12\x0b\n\x07\x43LEANUP\x10\x0b\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"_\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3')
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12protos/agent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe0\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\x12\x19\n\x11skill_config_json\x18\x06 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xa9\x02\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12(\n\x0bskill_event\x18\x06 \x01(\x0b\x32\x11.agent.SkillEventH\x00\x42\t\n\x07payload\"y\n\nSkillEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x16\n\x0cterminal_out\x18\x03 \x01(\tH\x00\x12\x10\n\x06prompt\x18\x04 \x01(\tH\x00\x12\x14\n\nkeep_alive\x18\x05 \x01(\x08H\x00\x42\x06\n\x04\x64\x61ta\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xbc\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12-\n\rpolicy_update\x18\x06 \x01(\x0b\x32\x14.agent.SandboxPolicyH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xd1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xef\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\x12\x11\n\toffloaded\x18\t \x01(\x08\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xe6\x02\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\x12\x11\n\tcpu_count\x18\x08 \x01(\x05\x12\x16\n\x0ememory_used_gb\x18\t \x01(\x02\x12\x17\n\x0fmemory_total_gb\x18\n \x01(\x02\x12\x1a\n\x12\x63pu_usage_per_core\x18\x0b \x03(\x02\x12\x14\n\x0c\x63pu_freq_mhz\x18\x0c \x01(\x02\x12\x1b\n\x13memory_available_gb\x18\r \x01(\x02\x12\x10\n\x08load_avg\x18\x0e \x03(\x02\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xe4\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x12\x0f\n\x07task_id\x18\x06 \x01(\tB\t\n\x07payload\"\xab\x02\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x15\n\rrequest_paths\x18\x03 \x03(\t\x12\x0f\n\x07\x63ontent\x18\x04 \x01(\x0c\x12\x0e\n\x06is_dir\x18\x05 \x01(\x08\"\xaa\x01\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\x12\x08\n\x04LIST\x10\x06\x12\x08\n\x04READ\x10\x07\x12\t\n\x05WRITE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\t\n\x05PURGE\x10\n\x12\x0b\n\x07\x43LEANUP\x10\x0b\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"\x83\x01\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\x12\x0e\n\x06offset\x18\x06 \x01(\x03\x12\x12\n\ncompressed\x18\x07 \x01(\x08\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -30,67 +30,67 @@
_globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=201
_globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=252
_globals['_SANDBOXPOLICY']._serialized_start=255
- _globals['_SANDBOXPOLICY']._serialized_end=452
- _globals['_SANDBOXPOLICY_MODE']._serialized_start=418
- _globals['_SANDBOXPOLICY_MODE']._serialized_end=452
- _globals['_REGISTRATIONRESPONSE']._serialized_start=454
- _globals['_REGISTRATIONRESPONSE']._serialized_end=574
- _globals['_CLIENTTASKMESSAGE']._serialized_start=577
- _globals['_CLIENTTASKMESSAGE']._serialized_end=874
- _globals['_SKILLEVENT']._serialized_start=876
- _globals['_SKILLEVENT']._serialized_end=997
- _globals['_NODEANNOUNCE']._serialized_start=999
- _globals['_NODEANNOUNCE']._serialized_end=1030
- _globals['_BROWSEREVENT']._serialized_start=1033
- _globals['_BROWSEREVENT']._serialized_end=1168
- _globals['_SERVERTASKMESSAGE']._serialized_start=1171
- _globals['_SERVERTASKMESSAGE']._serialized_end=1487
- _globals['_TASKCANCELREQUEST']._serialized_start=1489
- _globals['_TASKCANCELREQUEST']._serialized_end=1525
- _globals['_TASKREQUEST']._serialized_start=1528
- _globals['_TASKREQUEST']._serialized_end=1737
- _globals['_BROWSERACTION']._serialized_start=1740
- _globals['_BROWSERACTION']._serialized_end=2028
- _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1894
- _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=2028
- _globals['_TASKRESPONSE']._serialized_start=2031
- _globals['_TASKRESPONSE']._serialized_end=2383
- _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2263
- _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2311
- _globals['_TASKRESPONSE_STATUS']._serialized_start=2313
- _globals['_TASKRESPONSE_STATUS']._serialized_end=2373
- _globals['_BROWSERRESPONSE']._serialized_start=2386
- _globals['_BROWSERRESPONSE']._serialized_end=2606
- _globals['_CONSOLEMESSAGE']._serialized_start=2608
- _globals['_CONSOLEMESSAGE']._serialized_end=2675
- _globals['_NETWORKREQUEST']._serialized_start=2677
- _globals['_NETWORKREQUEST']._serialized_end=2781
- _globals['_WORKPOOLUPDATE']._serialized_start=2783
- _globals['_WORKPOOLUPDATE']._serialized_end=2827
- _globals['_TASKCLAIMREQUEST']._serialized_start=2829
- _globals['_TASKCLAIMREQUEST']._serialized_end=2881
- _globals['_TASKCLAIMRESPONSE']._serialized_start=2883
- _globals['_TASKCLAIMRESPONSE']._serialized_end=2952
- _globals['_HEARTBEAT']._serialized_start=2955
- _globals['_HEARTBEAT']._serialized_end=3313
- _globals['_HEALTHCHECKRESPONSE']._serialized_start=3315
- _globals['_HEALTHCHECKRESPONSE']._serialized_end=3360
- _globals['_FILESYNCMESSAGE']._serialized_start=3363
- _globals['_FILESYNCMESSAGE']._serialized_end=3591
- _globals['_SYNCCONTROL']._serialized_start=3594
- _globals['_SYNCCONTROL']._serialized_end=3893
- _globals['_SYNCCONTROL_ACTION']._serialized_start=3723
- _globals['_SYNCCONTROL_ACTION']._serialized_end=3893
- _globals['_DIRECTORYMANIFEST']._serialized_start=3895
- _globals['_DIRECTORYMANIFEST']._serialized_end=3965
- _globals['_FILEINFO']._serialized_start=3967
- _globals['_FILEINFO']._serialized_end=4035
- _globals['_FILEPAYLOAD']._serialized_start=4037
- _globals['_FILEPAYLOAD']._serialized_end=4132
- _globals['_SYNCSTATUS']._serialized_start=4135
- _globals['_SYNCSTATUS']._serialized_end=4295
- _globals['_SYNCSTATUS_CODE']._serialized_start=4229
- _globals['_SYNCSTATUS_CODE']._serialized_end=4295
- _globals['_AGENTORCHESTRATOR']._serialized_start=4298
- _globals['_AGENTORCHESTRATOR']._serialized_end=4531
+ _globals['_SANDBOXPOLICY']._serialized_end=479
+ _globals['_SANDBOXPOLICY_MODE']._serialized_start=445
+ _globals['_SANDBOXPOLICY_MODE']._serialized_end=479
+ _globals['_REGISTRATIONRESPONSE']._serialized_start=481
+ _globals['_REGISTRATIONRESPONSE']._serialized_end=601
+ _globals['_CLIENTTASKMESSAGE']._serialized_start=604
+ _globals['_CLIENTTASKMESSAGE']._serialized_end=901
+ _globals['_SKILLEVENT']._serialized_start=903
+ _globals['_SKILLEVENT']._serialized_end=1024
+ _globals['_NODEANNOUNCE']._serialized_start=1026
+ _globals['_NODEANNOUNCE']._serialized_end=1057
+ _globals['_BROWSEREVENT']._serialized_start=1060
+ _globals['_BROWSEREVENT']._serialized_end=1195
+ _globals['_SERVERTASKMESSAGE']._serialized_start=1198
+ _globals['_SERVERTASKMESSAGE']._serialized_end=1514
+ _globals['_TASKCANCELREQUEST']._serialized_start=1516
+ _globals['_TASKCANCELREQUEST']._serialized_end=1552
+ _globals['_TASKREQUEST']._serialized_start=1555
+ _globals['_TASKREQUEST']._serialized_end=1764
+ _globals['_BROWSERACTION']._serialized_start=1767
+ _globals['_BROWSERACTION']._serialized_end=2055
+ _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1921
+ _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=2055
+ _globals['_TASKRESPONSE']._serialized_start=2058
+ _globals['_TASKRESPONSE']._serialized_end=2410
+ _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2290
+ _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2338
+ _globals['_TASKRESPONSE_STATUS']._serialized_start=2340
+ _globals['_TASKRESPONSE_STATUS']._serialized_end=2400
+ _globals['_BROWSERRESPONSE']._serialized_start=2413
+ _globals['_BROWSERRESPONSE']._serialized_end=2652
+ _globals['_CONSOLEMESSAGE']._serialized_start=2654
+ _globals['_CONSOLEMESSAGE']._serialized_end=2721
+ _globals['_NETWORKREQUEST']._serialized_start=2723
+ _globals['_NETWORKREQUEST']._serialized_end=2827
+ _globals['_WORKPOOLUPDATE']._serialized_start=2829
+ _globals['_WORKPOOLUPDATE']._serialized_end=2873
+ _globals['_TASKCLAIMREQUEST']._serialized_start=2875
+ _globals['_TASKCLAIMREQUEST']._serialized_end=2927
+ _globals['_TASKCLAIMRESPONSE']._serialized_start=2929
+ _globals['_TASKCLAIMRESPONSE']._serialized_end=2998
+ _globals['_HEARTBEAT']._serialized_start=3001
+ _globals['_HEARTBEAT']._serialized_end=3359
+ _globals['_HEALTHCHECKRESPONSE']._serialized_start=3361
+ _globals['_HEALTHCHECKRESPONSE']._serialized_end=3406
+ _globals['_FILESYNCMESSAGE']._serialized_start=3409
+ _globals['_FILESYNCMESSAGE']._serialized_end=3637
+ _globals['_SYNCCONTROL']._serialized_start=3640
+ _globals['_SYNCCONTROL']._serialized_end=3939
+ _globals['_SYNCCONTROL_ACTION']._serialized_start=3769
+ _globals['_SYNCCONTROL_ACTION']._serialized_end=3939
+ _globals['_DIRECTORYMANIFEST']._serialized_start=3941
+ _globals['_DIRECTORYMANIFEST']._serialized_end=4011
+ _globals['_FILEINFO']._serialized_start=4013
+ _globals['_FILEINFO']._serialized_end=4081
+ _globals['_FILEPAYLOAD']._serialized_start=4084
+ _globals['_FILEPAYLOAD']._serialized_end=4215
+ _globals['_SYNCSTATUS']._serialized_start=4218
+ _globals['_SYNCSTATUS']._serialized_end=4378
+ _globals['_SYNCSTATUS_CODE']._serialized_start=4312
+ _globals['_SYNCSTATUS_CODE']._serialized_end=4378
+ _globals['_AGENTORCHESTRATOR']._serialized_start=4381
+ _globals['_AGENTORCHESTRATOR']._serialized_end=4614
# @@protoc_insertion_point(module_scope)
diff --git a/ai-hub/app/api/routes/agent_update.py b/ai-hub/app/api/routes/agent_update.py
index 2b0923d..05a6a31 100644
--- a/ai-hub/app/api/routes/agent_update.py
+++ b/ai-hub/app/api/routes/agent_update.py
@@ -56,9 +56,9 @@
return True
# M4 Fallback: Also allow any valid invite_token currently in the DB
- from app.api.deps import get_db
- from app.models.agent_node import AgentNode
- db = next(get_db())
+ from app.db.session import SessionLocal
+ from app.db.models import AgentNode
+ db = SessionLocal()
try:
exists = db.query(AgentNode).filter(AgentNode.invite_token == token, AgentNode.is_active == True).first()
return exists is not None
diff --git a/ai-hub/app/api/routes/nodes.py b/ai-hub/app/api/routes/nodes.py
index e96791a..4feab51 100644
--- a/ai-hub/app/api/routes/nodes.py
+++ b/ai-hub/app/api/routes/nodes.py
@@ -367,8 +367,8 @@
timeout_ms=request.timeout_ms,
session_id=request.session_id or "",
)
- # Push directly to the node's live gRPC outbound queue
- live.queue.put(agent_pb2.ServerTaskMessage(task_request=task_req))
+ # Push directly to the node's live gRPC outbound queue (Priority 1 for interactive)
+ live.send_message(agent_pb2.ServerTaskMessage(task_request=task_req), priority=1)
registry.emit(node_id, "task_start", {"command": request.command}, task_id=task_id)
except Exception as e:
logger.error(f"[nodes/dispatch] Failed to put task onto queue for {node_id}: {e}")
@@ -395,7 +395,7 @@
from app.protos import agent_pb2
cancel_req = agent_pb2.TaskCancelRequest(task_id=task_id)
- live.queue.put(agent_pb2.ServerTaskMessage(task_cancel=cancel_req))
+ live.send_message(agent_pb2.ServerTaskMessage(task_cancel=cancel_req), priority=0)
registry.emit(node_id, "task_cancel", {"task_id": task_id})
return {"status": "cancel_sent", "task_id": task_id}
@@ -612,7 +612,13 @@
# 4. Run installer with --daemon (or --non-interactive)
print("[*] Bootstrapping agent...")
-cmd = [sys.executable, "bootstrap_installer.py", "--daemon"]
+cmd = [
+ sys.executable, "bootstrap_installer.py",
+ "--daemon",
+ "--hub", "{base_url}",
+ "--token", "{node.invite_token}",
+ "--node-id", "{node_id}"
+]
if {skip_browsers}:
cmd.append("--skip-browsers")
subprocess.run(cmd)
@@ -774,7 +780,7 @@
"data": live.to_dict() if live else {"status": "offline"},
})
- q: queue.Queue = queue.Queue()
+ q: queue.Queue = queue.Queue(maxsize=500) # Capped to prevent memory leak
registry.subscribe_node(node_id, q)
async def send_events():
@@ -836,7 +842,7 @@
timeout_ms=0,
session_id=session_id
)
- live_node.queue.put(agent_pb2.ServerTaskMessage(task_request=task_req))
+ live_node.send_message(agent_pb2.ServerTaskMessage(task_request=task_req), priority=1)
registry.emit(node_id, "task_start", {"command": cmd}, task_id=task_id)
except Exception as e:
logger.error(f"[ws/dispatch] Error: {e}")
@@ -921,7 +927,7 @@
await websocket.close(code=1011)
return
- q: queue.Queue = queue.Queue()
+ q: queue.Queue = queue.Queue(maxsize=500)
# Subscribe to each accessible node individually
for nid in accessible_ids:
registry.subscribe_node(nid, q)
@@ -1250,7 +1256,7 @@
lines += [
"",
"# Workspace sync root โ override if needed",
- "sync_root: \"/tmp/cortex-workspace\"",
+ "sync_root: \"/tmp/cortex-sync\"",
"",
"# FS Explorer root โ defaults to user home if not specified here",
"# fs_root: \"/User/username/Documents\"",
diff --git a/ai-hub/app/api/routes/sessions.py b/ai-hub/app/api/routes/sessions.py
index f931ac8..ca5b372 100644
--- a/ai-hub/app/api/routes/sessions.py
+++ b/ai-hub/app/api/routes/sessions.py
@@ -36,9 +36,20 @@
node_prefs = (user.preferences or {}).get("nodes", {})
default_nodes = node_prefs.get("default_node_ids", [])
node_config = node_prefs.get("data_source", {"source": "empty"})
-
- if default_nodes:
+
+ # M3/M6: Generate stable workspace ID for all Swarm Control sessions or if defaults exist
+ if request.feature_name == "swarm_control" or default_nodes:
new_session.sync_workspace_id = f"session-{new_session.id}-{_uuid.uuid4().hex[:8]}"
+
+ # Ensure server-side ghost mirror directory is created immediately
+ try:
+ from app.main import app
+ if hasattr(app.state, "orchestrator") and app.state.orchestrator.mirror:
+ app.state.orchestrator.mirror.get_workspace_path(new_session.sync_workspace_id)
+ except Exception as mirror_err:
+ logger.error(f"[create_session] Failed to pre-initialize server mirror: {mirror_err}")
+
+ if default_nodes:
new_session.attached_node_ids = list(default_nodes)
new_session.node_sync_status = {
nid: {"status": "pending", "last_sync": None}
diff --git a/ai-hub/app/api/routes/user.py b/ai-hub/app/api/routes/user.py
index 1dea9c6..af5f6b6 100644
--- a/ai-hub/app/api/routes/user.py
+++ b/ai-hub/app/api/routes/user.py
@@ -1,6 +1,7 @@
from fastapi import APIRouter, HTTPException, Depends, Header, Query, Request, UploadFile, File
from fastapi.responses import RedirectResponse as redirect
from sqlalchemy.orm import Session
+from app.config import settings
from app.db import models
from typing import Optional, Annotated
import logging
@@ -18,17 +19,14 @@
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
-# Minimum OIDC configuration from settings
-from app.config import settings
-OIDC_CLIENT_ID = settings.OIDC_CLIENT_ID
-OIDC_CLIENT_SECRET = settings.OIDC_CLIENT_SECRET
-OIDC_SERVER_URL = settings.OIDC_SERVER_URL
-OIDC_REDIRECT_URI = settings.OIDC_REDIRECT_URI
-
-# --- Derived OIDC Configuration ---
-OIDC_AUTHORIZATION_URL = f"{OIDC_SERVER_URL}/auth"
-OIDC_TOKEN_URL = f"{OIDC_SERVER_URL}/token"
-OIDC_USERINFO_URL = f"{OIDC_SERVER_URL}/userinfo"
+# --- Derived OIDC Configuration Helpers ---
+def get_oidc_urls():
+ server_url = settings.OIDC_SERVER_URL.rstrip("/")
+ return {
+ "auth": f"{server_url}/auth",
+ "token": f"{server_url}/token",
+ "userinfo": f"{server_url}/userinfo"
+ }
# A dependency to simulate getting the current user ID from a request header
def get_current_user_id(x_user_id: Annotated[Optional[str], Header()] = None) -> Optional[str]:
@@ -58,17 +56,18 @@
# For simplicity, we will pass it as a query parameter in the callback.
# A more robust solution would use a state parameter.
- # Use urllib.parse.urlencode to properly encode parameters
+ oidc_urls = get_oidc_urls()
params = {
"response_type": "code",
"scope": "openid profile email",
- "client_id": OIDC_CLIENT_ID,
- "redirect_uri": OIDC_REDIRECT_URI,
+ "client_id": settings.OIDC_CLIENT_ID,
+ "redirect_uri": settings.OIDC_REDIRECT_URI,
"state": frontend_callback_uri or ""
}
- auth_url = f"{OIDC_AUTHORIZATION_URL}?{urllib.parse.urlencode(params)}"
- logger.info(f"Redirecting to OIDC authorization URL: {auth_url}")
+ auth_url = f"{oidc_urls['auth']}?{urllib.parse.urlencode(params)}"
+ logger.info(f"Initiating OIDC login. Client ID: '{settings.OIDC_CLIENT_ID}', Redirect URI: '{settings.OIDC_REDIRECT_URI}', State: '{params['state']}'")
+ logger.debug(f"Full redirect URL: {auth_url}")
return redirect(url=auth_url)
@router.get("/login/callback", summary="Handle OIDC Login Callback")
@@ -85,20 +84,21 @@
"""
logger.info(f"Received callback with authorization code: {code[:10]}... and state: {state}")
+ oidc_urls = get_oidc_urls()
try:
- logger.info(f"Exchanging code for tokens at: {OIDC_TOKEN_URL}")
+ logger.info(f"Exchanging code for tokens at: {oidc_urls['token']}")
# Step 1: Exchange the authorization code for an access token and an ID token
token_data = {
"grant_type": "authorization_code",
"code": code,
- "redirect_uri": OIDC_REDIRECT_URI,
- "client_id": OIDC_CLIENT_ID,
- "client_secret": OIDC_CLIENT_SECRET,
+ "redirect_uri": settings.OIDC_REDIRECT_URI,
+ "client_id": settings.OIDC_CLIENT_ID,
+ "client_secret": settings.OIDC_CLIENT_SECRET,
}
async with httpx.AsyncClient() as client:
- logger.debug(f"Sending POST to {OIDC_TOKEN_URL} with data keys: {list(token_data.keys())}")
- token_response = await client.post(OIDC_TOKEN_URL, data=token_data, timeout=30.0)
+ logger.debug(f"Sending POST to {oidc_urls['token']} with data keys: {list(token_data.keys())}")
+ token_response = await client.post(oidc_urls['token'], data=token_data, timeout=30.0)
token_response.raise_for_status()
response_json = token_response.json()
diff --git a/ai-hub/app/api/schemas.py b/ai-hub/app/api/schemas.py
index 1307489..ee725e0 100644
--- a/ai-hub/app/api/schemas.py
+++ b/ai-hub/app/api/schemas.py
@@ -301,6 +301,7 @@
enabled: bool = True
cwd_jail: Optional[str] = None # shell only: restrict working directory
max_file_size_mb: Optional[int] = None # sync only: file size cap
+ headless: Optional[bool] = None # browser only: toggle UI visibility
sandbox: Optional[SandboxConfig] = None # NEW: shell sandbox config
class NodeSkillConfig(BaseModel):
diff --git a/ai-hub/app/core/grpc/core/journal.py b/ai-hub/app/core/grpc/core/journal.py
index dba559b..4de45cf 100644
--- a/ai-hub/app/core/grpc/core/journal.py
+++ b/ai-hub/app/core/grpc/core/journal.py
@@ -207,14 +207,14 @@
with self.lock:
return self.tasks.pop(task_id, None)
- def cleanup(self, max_age_s: int = 3600):
+ def cleanup(self, max_age_s: int = 900):
"""Purges stale tasks to prevent slow memory accumulation."""
now = time.time()
with self.lock:
to_remove = [
tid for tid, t in self.tasks.items()
- if (t["completed_at"] and (now - t["completed_at"]) > 300) # finished: keep 5m
- or (now - t["created_at"]) > max_age_s # pending: keep 1h
+ if (t["completed_at"] and (now - t["completed_at"]) > 120) # finished: keep 2m
+ or (now - t["created_at"]) > 900 # pending: keep 15m
]
for tid in to_remove:
del self.tasks[tid]
diff --git a/ai-hub/app/core/grpc/core/pool.py b/ai-hub/app/core/grpc/core/pool.py
index f53a1db..2c22207 100644
--- a/ai-hub/app/core/grpc/core/pool.py
+++ b/ai-hub/app/core/grpc/core/pool.py
@@ -1,16 +1,29 @@
import threading
+import time
class GlobalWorkPool:
"""Thread-safe pool of unassigned tasks that can be claimed by any node."""
def __init__(self):
self.lock = threading.Lock()
- self.available = {} # task_id -> payload
+ self.available = {} # task_id -> {"payload": p, "ts": time.time()}
self.on_new_work = None # Callback to notify nodes
+ threading.Thread(target=self._cleanup_loop, daemon=True, name="WorkPoolCleanup").start()
+ def _cleanup_loop(self):
+ import time
+ while True:
+ time.sleep(300)
+ now = time.time()
+ with self.lock:
+ to_remove = [tid for tid, d in self.available.items() if (now - d["ts"]) > 3600]
+ for tid in to_remove:
+ del self.available[tid]
+
def push_work(self, task_id, payload):
"""Adds new task to global discovery pool."""
+ import time
with self.lock:
- self.available[task_id] = payload
+ self.available[task_id] = {"payload": payload, "ts": time.time()}
print(f" [๐ฆ] New Shared Task: {task_id}")
if self.on_new_work:
self.on_new_work(task_id)
@@ -20,7 +33,7 @@
with self.lock:
if task_id in self.available:
print(f" [๐ฆ] Task {task_id} Claimed by {node_id}")
- return True, self.available.pop(task_id)
+ return True, self.available.pop(task_id)["payload"]
return False, None
def list_available(self):
diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py
index 86782fe..024d5b3 100644
--- a/ai-hub/app/core/grpc/services/assistant.py
+++ b/ai-hub/app/core/grpc/services/assistant.py
@@ -2,10 +2,14 @@
import json
import os
import hashlib
+import zlib
import logging
import shutil
+import threading
from app.core.grpc.utils.crypto import sign_payload, sign_browser_action
from app.protos import agent_pb2
+from app.db.session import get_db_session
+from app.db.models import Session
logger = logging.getLogger(__name__)
@@ -17,34 +21,42 @@
self.pool = pool
self.mirror = mirror
self.memberships = {} # session_id -> list(node_id)
+ self.membership_lock = threading.Lock()
def push_workspace(self, node_id, session_id):
"""Initial unidirectional push from server ghost mirror to a node."""
- node = self.registry.get_node(node_id)
- if not node or not self.mirror: return
+ if not self.mirror: return
+ # 1. Ensure Server Mirror exists immediately
+ manifest = self.mirror.generate_manifest(session_id)
+
+ # 2. Track relationship for recovery/reconciliation
+ with self.membership_lock:
+ if session_id not in self.memberships:
+ self.memberships[session_id] = []
+ if node_id not in self.memberships[session_id]:
+ self.memberships[session_id].append(node_id)
+
+ # 3. If node is online, push actual data
+ node = self.registry.get_node(node_id)
+ if not node:
+ logger.info(f"[๐๐ค] Workspace {session_id} prepared on server for offline node {node_id}")
+ return
+
print(f"[๐๐ค] Initiating Workspace Push for Session {session_id} to {node_id}")
- # Track for recovery
- if session_id not in self.memberships:
- self.memberships[session_id] = []
- if node_id not in self.memberships[session_id]:
- self.memberships[session_id].append(node_id)
-
- manifest = self.mirror.generate_manifest(session_id)
-
- # 1. Send Manifest
- node.queue.put(agent_pb2.ServerTaskMessage(
+ # Send Manifest to Node. The node will compare this with its local state
+ # and send back RECONCILE_REQUIRED for any files it is missing.
+ # This prevents the "Double Push" race where the server blasts data
+ # while the node is still trying to decide what it needs.
+ node.send_message(agent_pb2.ServerTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
manifest=manifest
)
- ))
+ ), priority=1)
- # 2. Send File Data
- for file_info in manifest.files:
- if not file_info.is_dir:
- self.push_file(node_id, session_id, file_info.path)
+ # NOTE: Proactive parallel push removed. Manifest-driven reactive sync is cleaner.
def push_file(self, node_id, session_id, rel_path):
"""Pushes a specific file to a node (used for drift recovery)."""
@@ -58,34 +70,44 @@
print(f" [๐โ] Requested file {rel_path} not found in mirror")
return
- with open(abs_path, "rb") as f:
- full_data = f.read()
- full_hash = hashlib.sha256(full_data).hexdigest()
- f.seek(0)
-
- index = 0
- while True:
- import time
- chunk = f.read(1024 * 512) # 512KB chunks
- is_final = len(chunk) < 1024 * 512
-
- node.queue.put(agent_pb2.ServerTaskMessage(
- file_sync=agent_pb2.FileSyncMessage(
- session_id=session_id,
- file_data=agent_pb2.FilePayload(
- path=rel_path,
- chunk=chunk,
- chunk_index=index,
- is_final=is_final,
- hash=full_hash if is_final else ""
+ # Line-rate Optimization: 4MB chunks + No Software Throttling
+ hasher = hashlib.sha256()
+ file_size = os.path.getsize(abs_path)
+
+ try:
+ with open(abs_path, "rb") as f:
+ index = 0
+ while True:
+ chunk = f.read(4 * 1024 * 1024) # 4MB chunks (optimal for gRPC)
+ if not chunk: break
+
+ hasher.update(chunk)
+ offset = f.tell() - len(chunk)
+ is_final = f.tell() >= file_size
+
+ # Compress Chunk for transit
+ compressed_chunk = zlib.compress(chunk)
+
+ # Put into priority dispatcher (priority 2 for sync data)
+ node.send_message(agent_pb2.ServerTaskMessage(
+ file_sync=agent_pb2.FileSyncMessage(
+ session_id=session_id,
+ file_data=agent_pb2.FilePayload(
+ path=rel_path,
+ chunk=compressed_chunk,
+ chunk_index=index,
+ is_final=is_final,
+ hash=hasher.hexdigest() if is_final else "",
+ offset=offset,
+ compressed=True
+ )
)
- )
- ))
- time.sleep(0.02) # ~25MB/s throttle to allow interleaving with other Node messages
-
- if is_final or not chunk:
- break
- index += 1
+ ), priority=2)
+
+ if is_final: break
+ index += 1
+ except Exception as e:
+ logger.error(f"[๐๐ค] Line-rate push error for {rel_path}: {e}")
def clear_workspace(self, node_id, session_id):
"""Sends a SyncControl command to purge the local sync directory on a node, and removes from active mesh."""
@@ -95,26 +117,53 @@
node = self.registry.get_node(node_id)
if not node: return
- node.queue.put(agent_pb2.ServerTaskMessage(
+ node.send_message(agent_pb2.ServerTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.PURGE, path=".")
)
- ))
+ ), priority=1)
def reconcile_node(self, node_id):
"""Forces a re-sync check for all sessions this node belongs to and purges dead sessions."""
print(f" [๐๐] Triggering Resync Check for {node_id}...")
active_sessions = []
- for sid, nodes in self.memberships.items():
- if node_id in nodes:
- active_sessions.append(sid)
+ try:
+ with get_db_session() as db:
+ sessions = db.query(Session).filter(
+ Session.is_archived == False,
+ Session.sync_workspace_id.isnot(None)
+ ).all()
+
+ with self.membership_lock:
+ for s in sessions:
+ attached = s.attached_node_ids or []
+ if node_id in attached:
+ active_sessions.append(s.sync_workspace_id)
+ if s.sync_workspace_id not in self.memberships:
+ self.memberships[s.sync_workspace_id] = []
+ if node_id not in self.memberships[s.sync_workspace_id]:
+ self.memberships[s.sync_workspace_id].append(node_id)
+
+ # Aggressive memory cleanup: Purge orphaned session memberships
+ current_active_workspace_ids = {s.sync_workspace_id for s in sessions}
+ with self.membership_lock:
+ to_purge = [sid for sid in self.memberships.keys() if sid not in current_active_workspace_ids]
+ for sid in to_purge:
+ del self.memberships[sid]
+ except Exception as e:
+ print(f" [๐โ ๏ธ] Failed to fetch active sessions for node reconciliation: {e}")
+ # Fallback to in-memory if DB fails
+ with self.membership_lock:
+ for sid, nodes in self.memberships.items():
+ if node_id in nodes:
+ active_sessions.append(sid)
# Send proactive cleanup payload with the active sessions whitelist
node = self.registry.get_node(node_id)
if node:
- node.queue.put(agent_pb2.ServerTaskMessage(
+ node.send_message(agent_pb2.ServerTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id="global",
control=agent_pb2.SyncControl(
@@ -122,32 +171,41 @@
request_paths=active_sessions
)
)
- ))
+ ), priority=0)
for sid in active_sessions:
# Re-push manifest to trigger node-side drift check
self.push_workspace(node_id, sid)
+ # Add a small delay to prevent saturating the gRPC stream for multiple sessions
+ time.sleep(0.5)
def broadcast_file_chunk(self, session_id: str, sender_node_id: str, file_payload):
"""Broadcasts a file chunk received from one node to all other nodes in the mesh."""
- session_members = self.memberships.get(session_id, [])
- destinations = [n for n in session_members if n != sender_node_id]
+ with self.membership_lock:
+ session_members = self.memberships.get(session_id, [])
+ destinations = [n for n in session_members if n != sender_node_id]
if destinations:
print(f" [๐๐ข] Broadcasting {file_payload.path} from {sender_node_id} to: {', '.join(destinations)}")
- for node_id in destinations:
- node = self.registry.get_node(node_id)
- if not node:
- continue
-
- # Forward the exact same FileSyncMessage
- node.queue.put(agent_pb2.ServerTaskMessage(
- file_sync=agent_pb2.FileSyncMessage(
- session_id=session_id,
- file_data=file_payload
- )
- ))
+ def _send_to_node(nid):
+ node = self.registry.get_node(nid)
+ if node:
+ # Forward the exact same FileSyncMessage (Priority 2 for Sync Data)
+ node.send_message(agent_pb2.ServerTaskMessage(
+ file_sync=agent_pb2.FileSyncMessage(
+ session_id=session_id,
+ file_data=file_payload
+ )
+ ), priority=2)
+
+ # M6: Use registry executor if available for parallel mesh broadcast
+ if self.registry.executor:
+ for nid in destinations:
+ self.registry.executor.submit(_send_to_node, nid)
+ else:
+ for nid in destinations:
+ _send_to_node(nid)
def lock_workspace(self, node_id, session_id):
"""Disables user-side synchronization from a node during AI refactors."""
@@ -161,12 +219,12 @@
"""Requests a full directory manifest from a node for drift checking."""
node = self.registry.get_node(node_id)
if not node: return
- node.queue.put(agent_pb2.ServerTaskMessage(
+ node.send_message(agent_pb2.ServerTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.REFRESH_MANIFEST, path=path)
)
- ))
+ ), priority=1)
def control_sync(self, node_id, session_id, action="START", path="."):
"""Sends a SyncControl command to a node (e.g. START_WATCHING, LOCK)."""
@@ -188,12 +246,12 @@
if node_id not in self.memberships[session_id]:
self.memberships[session_id].append(node_id)
- node.queue.put(agent_pb2.ServerTaskMessage(
+ node.send_message(agent_pb2.ServerTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
control=agent_pb2.SyncControl(action=proto_action, path=path)
)
- ))
+ ), priority=1)
# ==================================================================
# Modular FS Explorer / Mesh Navigation
@@ -227,13 +285,13 @@
tid = f"fs-ls-{int(time.time()*1000)}"
event = self.journal.register(tid, node_id)
- node.queue.put(agent_pb2.ServerTaskMessage(
+ node.send_message(agent_pb2.ServerTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=tid,
control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.LIST, path=path)
)
- ))
+ ), priority=1)
if event.wait(timeout):
res = self.journal.get_result(tid)
@@ -249,11 +307,12 @@
def _proactive_explorer_sync(self, node_id, files, session_id):
"""Starts background tasks to mirror files to Hub so dots turn green."""
- import threading
for f in files:
if f.get("is_dir"): continue
if not f.get("is_synced") and f.get("size", 0) < 1024 * 512: # Skip large files
- threading.Thread(target=self.cat, args=(node_id, f["path"], 15, session_id), daemon=True).start()
+ # M6: Use shared registry executor instead of spawning loose threads
+ if self.registry.executor:
+ self.registry.executor.submit(self.cat, node_id, f["path"], 15, session_id)
def cat(self, node_id: str, path: str, timeout=15, session_id="__fs_explorer__", force_remote: bool = False):
"""Requests file content from a node (waits for result)."""
@@ -278,13 +337,13 @@
tid = f"fs-cat-{int(time.time()*1000)}"
event = self.journal.register(tid, node_id)
- node.queue.put(agent_pb2.ServerTaskMessage(
+ node.send_message(agent_pb2.ServerTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=tid,
control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.READ, path=path)
)
- ))
+ ), priority=1)
if event.wait(timeout):
res = self.journal.get_result(tid)
@@ -311,21 +370,34 @@
with open(dest, "wb") as f:
f.write(content)
- # Fire and forget synchronization to the edge node
- tid = f"fs-write-{int(time.time()*1000)}"
- node.queue.put(agent_pb2.ServerTaskMessage(
- file_sync=agent_pb2.FileSyncMessage(
- session_id=session_id,
- task_id=tid,
- control=agent_pb2.SyncControl(
- action=agent_pb2.SyncControl.WRITE,
- path=path,
- content=content,
- is_dir=is_dir
+ # Multi-node broadcast for sessions
+ targets = []
+ if session_id != "__fs_explorer__":
+ targets = self.memberships.get(session_id, [node_id])
+ else:
+ targets = [node_id]
+
+ print(f"[๐โ๏ธ] AI Write: {path} (Session: {session_id}) -> Dispatching to {len(targets)} nodes")
+
+ for target_nid in targets:
+ target_node = self.registry.get_node(target_nid)
+ if not target_node: continue
+
+ tid = f"fs-write-{int(time.time()*1000)}"
+ target_node.send_message(agent_pb2.ServerTaskMessage(
+ file_sync=agent_pb2.FileSyncMessage(
+ session_id=session_id,
+ task_id=tid,
+ control=agent_pb2.SyncControl(
+ action=agent_pb2.SyncControl.WRITE,
+ path=path,
+ content=content,
+ is_dir=is_dir
+ )
)
- )
- ))
- return {"success": True, "message": "Synchronized to local mirror and dispatched to node"}
+ ), priority=2)
+
+ return {"success": True, "message": f"Synchronized to local mirror and dispatched to {len(targets)} nodes"}
except Exception as e:
logger.error(f"[๐โ๏ธ] Local mirror write error: {e}")
return {"error": str(e)}
@@ -334,7 +406,7 @@
tid = f"fs-write-{int(time.time()*1000)}"
event = self.journal.register(tid, node_id)
- node.queue.put(agent_pb2.ServerTaskMessage(
+ node.send_message(agent_pb2.ServerTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=tid,
@@ -345,7 +417,7 @@
is_dir=is_dir
)
)
- ))
+ ), priority=2)
if event.wait(timeout):
res = self.journal.get_result(tid)
@@ -409,16 +481,28 @@
elif os.path.exists(dest):
os.remove(dest)
- # Fire and forget to edge node
- tid = f"fs-rm-{int(time.time()*1000)}"
- node.queue.put(agent_pb2.ServerTaskMessage(
- file_sync=agent_pb2.FileSyncMessage(
- session_id=session_id,
- task_id=tid,
- control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.DELETE, path=path)
- )
- ))
- return {"success": True, "message": "Removed from local mirror and dispatched delete to node"}
+ # Multi-node broadcast for sessions
+ targets = []
+ if session_id != "__fs_explorer__":
+ targets = self.memberships.get(session_id, [node_id])
+ else:
+ targets = [node_id]
+
+ print(f"[๐๐๏ธ] AI Remove: {path} (Session: {session_id}) -> Dispatching to {len(targets)} nodes")
+
+ for target_nid in targets:
+ target_node = self.registry.get_node(target_nid)
+ if not target_node: continue
+
+ tid = f"fs-rm-{int(time.time()*1000)}"
+ target_node.send_message(agent_pb2.ServerTaskMessage(
+ file_sync=agent_pb2.FileSyncMessage(
+ session_id=session_id,
+ task_id=tid,
+ control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.DELETE, path=path)
+ )
+ ), priority=2)
+ return {"success": True, "message": f"Removed from local mirror and dispatched delete to {len(targets)} nodes"}
except Exception as e:
logger.error(f"[๐๐๏ธ] Local mirror rm error: {e}")
return {"error": str(e)}
@@ -427,13 +511,13 @@
tid = f"fs-rm-{int(time.time()*1000)}"
event = self.journal.register(tid, node_id)
- node.queue.put(agent_pb2.ServerTaskMessage(
+ node.send_message(agent_pb2.ServerTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=tid,
control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.DELETE, path=path)
)
- ))
+ ), priority=2)
if event.wait(timeout):
res = self.journal.get_result(tid)
@@ -442,7 +526,7 @@
self.journal.pop(tid)
return {"error": "Timeout"}
- def dispatch_swarm(self, node_ids, cmd, timeout=30, session_id=None, no_abort=False):
+ def dispatch_swarm(self, node_ids, cmd, timeout=120, session_id=None, no_abort=False):
"""Dispatches a command to multiple nodes in parallel and waits for all results."""
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -462,7 +546,7 @@
return results
- def dispatch_single(self, node_id, cmd, timeout=30, session_id=None, no_abort=False):
+ def dispatch_single(self, node_id, cmd, timeout=120, session_id=None, no_abort=False):
"""Dispatches a shell command to a specific node."""
import uuid
node = self.registry.get_node(node_id)
@@ -480,7 +564,7 @@
logger.info(f"[๐ค] Dispatching shell {tid} to {node_id}")
self.registry.emit(node_id, "task_assigned", {"command": cmd, "session_id": session_id}, task_id=tid)
- node.queue.put(req)
+ node.send_message(req, priority=1)
self.registry.emit(node_id, "task_start", {"command": cmd}, task_id=tid)
# Immediate peek if timeout is 0
@@ -504,7 +588,7 @@
logger.warning(f"[โ ๏ธ] Shell task {tid} TIMEOUT after {timeout}s on {node_id}. Sending ABORT.")
try:
- node.queue.put(agent_pb2.ServerTaskMessage(task_cancel=agent_pb2.TaskCancelRequest(task_id=tid)))
+ node.send_message(agent_pb2.ServerTaskMessage(task_cancel=agent_pb2.TaskCancelRequest(task_id=tid)), priority=0)
except: pass
# Return partial result captured in buffer before popping
@@ -532,7 +616,7 @@
logger.info(f"[๐๐ค] Dispatching browser {tid} to {node_id}")
self.registry.emit(node_id, "task_assigned", {"browser_action": action.action, "url": action.url}, task_id=tid)
- node.queue.put(req)
+ node.send_message(req, priority=1)
self.registry.emit(node_id, "task_start", {"browser_action": action.action}, task_id=tid)
if event.wait(timeout):
@@ -591,7 +675,7 @@
logger.warning(f"[โ ๏ธ] Wait for task {task_id} TIMEOUT again. Sending ABORT.")
node = self.registry.get_node(node_id)
if node:
- try: node.queue.put(agent_pb2.ServerTaskMessage(task_cancel=agent_pb2.TaskCancelRequest(task_id=task_id)))
+ try: node.send_message(agent_pb2.ServerTaskMessage(task_cancel=agent_pb2.TaskCancelRequest(task_id=task_id)), priority=0)
except: pass
res = self.journal.get_result(task_id)
diff --git a/ai-hub/app/core/services/node_registry.py b/ai-hub/app/core/services/node_registry.py
index 0054d64..971fb02 100644
--- a/ai-hub/app/core/services/node_registry.py
+++ b/ai-hub/app/core/services/node_registry.py
@@ -11,11 +11,14 @@
"""
import threading
import queue
+import time
import logging
import re
import uuid
+import asyncio
from datetime import datetime
from typing import Dict, Optional, List, Any
+from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger(__name__)
@@ -49,7 +52,8 @@
self.node_id = node_id
self.user_id = user_id # Owner โ maps node to a Hub user
self.metadata = metadata # desc, caps (capabilities dict)
- self.queue: queue.Queue = queue.Queue() # gRPC outbound message queue
+ # Increased queue size to 1000 to handle high-concurrency file sync without dropping interactive tasks
+ self.queue: queue.PriorityQueue = queue.PriorityQueue(maxsize=1000)
self.stats: dict = {
"active_worker_count": 0,
"cpu_usage_percent": 0.0,
@@ -58,9 +62,40 @@
}
self.connected_at: datetime = datetime.utcnow()
self.last_heartbeat_at: datetime = datetime.utcnow()
- import uuid
self.session_id: str = str(uuid.uuid4())
self.terminal_history: List[str] = [] # Recent PTY lines for AI reading
+ self._registry_executor = None # Set by registry
+
+ def send_message(self, msg: Any, priority: int = 2):
+ """
+ Thread-safe and Async-safe message dispatcher.
+ priority: 0 (Admin/Control), 1 (Terminal/Interactive), 2 (File Sync)
+ """
+ item = (priority, time.time(), msg)
+
+ def _blocking_put():
+ try:
+ self.queue.put(item, block=True, timeout=2.0)
+ except queue.Full:
+ logger.warning(f"[๐โ ๏ธ] Message dropped for {self.node_id}: outbound queue FULL. Node may be unresponsive.")
+ except Exception as e:
+ logger.error(f"[๐โ] Sync error sending to {self.node_id}: {e}")
+
+ try:
+ # Check if we are in an async loop (FastAPI context)
+ loop = asyncio.get_running_loop()
+ if loop.is_running():
+ if self._registry_executor:
+ self._registry_executor.submit(_blocking_put)
+ else:
+ # Fallback to fire-and-forget thread if executor not yet ready
+ threading.Thread(target=_blocking_put, daemon=True).start()
+ return
+ except RuntimeError:
+ pass # Not in async loop
+
+ # Standard sync put (from gRPC thread)
+ _blocking_put()
def update_stats(self, stats: dict):
self.stats.update(stats)
@@ -105,16 +140,17 @@
"""
def __init__(self):
- self._lock = threading.Lock()
self._nodes: Dict[str, LiveNodeRecord] = {}
+ self._lock = threading.Lock()
+ self._connection_history: Dict[str, List[datetime]] = {}
# Per-node WS subscribers: node_id -> [queue, ...]
self._node_listeners: Dict[str, List[queue.Queue]] = {}
# Per-user WS subscribers: user_id -> [queue, ...] (ALL nodes for that user)
self._user_listeners: Dict[str, List[queue.Queue]] = {}
- # Connection history for flapping detection: node_id -> [timestamp, ...]
- self._connection_history: Dict[str, List[datetime]] = {}
- self._FLAP_THRESHOLD = 3 # Max connections
- self._FLAP_WINDOW_S = 30 # In 30 seconds
+ self._FLAP_WINDOW_S = 60
+ self._FLAP_THRESHOLD = 5
+ # Shared Hub-wide work executor to prevent thread-spawning leaks
+ self.executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix="RegistryWorker")
# ------------------------------------------------------------------ #
# DB Helpers #
@@ -186,6 +222,33 @@
self._connection_history.clear()
logger.info(f"[NodeRegistry] EMERGENCY: Cleared {count} nodes from memory cache.")
return count
+
+ def validate_invite_token(self, node_id: str, token: str) -> dict:
+ """
+ Directly validates an invite token against the DB.
+ Used by the gRPC server to avoid HTTP self-call deadlocks during startup.
+ """
+ from app.db.models import AgentNode
+ from app.db.session import get_db_session
+ try:
+ with get_db_session() as db:
+ node = db.query(AgentNode).filter(
+ AgentNode.node_id == node_id,
+ AgentNode.invite_token == token,
+ AgentNode.is_active == True,
+ ).first()
+ if not node:
+ return {"valid": False, "reason": "Invalid token or unknown node."}
+ return {
+ "valid": True,
+ "node_id": node.node_id,
+ "display_name": node.display_name,
+ "user_id": node.registered_by,
+ "skill_config": node.skill_config or {},
+ }
+ except Exception as e:
+ logger.error(f"[NodeRegistry] Token validation exception: {e}")
+ return {"valid": False, "reason": str(e)}
# ------------------------------------------------------------------ #
@@ -213,10 +276,11 @@
# 2. Register the live connection
record = LiveNodeRecord(node_id=node_id, user_id=user_id, metadata=metadata)
+ record._registry_executor = self.executor # Inject shared executor
self._nodes[node_id] = record
- # Persist to DB (background-safe โ session is scoped)
- self._db_upsert_node(node_id, user_id, metadata)
+ # Persist to DB asynchronously to avoid blocking gRPC stream setup during NFS lag
+ self.executor.submit(self._db_upsert_node, node_id, user_id, metadata)
logger.info(f"[๐] NodeRegistry: Registered {node_id} (owner: {user_id}) | Stats enabled")
self.emit(node_id, "node_online", record.to_dict())
@@ -236,7 +300,7 @@
node = self._nodes.pop(node_id, None)
user_id = node.user_id if node else None
- self._db_mark_offline(node_id)
+ self.executor.submit(self._db_mark_offline, node_id)
self.emit(node_id, "node_offline", {"node_id": node_id, "user_id": user_id})
logger.info(f"[๐] NodeRegistry: Deregistered {node_id}")
@@ -274,8 +338,8 @@
node.update_stats(stats)
if stats.get("cpu_usage_percent", 0) > 0 or stats.get("memory_usage_percent", 0) > 0:
logger.debug(f"[๐] Heartbeat {node_id}: CPU {stats.get('cpu_usage_percent')}% | MEM {stats.get('memory_usage_percent')}%")
- # Persist heartbeat timestamp to DB (throttle: already ~10s cadence from node)
- self._db_update_heartbeat(node_id)
+ # Persist heartbeat timestamp to DB asynchronously
+ self.executor.submit(self._db_update_heartbeat, node_id)
# Emit heartbeat event to live UI
self.emit(node_id, "heartbeat", stats)
@@ -322,16 +386,20 @@
if not is_tty_char:
node.terminal_history.append(f"$ {cmd}\n")
elif event_type == "task_stdout" and isinstance(data, str):
- # NEW: Strip ANSI codes for AI readability
+ # NEW: Strip ANSI codes and CAP size to 100KB per chunk to prevent memory bloat
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
clean_output = ansi_escape.sub('', data)
+ if len(clean_output) > 100_000:
+ clean_output = clean_output[:100_000] + "\n[... Output Truncated ...]\n"
node.terminal_history.append(clean_output)
elif event_type == "skill_event" and isinstance(data, dict):
if data.get("type") == "output":
output_data = data.get("data", "")
- # Strip ANSI codes for AI readability
+ # Strip ANSI codes and CAP size
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
clean_output = ansi_escape.sub('', output_data)
+ if len(clean_output) > 100_000:
+ clean_output = clean_output[:100_000] + "\n[... Output Truncated ...]\n"
node.terminal_history.append(clean_output)
# Keep a rolling buffer of 150 terminal interaction chunks
diff --git a/ai-hub/app/db/session.py b/ai-hub/app/db/session.py
index 6f46c4d..fb50db0 100644
--- a/ai-hub/app/db/session.py
+++ b/ai-hub/app/db/session.py
@@ -6,13 +6,21 @@
# Determine engine arguments based on the database mode from the central config
engine_args = {}
if settings.DB_MODE == "sqlite":
- # This argument is required for SQLite to allow it to be used by multiple threads,
- # which is the case in a web application like FastAPI.
- engine_args["connect_args"] = {"check_same_thread": False}
+ # This argument is required for SQLite to allow it to be used by multiple threads.
+ # We also increase the timeout for NFS environments to 30 seconds.
+ engine_args["connect_args"] = {"check_same_thread": False, "timeout": 60}
+ # M6: Scale pool size for high-concurrency background ops
+ engine_args["pool_size"] = 20
+ engine_args["max_overflow"] = 30
else:
# 'pool_pre_ping' checks if a database connection is still alive before using it.
# This prevents errors from connections that have been timed out by the DB server.
engine_args["pool_pre_ping"] = True
+ engine_args["pool_size"] = 20
+ engine_args["max_overflow"] = 30
+
+# Always pre-check connections
+engine_args["pool_pre_ping"] = True
import os
@@ -24,6 +32,16 @@
# Create the SQLAlchemy engine using the centralized URL and determined arguments
engine = create_engine(settings.DATABASE_URL, **engine_args)
+# M6: Speed up SQLite on NFS by using Write-Ahead Logging (WAL) and reducing sync level
+if settings.DB_MODE == "sqlite":
+ from sqlalchemy import event
+ @event.listens_for(engine, "connect")
+ def set_sqlite_pragma(dbapi_connection, connection_record):
+ cursor = dbapi_connection.cursor()
+ cursor.execute("PRAGMA journal_mode=WAL")
+ cursor.execute("PRAGMA synchronous=NORMAL")
+ cursor.close()
+
# SessionLocal is a factory for creating new database session objects.
# It's the standard way to interact with the database in SQLAlchemy.
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
diff --git a/ai-hub/app/protos/agent.proto b/ai-hub/app/protos/agent.proto
index 46b74fa..3076b89 100644
--- a/ai-hub/app/protos/agent.proto
+++ b/ai-hub/app/protos/agent.proto
@@ -33,6 +33,7 @@
repeated string denied_commands = 3;
repeated string sensitive_commands = 4;
string working_dir_jail = 5;
+ string skill_config_json = 6; // NEW: Map of skill settings (e.g. {"browser": {"headless": false}})
}
message RegistrationResponse {
@@ -155,6 +156,7 @@
string eval_result = 6;
repeated ConsoleMessage console_history = 7;
repeated NetworkRequest network_history = 8;
+ bool offloaded = 9; // NEW: Indicates content is stored in the sync mirror (.cortex_browser/)
}
message ConsoleMessage {
@@ -265,6 +267,10 @@
int32 chunk_index = 3;
bool is_final = 4;
string hash = 5; // Full file hash for verification on final chunk
+ int64 offset = 6; // NEW: Byte offset for random-access parallel writes
+ bool compressed = 7; // NEW: Whether the chunk is compressed (zlib)
+ int32 total_chunks = 8; // NEW: Total number of chunks expected
+ int64 total_size = 9; // NEW: Total file size in bytes
}
message SyncStatus {
diff --git a/ai-hub/app/protos/agent_pb2.py b/ai-hub/app/protos/agent_pb2.py
index da2f584..9c98de4 100644
--- a/ai-hub/app/protos/agent_pb2.py
+++ b/ai-hub/app/protos/agent_pb2.py
@@ -14,7 +14,7 @@
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16\x61pp/protos/agent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc5\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xa9\x02\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12(\n\x0bskill_event\x18\x06 \x01(\x0b\x32\x11.agent.SkillEventH\x00\x42\t\n\x07payload\"y\n\nSkillEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x16\n\x0cterminal_out\x18\x03 \x01(\tH\x00\x12\x10\n\x06prompt\x18\x04 \x01(\tH\x00\x12\x14\n\nkeep_alive\x18\x05 \x01(\x08H\x00\x42\x06\n\x04\x64\x61ta\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xbc\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12-\n\rpolicy_update\x18\x06 \x01(\x0b\x32\x14.agent.SandboxPolicyH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xd1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xdc\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xe6\x02\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\x12\x11\n\tcpu_count\x18\x08 \x01(\x05\x12\x16\n\x0ememory_used_gb\x18\t \x01(\x02\x12\x17\n\x0fmemory_total_gb\x18\n \x01(\x02\x12\x1a\n\x12\x63pu_usage_per_core\x18\x0b \x03(\x02\x12\x14\n\x0c\x63pu_freq_mhz\x18\x0c \x01(\x02\x12\x1b\n\x13memory_available_gb\x18\r \x01(\x02\x12\x10\n\x08load_avg\x18\x0e \x03(\x02\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xe4\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x12\x0f\n\x07task_id\x18\x06 \x01(\tB\t\n\x07payload\"\xab\x02\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x15\n\rrequest_paths\x18\x03 \x03(\t\x12\x0f\n\x07\x63ontent\x18\x04 \x01(\x0c\x12\x0e\n\x06is_dir\x18\x05 \x01(\x08\"\xaa\x01\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\x12\x08\n\x04LIST\x10\x06\x12\x08\n\x04READ\x10\x07\x12\t\n\x05WRITE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\t\n\x05PURGE\x10\n\x12\x0b\n\x07\x43LEANUP\x10\x0b\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"_\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3')
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16\x61pp/protos/agent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe0\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\x12\x19\n\x11skill_config_json\x18\x06 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xa9\x02\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12(\n\x0bskill_event\x18\x06 \x01(\x0b\x32\x11.agent.SkillEventH\x00\x42\t\n\x07payload\"y\n\nSkillEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x16\n\x0cterminal_out\x18\x03 \x01(\tH\x00\x12\x10\n\x06prompt\x18\x04 \x01(\tH\x00\x12\x14\n\nkeep_alive\x18\x05 \x01(\x08H\x00\x42\x06\n\x04\x64\x61ta\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xbc\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12-\n\rpolicy_update\x18\x06 \x01(\x0b\x32\x14.agent.SandboxPolicyH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xd1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xef\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\x12\x11\n\toffloaded\x18\t \x01(\x08\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xe6\x02\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\x12\x11\n\tcpu_count\x18\x08 \x01(\x05\x12\x16\n\x0ememory_used_gb\x18\t \x01(\x02\x12\x17\n\x0fmemory_total_gb\x18\n \x01(\x02\x12\x1a\n\x12\x63pu_usage_per_core\x18\x0b \x03(\x02\x12\x14\n\x0c\x63pu_freq_mhz\x18\x0c \x01(\x02\x12\x1b\n\x13memory_available_gb\x18\r \x01(\x02\x12\x10\n\x08load_avg\x18\x0e \x03(\x02\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xe4\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x12\x0f\n\x07task_id\x18\x06 \x01(\tB\t\n\x07payload\"\xab\x02\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x15\n\rrequest_paths\x18\x03 \x03(\t\x12\x0f\n\x07\x63ontent\x18\x04 \x01(\x0c\x12\x0e\n\x06is_dir\x18\x05 \x01(\x08\"\xaa\x01\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\x12\x08\n\x04LIST\x10\x06\x12\x08\n\x04READ\x10\x07\x12\t\n\x05WRITE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\t\n\x05PURGE\x10\n\x12\x0b\n\x07\x43LEANUP\x10\x0b\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"\xad\x01\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\x12\x0e\n\x06offset\x18\x06 \x01(\x03\x12\x12\n\ncompressed\x18\x07 \x01(\x08\x12\x14\n\x0ctotal_chunks\x18\x08 \x01(\x05\x12\x12\n\ntotal_size\x18\t \x01(\x03\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -30,67 +30,67 @@
_globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=205
_globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=256
_globals['_SANDBOXPOLICY']._serialized_start=259
- _globals['_SANDBOXPOLICY']._serialized_end=456
- _globals['_SANDBOXPOLICY_MODE']._serialized_start=422
- _globals['_SANDBOXPOLICY_MODE']._serialized_end=456
- _globals['_REGISTRATIONRESPONSE']._serialized_start=458
- _globals['_REGISTRATIONRESPONSE']._serialized_end=578
- _globals['_CLIENTTASKMESSAGE']._serialized_start=581
- _globals['_CLIENTTASKMESSAGE']._serialized_end=878
- _globals['_SKILLEVENT']._serialized_start=880
- _globals['_SKILLEVENT']._serialized_end=1001
- _globals['_NODEANNOUNCE']._serialized_start=1003
- _globals['_NODEANNOUNCE']._serialized_end=1034
- _globals['_BROWSEREVENT']._serialized_start=1037
- _globals['_BROWSEREVENT']._serialized_end=1172
- _globals['_SERVERTASKMESSAGE']._serialized_start=1175
- _globals['_SERVERTASKMESSAGE']._serialized_end=1491
- _globals['_TASKCANCELREQUEST']._serialized_start=1493
- _globals['_TASKCANCELREQUEST']._serialized_end=1529
- _globals['_TASKREQUEST']._serialized_start=1532
- _globals['_TASKREQUEST']._serialized_end=1741
- _globals['_BROWSERACTION']._serialized_start=1744
- _globals['_BROWSERACTION']._serialized_end=2032
- _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1898
- _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=2032
- _globals['_TASKRESPONSE']._serialized_start=2035
- _globals['_TASKRESPONSE']._serialized_end=2387
- _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2267
- _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2315
- _globals['_TASKRESPONSE_STATUS']._serialized_start=2317
- _globals['_TASKRESPONSE_STATUS']._serialized_end=2377
- _globals['_BROWSERRESPONSE']._serialized_start=2390
- _globals['_BROWSERRESPONSE']._serialized_end=2610
- _globals['_CONSOLEMESSAGE']._serialized_start=2612
- _globals['_CONSOLEMESSAGE']._serialized_end=2679
- _globals['_NETWORKREQUEST']._serialized_start=2681
- _globals['_NETWORKREQUEST']._serialized_end=2785
- _globals['_WORKPOOLUPDATE']._serialized_start=2787
- _globals['_WORKPOOLUPDATE']._serialized_end=2831
- _globals['_TASKCLAIMREQUEST']._serialized_start=2833
- _globals['_TASKCLAIMREQUEST']._serialized_end=2885
- _globals['_TASKCLAIMRESPONSE']._serialized_start=2887
- _globals['_TASKCLAIMRESPONSE']._serialized_end=2956
- _globals['_HEARTBEAT']._serialized_start=2959
- _globals['_HEARTBEAT']._serialized_end=3317
- _globals['_HEALTHCHECKRESPONSE']._serialized_start=3319
- _globals['_HEALTHCHECKRESPONSE']._serialized_end=3364
- _globals['_FILESYNCMESSAGE']._serialized_start=3367
- _globals['_FILESYNCMESSAGE']._serialized_end=3595
- _globals['_SYNCCONTROL']._serialized_start=3598
- _globals['_SYNCCONTROL']._serialized_end=3897
- _globals['_SYNCCONTROL_ACTION']._serialized_start=3727
- _globals['_SYNCCONTROL_ACTION']._serialized_end=3897
- _globals['_DIRECTORYMANIFEST']._serialized_start=3899
- _globals['_DIRECTORYMANIFEST']._serialized_end=3969
- _globals['_FILEINFO']._serialized_start=3971
- _globals['_FILEINFO']._serialized_end=4039
- _globals['_FILEPAYLOAD']._serialized_start=4041
- _globals['_FILEPAYLOAD']._serialized_end=4136
- _globals['_SYNCSTATUS']._serialized_start=4139
- _globals['_SYNCSTATUS']._serialized_end=4299
- _globals['_SYNCSTATUS_CODE']._serialized_start=4233
- _globals['_SYNCSTATUS_CODE']._serialized_end=4299
- _globals['_AGENTORCHESTRATOR']._serialized_start=4302
- _globals['_AGENTORCHESTRATOR']._serialized_end=4535
+ _globals['_SANDBOXPOLICY']._serialized_end=483
+ _globals['_SANDBOXPOLICY_MODE']._serialized_start=449
+ _globals['_SANDBOXPOLICY_MODE']._serialized_end=483
+ _globals['_REGISTRATIONRESPONSE']._serialized_start=485
+ _globals['_REGISTRATIONRESPONSE']._serialized_end=605
+ _globals['_CLIENTTASKMESSAGE']._serialized_start=608
+ _globals['_CLIENTTASKMESSAGE']._serialized_end=905
+ _globals['_SKILLEVENT']._serialized_start=907
+ _globals['_SKILLEVENT']._serialized_end=1028
+ _globals['_NODEANNOUNCE']._serialized_start=1030
+ _globals['_NODEANNOUNCE']._serialized_end=1061
+ _globals['_BROWSEREVENT']._serialized_start=1064
+ _globals['_BROWSEREVENT']._serialized_end=1199
+ _globals['_SERVERTASKMESSAGE']._serialized_start=1202
+ _globals['_SERVERTASKMESSAGE']._serialized_end=1518
+ _globals['_TASKCANCELREQUEST']._serialized_start=1520
+ _globals['_TASKCANCELREQUEST']._serialized_end=1556
+ _globals['_TASKREQUEST']._serialized_start=1559
+ _globals['_TASKREQUEST']._serialized_end=1768
+ _globals['_BROWSERACTION']._serialized_start=1771
+ _globals['_BROWSERACTION']._serialized_end=2059
+ _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1925
+ _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=2059
+ _globals['_TASKRESPONSE']._serialized_start=2062
+ _globals['_TASKRESPONSE']._serialized_end=2414
+ _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2294
+ _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2342
+ _globals['_TASKRESPONSE_STATUS']._serialized_start=2344
+ _globals['_TASKRESPONSE_STATUS']._serialized_end=2404
+ _globals['_BROWSERRESPONSE']._serialized_start=2417
+ _globals['_BROWSERRESPONSE']._serialized_end=2656
+ _globals['_CONSOLEMESSAGE']._serialized_start=2658
+ _globals['_CONSOLEMESSAGE']._serialized_end=2725
+ _globals['_NETWORKREQUEST']._serialized_start=2727
+ _globals['_NETWORKREQUEST']._serialized_end=2831
+ _globals['_WORKPOOLUPDATE']._serialized_start=2833
+ _globals['_WORKPOOLUPDATE']._serialized_end=2877
+ _globals['_TASKCLAIMREQUEST']._serialized_start=2879
+ _globals['_TASKCLAIMREQUEST']._serialized_end=2931
+ _globals['_TASKCLAIMRESPONSE']._serialized_start=2933
+ _globals['_TASKCLAIMRESPONSE']._serialized_end=3002
+ _globals['_HEARTBEAT']._serialized_start=3005
+ _globals['_HEARTBEAT']._serialized_end=3363
+ _globals['_HEALTHCHECKRESPONSE']._serialized_start=3365
+ _globals['_HEALTHCHECKRESPONSE']._serialized_end=3410
+ _globals['_FILESYNCMESSAGE']._serialized_start=3413
+ _globals['_FILESYNCMESSAGE']._serialized_end=3641
+ _globals['_SYNCCONTROL']._serialized_start=3644
+ _globals['_SYNCCONTROL']._serialized_end=3943
+ _globals['_SYNCCONTROL_ACTION']._serialized_start=3773
+ _globals['_SYNCCONTROL_ACTION']._serialized_end=3943
+ _globals['_DIRECTORYMANIFEST']._serialized_start=3945
+ _globals['_DIRECTORYMANIFEST']._serialized_end=4015
+ _globals['_FILEINFO']._serialized_start=4017
+ _globals['_FILEINFO']._serialized_end=4085
+ _globals['_FILEPAYLOAD']._serialized_start=4088
+ _globals['_FILEPAYLOAD']._serialized_end=4261
+ _globals['_SYNCSTATUS']._serialized_start=4264
+ _globals['_SYNCSTATUS']._serialized_end=4424
+ _globals['_SYNCSTATUS_CODE']._serialized_start=4358
+ _globals['_SYNCSTATUS_CODE']._serialized_end=4424
+ _globals['_AGENTORCHESTRATOR']._serialized_start=4427
+ _globals['_AGENTORCHESTRATOR']._serialized_end=4660
# @@protoc_insertion_point(module_scope)
diff --git a/deployment/jerxie-prod/docker-compose.production.yml b/deployment/jerxie-prod/docker-compose.production.yml
index b5f137c..64d1040 100644
--- a/deployment/jerxie-prod/docker-compose.production.yml
+++ b/deployment/jerxie-prod/docker-compose.production.yml
@@ -15,11 +15,7 @@
- SUPER_ADMINS=axieyangb@gmail.com,jerxie.app@gmail.com
- SECRET_KEY=aYc2j1lYUUZXkBFFUndnleZI
-# Redirect the persistent data to the NFS volume
+# Use local storage for database to avoid NFS locking issues
volumes:
ai_hub_data:
driver: local
- driver_opts:
- type: "nfs"
- o: "addr=192.168.68.90,rw"
- device: ":/volume1/docker/ai-hub/data"
diff --git a/deployment/test-nodes/docker-compose.test-nodes.yml b/deployment/test-nodes/docker-compose.test-nodes.yml
index 58d20ee..a0d8f98 100644
--- a/deployment/test-nodes/docker-compose.test-nodes.yml
+++ b/deployment/test-nodes/docker-compose.test-nodes.yml
@@ -21,6 +21,10 @@
privileged: true
volumes:
- ./skills:/app/node_skills:ro
+ deploy:
+ resources:
+ limits:
+ memory: 512M
test-node-2:
build:
@@ -41,3 +45,7 @@
privileged: true
volumes:
- ./skills:/app/node_skills:ro
+ deploy:
+ resources:
+ limits:
+ memory: 512M
diff --git a/frontend/src/pages/NodesPage.js b/frontend/src/pages/NodesPage.js
index e414ff9..9639d9e 100644
--- a/frontend/src/pages/NodesPage.js
+++ b/frontend/src/pages/NodesPage.js
@@ -613,6 +613,56 @@
)}
+
+
Browser Skill
+ {editingNodeId === node.node_id ? (
+
+ ) : (
+
+
+ {node.skill_config?.browser?.enabled ? 'Active' : 'Disabled'}
+
+ {node.skill_config?.browser?.enabled && (
+
+ {node.skill_config?.browser?.headless === false ? 'Headed' : 'Headless'}
+
+ )}
+
+ )}
+
{/* SANDBOX POLICY CONFIGURATION โ New M6 Feature */}
{editingNodeId === node.node_id && editForm.skill_config?.shell?.enabled ? (
diff --git a/scripts/deploy_test_nodes.sh b/scripts/deploy_test_nodes.sh
index ad5d8f8..06bf74c 100755
--- a/scripts/deploy_test_nodes.sh
+++ b/scripts/deploy_test_nodes.sh
@@ -39,7 +39,7 @@
# 2. Cleanup any previous test nodes
echo "Cleaning up old test nodes..."
- echo '$PASS' | sudo -S docker ps -a --filter "name=cortex-test-node-" -q | xargs -r sudo docker rm -f
+ echo '$PASS' | sudo -S docker ps -a --filter "name=cortex-test-node-" -q | xargs -r -I % sh -c "echo '$PASS' | sudo -S docker rm -f %"
# 3. Spawn N nodes
for i in \$(seq 1 $COUNT); do
@@ -48,13 +48,22 @@
echo "[+] Starting \$CONTAINER_NAME..."
+ # Determine the correct token based on node number (matches docker-compose.test-nodes.yml)
+ if [ "\$i" -eq 1 ]; then
+ NODE_TOKEN="cortex-secret-shared-key"
+ else
+ NODE_TOKEN="ysHjZIRXeWo-YYK6EWtBsIgJ4uNBihSnZMtt0BQW3eI"
+ fi
+
echo '$PASS' | sudo -S docker run -d \\
--name "\$CONTAINER_NAME" \\
--network cortex-hub_default \\
-e AGENT_NODE_ID="\$NODE_ID" \\
-e AGENT_NODE_DESC="Scalable Test Node #\$i" \\
-e GRPC_ENDPOINT="ai_hub_service:50051" \\
- -e AGENT_SECRET_KEY="cortex-secret-shared-key" \\
+ -e AGENT_HUB_URL="http://ai_hub_service:8000" \\
+ -e AGENT_AUTH_TOKEN="\$NODE_TOKEN" \\
+ -e AGENT_SECRET_KEY="aYc2j1lYUUZXkBFFUndnleZI" \\
-e AGENT_TLS_ENABLED="false" \\
agent-node-base
done
diff --git a/scripts/register_test_nodes.py b/scripts/register_test_nodes.py
index 867819a..ae30fb0 100644
--- a/scripts/register_test_nodes.py
+++ b/scripts/register_test_nodes.py
@@ -4,7 +4,7 @@
# Ensure we can import from app
sys.path.append("/app")
-from app.db.database import SessionLocal
+from app.db.session import SessionLocal
from app.db.models import AgentNode, NodeGroupAccess
db = SessionLocal()
@@ -29,7 +29,7 @@
"display_name": "Test Node 2",
"description": "Scaled Pod 2",
"registered_by": "9a333ccd-9c3f-432f-a030-7b1e1284a436",
- "invite_token": "cortex-secret-shared-key",
+ "invite_token": "ysHjZIRXeWo-YYK6EWtBsIgJ4uNBihSnZMtt0BQW3eI",
"is_active": True,
"skill_config": {
"shell": {"enabled": True},
diff --git a/scripts/remote_deploy.sh b/scripts/remote_deploy.sh
index 3963e52..737cfa9 100755
--- a/scripts/remote_deploy.sh
+++ b/scripts/remote_deploy.sh
@@ -61,7 +61,7 @@
# 1. Sync local codebase to temporary directory on remote server
echo "Syncing local files to production [USER: $USER, HOST: $HOST]..."
-sshpass -p "$PASS" rsync -avz \
+sshpass -p "$PASS" rsync -avz --delete \
--exclude '.git' \
--exclude 'node_modules' \
--exclude 'frontend/node_modules' \
@@ -75,11 +75,10 @@
exit 1
fi
-# 2. Copy the synced files into the actual project directory replacing the old ones
+# 2. Sync from the temporary directory to the actual project directory
echo "Overwriting production project files..."
sshpass -p "$PASS" ssh -o StrictHostKeyChecking=no "$USER@$HOST" << EOF
- echo '$PASS' | sudo -S rm -rf $REMOTE_PROJ/nginx.conf
- echo '$PASS' | sudo -S cp -r ${REMOTE_TMP}* $REMOTE_PROJ/
+ echo '$PASS' | sudo -S rsync -av --delete ${REMOTE_TMP}/ $REMOTE_PROJ/
echo '$PASS' | sudo -S chown -R $USER:$USER $REMOTE_PROJ
EOF
diff --git a/skills/browser-automation-agent/logic.py b/skills/browser-automation-agent/logic.py
deleted file mode 100644
index c7d7ea7..0000000
--- a/skills/browser-automation-agent/logic.py
+++ /dev/null
@@ -1,369 +0,0 @@
-import threading
-import queue
-import time
-import json
-import re
-from playwright.sync_api import sync_playwright
-from agent_node.skills.base import BaseSkill
-from protos import agent_pb2
-
-# ============================================================
-# Role-Ref Registry
-# Inspired by Openclaw's pw-role-snapshot.ts
-# Maps `ref=eN` shorthand -> (role, name, nth) for every
-# interactive / content element on the last snapshotted page.
-# ============================================================
-
-INTERACTIVE_ROLES = {
- "button", "link", "textbox", "checkbox", "radio", "combobox",
- "listbox", "menuitem", "menuitemcheckbox", "menuitemradio",
- "option", "searchbox", "slider", "spinbutton", "switch", "tab", "treeitem",
-}
-CONTENT_ROLES = {
- "heading", "cell", "gridcell", "columnheader", "rowheader",
- "listitem", "article", "region", "main", "navigation",
-}
-STRUCTURAL_ROLES = {
- "generic", "group", "list", "table", "row", "rowgroup", "grid",
- "treegrid", "menu", "menubar", "toolbar", "tablist", "tree",
- "directory", "document", "application", "presentation", "none",
-}
-
-
-def _build_aria_snapshot(aria_text: str) -> tuple[str, dict]:
- """
- Parse Playwright's ariaSnapshot() output and annotate interactive/content
- elements with stable [ref=eN] labels that the AI can refer back to.
- Returns (annotated_snapshot, ref_map).
- """
- lines = aria_text.split("\n")
- refs = {}
- counter = [0]
- role_counts = {} # (role, name) -> count (for nth disambiguation)
- output_lines = []
-
- def next_ref():
- counter[0] += 1
- return f"e{counter[0]}"
-
- for line in lines:
- m = re.match(r'^(\s*-\s*)(\w+)(?:\s+"([^"]*)")?(.*)$', line)
- if not m:
- output_lines.append(line)
- continue
-
- prefix, role_raw, name, suffix = m.group(1), m.group(2), m.group(3), m.group(4)
- role = role_raw.lower()
-
- is_interactive = role in INTERACTIVE_ROLES
- is_content_with_name = role in CONTENT_ROLES and name
-
- if not (is_interactive or is_content_with_name):
- output_lines.append(line)
- continue
-
- # assign ref
- ref = next_ref()
- key = (role, name)
- nth = role_counts.get(key, 0)
- role_counts[key] = nth + 1
-
- refs[ref] = {"role": role, "name": name, "nth": nth if nth > 0 else None}
-
- enhanced = f"{prefix}{role_raw}"
- if name:
- enhanced += f' "{name}"'
- enhanced += f" [ref={ref}]"
- if nth > 0:
- enhanced += f" [nth={nth}]"
- if suffix:
- enhanced += suffix
- output_lines.append(enhanced)
-
- return "\n".join(output_lines), refs
-
-
-def _resolve_ref(page, ref: str, role_refs: dict):
- """Resolve a [ref=eN] string to a Playwright Locator."""
- info = role_refs.get(ref)
- if not info:
- raise ValueError(f"Unknown ref '{ref}'. Run aria_snapshot first and use a ref from that output.")
- role = info["role"]
- name = info.get("name")
- nth = info.get("nth") or 0
- if name:
- loc = page.get_by_role(role, name=name, exact=True)
- else:
- loc = page.get_by_role(role)
- if nth:
- loc = loc.nth(nth)
- return loc
-
-
-class BrowserSkill(BaseSkill):
- """
- Persistent Browser Skill โ OpenClaw-inspired role-snapshot architecture.
-
- Key innovation over the prior version:
- - `aria_snapshot` action returns a compact semantic role tree with [ref=eN] labels.
- - All `click`, `type`, `hover` actions accept either a CSS/XPath selector OR a
- ref string like 'e3', enabling the AI to address elements without fragile selectors.
- - Page errors and console output are tracked per-session and included in results.
- """
- def __init__(self, sync_mgr=None):
- self.task_queue = queue.Queue()
- # session_id -> { "context", "page", "role_refs", "console", "errors", "download_dir" }
- self.sessions = {}
- self.sync_mgr = sync_mgr
- self.lock = threading.Lock()
- threading.Thread(target=self._browser_actor, daemon=True, name="BrowserActor").start()
-
- # ------------------------------------------------------------------
- # Session Management
- # ------------------------------------------------------------------
-
- def _get_or_create_session(self, browser, sid, task, on_event):
- """Return existing session dict or create a new one."""
- with self.lock:
- if sid in self.sessions:
- return self.sessions[sid]
-
- download_dir = None
- if self.sync_mgr and task.session_id:
- download_dir = self.sync_mgr.get_session_dir(task.session_id)
- print(f" [๐๐] Mapping Browser Context to: {download_dir}")
-
- ctx = browser.new_context(accept_downloads=True)
- page = ctx.new_page()
-
- sess = {
- "context": ctx,
- "page": page,
- "role_refs": {}, # ref -> {role, name, nth}
- "console": [],
- "errors": [],
- "download_dir": download_dir,
- }
- self.sessions[sid] = sess
-
- # Listeners
- self._attach_listeners(sid, page, on_event, sess)
- return sess
-
- def _attach_listeners(self, sid, page, on_event, sess):
- # Console log capture
- def _on_console(msg):
- entry = {"level": msg.type, "text": msg.text, "ts": int(time.time() * 1000)}
- sess["console"].append(entry)
- if len(sess["console"]) > 200:
- sess["console"].pop(0)
- if on_event:
- on_event(agent_pb2.BrowserEvent(
- session_id=sid,
- console_msg=agent_pb2.ConsoleMessage(
- level=msg.type, text=msg.text, timestamp_ms=entry["ts"]
- )
- ))
-
- def _on_page_error(err):
- sess["errors"].append({"message": str(err), "ts": int(time.time() * 1000)})
- if len(sess["errors"]) > 100:
- sess["errors"].pop(0)
-
- def _on_network(req):
- resp = req.response()
- if on_event:
- on_event(agent_pb2.BrowserEvent(
- session_id=sid,
- network_req=agent_pb2.NetworkRequest(
- method=req.method, url=req.url,
- status=resp.status if resp else 0,
- resource_type=req.resource_type, latency_ms=0
- )
- ))
-
- def _on_download(dl):
- import os
- with self.lock:
- s = self.sessions.get(sid)
- if s and s.get("download_dir"):
- os.makedirs(s["download_dir"], exist_ok=True)
- target = os.path.join(s["download_dir"], dl.suggested_filename)
- print(f" [๐๐ฅ] Download: {dl.suggested_filename} -> {target}")
- dl.save_as(target)
-
- page.on("console", _on_console)
- page.on("pageerror", _on_page_error)
- page.on("requestfinished", _on_network)
- page.on("download", _on_download)
-
- # ------------------------------------------------------------------
- # Browser Actor Loop
- # ------------------------------------------------------------------
-
- def _browser_actor(self):
- print("[๐] Browser Actor Starting...", flush=True)
- pw = browser = None
- try:
- try:
- pw = sync_playwright().start()
- except Exception as pe:
- print(f"[!] Playwright failed to start: {pe}", flush=True)
- return
-
- try:
- browser = pw.chromium.launch(headless=True, args=[
- '--no-sandbox', '--disable-setuid-sandbox',
- '--disable-dev-shm-usage', '--disable-gpu'
- ])
- print("[๐] Browser Engine Online.", flush=True)
- except Exception as be:
- print(f"[!] Chromium launch failed: {be}", flush=True)
- if pw: pw.stop()
- return
- except Exception as e:
- print(f"[!] Browser Actor critical failure: {e}", flush=True)
- if pw: pw.stop()
- return
-
- while True:
- try:
- item = self.task_queue.get()
- if item is None:
- print("[๐] Browser Actor Shutting Down...", flush=True)
- break
-
- task, sandbox, on_complete, on_event = item
- action = task.browser_action
- sid = action.session_id or "default"
- action_name = agent_pb2.BrowserAction.ActionType.Name(action.action)
- print(f" [๐] {action_name} | Session: {sid}", flush=True)
-
- sess = self._get_or_create_session(browser, sid, task, on_event)
- page = sess["page"]
-
- res_data = {}
- try:
- self._dispatch_action(action, page, sess, res_data)
- except Exception as e:
- on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id)
- continue
-
- # Build BrowserResponse โ include aria_snapshot result in eval_result
- br_res = agent_pb2.BrowserResponse(
- url=page.url,
- title=page.title(),
- snapshot=res_data.get("snapshot", b""),
- dom_content=res_data.get("dom_content", ""),
- a11y_tree=res_data.get("a11y_tree", ""),
- eval_result=res_data.get("eval_result", ""),
- )
- on_complete(task.task_id, {"status": 1, "browser_result": br_res}, task.trace_id)
-
- except Exception as e:
- print(f" [!] Browser Actor Error: {e}", flush=True)
- try:
- on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id)
- except Exception:
- pass
-
- # Cleanup
- print("[๐] Cleaning up Browser Engine...", flush=True)
- with self.lock:
- for s in self.sessions.values():
- try: s["context"].close()
- except: pass
- self.sessions.clear()
- if browser: browser.close()
- if pw: pw.stop()
-
- # ------------------------------------------------------------------
- # Action Dispatcher
- # ------------------------------------------------------------------
-
- def _dispatch_action(self, action, page, sess, res_data):
- A = agent_pb2.BrowserAction
- role_refs = sess["role_refs"]
-
- def resolve(selector_or_ref: str):
- """Accept either a CSS selector or a ref like 'e3'."""
- s = (selector_or_ref or "").strip()
- if re.match(r'^e\d+$', s):
- return _resolve_ref(page, s, role_refs)
- return page.locator(s)
-
- if action.action == A.NAVIGATE:
- page.goto(action.url, wait_until="domcontentloaded", timeout=25000)
- # Auto-snapshot after every navigation: give AI page context immediately
- aria_raw = page.locator(":root").aria_snapshot()
- snap, refs = _build_aria_snapshot(aria_raw)
- sess["role_refs"] = refs
- # Trim to 8000 chars to avoid bloating the grpc response
- trimmed = snap[:8000] + ("\n\n[...snapshot truncated...]" if len(snap) > 8000 else "")
- stats = f"refs={len(refs)}"
- res_data["a11y_tree"] = trimmed
- res_data["eval_result"] = stats
-
- elif action.action == A.CLICK:
- target = action.selector or ""
- resolve(target).click(timeout=8000)
-
- elif action.action == A.TYPE:
- target = action.selector or ""
- resolve(target).fill(action.text, timeout=8000)
-
- elif action.action == A.SCREENSHOT:
- res_data["snapshot"] = page.screenshot(full_page=False)
-
- elif action.action == A.GET_DOM:
- res_data["dom_content"] = page.content()
-
- elif action.action == A.HOVER:
- target = action.selector or ""
- resolve(target).hover(timeout=5000)
-
- elif action.action == A.SCROLL:
- page.mouse.wheel(x=0, y=action.y or 400)
-
- elif action.action == A.EVAL:
- result = page.evaluate(action.text)
- res_data["eval_result"] = str(result)
-
- elif action.action == A.GET_A11Y:
- # OpenClaw-style role snapshot with ref labels โ the key feature!
- aria_raw = page.locator(":root").aria_snapshot()
- snap, refs = _build_aria_snapshot(aria_raw)
- sess["role_refs"] = refs # remember refs for subsequent click/type calls
-
- # Trim large snapshots (news pages can be huge)
- MAX = 10000
- if len(snap) > MAX:
- snap = snap[:MAX] + "\n\n[...snapshot truncated - use eval/scroll to see more...]"
-
- stats = {
- "total_refs": len(refs),
- "interactive": sum(1 for r in refs.values() if r["role"] in INTERACTIVE_ROLES),
- "url": page.url,
- "title": page.title(),
- }
- res_data["a11y_tree"] = snap
- res_data["eval_result"] = json.dumps(stats)
-
- elif action.action == A.CLOSE:
- with self.lock:
- s = self.sessions.pop(action.session_id or "default", None)
- if s:
- s["context"].close()
-
- # ------------------------------------------------------------------
- # Public Interface
- # ------------------------------------------------------------------
-
- def execute(self, task, sandbox, on_complete, on_event=None):
- self.task_queue.put((task, sandbox, on_complete, on_event))
-
- def cancel(self, task_id):
- return False
-
- def shutdown(self):
- self.task_queue.put(None)
diff --git a/skills/mesh-file-explorer/SKILL.md b/skills/mesh-file-explorer/SKILL.md
index 1c7f064..625fd8f 100644
--- a/skills/mesh-file-explorer/SKILL.md
+++ b/skills/mesh-file-explorer/SKILL.md
@@ -47,9 +47,22 @@
# Mesh File Explorer
-You are a file management assistant. Use this tool for high-performance file operations:
-1. **`list`**: Explore directories. If a 'session_id' is provided, it uses the zero-latency Hub mirror.
-2. **`read`**: Fetch file content. Uses local Hub mirror fast-path if available.
-3. **`write`**: Synchronously update Hub mirror and background push to node.
-4. **`delete`**: Remove from Hub and dispatch remote delete.
-Always include 'session_id' for improved performance unless you need to bypass the ghost mirror.
+You are a decentralized file management specialist. Use this tool based on the context:
+
+### 1. ๐ Standard Workspace Sync (Ghost Mirror)
+- **WHEN**: You are working on project files intended to sync across all nodes.
+- **PATH**: Use a **RELATIVE** path (e.g., `src/main.py`). NEVER use absolute paths starting with `/tmp/cortex-sync/`.
+- **SESSION**: You MUST provide the `session_id` (usually your current Ghost Mirror ID).
+- **BENEFIT**: Zero-latency write to the Hub mirror + background push to nodes.
+
+### 2. ๐ฅ๏ธ Physical Node Maintenance
+- **WHEN**: You need to interact with system files OUTSIDE the project workspace (e.g., `/etc/hosts` or personal home dirs).
+- **PATH**: Use an **ABSOLUTE** path.
+- **SESSION**: Set `session_id` to `__fs_explorer__`.
+- **BEHAVIOR**: Direct gRPC call to the physical node. Slower, but bypasses the mirror.
+
+### Actions
+- **`list`**: Explore the filesystem.
+- **`read`**: Retrieve content.
+- **`write`**: Create/Update files.
+- **`delete`**: Remove files.
diff --git a/skills/mesh-file-explorer/logic.py b/skills/mesh-file-explorer/logic.py
deleted file mode 100644
index a8bd080..0000000
--- a/skills/mesh-file-explorer/logic.py
+++ /dev/null
@@ -1,77 +0,0 @@
-import os
-import json
-import logging
-from agent_node.skills.base import BaseSkill
-
-logger = logging.getLogger(__name__)
-
-class FileSkill(BaseSkill):
- """Provides file system navigation and inspection capabilities."""
-
- def __init__(self, sync_mgr=None):
- self.sync_mgr = sync_mgr
-
- def execute(self, task, sandbox, on_complete, on_event=None):
- """
- Executes a file-related task (list, stats).
- Payload JSON: { "action": "list", "path": "...", "recursive": false }
- """
- try:
- payload = json.loads(task.payload_json)
- action = payload.get("action", "list")
- path = payload.get("path", ".")
-
- # 1. Sandbox Jail Check
- # (In a real implementation, we'd use sandbox.check_path(path))
- # For now, we'll assume the node allows browsing its root or session dir.
-
- if action == "list":
- result = self._list_dir(path, payload.get("recursive", False))
- on_complete(task.task_id, {"status": 1, "stdout": json.dumps(result)}, task.trace_id)
- else:
- on_complete(task.task_id, {"status": 0, "stderr": f"Unknown action: {action}"}, task.trace_id)
-
- except Exception as e:
- logger.error(f"[FileSkill] Task {task.task_id} failed: {e}")
- on_complete(task.task_id, {"status": 0, "stderr": str(e)}, task.trace_id)
-
- def _list_dir(self, path, recursive=False):
- """Lists directory contents with metadata."""
- if not os.path.exists(path):
- return {"error": "Path not found"}
-
- items = []
- if recursive:
- for root, dirs, files in os.walk(path):
- for name in dirs + files:
- abs_path = os.path.join(root, name)
- rel_path = os.path.relpath(abs_path, path)
- st = os.stat(abs_path)
- items.append({
- "name": name,
- "path": rel_path,
- "is_dir": os.path.isdir(abs_path),
- "size": st.st_size,
- "mtime": st.st_mtime
- })
- else:
- for name in os.listdir(path):
- abs_path = os.path.join(path, name)
- st = os.stat(abs_path)
- items.append({
- "name": name,
- "is_dir": os.path.isdir(abs_path),
- "size": st.st_size,
- "mtime": st.st_mtime
- })
-
- return {
- "root": os.path.abspath(path),
- "items": sorted(items, key=lambda x: (not x["is_dir"], x["name"]))
- }
-
- def cancel(self, task_id):
- return False # Listing is usually fast, no cancellation needed
-
- def shutdown(self):
- pass
diff --git a/skills/mesh-terminal-control/logic.py b/skills/mesh-terminal-control/logic.py
deleted file mode 100644
index 5a138b7..0000000
--- a/skills/mesh-terminal-control/logic.py
+++ /dev/null
@@ -1,413 +0,0 @@
-import os
-import pty
-import select
-import threading
-import time
-import termios
-import struct
-import fcntl
-import tempfile
-from agent_node.skills.base import BaseSkill
-from protos import agent_pb2
-
-class ShellSkill(BaseSkill):
- """Admin Console Skill: Persistent stateful Bash via PTY."""
- def __init__(self, sync_mgr=None):
- self.sync_mgr = sync_mgr
- self.sessions = {} # session_id -> {fd, pid, thread, last_activity, ...}
- self.lock = threading.Lock()
-
- # Phase 3: Prompt Patterns for Edge Intelligence
- self.PROMPT_PATTERNS = [
- r"[\r\n].*[@\w\.\-]+:.*[#$]\s*$", # bash/zsh: user@host:~$
- r">>>\s*$", # python
- r"\.\.\.\s*$", # python multi-line
- r">\s*$", # node/js
- ]
-
- # --- M7: Idle Session Reaper ---
- # Automatically kills dormant bash processes to free up system resources.
- self.reaper_thread = threading.Thread(target=self._session_reaper, daemon=True, name="ShellReaper")
- self.reaper_thread.start()
-
- def _session_reaper(self):
- """Background thread that cleans up unused PTY sessions."""
- while True:
- time.sleep(60)
- with self.lock:
- now = time.time()
- for sid, sess in list(self.sessions.items()):
- # Avoid reaping currently active tasks
- if sess.get("active_task"):
- continue
-
- # 10 minute idle timeout
- if now - sess.get("last_activity", 0) > 600:
- print(f" [๐๐งน] Reaping idle shell session: {sid}")
- try:
- os.close(sess["fd"])
- os.kill(sess["pid"], 9)
- except: pass
- self.sessions.pop(sid, None)
-
- def _ensure_session(self, session_id, cwd, on_event):
- with self.lock:
- if session_id in self.sessions:
- self.sessions[session_id]["last_activity"] = time.time()
- return self.sessions[session_id]
-
- print(f" [๐] Initializing Persistent Shell Session: {session_id}")
- # Spawn bash in a pty
- pid, fd = pty.fork()
- if pid == 0: # Child
- # Environment prep
- os.environ["TERM"] = "xterm-256color"
-
- # Change to CWD
- if cwd and os.path.exists(cwd):
- os.chdir(cwd)
-
- # Launch shell
- shell_path = "/bin/bash"
- if not os.path.exists(shell_path):
- shell_path = "/bin/sh"
- os.execv(shell_path, [shell_path, "--login"])
-
- # Parent
- # Set non-blocking
- fl = fcntl.fcntl(fd, fcntl.F_GETFL)
- fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
-
- sess = {
- "fd": fd,
- "pid": pid,
- "last_activity": time.time(),
- "buffer_file": None,
- "tail_buffer": "",
- "active_task": None
- }
-
- def reader():
- while True:
- try:
- r, _, _ = select.select([fd], [], [], 0.1)
- if fd in r:
- data = os.read(fd, 4096)
- if not data: break
-
- decoded = data.decode("utf-8", errors="replace")
-
- # Streaming/Sync logic (Detect completion marker)
- with self.lock:
- active_tid = sess.get("active_task")
- marker = sess.get("marker")
- if active_tid and marker and sess.get("buffer_file"):
- # Phase 2: Persistence Offloading
- # Write directly to disk instead of heap memory
- sess["buffer_file"].write(decoded)
- sess["buffer_file"].flush()
-
- # Keep a tiny 4KB tail in RAM for marker detection and prompt scanning
- sess["tail_buffer"] = (sess.get("tail_buffer", "") + decoded)[-4096:]
-
- if marker in sess["tail_buffer"]:
- # Marker found! Extract exit code
- try:
- # The tail buffer has the marker
- after_marker = sess["tail_buffer"].split(marker)[1].strip().split()
- exit_code = int(after_marker[0]) if after_marker else 0
-
- # Formulate final stdout summary from the disk file
- bf = sess["buffer_file"]
- bf.seek(0, 2)
- file_len = bf.tell()
-
- HEAD, TAIL = 10_000, 30_000
- if file_len > HEAD + TAIL:
- bf.seek(0)
- head_str = bf.read(HEAD)
- bf.seek(file_len - TAIL)
- tail_str = bf.read()
- omitted = file_len - HEAD - TAIL
- pure_stdout = head_str + f"\n\n[... {omitted:,} bytes omitted (full output safely preserved at {bf.name}) ...]\n\n" + tail_str
- else:
- bf.seek(0)
- pure_stdout = bf.read()
-
- # Slice off the marker string and anything after it from the final result
- pure_stdout = pure_stdout.split(marker)[0]
-
- sess["result"]["stdout"] = pure_stdout
- sess["result"]["status"] = 0 if exit_code == 0 else 1
-
- # Close the file handle (leaves file on disk)
- sess["buffer_file"].close()
- sess["buffer_file"] = None
-
- sess["event"].set()
- decoded = pure_stdout.split(marker)[0][-4096:] if marker in pure_stdout else pure_stdout
- except Exception as e:
- print(f" [๐โ ๏ธ] Marker parsing failed: {e}")
- sess["event"].set()
-
- # Stream terminal output back (with stealth filtering)
- if on_event:
- stealth_out = decoded
- if "__CORTEX_FIN_SH_" in decoded:
- import re
- # We remove any line that contains our internal marker to hide plumbing from user.
- # This covers both the initial command echo and the exit code output.
- stealth_out = re.sub(r'.*__CORTEX_FIN_SH_.*[\r\n]*', '', decoded)
-
- if stealth_out:
- # Phase 3: Client-Side Truncation (Stream Rate Limiting)
- # Limit real-time stream to 15KB/sec per session to prevent flooding the Hub over gRPC.
- # The full output is still safely written to the tempfile on disk.
- with self.lock:
- now = time.time()
- if now - sess.get("stream_window_start", 0) > 1.0:
- sess["stream_window_start"] = now
- sess["stream_bytes_sent"] = 0
- dropped = sess.get("stream_dropped_bytes", 0)
- if dropped > 0:
- drop_msg = f"\n[... {dropped:,} bytes truncated from live stream ...]\n"
- event = agent_pb2.SkillEvent(
- session_id=session_id, task_id=sess.get("active_task") or "", terminal_out=drop_msg
- )
- on_event(agent_pb2.ClientTaskMessage(skill_event=event))
- sess["stream_dropped_bytes"] = 0
-
- if sess.get("stream_bytes_sent", 0) + len(stealth_out) > 15_000:
- sess["stream_dropped_bytes"] = sess.get("stream_dropped_bytes", 0) + len(stealth_out)
- else:
- sess["stream_bytes_sent"] = sess.get("stream_bytes_sent", 0) + len(stealth_out)
- event = agent_pb2.SkillEvent(
- session_id=session_id,
- task_id=sess.get("active_task") or "",
- terminal_out=stealth_out
- )
- on_event(agent_pb2.ClientTaskMessage(skill_event=event))
-
- # EDGE INTELLIGENCE: Proactively signal prompt detection
- # We only check for prompts if we are actively running a task and haven't found the marker yet.
- if active_tid and not sess["event"].is_set():
- import re
- tail = sess["tail_buffer"][-100:] if len(sess["tail_buffer"]) > 100 else sess["tail_buffer"]
- for pattern in self.PROMPT_PATTERNS:
- if re.search(pattern, tail):
- # Send specific prompt signal
- # Use last 20 chars as the 'prompt' hint
- p_hint = tail[-20:].strip()
- prompt_event = agent_pb2.SkillEvent(
- session_id=session_id,
- task_id=active_tid,
- prompt=p_hint
- )
- on_event(agent_pb2.ClientTaskMessage(skill_event=prompt_event))
- break
- except (EOFError, OSError):
- break
-
- # Thread Cleanup
- print(f" [๐] Shell Session Terminated: {session_id}")
- with self.lock:
- self.sessions.pop(session_id, None)
-
- t = threading.Thread(target=reader, daemon=True, name=f"ShellReader-{session_id}")
- t.start()
- sess["thread"] = t
-
- self.sessions[session_id] = sess
- return sess
-
-
- def handle_transparent_tty(self, task, on_complete, on_event=None):
- """Processes raw TTY/Resize events synchronously (bypasses threadpool/sandbox)."""
- cmd = task.payload_json
- session_id = task.session_id or "default-session"
- try:
- import json
- if cmd.startswith('{') and cmd.endswith('}'):
- raw_payload = json.loads(cmd)
-
- # 1. Raw Keystroke forward
- if isinstance(raw_payload, dict) and "tty" in raw_payload:
- raw_bytes = raw_payload["tty"]
- sess = self._ensure_session(session_id, None, on_event)
- os.write(sess["fd"], raw_bytes.encode("utf-8"))
- on_complete(task.task_id, {"stdout": "", "status": 0}, task.trace_id)
- return True
-
- # 2. Window Resize
- if isinstance(raw_payload, dict) and raw_payload.get("action") == "resize":
- cols = raw_payload.get("cols", 80)
- rows = raw_payload.get("rows", 24)
- sess = self._ensure_session(session_id, None, on_event)
- import termios, struct, fcntl
- s = struct.pack('HHHH', rows, cols, 0, 0)
- fcntl.ioctl(sess["fd"], termios.TIOCSWINSZ, s)
- print(f" [๐] Terminal Resized to {cols}x{rows}")
- on_complete(task.task_id, {"stdout": f"resized to {cols}x{rows}", "status": 0}, task.trace_id)
- return True
- except Exception as pe:
- print(f" [๐] Transparent TTY Fail: {pe}")
- return False
-
- def execute(self, task, sandbox, on_complete, on_event=None):
- """Dispatches command string to the persistent PTY shell and WAITS for completion."""
- session_id = task.session_id or "default-session"
- tid = task.task_id
- try:
- cmd = task.payload_json
-
- # --- Legacy Full-Command Execution (Sandboxed) ---
- allowed, status_msg = sandbox.verify(cmd)
- if not allowed:
- err_msg = f"\r\n[System] Command blocked: {status_msg}\r\n"
- if on_event:
- event = agent_pb2.SkillEvent(
- session_id=session_id, task_id=tid,
- terminal_out=err_msg
- )
- on_event(agent_pb2.ClientTaskMessage(skill_event=event))
-
- return on_complete(tid, {"stderr": f"SANDBOX_VIOLATION: {status_msg}", "status": 2}, task.trace_id)
-
- # Resolve CWD jail
- cwd = None
- if self.sync_mgr and task.session_id:
- cwd = self.sync_mgr.get_session_dir(task.session_id)
- elif sandbox.policy.get("WORKING_DIR_JAIL"):
- cwd = sandbox.policy["WORKING_DIR_JAIL"]
- if not os.path.exists(cwd):
- try: os.makedirs(cwd, exist_ok=True)
- except: pass
-
- # Handle Session Persistent Process
- sess = self._ensure_session(session_id, cwd, on_event)
-
- # Check for RAW mode first (bypasses busy check for interactive control)
- is_raw = cmd.startswith("!RAW:")
- if is_raw:
- input_str = cmd[5:] + "\n"
- print(f" [๐โจ๏ธ] RAW Input Injection: {input_str.strip()}")
- os.write(sess["fd"], input_str.encode("utf-8"))
- return on_complete(tid, {"stdout": "INJECTED", "status": 1}, task.trace_id)
-
- # --- 0. Busy Check: Serialize access to the PTY for standard commands ---
- with self.lock:
- if sess.get("active_task"):
- curr_tid = sess.get("active_task")
- return on_complete(tid, {"stderr": f"[BUSY] Session {session_id} is already running task {curr_tid}", "status": 2}, task.trace_id)
-
- # --- Blocking Wait Logic ---
- # --- Blocking Wait Logic ---
- marker_id = int(time.time())
- marker = f"__CORTEX_FIN_SH_{marker_id}__"
- event = threading.Event()
- result_container = {"stdout": "", "status": 1} # 1 = Success by default (node.py convention)
-
- # Register waiter in session state
- with self.lock:
- sess["active_task"] = tid
- sess["marker"] = marker
- sess["event"] = event
- # Create a persistent tempfile for stdout instead of RAM buffer
- sess["buffer_file"] = tempfile.NamedTemporaryFile("w+", encoding="utf-8", prefix=f"cortex_task_{tid}_", delete=False)
- sess["tail_buffer"] = ""
- sess["result"] = result_container
- sess["cancel_event"] = threading.Event()
-
- # Input injection: execute command then echo marker and exit code
- try:
- # 12-factor bash: ( cmd ) ; echo marker $?
- # We use "" concatenation in the echo command to ensure the marker literal
- # DOES NOT appear in the PTY input echo, preventing premature completion.
- full_input = f"({cmd}) ; echo \"__CORTEX_FIN_SH_\"\"{marker_id}__\" $?\n"
- os.write(sess["fd"], full_input.encode("utf-8"))
-
- # Wait for completion (triggered by reader) OR cancellation
- timeout = (task.timeout_ms / 1000.0) if task.timeout_ms > 0 else 60.0
- start_time = time.time()
- while time.time() - start_time < timeout:
- # Check for completion (reader found marker)
- if event.is_set():
- return on_complete(tid, result_container, task.trace_id)
-
- # Check for cancellation (HUB sent cancel)
- if sess["cancel_event"].is_set():
- print(f" [๐๐] Task {tid} cancelled on node.")
- return on_complete(tid, {"stderr": "ABORTED", "status": 2}, task.trace_id)
-
- # Sleep slightly to avoid busy loop
- time.sleep(0.1)
-
- # Timeout Case
- print(f" [๐โ ๏ธ] Task {tid} timed out on node.")
- with self.lock:
- if sess.get("buffer_file"):
- try:
- sess["buffer_file"].seek(0, 2)
- file_len = sess["buffer_file"].tell()
- HEAD, TAIL = 10_000, 30_000
- if file_len > HEAD + TAIL:
- sess["buffer_file"].seek(0)
- head_str = sess["buffer_file"].read(HEAD)
- sess["buffer_file"].seek(file_len - TAIL)
- tail_str = sess["buffer_file"].read()
- omitted = file_len - HEAD - TAIL
- partial_out = head_str + f"\n\n[... {omitted:,} bytes omitted (full timeout output saved to {sess['buffer_file'].name}) ...]\n\n" + tail_str
- else:
- sess["buffer_file"].seek(0)
- partial_out = sess["buffer_file"].read()
- except:
- partial_out = ""
- else:
- partial_out = ""
-
- on_complete(tid, {"stdout": partial_out, "stderr": "TIMEOUT", "status": 2}, task.trace_id)
-
- finally:
- # Cleanup session task state
- with self.lock:
- if sess.get("active_task") == tid:
- if sess.get("buffer_file"):
- try:
- sess["buffer_file"].close()
- except: pass
- sess["buffer_file"] = None
- sess["active_task"] = None
- sess["marker"] = None
- sess["event"] = None
- sess["result"] = None
- sess["cancel_event"] = None
-
- except Exception as e:
- print(f" [๐โ] Execute Error for {tid}: {e}")
- on_complete(tid, {"stderr": str(e), "status": 2}, task.trace_id)
-
- def cancel(self, task_id: str):
- """Cancels an active task โ for persistent shell, this sends a SIGINT (Ctrl+C)."""
- with self.lock:
- for sid, sess in self.sessions.items():
- if sess.get("active_task") == task_id:
- print(f"[๐] Sending SIGINT (Ctrl+C) to shell session (Task {task_id}): {sid}")
- # Write \x03 (Ctrl+C) to the master FD
- os.write(sess["fd"], b"\x03")
- # Break the wait loop in execute thread
- if sess.get("cancel_event"):
- sess["cancel_event"].set()
- return True
-
-
- def shutdown(self):
- """Cleanup: Terminates all persistent shells."""
- with self.lock:
- for sid, sess in list(self.sessions.items()):
- print(f"[๐] Cleaning up persistent shell: {sid}")
- try: os.close(sess["fd"])
- except: pass
- # kill pid
- try: os.kill(sess["pid"], 9)
- except: pass
- self.sessions.clear()