Newer
Older
cortex-hub / browser-service / src / extraction / a11y.py
import logging
import json
import os
from .scripts import INTERACTIVE_ROLES, CONTENT_ROLES, LANDMARKS, JS_A11Y_EXTRACTOR

logger = logging.getLogger(__name__)

class A11yProcessor:
    def __init__(self, page, session_id):
        self.page = page
        self.session_id = session_id

    async def get_all_elements(self):
        """Orchestrates element collection across all frames (iframes)."""
        flat_list = []
        tracker = {} # global tracker for all frames
        
        # 1. Primary Frame (Main)
        await self._process_frame(self.page.main_frame, flat_list, tracker)
        
        # 2. Child Frames
        for frame in self.page.frames:
            if frame == self.page.main_frame:
                continue
            try:
                await self._process_frame(frame, flat_list, tracker)
            except Exception as fe:
                logger.debug(f"Failed to process iframe {frame.url}: {fe}")
                
        return flat_list

    async def _process_frame(self, frame, flat_list, tracker):
        """Processes a single frame for accessibility elements."""
        frame_url = frame.url
        
        # Native A11y
        try:
            # Note: page.accessibility is global, but some elements are frame-relative.
            # In Playwright, snapshots are usually for the whole page.
            # However, if we want frame-specific, we might need JS or CDP.
            pass 
        except: pass

        # JS Extraction (The most reliable for iframes in this architecture)
        try:
            elements = await frame.evaluate(JS_A11Y_EXTRACTOR)
            if elements:
                for el in elements:
                    # Enrich with frame info
                    el["frame_url"] = frame_url
                    
                    role = el.get("role")
                    name = el.get("name")
                    
                    key = f"{role}:{name or ''}"
                    nth = tracker.get(key, 0)
                    tracker[key] = nth + 1
                    
                    ref = f"e{len(flat_list) + 1}"
                    el["ref"] = ref
                    el["nth"] = nth
                    flat_list.append(el)
        except Exception as e:
            logger.debug(f"JS extraction failed for frame {frame_url}: {e}")

    def flatten_tree(self, node, flat_list, tracker, depth=0, frame_url=None):
        """Recursively flattens a tree with frame awareness."""
        role = node.get("role")
        name = node.get("name")
        
        should_have_ref = False
        if role in INTERACTIVE_ROLES:
            should_have_ref = True
        elif role in CONTENT_ROLES and name:
            should_have_ref = True
            
        if should_have_ref:
            key = f"{role}:{name or ''}"
            nth = tracker.get(key, 0)
            tracker[key] = nth + 1
            
            ref = f"e{len(flat_list) + 1}"
            node["ref"] = ref
            node["nth"] = nth
            if frame_url:
                node["frame_url"] = frame_url
            flat_list.append(node)
        
        for child in node.get("children", []):
            self.flatten_tree(child, flat_list, tracker, depth + 1, frame_url)

    async def get_cdp_tree(self, tracker):
        """Captures AXTree via CDP for deeper interaction metadata (Main Page Only)."""
        try:
            client = await self.page.context.new_cdp_session(self.page)
            await client.send("Accessibility.enable")
            cdp_res = await client.send("Accessibility.getFullAXTree")
            cdp_nodes = cdp_res.get("nodes", [])
            
            flat_a11y = []
            if cdp_nodes:
                for node in cdp_nodes:
                    role_data = node.get("role", {})
                    name_data = node.get("name", {})
                    role = role_data.get("value") if isinstance(role_data, dict) else None
                    name = name_data.get("value") if isinstance(name_data, dict) else None
                    
                    if role and name and role not in ["WebArea", "StaticText", "GenericContainer", "List", "LayoutTable"]:
                        if role in INTERACTIVE_ROLES or role in LANDMARKS:
                            key = f"{role}:{name}"
                            nth = tracker.get(key, 0)
                            tracker[key] = nth + 1
                            
                            flat_a11y.append({
                                "role": role,
                                "name": name,
                                "nth": nth,
                                "backendDOMNodeId": node.get("backendDOMNodeId")
                            })
            return flat_a11y
        except Exception as e:
            logger.warning(f"CDP a11y capture failed: {e}")
            return []

    async def get_js_fallback(self):
        """Spatial element discovery via JS walking (Main Page Only)."""
        try:
            return await self.page.evaluate(JS_A11Y_EXTRACTOR)
        except Exception as e:
            logger.warning(f"JS A11y extraction failed: {e}")
            return []