Newer
Older
cortex-hub / ai-hub / app / core / llm_providers.py
import os
import httpx
import logging
import json
from abc import ABC, abstractmethod
from openai import OpenAI
from typing import final

# --- 0. Configure Logging ---
# Set up basic logging to print INFO level messages to the console.
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - [%(funcName)s] - %(message)s'
)


# --- 1. Load Configuration from Environment ---
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
DEEPSEEK_MODEL = os.getenv("DEEPSEEK_MODEL_NAME", "deepseek-chat")
GEMINI_MODEL = os.getenv("GEMINI_MODEL_NAME", "gemini-1.5-flash-latest")


# --- 2. Initialize API Clients and URLs ---
deepseek_client = OpenAI(api_key=DEEPSEEK_API_KEY, base_url="https://api.deepseek.com")
GEMINI_URL = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}"


# --- 3. Provider Interface and Implementations ---

class LLMProvider(ABC):
    """Abstract base class ('Interface') for all LLM providers."""
    @abstractmethod
    async def generate_response(self, prompt: str) -> str:
        """Generates a response from the LLM."""
        pass

@final
class DeepSeekProvider(LLMProvider):
    """Provider for the DeepSeek API."""
    def __init__(self, model_name: str):
        self.model = model_name
        logging.info(f"DeepSeekProvider initialized with model: {self.model}")

    async def generate_response(self, prompt: str) -> str:
        # Construct the request payload
        messages_payload = [
            # {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt},
        ]
        
        # Log the payload before sending the request
        logging.info(f"--- DeepSeek Request Payload ---\n{json.dumps(messages_payload, indent=2)}")

        try:
            chat_completion = deepseek_client.chat.completions.create(
                model=self.model,
                messages=messages_payload,
                stream=False
            )
            
            # Log the full, raw response object from the API
            logging.info(f"--- DeepSeek Raw Response ---\n{chat_completion.model_dump_json(indent=2)}")
            
            return chat_completion.choices[0].message.content
        except Exception as e:
            logging.error("DeepSeek Provider Error", exc_info=True) # exc_info=True logs the traceback
            raise

@final
class GeminiProvider(LLMProvider):
    """Provider for the Google Gemini API."""
    def __init__(self, api_url: str):
        self.url = api_url
        logging.info(f"GeminiProvider initialized for URL: {self.url.split('?')[0]}")

    async def generate_response(self, prompt: str) -> str:
        # Construct the request payload
        payload = {"contents": [{"parts": [{"text": prompt}]}]}
        headers = {"Content-Type": "application/json"}
        
        # Log the payload before sending the request
        logging.info(f"--- Gemini Request Payload ---\n{json.dumps(payload, indent=2)}")
        
        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(self.url, json=payload, headers=headers)
                
                # Log the raw response text, which is crucial for debugging any errors
                logging.info(f"--- Gemini Raw Response ---\n{response.text}")

                response.raise_for_status() # Raise an exception for non-2xx status codes
                data = response.json()
                return data['candidates'][0]['content']['parts'][0]['text']
        except (httpx.HTTPStatusError, KeyError, IndexError) as e:
            logging.error("Gemini Provider Error", exc_info=True)
            raise

# --- 4. The Factory Function ---

_providers = {
    "deepseek": DeepSeekProvider(model_name=DEEPSEEK_MODEL),
    "gemini": GeminiProvider(api_url=GEMINI_URL)
}

def get_llm_provider(model_name: str) -> LLMProvider:
    """Factory function to get the appropriate, pre-configured LLM provider."""
    provider = _providers.get(model_name)
    if not provider:
        raise ValueError(f"Unsupported model provider: '{model_name}'. Supported providers are: {list(_providers.keys())}")
    return provider