Newer
Older
cortex-hub / browser-service / src / core / browser.py
import logging
import uuid
import os
import re
import asyncio

import aiohttp
from bs4 import BeautifulSoup
from playwright.async_api import async_playwright
# from playwright_stealth import stealth

logger = logging.getLogger(__name__)

class BrowserManager:
    def __init__(self):
        self._playwright = None
        self._browser = None
        self.contexts = {} # session_id -> context
        self.pages = {}    # session_id -> page

    async def init(self):
        if self._playwright:
            try:
                await self._playwright.stop()
            except:
                pass
        self._playwright = await async_playwright().start()
        self._browser = await self._playwright.chromium.launch(
            headless=True,
            args=[
                '--no-sandbox', 
                '--disable-setuid-sandbox',
                '--disable-blink-features=AutomationControlled'
            ]
        )
        logger.info("Playwright initialized and Chromium launched with stealth args.")

    async def ensure_browser(self):
        if not self._browser or not self._browser.is_connected():
            logger.warning("Browser disconnected or uninitialized. Restarting...")
            await self.init()

    async def get_page(self, session_id):
        await self.ensure_browser()
        
        page = self.pages.get(session_id)
        is_stale = False
        if page:
            try:
                if page.is_closed():
                    is_stale = True
            except:
                is_stale = True
        else:
            is_stale = True

        if is_stale:
            logger.info(f"Session {session_id} is stale or new. Creating context...")
            if session_id in self.pages: del self.pages[session_id]
            if session_id in self.contexts: del self.contexts[session_id]
            
            context = await self._browser.new_context(
                viewport={'width': 1280, 'height': 800},
                user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
            )
            page = await context.new_page()
            
            # Apply stealth (synchronous) - Disabled temporarily due to module call error
            # playwright_stealth.stealth(page)
            
            self.contexts[session_id] = context
            self.pages[session_id] = page
                
        return page

    async def _static_fetch(self, url, timeout=15, extract_markdown=True):
        """Fast static fetch via HTTP + BeautifulSoup + markdown extraction.

        This is the fast path. If the result looks like a JS app shell, we
        fall back to Playwright rendering.
        """
        from src.extraction.markdown import MarkdownExtractor

        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
        }

        try:
            timeout_obj = aiohttp.ClientTimeout(total=timeout)
            async with aiohttp.ClientSession(timeout=timeout_obj, headers=headers) as session:
                async with session.get(url, allow_redirects=True) as resp:
                    resp.raise_for_status()
                    html = await resp.text()
        except Exception as e:
            return {"url": url, "success": False, "error": str(e), "fetch_mode": "static"}

        title = ""
        markdown = ""
        try:
            soup = BeautifulSoup(html, "html.parser")
            title_tag = soup.find("title")
            if title_tag:
                title = title_tag.get_text(strip=True)
        except Exception:
            pass

        if extract_markdown:
            try:
                extractor = MarkdownExtractor()
                markdown = extractor.extract(html)
            except Exception:
                markdown = ""

        return {
            "url": url,
            "success": True,
            "html": html,
            "title": title,
            "content_markdown": markdown,
        }

    def _needs_js_render(self, html: str, markdown: str, title: str) -> bool:
        """Heuristic to determine whether HTML likely requires JS rendering."""
        if not html:
            return True

        stripped_md = (markdown or "").strip()
        if not stripped_md or stripped_md == "No readable content found.":
            return True

        word_count = len(re.findall(r"\w+", stripped_md))
        if word_count < 40:
            return True

        try:
            soup = BeautifulSoup(html, "html.parser")

            # Typical JS app shell placeholder
            root = soup.find(id=re.compile(r"^(app|root|__next|react-app|ember-app|gatsby-root)$", re.I))
            if root and len(root.get_text(strip=True)) < 20:
                return True

            noscript = soup.find("noscript")
            if noscript:
                noscript_text = noscript.get_text(" ", strip=True)
                if re.search(r"enable javascript|javascript is required|please enable javascript", noscript_text, re.I):
                    return True
        except Exception:
            pass

        # If page is script-heavy and content is light, assume JS needed
        script_tags = re.findall(r"<script[\s>]", html, flags=re.I)
        if len(script_tags) > 10 and word_count < 100:
            return True

        return False

    async def parallel_fetch(self, urls, max_concurrent=5, extract_markdown=True):
        """Fetches multiple URLs in parallel using a pool of pages."""
        import asyncio

        await self.ensure_browser()
        semaphore = asyncio.Semaphore(max_concurrent)

        async def fetch_one(url):
            async with semaphore:
                logger.info(f"Worker fetching: {url}")

                # Fast static fetch path (no browser start)
                static_result = await self._static_fetch(url, extract_markdown=extract_markdown)
                if static_result.get("success"):
                    if not self._needs_js_render(
                        static_result.get("html", ""),
                        static_result.get("content_markdown", ""),
                        static_result.get("title", ""),
                    ):
                        logger.info(f"Static fetch sufficient for: {url} (fetch_mode=static)")
                        return {
                            "url": url,
                            "title": static_result.get("title", ""),
                            "content_markdown": static_result.get("content_markdown", ""),
                            "success": True,
                            "fetch_mode": "static",
                        }
                    else:
                        logger.info(f"Static fetch looked like JS shell, falling back to browser for: {url} (fetch_mode=js)")
                else:
                    logger.info(f"Static fetch failed for {url}: {static_result.get('error')}. Falling back to browser.")

                # Fall back to Playwright rendering
                # Separate context for each fetch for isolation
                context = await self._browser.new_context(
                    viewport={'width': 1280, 'height': 800},
                    user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
                )
                page = await context.new_page()
                try:
                    await page.goto(url, wait_until="domcontentloaded", timeout=20000)
                    await asyncio.sleep(1)  # Wait for JS dynamic content
                    title = await page.title()

                    content = ""
                    if extract_markdown:
                        html = await page.content()
                        # Keep existing extractor behavior
                        from src.extraction.markdown import MarkdownExtractor
                        extractor = MarkdownExtractor()
                        content = extractor.extract(html)

                    return {
                        "url": url,
                        "title": title,
                        "content_markdown": content,
                        "success": True,
                        "fetch_mode": "js"
                    }
                except Exception as e:
                    logger.warning(f"Failed to fetch {url}: {e} (fetch_mode=js)")
                    return {
                        "url": url,
                        "success": False,
                        "error": str(e),
                        "fetch_mode": "js"
                    }
                finally:
                    await context.close()

        tasks = [fetch_one(url) for url in urls]
        return await asyncio.gather(*tasks)

    async def close_session(self, session_id):
        if session_id in self.pages:
            await self.pages[session_id].close()
            del self.pages[session_id]
        if session_id in self.contexts:
            await self.contexts[session_id].close()
            del self.contexts[session_id]