import threading
import queue
import time
import json
import re
try:
from playwright.sync_api import sync_playwright
except ImportError:
sync_playwright = None
from agent_node.skills.base import BaseSkill
from protos import agent_pb2
try:
from agent_node import config as node_config
except ImportError:
node_config = None
# ============================================================
# Role-Ref Registry
# Inspired by Openclaw's pw-role-snapshot.ts
# Maps `ref=eN` shorthand -> (role, name, nth) for every
# interactive / content element on the last snapshotted page.
# ============================================================
INTERACTIVE_ROLES = {
"button", "link", "textbox", "checkbox", "radio", "combobox",
"listbox", "menuitem", "menuitemcheckbox", "menuitemradio",
"option", "searchbox", "slider", "spinbutton", "switch", "tab", "treeitem",
}
CONTENT_ROLES = {
"heading", "cell", "gridcell", "columnheader", "rowheader",
"listitem", "article", "region", "main", "navigation",
}
STRUCTURAL_ROLES = {
"generic", "group", "list", "table", "row", "rowgroup", "grid",
"treegrid", "menu", "menubar", "toolbar", "tablist", "tree",
"directory", "document", "application", "presentation", "none",
}
def _build_aria_snapshot(aria_text: str) -> tuple[str, dict]:
"""
Parse Playwright's ariaSnapshot() output and annotate interactive/content
elements with stable [ref=eN] labels that the AI can refer back to.
Returns (annotated_snapshot, ref_map).
"""
lines = aria_text.split("\n")
refs = {}
counter = [0]
role_counts = {} # (role, name) -> count (for nth disambiguation)
output_lines = []
def next_ref():
counter[0] += 1
return f"e{counter[0]}"
for line in lines:
m = re.match(r'^(\s*-\s*)(\w+)(?:\s+"([^"]*)")?(.*)$', line)
if not m:
output_lines.append(line)
continue
prefix, role_raw, name, suffix = m.group(1), m.group(2), m.group(3), m.group(4)
role = role_raw.lower()
is_interactive = role in INTERACTIVE_ROLES
is_content_with_name = role in CONTENT_ROLES and name
if not (is_interactive or is_content_with_name):
output_lines.append(line)
continue
# assign ref
ref = next_ref()
key = (role, name)
nth = role_counts.get(key, 0)
role_counts[key] = nth + 1
refs[ref] = {"role": role, "name": name, "nth": nth if nth > 0 else None}
enhanced = f"{prefix}{role_raw}"
if name:
enhanced += f' "{name}"'
enhanced += f" [ref={ref}]"
if nth > 0:
enhanced += f" [nth={nth}]"
if suffix:
enhanced += suffix
output_lines.append(enhanced)
return "\n".join(output_lines), refs
def _resolve_ref(page, ref: str, role_refs: dict):
"""Resolve a [ref=eN] string to a Playwright Locator."""
info = role_refs.get(ref)
if not info:
raise ValueError(f"Unknown ref '{ref}'. Run aria_snapshot first and use a ref from that output.")
role = info["role"]
name = info.get("name")
nth = info.get("nth") or 0
if name:
loc = page.get_by_role(role, name=name, exact=True)
else:
loc = page.get_by_role(role)
if nth:
loc = loc.nth(nth)
return loc
class BrowserSkill(BaseSkill):
"""
Persistent Browser Skill — OpenClaw-inspired role-snapshot architecture.
Key innovation over the prior version:
- `aria_snapshot` action returns a compact semantic role tree with [ref=eN] labels.
- All `click`, `type`, `hover` actions accept either a CSS/XPath selector OR a
ref string like 'e3', enabling the AI to address elements without fragile selectors.
- Page errors and console output are tracked per-session and included in results.
- Supports headed (UI visible) mode for OIDC/OAuth login flows, toggled live from the hub.
"""
def __init__(self, sync_mgr=None):
self.task_queue = queue.Queue()
# session_id -> { "context", "page", "role_refs", "console", "errors", "download_dir" }
self.sessions = {}
self.sync_mgr = sync_mgr
self.lock = threading.Lock()
self._headless = self._read_headless_config()
self._actor_thread = threading.Thread(target=self._browser_actor, daemon=True, name="BrowserActor")
self._actor_thread.start()
def is_available(self) -> bool:
return sync_playwright is not None
def _read_headless_config(self) -> bool:
"""Read headless preference from node config (default: True)."""
if node_config:
return getattr(node_config, 'BROWSER_HEADLESS', True)
return True
def apply_config(self, skill_config: dict):
"""
Called by the node when the Hub pushes a skill config update.
skill_config example: {"browser": {"headless": false}}
If headless mode changed, gracefully restarts the browser engine.
"""
browser_cfg = skill_config.get("browser", {})
if sync_playwright is None:
return
# Also respect the global node_config toggle
new_headless = browser_cfg.get("headless", self._read_headless_config())
if new_headless == self._headless:
# Only log if specifically requested to headed mode or if we are indeed headed
if not self._headless:
print(f" [🌐] Browser mode remains: headed")
return # No change
mode_str = "headless -> headed" if not new_headless else "headed -> headless"
print(f" [🌐] Browser mode changing: {mode_str}")
self._headless = new_headless
# Signal the actor to restart with the new mode
self.task_queue.put("__restart__")
# ------------------------------------------------------------------
# Session Management
# ------------------------------------------------------------------
def _get_or_create_session(self, browser, sid, task, on_event):
"""Return existing session dict or create a new one."""
with self.lock:
if sid in self.sessions:
return self.sessions[sid]
download_dir = None
if self.sync_mgr and task.session_id:
download_dir = self.sync_mgr.get_session_dir(task.session_id)
print(f" [🌐📁] Mapping Browser Context to: {download_dir}")
ctx = browser.new_context(accept_downloads=True)
page = ctx.new_page()
sess = {
"context": ctx,
"page": page,
"role_refs": {}, # ref -> {role, name, nth}
"console": [],
"errors": [],
"download_dir": download_dir,
}
self.sessions[sid] = sess
# Listeners
self._attach_listeners(sid, page, on_event, sess)
return sess
def _attach_listeners(self, sid, page, on_event, sess):
# Console log capture
def _on_console(msg):
entry = {"level": msg.type, "text": msg.text, "ts": int(time.time() * 1000)}
sess["console"].append(entry)
if len(sess["console"]) > 200:
sess["console"].pop(0)
if on_event:
on_event(agent_pb2.BrowserEvent(
session_id=sid,
console_msg=agent_pb2.ConsoleMessage(
level=msg.type, text=msg.text, timestamp_ms=entry["ts"]
)
))
def _on_page_error(err):
sess["errors"].append({"message": str(err), "ts": int(time.time() * 1000)})
if len(sess["errors"]) > 100:
sess["errors"].pop(0)
def _on_network(req):
resp = req.response()
if on_event:
on_event(agent_pb2.BrowserEvent(
session_id=sid,
network_req=agent_pb2.NetworkRequest(
method=req.method, url=req.url,
status=resp.status if resp else 0,
resource_type=req.resource_type, latency_ms=0
)
))
def _on_download(dl):
import os
with self.lock:
s = self.sessions.get(sid)
if s and s.get("download_dir"):
os.makedirs(s["download_dir"], exist_ok=True)
target = os.path.join(s["download_dir"], dl.suggested_filename)
print(f" [🌐📥] Download: {dl.suggested_filename} -> {target}")
dl.save_as(target)
page.on("console", _on_console)
page.on("pageerror", _on_page_error)
page.on("requestfinished", _on_network)
page.on("download", _on_download)
# ------------------------------------------------------------------
# Browser Actor Loop
# ------------------------------------------------------------------
def _browser_actor(self):
"""
Immortal worker thread for browser operations.
Processes the task queue and manages the Playwright lifecycle.
"""
if sync_playwright is None:
return
print("[🌐] Browser Actor Starting...", flush=True)
pw = browser = None
def _cleanup_internal():
nonlocal pw, browser
print("[🌐] Cleaning up Browser Engine...", flush=True)
with self.lock:
for s in self.sessions.values():
try: s["context"].close()
except: pass
self.sessions.clear()
if browser:
try: browser.close()
except: pass
if pw:
try: pw.stop()
except: pass
browser = pw = None
while True:
item = self.task_queue.get()
if item is None:
_cleanup_internal()
print("[🌐] Browser Actor Shutting Down.", flush=True)
break
# Handle Restart Signal
if item == "__restart__":
print("[🌐] Browser Actor Restarting (Mode Change)...", flush=True)
_cleanup_internal()
continue
task, sandbox, on_complete, on_event = item
# --- Lazy Initialization ---
if not pw or not browser:
try:
pw = sync_playwright().start()
headless = self._headless
launch_mode = "headless" if headless else "headed"
print(f"[🌐] Launching Chromium in {launch_mode} mode...", flush=True)
args = ['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage']
if headless: args.append('--disable-gpu')
browser = pw.chromium.launch(headless=headless, args=args)
print("[🌐] Browser Engine Online.", flush=True)
except Exception as e:
print(f"[!] Browser Setup Failed: {e}", flush=True)
# Report back to AI immediately so it doesn't hang
on_complete(task.task_id, {"stderr": f"Playwright/Chromium Error: {e}", "status": 1}, task.trace_id)
_cleanup_internal()
continue
# --- Task Execution ---
try:
action = task.browser_action
sid = action.session_id or "default"
action_name = agent_pb2.BrowserAction.ActionType.Name(action.action)
print(f" [🌐] {action_name} | Session: {sid}", flush=True)
sess = self._get_or_create_session(browser, sid, task, on_event)
page = sess.get("page")
res_data = {}
try:
self._dispatch_action(action, page, sess, res_data)
# Support for offloading large results to files
self._maybe_offload(sess, res_data, on_event)
except Exception as e:
on_complete(task.task_id, {"stderr": str(e), "status": 1}, task.trace_id)
continue
try:
# Build BrowserResponse
br_res = agent_pb2.BrowserResponse(
url=page.url if page else "",
title=page.title() if page else "",
snapshot=res_data.get("snapshot", b""),
dom_content=res_data.get("dom_content", ""),
a11y_tree=res_data.get("a11y_tree", ""),
eval_result=res_data.get("eval_result", ""),
offloaded=res_data.get("offloaded", False),
)
on_complete(task.task_id, {"status": 0, "browser_result": br_res}, task.trace_id)
except Exception as ex:
print(f" [!] Error building response: {ex}", flush=True)
on_complete(task.task_id, {"stderr": f"Result parsing error: {ex}", "status": 1}, task.trace_id)
except Exception as e:
print(f" [!] Browser Actor Loop Exception: {e}", flush=True)
try:
on_complete(task.task_id, {"stderr": f"Actor internal error: {e}", "status": 1}, task.trace_id)
except:
pass
def _maybe_offload(self, sess, res_data, on_event):
"""Offload large strings/bytes to files in the sync directory."""
download_dir = sess.get("download_dir")
if not download_dir:
return
import os
import hashlib
offload_dir = os.path.join(download_dir, ".cortex_browser")
os.makedirs(offload_dir, exist_ok=True)
OFFLOAD_THRESHOLD = 128 * 1024 # 128KB
offloaded = False
files_to_push = []
# 1. Aria Snapshot
a11y = res_data.get("a11y_tree", "")
if a11y and len(a11y) > OFFLOAD_THRESHOLD:
rel_path = ".cortex_browser/last_a11y.txt"
abs_path = os.path.join(offload_dir, "last_a11y.txt")
with open(abs_path, "w") as f:
f.write(a11y)
res_data["a11y_tree"] = f"[OFFLOADED to {rel_path}]"
files_to_push.append((rel_path, a11y.encode('utf-8')))
offloaded = True
# 2. DOM Content
dom = res_data.get("dom_content", "")
if dom and len(dom) > OFFLOAD_THRESHOLD:
rel_path = ".cortex_browser/last_dom.html"
abs_path = os.path.join(offload_dir, "last_dom.html")
with open(abs_path, "w") as f:
f.write(dom)
res_data["dom_content"] = f"[OFFLOADED to {rel_path}]"
files_to_push.append((rel_path, dom.encode('utf-8')))
offloaded = True
# 3. Screenshot
snap = res_data.get("snapshot", b"")
if snap and len(snap) > OFFLOAD_THRESHOLD:
rel_path = ".cortex_browser/last_screenshot.png"
abs_path = os.path.join(offload_dir, "last_screenshot.png")
with open(abs_path, "wb") as f:
f.write(snap)
res_data["snapshot"] = f"[OFFLOADED to {rel_path}]".encode('utf-8')
files_to_push.append((rel_path, snap))
offloaded = True
if offloaded:
res_data["offloaded"] = True
# Proactively push files via event bus so they reach Hub BEFORE TaskResponse
if on_event:
for rel_p, data in files_to_push:
self._push_sync_event(sess, rel_p, data, on_event)
print(f" [🌐📁] Browser Result Offloaded and Synced to Hub: {offload_dir}")
def _push_sync_event(self, sess, rel_path, data, on_event):
"""Manually chunk data into FileSyncMessage events to ensure Hub-side availability."""
import hashlib
full_hash = hashlib.sha256(data).hexdigest()
chunk_size = 1024 * 256 # 256KB chunks
# We need the session_id that the node uses for syncing
# In BrowserSkill, sess["download_dir"] is SYNC_DIR / session_id
# We can extract session_id back from download_dir or just pass it in
download_dir = sess.get("download_dir")
session_id = os.path.basename(download_dir)
t_id = f"br-sync-{int(time.time()*1000)}"
for i in range(0, len(data), chunk_size):
chunk = data[i:i+chunk_size]
is_final = (i + chunk_size) >= len(data)
payload = agent_pb2.FilePayload(
path=rel_path,
chunk=chunk,
chunk_index=i // chunk_size,
is_final=is_final,
hash=full_hash if is_final else ""
)
msg = agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
file_data=payload,
task_id=t_id
)
)
on_event(msg)
# ------------------------------------------------------------------
# Action Dispatcher
# ------------------------------------------------------------------
def _get_aria_snapshot_safe(self, page):
"""Safe aria_snapshot with fallback for older Playwright versions."""
try:
if hasattr(page.locator(":root"), "aria_snapshot"):
# Use a locator-level timeout of 10s to prevent hanging the actor thread
return page.locator(":root").aria_snapshot(timeout=10000)
# Fallback for Playwright < 1.44
print(" [🌐⚠️] aria_snapshot() not available in this Playwright version. Using fallback.")
return "- application \"Browser\"\n - generic \"Accessibility Tree Unavailable (Playwright < 1.44)\""
except Exception as e:
print(f" [🌐❌] Error generating aria snapshot: {e}")
return f"- error \"Failed to generate accessibility tree (Timeout or version issue): {e}\""
def _dispatch_action(self, action, page, sess, res_data):
A = agent_pb2.BrowserAction
role_refs = sess["role_refs"]
def resolve(selector_or_ref: str):
"""Accept either a CSS selector or a ref like 'e3'."""
s = (selector_or_ref or "").strip()
if re.match(r'^e\d+$', s):
return _resolve_ref(page, s, role_refs)
return page.locator(s)
if action.action == A.NAVIGATE:
# PERFORMANCE: Remove auto-snapshot on navigation for Mac Mini stability.
# Large Aria trees cause massive CPU bursts and gRPC timeouts.
page.goto(action.url, wait_until="domcontentloaded", timeout=45000)
res_data["eval_result"] = "Navigation successful."
elif action.action == A.CLICK:
target = action.selector or ""
resolve(target).click(timeout=8000)
elif action.action == A.TYPE:
target = action.selector or ""
resolve(target).fill(action.text, timeout=8000)
elif action.action == A.SCREENSHOT:
res_data["snapshot"] = page.screenshot(full_page=False)
elif action.action == A.GET_DOM:
res_data["dom_content"] = page.content()
elif action.action == A.HOVER:
target = action.selector or ""
resolve(target).hover(timeout=5000)
elif action.action == A.SCROLL:
page.mouse.wheel(x=0, y=action.y or 400)
elif action.action == A.EVAL:
result = page.evaluate(action.text)
res_data["eval_result"] = str(result)
elif action.action == A.GET_A11Y:
aria_raw = self._get_aria_snapshot_safe(page)
snap, refs = _build_aria_snapshot(aria_raw)
sess["role_refs"] = refs
# Trim large snapshots
res_data["a11y_tree"] = snap[:12000]
stats = {
"total_refs": len(refs),
"url": page.url,
"title": page.title(),
}
res_data["eval_result"] = json.dumps(stats)
# Trim large snapshots (news pages can be huge)
MAX = 10000
if len(snap) > MAX:
snap = snap[:MAX] + "\n\n[...snapshot truncated - use eval/scroll to see more...]"
stats = {
"total_refs": len(refs),
"interactive": sum(1 for r in refs.values() if r["role"] in INTERACTIVE_ROLES),
"url": page.url,
"title": page.title(),
}
res_data["a11y_tree"] = snap
res_data["eval_result"] = json.dumps(stats)
elif action.action == A.CLOSE:
with self.lock:
s = self.sessions.pop(action.session_id or "default", None)
if s:
s["context"].close()
# ------------------------------------------------------------------
# Public Interface
# ------------------------------------------------------------------
def execute(self, task, sandbox, on_complete, on_event=None):
self.task_queue.put((task, sandbox, on_complete, on_event))
def cancel(self, task_id):
return False
def shutdown(self):
self.task_queue.put(None)