import asyncio
import os
import sys
# Add app to path to import protos
sys.path.append(os.path.join(os.getcwd(), "ai-hub"))
from app.core.services.browser_client import BrowserServiceClient
async def test_comprehensive_browser_flow():
print("🚀 Starting Comprehensive Browser Service Test...")
# Connect to the remote production browser service
endpoint = "192.168.68.113:50052"
client = BrowserServiceClient(endpoint=endpoint)
session_id = f"comp-test-{os.urandom(4).hex()}"
async def on_event(event):
if event["type"] == "subagent_thought":
print(f" [STREAM] {event['content']}")
try:
# 1. Navigate
print(f"\n1️⃣ Action: Navigate to Google")
res = await client.navigate("https://www.google.com", session_id=session_id, on_event=on_event)
print(f" Success: {res.get('success')}, Title: {res.get('title')}")
# 2. Type & Search
print(f"\n2️⃣ Action: Type & Search")
res = await client.type("latest AI news", selector="textarea[name='q']", session_id=session_id, on_event=on_event)
print(f" Success: {res.get('success')}, URL now: {res.get('url')}")
# 3. Snapshot with Screenshot
print(f"\n3️⃣ Action: Get Snapshot (with screenshot & A11y)")
res = await client.get_snapshot(session_id=session_id, on_event=on_event)
print(f" Success: {res.get('success')}")
print(f" A11y Path: {res.get('a11y_path')}")
print(f" Screenshot URL: {res.get('screenshot_url')}")
if res.get("_screenshot_bytes"):
print(f" Captured {len(res['_screenshot_bytes'])} bytes of screenshot data.")
# 4. Scroll
print(f"\n4️⃣ Action: Scroll Down")
res = await client.scroll(delta_y=500, session_id=session_id, on_event=on_event)
print(f" Success: {res.get('success')}")
# 5. Eval
print(f"\n5️⃣ Action: Evaluate JS")
res = await client.eval("document.title", session_id=session_id, on_event=on_event)
print(f" Success: {res.get('success')}, Eval Result: {res.get('eval_result')}")
except Exception as e:
print(f"❌ Test Failed: {e}")
import traceback
traceback.print_exc()
finally:
print(f"\n🛑 Cleanup...")
try:
from app.protos import browser_pb2
await client.stub.CloseSession(browser_pb2.CloseRequest(session_id=session_id))
print("✅ Session closed.")
except Exception as e:
print(f"⚠️ Cleanup failed: {e}")
if __name__ == "__main__":
asyncio.run(test_comprehensive_browser_flow())