import logging
import uuid
import os
from playwright.async_api import async_playwright
# from playwright_stealth import stealth
logger = logging.getLogger(__name__)
class BrowserManager:
def __init__(self):
self._playwright = None
self._browser = None
self.contexts = {} # session_id -> context
self.pages = {} # session_id -> page
async def init(self):
if self._playwright:
try:
await self._playwright.stop()
except:
pass
self._playwright = await async_playwright().start()
self._browser = await self._playwright.chromium.launch(
headless=True,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-blink-features=AutomationControlled'
]
)
logger.info("Playwright initialized and Chromium launched with stealth args.")
async def ensure_browser(self):
if not self._browser or not self._browser.is_connected():
logger.warning("Browser disconnected or uninitialized. Restarting...")
await self.init()
async def get_page(self, session_id):
await self.ensure_browser()
page = self.pages.get(session_id)
is_stale = False
if page:
try:
if page.is_closed():
is_stale = True
except:
is_stale = True
else:
is_stale = True
if is_stale:
logger.info(f"Session {session_id} is stale or new. Creating context...")
if session_id in self.pages: del self.pages[session_id]
if session_id in self.contexts: del self.contexts[session_id]
context = await self._browser.new_context(
viewport={'width': 1280, 'height': 800},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
)
page = await context.new_page()
# Apply stealth (synchronous) - Disabled temporarily due to module call error
# playwright_stealth.stealth(page)
self.contexts[session_id] = context
self.pages[session_id] = page
return page
async def parallel_fetch(self, urls, max_concurrent=5, extract_markdown=True):
"""Fetches multiple URLs in parallel using a pool of pages."""
import asyncio
from src.extraction.markdown import MarkdownExtractor
await self.ensure_browser()
extractor = MarkdownExtractor()
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(url):
async with semaphore:
logger.info(f"Worker fetching: {url}")
# Separate context for each fetch for isolation
context = await self._browser.new_context(
viewport={'width': 1280, 'height': 800},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
)
page = await context.new_page()
try:
await page.goto(url, wait_until="domcontentloaded", timeout=20000)
await asyncio.sleep(1) # Wait for JS dynamic content
title = await page.title()
content = ""
if extract_markdown:
html = await page.content()
content = extractor.extract(html)
return {
"url": url,
"title": title,
"content_markdown": content,
"success": True
}
except Exception as e:
logger.warning(f"Failed to fetch {url}: {e}")
return {
"url": url,
"success": False,
"error": str(e)
}
finally:
await context.close()
tasks = [fetch_one(url) for url in urls]
return await asyncio.gather(*tasks)
async def close_session(self, session_id):
if session_id in self.pages:
await self.pages[session_id].close()
del self.pages[session_id]
if session_id in self.contexts:
await self.contexts[session_id].close()
del self.contexts[session_id]