import logging
import uuid
import os
import re
import asyncio
import aiohttp
from bs4 import BeautifulSoup
from playwright.async_api import async_playwright
# from playwright_stealth import stealth
logger = logging.getLogger(__name__)
class BrowserManager:
def __init__(self):
self._playwright = None
self._browser = None
self.contexts = {} # session_id -> context
self.pages = {} # session_id -> page
async def init(self):
if self._playwright:
try:
await self._playwright.stop()
except:
pass
self._playwright = await async_playwright().start()
self._browser = await self._playwright.chromium.launch(
headless=True,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-blink-features=AutomationControlled'
]
)
logger.info("Playwright initialized and Chromium launched with stealth args.")
async def ensure_browser(self):
if not self._browser or not self._browser.is_connected():
logger.warning("Browser disconnected or uninitialized. Restarting...")
await self.init()
async def get_page(self, session_id):
await self.ensure_browser()
page = self.pages.get(session_id)
is_stale = False
if page:
try:
if page.is_closed():
is_stale = True
except:
is_stale = True
else:
is_stale = True
if is_stale:
logger.info(f"Session {session_id} is stale or new. Creating context...")
if session_id in self.pages: del self.pages[session_id]
if session_id in self.contexts: del self.contexts[session_id]
context = await self._browser.new_context(
viewport={'width': 1280, 'height': 800},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
)
page = await context.new_page()
# Apply stealth (synchronous) - Disabled temporarily due to module call error
# playwright_stealth.stealth(page)
self.contexts[session_id] = context
self.pages[session_id] = page
return page
async def _static_fetch(self, url, timeout=15, extract_markdown=True):
"""Fast static fetch via HTTP + BeautifulSoup + markdown extraction.
This is the fast path. If the result looks like a JS app shell, we
fall back to Playwright rendering.
"""
from src.extraction.markdown import MarkdownExtractor
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
}
try:
timeout_obj = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(timeout=timeout_obj, headers=headers) as session:
async with session.get(url, allow_redirects=True) as resp:
resp.raise_for_status()
html = await resp.text()
except Exception as e:
return {"url": url, "success": False, "error": str(e), "fetch_mode": "static"}
title = ""
markdown = ""
try:
soup = BeautifulSoup(html, "html.parser")
title_tag = soup.find("title")
if title_tag:
title = title_tag.get_text(strip=True)
except Exception:
pass
if extract_markdown:
try:
extractor = MarkdownExtractor()
markdown = extractor.extract(html)
except Exception:
markdown = ""
return {
"url": url,
"success": True,
"html": html,
"title": title,
"content_markdown": markdown,
}
def _needs_js_render(self, html: str, markdown: str, title: str) -> bool:
"""Heuristic to determine whether HTML likely requires JS rendering."""
if not html:
return True
stripped_md = (markdown or "").strip()
if not stripped_md or stripped_md == "No readable content found.":
return True
word_count = len(re.findall(r"\w+", stripped_md))
if word_count < 40:
return True
try:
soup = BeautifulSoup(html, "html.parser")
# Typical JS app shell placeholder
root = soup.find(id=re.compile(r"^(app|root|__next|react-app|ember-app|gatsby-root)$", re.I))
if root and len(root.get_text(strip=True)) < 20:
return True
noscript = soup.find("noscript")
if noscript:
noscript_text = noscript.get_text(" ", strip=True)
if re.search(r"enable javascript|javascript is required|please enable javascript", noscript_text, re.I):
return True
except Exception:
pass
# If page is script-heavy and content is light, assume JS needed
script_tags = re.findall(r"<script[\s>]", html, flags=re.I)
if len(script_tags) > 10 and word_count < 100:
return True
return False
async def parallel_fetch(self, urls, max_concurrent=5, extract_markdown=True):
"""Fetches multiple URLs in parallel using a pool of pages."""
import asyncio
await self.ensure_browser()
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(url):
async with semaphore:
logger.info(f"Worker fetching: {url}")
# Fast static fetch path (no browser start)
static_result = await self._static_fetch(url, extract_markdown=extract_markdown)
if static_result.get("success"):
if not self._needs_js_render(
static_result.get("html", ""),
static_result.get("content_markdown", ""),
static_result.get("title", ""),
):
logger.info(f"Static fetch sufficient for: {url} (fetch_mode=static)")
return {
"url": url,
"title": static_result.get("title", ""),
"content_markdown": static_result.get("content_markdown", ""),
"success": True,
"fetch_mode": "static",
}
else:
logger.info(f"Static fetch looked like JS shell, falling back to browser for: {url} (fetch_mode=js)")
else:
logger.info(f"Static fetch failed for {url}: {static_result.get('error')}. Falling back to browser.")
# Fall back to Playwright rendering
# Separate context for each fetch for isolation
context = await self._browser.new_context(
viewport={'width': 1280, 'height': 800},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
)
page = await context.new_page()
try:
await page.goto(url, wait_until="domcontentloaded", timeout=20000)
await asyncio.sleep(1) # Wait for JS dynamic content
title = await page.title()
content = ""
if extract_markdown:
html = await page.content()
# Keep existing extractor behavior
from src.extraction.markdown import MarkdownExtractor
extractor = MarkdownExtractor()
content = extractor.extract(html)
return {
"url": url,
"title": title,
"content_markdown": content,
"success": True,
"fetch_mode": "js"
}
except Exception as e:
logger.warning(f"Failed to fetch {url}: {e} (fetch_mode=js)")
return {
"url": url,
"success": False,
"error": str(e),
"fetch_mode": "js"
}
finally:
await context.close()
tasks = [fetch_one(url) for url in urls]
return await asyncio.gather(*tasks)
async def close_session(self, session_id):
if session_id in self.pages:
await self.pages[session_id].close()
del self.pages[session_id]
if session_id in self.contexts:
await self.contexts[session_id].close()
del self.contexts[session_id]