import httpx
import pytest
import pytest_asyncio
BASE_URL = "http://127.0.0.1:8001"
def pytest_configure(config):
config.addinivalue_line(
"markers",
"requires_nodes: tests that need live agent nodes; skipped if hub/nodes are offline",
)
def pytest_collection_modifyitems(config, items):
"""
Auto-skip any test marked with @pytest.mark.requires_nodes when the Hub
or its nodes are unreachable.
"""
import os
base = os.getenv("SYNC_TEST_BASE_URL", BASE_URL)
skip = pytest.mark.skip(reason="Hub/nodes not reachable — skipping file sync integration tests")
try:
import httpx as _httpx
# Probe the hub with a simple endpoint; any 2xx/4xx means the server is up
with _httpx.Client(base_url=base, timeout=5.0) as c:
r = c.get("/", follow_redirects=True)
hub_up = r.status_code < 500
except Exception:
hub_up = False
if not hub_up:
for item in items:
if item.get_closest_marker("requires_nodes"):
item.add_marker(skip)
@pytest_asyncio.fixture(scope="session")
def base_url():
"""Fixture to provide the base URL for the tests."""
return BASE_URL
@pytest_asyncio.fixture(scope="function")
async def http_client():
"""
Fixture to provide an async HTTP client for all tests in the session.
A new client is created and closed properly using a try/finally block
to prevent "Event loop is closed" errors.
"""
client = httpx.AsyncClient(base_url=BASE_URL, timeout=60.0)
try:
yield client
finally:
await client.aclose()
@pytest_asyncio.fixture(scope="function")
async def session_id(http_client):
"""
Creates a new session before a test and cleans it up after.
Returns the session ID.
"""
payload = {"user_id": "integration_tester", "model": "deepseek"}
# The URL has been updated to include the trailing slash
response = await http_client.post("/sessions/", json=payload)
assert response.status_code == 200
session_id = response.json()["id"]
yield session_id
# No explicit session deletion is needed for this example,
# as sessions are typically managed by a database lifecycle.
@pytest_asyncio.fixture(scope="function")
async def document_id(http_client):
"""
Creates a new document before a test and ensures it's deleted afterward.
Returns the document ID.
"""
doc_data = {"title": "Lifecycle Test Doc", "text": "This doc will be listed and deleted."}
# The URL has been updated to include the trailing slash
response = await http_client.post("/documents/", json=doc_data)
assert response.status_code == 200
try:
message = response.json().get("message", "")
document_id = int(message.split(" with ID ")[-1])
except (ValueError, IndexError):
pytest.fail("Could not parse document ID from response message.")
yield document_id
# Teardown: Delete the document after the test
delete_response = await http_client.delete(f"/documents/{document_id}")
assert delete_response.status_code == 200