import asyncio
import os
import sys
# Add app to path to import protos
sys.path.append(os.path.join(os.getcwd(), "ai-hub"))
from app.core.services.browser_client import BrowserServiceClient
async def test_browser_flow():
print("š Starting AI-Hub Mock Flow Test...")
# Connect to the remote production browser service
endpoint = "192.168.68.113:50052"
client = BrowserServiceClient(endpoint=endpoint)
session_id = f"test-ai-flow-{os.urandom(4).hex()}"
# Define a simple event handler to capture "thoughts"
async def on_event(event):
if event["type"] == "subagent_thought":
print(f" [STREAM] {event['content']}")
try:
# Step 1: Browse Wikipedia (Less likely to trigger captcha than Google search)
print(f"\n1ļøā£ Action: Navigate to Wikipedia")
nav_res = await client.navigate("https://en.wikipedia.org/wiki/San_Jose,_California", session_id=session_id, on_event=on_event)
print(f"ā
Navigate Result: {nav_res.get('title')} (Success: {nav_res.get('success')})")
# Step 2: Get Snapshot (Mimic AI understanding the page)
print(f"\n2ļøā£ Action: Get Page Snapshot")
snap_res = await client.get_snapshot(session_id=session_id, on_event=on_event)
if snap_res.get("success"):
print(f"ā
Snapshot successful.")
print(f" - Title: {snap_res.get('title')}")
print(f" - URL: {snap_res.get('url')}")
print(f" - DOM Path (Remote): {snap_res.get('dom_path')}")
# Note: We can't read SHM locally from a remote service unless we use FS sync
# But we can verify the service reported a valid path.
if snap_res.get('dom_path'):
print("⨠Service generated a DOM snapshot path.")
else:
print(f"ā Snapshot failed: {snap_res.get('error')}")
# Step 3: Extract specifics (Mimic AI eval)
# Note: server.py currently doesn't implement 'eval' via gRPC Navigate/Snapshot
# But we can check if Navigate returned a title.
except Exception as e:
print(f"ā Test Crashed: {e}")
finally:
# Step 4: Cleanup
print(f"\nš Cleaning up session...")
try:
from app.protos import browser_pb2
await client.stub.CloseSession(browser_pb2.CloseRequest(session_id=session_id))
print("ā
Session closed.")
except Exception as e:
print(f"ā ļø Close session failed: {e}")
if __name__ == "__main__":
asyncio.run(test_browser_flow())