Newer
Older
cortex-hub / ai-hub / integration_tests / conftest.py
import httpx
import pytest_asyncio

BASE_URL = "http://127.0.0.1:8001"

@pytest_asyncio.fixture(scope="session")
def base_url():
    """Fixture to provide the base URL for the tests."""
    return BASE_URL

@pytest_asyncio.fixture(scope="function")
async def http_client():
    """
    Fixture to provide an async HTTP client for all tests in the session.
    A new client is created and closed properly using a try/finally block
    to prevent "Event loop is closed" errors.
    """
    client = httpx.AsyncClient(base_url=BASE_URL, timeout=60.0)
    try:
        yield client
    finally:
        await client.aclose()

@pytest_asyncio.fixture(scope="function")
async def session_id(http_client):
    """
    Creates a new session before a test and cleans it up after.
    Returns the session ID.
    """
    payload = {"user_id": "integration_tester", "model": "deepseek"}
    # The URL has been updated to include the trailing slash
    response = await http_client.post("/sessions/", json=payload)
    assert response.status_code == 200
    session_id = response.json()["id"]
    yield session_id
    # No explicit session deletion is needed for this example,
    # as sessions are typically managed by a database lifecycle.

@pytest_asyncio.fixture(scope="function")
async def document_id(http_client):
    """
    Creates a new document before a test and ensures it's deleted afterward.
    Returns the document ID.
    """
    doc_data = {"title": "Lifecycle Test Doc", "text": "This doc will be listed and deleted."}
    # The URL has been updated to include the trailing slash
    response = await http_client.post("/documents/", json=doc_data)
    assert response.status_code == 200
    try:
        message = response.json().get("message", "")
        document_id = int(message.split(" with ID ")[-1])
    except (ValueError, IndexError):
        pytest.fail("Could not parse document ID from response message.")
        
    yield document_id
    
    # Teardown: Delete the document after the test
    delete_response = await http_client.delete(f"/documents/{document_id}")
    assert delete_response.status_code == 200


@pytest_asyncio.fixture(scope="function")
async def websocket_client(base_url, session_id):
    """
    Fixture to provide an active, connected WebSocket client for testing the
    /ws/workspace/{session_id} endpoint.

    The client will be disconnected and closed at the end of the test.
    """
    # Replace 'http' with 'ws' for the WebSocket URL scheme
    ws_url = base_url.replace("http", "ws")
    
    # We use httpx.AsyncClient to handle the WebSocket connection.
    client = httpx.AsyncClient()
    
    # Context manager handles the connection lifecycle (connect, disconnect, close)
    async with client.websocket_connect(f"{ws_url}/ws/workspace/{session_id}") as websocket:
        # The first message from the server should be the 'connection_established'
        # message after the initial accept(). We read it to ensure the connection
        # is fully established before yielding.
        initial_message = await websocket.receive_text()
        print(f"\nReceived initial WS message: {initial_message}")

        # The fixture yields the active WebSocket object
        yield websocket

    # When the context manager exits, the websocket connection is automatically closed.
    await client.aclose()