Newer
Older
cortex-hub / ai-hub / integration_tests / test_llm_chat.py
import os
import httpx
import pytest
import json

BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8002/api/v1")

def _headers():
    return {
        "X-User-ID": os.environ.get("SYNC_TEST_USER_ID", "")
    }

def test_create_session_and_chat_gemini():
    """
    Test successfully spinning up a fresh session and pushing an LLM prompt
    over the live SSE connection to verify bidirectional inference health.
    """
    user_id = os.environ.get("SYNC_TEST_USER_ID", "")
    assert user_id, "User ID not found in environment from conftest."
    
    with httpx.Client(timeout=30.0) as client:
        # Step 1: Create a new session bound to Gemini
        session_payload = {
            "user_id": user_id,
            "provider_name": "gemini",
            "feature_name": "default"
        }
        r_sess = client.post(f"{BASE_URL}/sessions/", headers=_headers(), json=session_payload)
        assert r_sess.status_code == 200, f"Failed to create session: {r_sess.text}"
        
        session_data = r_sess.json()
        session_id = session_data["id"]
        assert session_id, "Session ID not returned."
        
        # Step 2: Send a chat prompt instructing the LLM to echo back a phrase.
        # This asserts litellm, group RBAC, the context window, and SSE streaming are intact.
        chat_payload = {
            "prompt": "I am testing your response capabilities. Please respond with exactly the text 'CORTEX HUB LIVE' and nothing else.",
            "provider_name": "gemini",
            "load_faiss_retriever": False
        }
        
        full_response = ""
        # Endpoint returns an SSE stream, we buffer it by chunks
        with client.stream("POST", f"{BASE_URL}/sessions/{session_id}/chat", headers=_headers(), json=chat_payload) as r_chat:
            assert r_chat.status_code == 200, "Chat request failed to initialize."
            
            for line in r_chat.iter_lines():
                if line.startswith("data: "):
                    data_str = line[len("data: "):]
                    # Sometimes SSE ends with [DONE]
                    if data_str == "[DONE]":
                        break
                        
                    try:
                        event = json.loads(data_str)
                        event_type = event.get("type")
                        
                        if event_type == "content":
                            full_response += event.get("content", "")
                        elif event_type == "error":
                            pytest.fail(f"LLM backend emitted an error: {event.get('content')}")
                        elif event_type == "done":
                            break
                    except json.JSONDecodeError:
                        pass
        
        full_response = full_response.strip()
        assert len(full_response) > 0, "LLM returned an entirely silent response."
        assert "CORTEX HUB LIVE" in full_response.upper(), f"LLM missed the instruction formatting. True Response: {full_response}"