import dspy import json import uuid import ast # Import the Abstract Syntax Trees module from typing import Dict, Any, Callable, Awaitable, List from fastapi import WebSocket from app.core.providers.factory import get_llm_provider from app.core.pipelines.file_selector import CodeRagFileSelector # A type hint for our handler functions MessageHandler = Callable[[WebSocket, Dict[str, Any]], Awaitable[None]] class WorkspaceService: """ Manages the full lifecycle of an AI workspace session, including handling various message types and dispatching them to the correct handlers. """ def __init__(self): # The dispatcher map: keys are message types, values are handler functions self.message_handlers: Dict[str, MessageHandler] = { "select_folder_response": self.handle_select_folder_response, "list_directory_response": self.handle_list_directory_response, "file_content_response": self.handle_files_content_response, "execute_command_response": self.handle_command_output, # Add more message types here as needed } # Centralized map of commands that can be sent to the client self.command_map: Dict[str, Dict[str, Any]] = { "list_directory": {"type": "list_directory", "description": "Request a list of files and folders in the current directory."}, "get_file_content": {"type": "get_file_content", "description": "Request the content of a specific file."}, "execute_command": {"type": "execute_command", "description": "Request to execute a shell command."}, # Define more commands here } # Per-websocket session state management self.sessions: Dict[str, Dict[str, Any]] = {} def generate_request_id(self) -> str: """Generates a unique request ID.""" return str(uuid.uuid4()) async def send_command(self, websocket: WebSocket, command_name: str, data: Dict[str, Any] = {}): """Helper to send a command to the client with a unique request_id and round number.""" if command_name not in self.command_map: raise ValueError(f"Unknown command: {command_name}") request_id = self.generate_request_id() session_state = self.sessions.get(websocket.scope["client"], {"round": 0}) session_state["round"] += 1 message_to_send = { "type": self.command_map[command_name]["type"], "request_id": request_id, "round": session_state["round"], **data, } await websocket.send_text(json.dumps(message_to_send)) print(f"Sent command '{command_name}' to client (request_id: {request_id}, round: {session_state['round']})") self.sessions[websocket.scope["client"]] = session_state async def dispatch_message(self, websocket: WebSocket, message: Dict[str, Any]): """ Routes an incoming message to the appropriate handler based on its 'type'. Retrieves session state to maintain context. """ message_type = message.get("type") request_id = message.get("request_id") round_num = message.get("round") # In a real-world app, you'd retrieve historical data based on request_id or session_id # For this example, we'll just print it. print(f"Received message of type '{message_type}' (request_id: {request_id}, round: {round_num})") handler = self.message_handlers.get(message_type) if handler: await handler(websocket, message) else: print(f"Warning: No handler found for message type: {message_type}") await websocket.send_text(json.dumps({"type": "error", "content": f"Unknown message type: {message_type}"})) async def handle_select_folder_response(self, websocket:WebSocket, data: Dict[str, Any]): """Handles the client's response to a select folder response.""" path = data.get("path") request_id = data.get("request_id") print(f"Received folder selected (request_id: {request_id}): Path: {path}") # After a folder is selected, the next step is to list its contents. # This now uses the send_command helper. await self.send_command(websocket, "list_directory", data={"path": path}) async def handle_list_directory_response(self, websocket: WebSocket, data: Dict[str, Any]): """Handles the client's response to a list_directory request.""" files = data.get("files", []) provider_name = data.get("provider_name", "gemini") llm_provider = get_llm_provider(provider_name) cfs = CodeRagFileSelector() with dspy.context(lm=llm_provider): raw_answer_text = await cfs( question="Please help to refactor my code", # The history will be retrieved from a database in a real application # For this example, we'll pass an empty history history="", file_list=files ) try: answer_text = ast.literal_eval(raw_answer_text) if not isinstance(answer_text, list): raise ValueError("Parsed result is not a list.") except (ValueError, SyntaxError) as e: # Handle cases where the LLM output is not a valid list string. print(f"Error parsing LLM output: {e}") answer_text = [] # Default to an empty list to prevent errors. await websocket.send_text(json.dumps({ "type": "thinking_log", "content": f"Warning: AI's file list could not be parsed. Error: {e}" })) return await websocket.send_text(json.dumps({ "type": "thinking_log", "content": f"AI selected files: {answer_text}. Now requesting file content." })) # After getting the AI's selected files, we send a command to the client to get their content. await self.send_command(websocket, "get_file_content", data={"filenames": answer_text}) async def handle_files_content_response(self, websocket: WebSocket, data: Dict[str, Any]): """Handles the content of a list of files sent by the client.""" # The client is expected to send a list of file objects # Each object should have 'filename' and 'content' keys. files_data: List[Dict[str, str]] = data.get("files", []) request_id = data.get("request_id") if not files_data: print(f"Warning: No files data received for request_id: {request_id}") return print(f"Received content for {len(files_data)} files (request_id: {request_id}).") for file_info in files_data: filename = file_info.get("filename") content = file_info.get("content") if filename and content: print(f"Processing content for '{filename}'. Content length: {len(content)}") # The AI would analyze this content to determine the next action, e.g., # generate a plan, perform a refactoring, or ask for more information. await websocket.send_text(json.dumps({ "type": "thinking_log", "content": f"Analyzing the content of file: {filename}" })) else: print(f"Warning: Malformed file data in response for request_id: {request_id}") async def handle_command_output(self, websocket: WebSocket, data: Dict[str, Any]): """Handles the output from a command executed by the client.""" command = data.get("command") output = data.get("output") request_id = data.get("request_id") print(f"Received output for command '{command}' (request_id: {request_id}). Output: {output}") # The AI would process the command output to determine the next step await websocket.send_text(json.dumps({ "type": "thinking_log", "content": f"Command '{command}' completed. Analyzing output." }))