from typing import Dict, Any, Optional
from app.db import file_retriever_models
from sqlalchemy.orm import Session, joinedload
import uuid
class FileRetriever:
"""
A retriever specifically for accessing file and directory content
based on a FileRetrievalRequest ID.
"""
def retrieve_by_request_id(self, db: Session, request_id: str) -> Optional[Dict[str, Any]]:
"""
Retrieves a FileRetrievalRequest and all its associated files from the database,
returning the data in a well-formatted JSON-like dictionary.
Args:
db: The SQLAlchemy database session.
request_id: The UUID of the FileRetrievalRequest.
Returns:
A dictionary containing the request and file data, or None if the request is not found.
"""
try:
# Convert string request_id to UUID object for the query
request_uuid = uuid.UUID(request_id)
except ValueError:
print(f"Invalid UUID format for request_id: {request_id}")
return None
# Fetch the request and its related files in a single query using join
request = db.query(file_retriever_models.FileRetrievalRequest).filter(
file_retriever_models.FileRetrievalRequest.id == request_uuid
).options(
# Eagerly load the retrieved_files to avoid N+1 query problem
joinedload(file_retriever_models.FileRetrievalRequest.retrieved_files)
).first()
if not request:
return None
# Build the dictionary to represent the JSON structure
retrieved_data = {
"request_id": str(request.id),
"question": request.question,
"directory_path": request.directory_path,
"session_id": request.session_id,
"created_at": request.created_at.isoformat() if request.created_at else None,
"retrieved_files": []
}
for file in request.retrieved_files:
if file.content:
# For files with content, show the full detailed structure
file_data = {
"file_path": file.file_path,
"content": file.content,
"id": str(file.id),
"name": file.file_name,
"type": file.type,
"last_updated": file.last_updated.isoformat() if file.last_updated else None,
"created_at": file.created_at.isoformat() if file.created_at else None,
}
else:
# For empty files, use a compact representation
file_data = {
"file_path": file.file_path,
"type": file.type
}
retrieved_data["retrieved_files"].append(file_data)
return retrieved_data