Newer
Older
cortex-hub / ai-hub / app / core / services / workspace.py
import dspy
import json
import uuid
import ast  # Import the Abstract Syntax Trees module
from typing import Dict, Any, Callable, Awaitable, List
from fastapi import WebSocket
from app.core.providers.factory import get_llm_provider
from app.core.pipelines.file_selector import CodeRagFileSelector

# A type hint for our handler functions
MessageHandler = Callable[[WebSocket, Dict[str, Any]], Awaitable[None]]

class WorkspaceService:
    """
    Manages the full lifecycle of an AI workspace session, including
    handling various message types and dispatching them to the correct handlers.
    """
    def __init__(self):
        # The dispatcher map: keys are message types, values are handler functions
        self.message_handlers: Dict[str, MessageHandler] = {
            "select_folder_response": self.handle_select_folder_response,
            "list_directory_response": self.handle_list_directory_response,
            "file_content_response": self.handle_files_content_response,
            "execute_command_response": self.handle_command_output,
            # Add more message types here as needed
        }
        # Centralized map of commands that can be sent to the client
        self.command_map: Dict[str, Dict[str, Any]] = {
            "list_directory": {"type": "list_directory", "description": "Request a list of files and folders in the current directory."},
            "get_file_content": {"type": "get_file_content", "description": "Request the content of a specific file."},
            "execute_command": {"type": "execute_command", "description": "Request to execute a shell command."},
            # Define more commands here
        }
        # Per-websocket session state management
        self.sessions: Dict[str, Dict[str, Any]] = {}

    def generate_request_id(self) -> str:
        """Generates a unique request ID."""
        return str(uuid.uuid4())
        
    async def send_command(self, websocket: WebSocket, command_name: str, data: Dict[str, Any] = {}):
        """Helper to send a command to the client with a unique request_id and round number."""
        if command_name not in self.command_map:
            raise ValueError(f"Unknown command: {command_name}")
        
        request_id = self.generate_request_id()
        session_state = self.sessions.get(websocket.scope["client"], {"round": 0})
        session_state["round"] += 1
        
        message_to_send = {
            "type": self.command_map[command_name]["type"],
            "request_id": request_id,
            "round": session_state["round"],
            **data,
        }
        
        await websocket.send_text(json.dumps(message_to_send))
        print(f"Sent command '{command_name}' to client (request_id: {request_id}, round: {session_state['round']})")
        
        self.sessions[websocket.scope["client"]] = session_state

    async def dispatch_message(self, websocket: WebSocket, message: Dict[str, Any]):
        """
        Routes an incoming message to the appropriate handler based on its 'type'.
        Retrieves session state to maintain context.
        """
        message_type = message.get("type")
        request_id = message.get("request_id")
        round_num = message.get("round")
        
        # In a real-world app, you'd retrieve historical data based on request_id or session_id
        # For this example, we'll just print it.
        print(f"Received message of type '{message_type}' (request_id: {request_id}, round: {round_num})")
        
        handler = self.message_handlers.get(message_type)
        if handler:
            await handler(websocket, message)
        else:
            print(f"Warning: No handler found for message type: {message_type}")
            await websocket.send_text(json.dumps({"type": "error", "content": f"Unknown message type: {message_type}"}))

    async def handle_select_folder_response(self, websocket:WebSocket, data: Dict[str, Any]):
        """Handles the client's response to a select folder response."""
        path = data.get("path")
        request_id = data.get("request_id")
        
        print(f"Received folder selected (request_id: {request_id}): Path: {path}")
        
        # After a folder is selected, the next step is to list its contents.
        # This now uses the send_command helper.
        await self.send_command(websocket, "list_directory", data={"path": path})

    async def handle_list_directory_response(self, websocket: WebSocket, data: Dict[str, Any]):
        """Handles the client's response to a list_directory request."""
        files = data.get("files", [])
        provider_name = data.get("provider_name", "gemini")
        
        llm_provider = get_llm_provider(provider_name)
        cfs = CodeRagFileSelector()
        
        with dspy.context(lm=llm_provider):
            raw_answer_text  = await cfs(
                question="Please help to refactor my code",
                # The history will be retrieved from a database in a real application
                # For this example, we'll pass an empty history
                history="",
                file_list=files
            )
        
        try:
            answer_text = ast.literal_eval(raw_answer_text)
            if not isinstance(answer_text, list):
                raise ValueError("Parsed result is not a list.")
        except (ValueError, SyntaxError) as e:
    # Handle cases where the LLM output is not a valid list string.
            print(f"Error parsing LLM output: {e}")
            answer_text = []  # Default to an empty list to prevent errors.
            await websocket.send_text(json.dumps({
                "type": "thinking_log",
                "content": f"Warning: AI's file list could not be parsed. Error: {e}"
            }))
            return

        await websocket.send_text(json.dumps({
            "type": "thinking_log",
            "content": f"AI selected files: {answer_text}. Now requesting file content."
        }))
        
        # After getting the AI's selected files, we send a command to the client to get their content.
        await self.send_command(websocket, "get_file_content", data={"filenames": answer_text})

    async def handle_files_content_response(self, websocket: WebSocket, data: Dict[str, Any]):
        """Handles the content of a list of files sent by the client."""
        # The client is expected to send a list of file objects
        # Each object should have 'filename' and 'content' keys.
        files_data: List[Dict[str, str]] = data.get("files", [])
        request_id = data.get("request_id")
        
        if not files_data:
            print(f"Warning: No files data received for request_id: {request_id}")
            return
            
        print(f"Received content for {len(files_data)} files (request_id: {request_id}).")
        
        for file_info in files_data:
            filename = file_info.get("filename")
            content = file_info.get("content")
            
            if filename and content:
                print(f"Processing content for '{filename}'. Content length: {len(content)}")
                # The AI would analyze this content to determine the next action, e.g.,
                # generate a plan, perform a refactoring, or ask for more information.
                await websocket.send_text(json.dumps({
                    "type": "thinking_log",
                    "content": f"Analyzing the content of file: {filename}"
                }))
            else:
                print(f"Warning: Malformed file data in response for request_id: {request_id}")
        
    async def handle_command_output(self, websocket: WebSocket, data: Dict[str, Any]):
        """Handles the output from a command executed by the client."""
        command = data.get("command")
        output = data.get("output")
        request_id = data.get("request_id")
        
        print(f"Received output for command '{command}' (request_id: {request_id}). Output: {output}")
        
        # The AI would process the command output to determine the next step
        await websocket.send_text(json.dumps({
            "type": "thinking_log",
            "content": f"Command '{command}' completed. Analyzing output."
        }))