import threading
import queue
import time
import json
from playwright.sync_api import sync_playwright
from agent_node.skills.base import BaseSkill
from protos import agent_pb2
class BrowserSkill(BaseSkill):
"""The 'Antigravity Bridge': Persistent Browser Skill using a dedicated Actor thread."""
def __init__(self):
self.task_queue = queue.Queue()
self.sessions = {} # session_id -> { "context": Context, "page": Page }
self.lock = threading.Lock()
threading.Thread(target=self._browser_actor, daemon=True, name="BrowserActor").start()
def _setup_listeners(self, sid, page, on_event):
"""Tunnels browser internal events back to the Orchestrator."""
if not on_event: return
# Live Console Redirector
page.on("console", lambda msg: on_event(agent_pb2.BrowserEvent(
session_id=sid, console_msg=agent_pb2.ConsoleMessage(
level=msg.type, text=msg.text, timestamp_ms=int(time.time()*1000)
)
)))
# Live Network Redirector
page.on("requestfinished", lambda req: on_event(agent_pb2.BrowserEvent(
session_id=sid, network_req=agent_pb2.NetworkRequest(
method=req.method, url=req.url, status=req.response().status if req.response() else 0,
resource_type=req.resource_type, latency_ms=0
)
)))
def _browser_actor(self):
"""Serializes all Playwright operations on a single dedicated thread."""
print("[🌐] Browser Actor Starting...", flush=True)
pw = None
browser = None
try:
pw = sync_playwright().start()
# 12-Factor/Container Optimization: Standard non-sandbox arguments
browser = pw.chromium.launch(headless=True, args=[
'--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-gpu'
])
print("[🌐] Browser Engine Online.", flush=True)
except Exception as e:
print(f"[!] Browser Actor Startup Fail: {e}", flush=True)
if pw: pw.stop()
return
while True:
try:
item = self.task_queue.get()
if item is None: # Sentinel for shutdown
print("[🌐] Browser Actor Shutting Down...", flush=True)
break
task, sandbox, on_complete, on_event = item
action = task.browser_action
sid = action.session_id or "default"
with self.lock:
if sid not in self.sessions:
ctx = browser.new_context()
pg = ctx.new_page()
self._setup_listeners(sid, pg, on_event)
self.sessions[sid] = {"context": ctx, "page": pg}
page = self.sessions[sid]["page"]
print(f" [🌐] Browser Actor Processing: {agent_pb2.BrowserAction.ActionType.Name(action.action)} | Session: {sid}", flush=True)
res_data = {}
# State-Machine Logic for Actions
if action.action == agent_pb2.BrowserAction.NAVIGATE:
page.goto(action.url, wait_until="commit")
elif action.action == agent_pb2.BrowserAction.CLICK:
page.click(action.selector)
elif action.action == agent_pb2.BrowserAction.TYPE:
page.fill(action.selector, action.text)
elif action.action == agent_pb2.BrowserAction.SCREENSHOT:
res_data["snapshot"] = page.screenshot()
elif action.action == agent_pb2.BrowserAction.GET_DOM:
res_data["dom_content"] = page.content()
elif action.action == agent_pb2.BrowserAction.HOVER:
page.hover(action.selector)
elif action.action == agent_pb2.BrowserAction.SCROLL:
page.mouse.wheel(x=0, y=action.y)
elif action.action == agent_pb2.BrowserAction.EVAL:
res_data["eval_result"] = str(page.evaluate(action.text))
elif action.action == agent_pb2.BrowserAction.GET_A11Y:
res_data["a11y_tree"] = json.dumps(page.accessibility.snapshot())
elif action.action == agent_pb2.BrowserAction.CLOSE:
with self.lock:
sess = self.sessions.pop(sid, None)
if sess: sess["context"].close()
# Results Construction
br_res = agent_pb2.BrowserResponse(
url=page.url, title=page.title(),
snapshot=res_data.get("snapshot", b""),
dom_content=res_data.get("dom_content", ""),
a11y_tree=res_data.get("a11y_tree", ""),
eval_result=res_data.get("eval_result", "")
)
on_complete(task.task_id, {"status": 1, "browser_result": br_res}, task.trace_id)
except Exception as e:
print(f" [!] Browser Actor Error: {e}", flush=True)
on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id)
# Cleanup on loop exit
print("[🌐] Cleaning up Browser Engine...", flush=True)
with self.lock:
for s in self.sessions.values():
try: s["context"].close()
except: pass
self.sessions.clear()
if browser: browser.close()
if pw: pw.stop()
def execute(self, task, sandbox, on_complete, on_event=None):
self.task_queue.put((task, sandbox, on_complete, on_event))
def cancel(self, task_id): return False
def shutdown(self):
"""Triggers graceful shutdown of the browser engine."""
self.task_queue.put(None)