from typing import List, Dict, Any from sqlalchemy.orm import Session from sqlalchemy.exc import SQLAlchemyError import dspy from app.core.vector_store import FaissVectorStore from app.db import models from app.core.retrievers import Retriever from app.core.llm_providers import get_llm_provider from app.core.pipelines.dspy_rag import DspyRagPipeline, DSPyLLMProvider class RAGService: """ Service class for managing the RAG (Retrieval-Augmented Generation) pipeline. This class acts as a high-level orchestrator. """ def __init__(self, vector_store: FaissVectorStore, retrievers: List[Retriever]): self.vector_store = vector_store self.retrievers = retrievers def add_document(self, db: Session, doc_data: Dict[str, Any]) -> int: """ Adds a document to both the database and the vector store. """ try: document_db = models.Document(**doc_data) db.add(document_db) db.commit() db.refresh(document_db) faiss_index = self.vector_store.add_document(document_db.text) vector_metadata = models.VectorMetadata( document_id=document_db.id, faiss_index=faiss_index, embedding_model="mock_embedder" ) db.add(vector_metadata) db.commit() print(f"Document with ID {document_db.id} successfully added.") return document_db.id except SQLAlchemyError as e: db.rollback() # **FIXED LINE**: Added the missing '})' print(f"Database error while adding document: {e}") raise except Exception as e: db.rollback() print(f"An unexpected error occurred: {e}") raise async def chat_with_rag(self, db: Session, prompt: str, model: str) -> str: """ Generates a response to a user prompt by orchestrating the RAG pipeline. """ print(f"Received Prompt: {prompt}") if not prompt or not prompt.strip(): raise ValueError("The prompt cannot be null, empty, or contain only whitespace.") llm_provider_instance = get_llm_provider(model) dspy_llm_provider = DSPyLLMProvider(provider=llm_provider_instance, model_name=model) dspy.configure(lm=dspy_llm_provider) rag_pipeline = DspyRagPipeline(retrievers=self.retrievers) answer = await rag_pipeline.forward(question=prompt, db=db) return answer def get_all_documents(self, db: Session) -> List[models.Document]: """ Retrieves all documents from the database. """ try: return db.query(models.Document).order_by(models.Document.created_at.desc()).all() except SQLAlchemyError as e: print(f"Database error while retrieving documents: {e}") raise def delete_document(self, db: Session, document_id: int) -> int: """ Deletes a document and its associated vector metadata from the database. Returns the ID of the deleted document, or None if not found. """ try: doc_to_delete = db.query(models.Document).filter(models.Document.id == document_id).first() if not doc_to_delete: return None db.delete(doc_to_delete) db.commit() return document_id except SQLAlchemyError as e: db.rollback() print(f"Database error while deleting document: {e}") raise