import httpx
import pytest_asyncio
BASE_URL = "http://127.0.0.1:8001"
@pytest_asyncio.fixture(scope="session")
def base_url():
"""Fixture to provide the base URL for the tests."""
return BASE_URL
@pytest_asyncio.fixture(scope="function")
async def http_client():
"""
Fixture to provide an async HTTP client for all tests in the session.
A new client is created and closed properly using a try/finally block
to prevent "Event loop is closed" errors.
"""
client = httpx.AsyncClient(base_url=BASE_URL, timeout=60.0)
try:
yield client
finally:
await client.aclose()
@pytest_asyncio.fixture(scope="function")
async def session_id(http_client):
"""
Creates a new session before a test and cleans it up after.
Returns the session ID.
"""
payload = {"user_id": "integration_tester", "model": "deepseek"}
# The URL has been updated to include the trailing slash
response = await http_client.post("/sessions/", json=payload)
assert response.status_code == 200
session_id = response.json()["id"]
yield session_id
# No explicit session deletion is needed for this example,
# as sessions are typically managed by a database lifecycle.
@pytest_asyncio.fixture(scope="function")
async def document_id(http_client):
"""
Creates a new document before a test and ensures it's deleted afterward.
Returns the document ID.
"""
doc_data = {"title": "Lifecycle Test Doc", "text": "This doc will be listed and deleted."}
# The URL has been updated to include the trailing slash
response = await http_client.post("/documents/", json=doc_data)
assert response.status_code == 200
try:
message = response.json().get("message", "")
document_id = int(message.split(" with ID ")[-1])
except (ValueError, IndexError):
pytest.fail("Could not parse document ID from response message.")
yield document_id
# Teardown: Delete the document after the test
delete_response = await http_client.delete(f"/documents/{document_id}")
assert delete_response.status_code == 200
@pytest_asyncio.fixture(scope="function")
async def websocket_client(base_url, session_id):
"""
Fixture to provide an active, connected WebSocket client for testing the
/ws/workspace/{session_id} endpoint.
The client will be disconnected and closed at the end of the test.
"""
# Replace 'http' with 'ws' for the WebSocket URL scheme
ws_url = base_url.replace("http", "ws")
# We use httpx.AsyncClient to handle the WebSocket connection.
client = httpx.AsyncClient()
# Context manager handles the connection lifecycle (connect, disconnect, close)
async with client.websocket_connect(f"{ws_url}/ws/workspace/{session_id}") as websocket:
# The first message from the server should be the 'connection_established'
# message after the initial accept(). We read it to ensure the connection
# is fully established before yielding.
initial_message = await websocket.receive_text()
print(f"\nReceived initial WS message: {initial_message}")
# The fixture yields the active WebSocket object
yield websocket
# When the context manager exits, the websocket connection is automatically closed.
await client.aclose()