import os import aiohttp import asyncio import logging import mimetypes from typing import Optional from fastapi import HTTPException from app.core.providers.base import STTProvider # Configure logging logger = logging.getLogger(__name__) class GoogleSTTProvider(STTProvider): """Concrete STT provider for Google Gemini API.""" def __init__( self, api_key: Optional[str] = None, model_name: str = "gemini-2.5-flash" ): self.api_key = api_key or os.getenv("GEMINI_API_KEY") if not self.api_key: raise ValueError("GEMINI_API_KEY environment variable not set or provided.") self.model_name = model_name self.api_url = f"https://generativelanguage.googleapis.com/v1beta/models/{model_name}:generateContent" self.upload_url_base = "https://generativelanguage.googleapis.com/upload/v1beta/files" logger.debug(f"Initialized GoogleSTTProvider with model: {self.model_name}") async def transcribe_audio(self, audio_data: bytes) -> str: logger.debug("Starting transcription process.") mime_type = mimetypes.guess_type("audio.wav")[0] or "application/octet-stream" num_bytes = len(audio_data) logger.debug(f"Detected MIME type: {mime_type}, size: {num_bytes} bytes.") try: async with aiohttp.ClientSession() as session: # Step 1: Start resumable upload logger.debug("Starting resumable upload...") start_headers = { "x-goog-api-key": self.api_key, "X-Goog-Upload-Protocol": "resumable", "X-Goog-Upload-Command": "start", "X-Goog-Upload-Header-Content-Length": str(num_bytes), "X-Goog-Upload-Header-Content-Type": mime_type, "Content-Type": "application/json", } start_payload = {"file": {"display_name": "AUDIO"}} async with session.post( self.upload_url_base, headers=start_headers, json=start_payload ) as resp: logger.debug(f"Upload start response status: {resp.status}") resp.raise_for_status() upload_url = resp.headers.get("X-Goog-Upload-URL") if not upload_url: raise HTTPException(status_code=500, detail="No upload URL returned from Google API.") logger.debug(f"Received upload URL: {upload_url}") # Step 2: Upload the file logger.debug("Uploading audio file...") upload_headers = { "Content-Length": str(num_bytes), "X-Goog-Upload-Offset": "0", "X-Goog-Upload-Command": "upload, finalize", } async with session.post(upload_url, headers=upload_headers, data=audio_data) as resp: logger.debug(f"File upload response status: {resp.status}") resp.raise_for_status() file_info = await resp.json() file_name = file_info["file"]["name"].split("/")[-1] file_uri = f"https://generativelanguage.googleapis.com/v1beta/files/{file_name}" logger.debug(f"Uploaded file URI: {file_uri}") # Step 3: Request transcription logger.debug("Requesting transcription from Gemini API...") transcription_headers = { "x-goog-api-key": self.api_key, "Content-Type": "application/json", } transcription_payload = { "contents": [ { "parts": [ { "fileData": { "mimeType": mime_type, "fileUri": file_uri } }, {"text": "Transcribe this audio file."} ] } ] } async with session.post( self.api_url, headers=transcription_headers, json=transcription_payload ) as resp: logger.debug(f"Transcription request status: {resp.status}") resp.raise_for_status() data = await resp.json() # Step 4: Extract text try: transcript = data["candidates"][0]["content"]["parts"][0]["text"] logger.debug(f"Successfully extracted transcript: '{transcript[:50]}...'") return transcript except (KeyError, IndexError) as e: logger.error(f"Malformed API response: {e}. Full response: {data}") raise HTTPException(status_code=500, detail="Malformed API response from Gemini.") except aiohttp.ClientError as e: logger.error(f"Aiohttp client error occurred: {e}") raise HTTPException(status_code=500, detail=f"API request failed: {e}") except Exception as e: logger.error(f"Unexpected error occurred during transcription: {e}") raise HTTPException(status_code=500, detail=f"Failed to transcribe audio: {e}")