Newer
Older
cortex-hub / tests / test_browser_features.py
import asyncio
import os
import sys

# Add app to path to import protos
sys.path.append(os.path.join(os.getcwd(), "ai-hub"))

from app.core.services.browser_client import BrowserServiceClient

async def test_comprehensive_browser_flow():
    print("🚀 Starting Comprehensive Browser Service Test...")
    
    # Connect to the remote production browser service
    endpoint = "192.168.68.113:50052"
    client = BrowserServiceClient(endpoint=endpoint)
    session_id = f"comp-test-{os.urandom(4).hex()}"
    
    async def on_event(event):
        if event["type"] == "subagent_thought":
            print(f"  [STREAM] {event['content']}")

    try:
        # 1. Navigate
        print(f"\n1️⃣ Action: Navigate to Google")
        res = await client.navigate("https://www.google.com", session_id=session_id, on_event=on_event)
        print(f"   Success: {res.get('success')}, Title: {res.get('title')}")

        # 2. Type & Search
        print(f"\n2️⃣ Action: Type & Search")
        res = await client.type("latest AI news", selector="textarea[name='q']", session_id=session_id, on_event=on_event)
        print(f"   Success: {res.get('success')}, URL now: {res.get('url')}")

        # 3. Snapshot with Screenshot
        print(f"\n3️⃣ Action: Get Snapshot (with screenshot & A11y)")
        res = await client.get_snapshot(session_id=session_id, on_event=on_event)
        print(f"   Success: {res.get('success')}")
        print(f"   A11y Path: {res.get('a11y_path')}")
        print(f"   Screenshot URL: {res.get('screenshot_url')}")
        if res.get("_screenshot_bytes"):
            print(f"   Captured {len(res['_screenshot_bytes'])} bytes of screenshot data.")

        # 4. Scroll
        print(f"\n4️⃣ Action: Scroll Down")
        res = await client.scroll(delta_y=500, session_id=session_id, on_event=on_event)
        print(f"   Success: {res.get('success')}")

        # 5. Eval
        print(f"\n5️⃣ Action: Evaluate JS")
        res = await client.eval("document.title", session_id=session_id, on_event=on_event)
        print(f"   Success: {res.get('success')}, Eval Result: {res.get('eval_result')}")

    except Exception as e:
        print(f"❌ Test Failed: {e}")
        import traceback
        traceback.print_exc()
    finally:
        print(f"\n🛑 Cleanup...")
        try:
            from app.protos import browser_pb2
            await client.stub.CloseSession(browser_pb2.CloseRequest(session_id=session_id))
            print("✅ Session closed.")
        except Exception as e:
            print(f"⚠️ Cleanup failed: {e}")

if __name__ == "__main__":
    asyncio.run(test_comprehensive_browser_flow())