Newer
Older
cortex-hub / tests / test_browser_service_remote.py
import grpc
import sys
import os
import uuid

# Add ai-hub to path to find protos
sys.path.append('/app/ai-hub')
from app.protos import browser_pb2, browser_pb2_grpc

def test_browser_service(host='192.168.68.113', port='50052'):
    print(f"📡 Connecting to Browser Service at {host}:{port}...")
    channel = grpc.insecure_channel(f'{host}:{port}')
    stub = browser_pb2_grpc.BrowserServiceStub(channel)
    
    session_id = f"test-session-{uuid.uuid4().hex[:8]}"
    
    try:
        # 1. Test Navigate
        print(f"🌐 Testing Navigate (Session: {session_id})...")
        nav_req = browser_pb2.NavigateRequest(
            url="https://google.com",
            session_id=session_id
        )
        resp = stub.Navigate(nav_req)
        print(f"✅ Navigate Response: Status={resp.status}, URL={resp.url}, Title='{resp.title}'")
        
        # 2. Test GetSnapshot
        print(f"📸 Testing GetSnapshot...")
        snap_req = browser_pb2.SnapshotRequest(
            session_id=session_id,
            include_dom=True
        )
        resp = stub.GetSnapshot(snap_req)
        print(f"✅ Snapshot Response: Status={resp.status}, DOM Path={resp.dom_path}")
        
        # 3. Test Close session (if implemented)
        print(f"🛑 Testing CloseSession...")
        close_req = browser_pb2.CloseRequest(session_id=session_id)
        resp = stub.CloseSession(close_req)
        print(f"✅ Close Response: Success={resp.success}")

    except grpc.RpcError as e:
        print(f"❌ gRPC Error: {e.code()} - {e.details()}")
    except Exception as e:
        print(f"❌ Error: {e}")
    finally:
        channel.close()

if __name__ == "__main__":
    host = os.getenv('REMOTE_HOST', '192.168.68.113')
    test_browser_service(host=host)