import dspy import json import uuid import logging import ast # Import the Abstract Syntax Trees module from typing import Dict, Any, Callable, Awaitable, List from fastapi import WebSocket,Depends from sqlalchemy.orm import Session,joinedload from app.db import models from app.db.session import SessionLocal from app.core.providers.factory import get_llm_provider from app.core.pipelines.file_selector import CodeRagFileSelector from app.core.pipelines.dspy_rag import DspyRagPipeline # A type hint for our handler functions MessageHandler = Callable[[WebSocket, Dict[str, Any]], Awaitable[None]] # Configure logging logger = logging.getLogger(__name__) class WorkspaceService: """ Manages the full lifecycle of an AI workspace session, including handling various message types and dispatching them to the correct handlers. """ def __init__(self): # The dispatcher map: keys are message types, values are handler functions self.message_handlers: Dict[str, MessageHandler] = { "select_folder_response": self.handle_select_folder_response, "list_directory_response": self.handle_list_directory_response, "file_content_response": self.handle_files_content_response, "execute_command_response": self.handle_command_output, "chat_message": self.handle_chat_message, # Add more message types here as needed } # Centralized map of commands that can be sent to the client self.command_map: Dict[str, Dict[str, Any]] = { "list_directory": {"type": "list_directory", "description": "Request a list of files and folders in the current directory."}, "get_file_content": {"type": "get_file_content", "description": "Request the content of a specific file."}, "execute_command": {"type": "execute_command", "description": "Request to execute a shell command."}, # Define more commands here } # Per-websocket session state management self.sessions: Dict[str, Dict[str, Any]] = {} self.db = SessionLocal() def generate_request_id(self) -> str: """Generates a unique request ID.""" return str(uuid.uuid4()) async def send_command(self, websocket: WebSocket, command_name: str, data: Dict[str, Any] = {}): """Helper to send a command to the client with a unique request_id and round number.""" if command_name not in self.command_map: raise ValueError(f"Unknown command: {command_name}") request_id = self.generate_request_id() session_state = self.sessions.get(websocket.scope["client"], {"round": 0}) session_state["round"] += 1 message_to_send = { "type": self.command_map[command_name]["type"], "request_id": request_id, "round": session_state["round"], **data, } await websocket.send_text(json.dumps(message_to_send)) print(f"Sent command '{command_name}' to client (request_id: {request_id}, round: {session_state['round']})") self.sessions[websocket.scope["client"]] = session_state async def dispatch_message(self, websocket: WebSocket, message: Dict[str, Any]): """ Routes an incoming message to the appropriate handler based on its 'type'. Retrieves session state to maintain context. """ message_type = message.get("type") # request_id = message.get("request_id") # round_num = message.get("round") # In a real-world app, you'd retrieve historical data based on request_id or session_id # For this example, we'll just print it. # print(f"Received message of type '{message_type}' (request_id: {request_id}, round: {round_num})") logger.info(f"Received message: {message}") handler = self.message_handlers.get(message_type) if handler: await handler(websocket, message) else: print(f"Warning: No handler found for message type: {message_type}") await websocket.send_text(json.dumps({"type": "error", "content": f"Unknown message type: {message_type}"})) async def handle_select_folder_response(self, websocket:WebSocket, data: Dict[str, Any]): """Handles the client's response to a select folder response.""" path = data.get("path") request_id = data.get("request_id") print(f"Received folder selected (request_id: {request_id}): Path: {path}") # After a folder is selected, the next step is to list its contents. # This now uses the send_command helper. await self.send_command(websocket, "list_directory", data={"path": path}) async def handle_list_directory_response(self, websocket: WebSocket, data: Dict[str, Any]): """Handles the client's response to a list_directory request.""" files = data.get("files", []) provider_name = data.get("provider_name", "gemini") llm_provider = get_llm_provider(provider_name) cfs = CodeRagFileSelector() with dspy.context(lm=llm_provider): raw_answer_text = await cfs( question="Please help to refactor my code", # The history will be retrieved from a database in a real application history="", file_list=files ) try: answer_text = ast.literal_eval(raw_answer_text) if not isinstance(answer_text, list): raise ValueError("Parsed result is not a list.") except (ValueError, SyntaxError) as e: # Handle cases where the LLM output is not a valid list string. print(f"Error parsing LLM output: {e}") answer_text = [] # Default to an empty list to prevent errors. await websocket.send_text(json.dumps({ "type": "thinking_log", "content": f"Warning: AI's file list could not be parsed. Error: {e}" })) return await websocket.send_text(json.dumps({ "type": "thinking_log", "content": f"AI selected files: {answer_text}. Now requesting file content." })) # After getting the AI's selected files, we send a command to the client to get their content. await self.send_command(websocket, "get_file_content", data={"filenames": answer_text}) async def handle_files_content_response(self, websocket: WebSocket, data: Dict[str, Any]): """Handles the content of a list of files sent by the client.""" # The client is expected to send a list of file objects # Each object should have 'filename' and 'content' keys. files_data: List[Dict[str, str]] = data.get("files", []) request_id = data.get("request_id") if not files_data: print(f"Warning: No files data received for request_id: {request_id}") return print(f"Received content for {len(files_data)} files (request_id: {request_id}).") for file_info in files_data: filename = file_info.get("filename") content = file_info.get("content") if filename and content: print(f"Processing content for '{filename}'. Content length: {len(content)}") # The AI would analyze this content to determine the next action, e.g., # generate a plan, perform a refactoring, or ask for more information. await websocket.send_text(json.dumps({ "type": "thinking_log", "content": f"Analyzing the content of file: {filename}" })) else: print(f"Warning: Malformed file data in response for request_id: {request_id}") async def handle_command_output(self, websocket: WebSocket, data: Dict[str, Any]): """Handles the output from a command executed by the client.""" command = data.get("command") output = data.get("output") request_id = data.get("request_id") print(f"Received output for command '{command}' (request_id: {request_id}). Output: {output}") # The AI would process the command output to determine the next step await websocket.send_text(json.dumps({ "type": "thinking_log", "content": f"Command '{command}' completed. Analyzing output." })) async def handle_chat_message(self, websocket: WebSocket, data: Dict[str, Any]): """Handles incoming chat messages from the client.""" # TODO: Enhance this function to process the chat message and determine the next action. prompt = data.get("content") provider_name = data.get("provider_name", "gemini") session_id = data.get("session_id") if session_id is None: await websocket.send_text(json.dumps({ "type": "error", "content": "Error: session_id is required for chat messages." })) return session = self.db.query(models.Session).options( joinedload(models.Session.messages) ).filter(models.Session.id == session_id).first() if not session: await websocket.send_text(json.dumps({ "type": "error", "content": f"Error: Session with ID {session_id} not found." })) return user_message = models.Message(session_id=session_id, sender="user", content=prompt) self.db.add(user_message) self.db.commit() self.db.refresh(user_message) llm_provider = get_llm_provider(provider_name) chat = DspyRagPipeline(retrievers=[]) with dspy.context(lm=llm_provider): answer_text = await chat(question=prompt, history=session.messages, db=self.db) # Save assistant's response assistant_message = models.Message(session_id=session_id, sender="assistant", content=answer_text) self.db.add(assistant_message) self.db.commit() self.db.refresh(assistant_message) # 📝 Add this section to send the response back to the client # The client-side `handleChatMessage` handler will process this message await websocket.send_text(json.dumps({ "type": "chat_message", "content": answer_text })) logger.info(f"Sent chat response to client: {answer_text}")