Newer
Older
cortex-hub / ai-hub / app / core / services / workspace.py
import asyncio
import json
from typing import Dict, Any, Callable, Awaitable
from fastapi import WebSocket
from sqlalchemy.orm import Session

# A type hint for our handler functions
MessageHandler = Callable[[WebSocket, Dict[str, Any]], Awaitable[None]]

class WorkspaceService:
    """
    Manages the full lifecycle of an AI workspace session, including
    handling various message types and dispatching them to the correct handlers.
    """
    def __init__(self):
        # The dispatcher map: keys are message types, values are handler functions
        self.message_handlers: Dict[str, MessageHandler] = {
            "select_folder": self.handle_select_folder_response,
            "list_directory_response": self.handle_list_directory_response,
            "file_content_response": self.handle_file_content_response,
            "execute_command_response": self.handle_command_output,
            # Add more message types here as needed
        }

    async def dispatch_message(self, websocket: WebSocket, message: Dict[str, Any]):
        """
        Routes an incoming message to the appropriate handler based on its 'type'.
        """
        message_type = message.get("type")
        handler = self.message_handlers.get(message_type)
        if handler:
            await handler(websocket, message)
        else:
            print(f"Warning: No handler found for message type: {message_type}")
            await websocket.send_text(json.dumps({"type": "error", "content": f"Unknown message type: {message_type}"}))

    async def handle_select_folder_response(self, websocket:WebSocket, data: Dict[str, Any]):
        """Handles the client's response to a select folder response."""
        path = data.get("path")
        request_id = data.get("request_id")
        print(f"Received folder selected (request_id: {request_id}): Path: {path}")
        # After the server received the request that folder selected, we immediately ask for the file lists in the folder.
        await websocket.send_text(json.dumps({
            "type": "list_directory",
            "request_id": request_id
        }))


    async def handle_list_directory_response(self, websocket: WebSocket, data: Dict[str, Any]):
        """Handles the client's response to a list_directory request."""
        # This is where the AI logic would pick up after getting the file list
        files = data.get("files", [])
        folders = data.get("folders", [])
        request_id = data.get("request_id")
        await websocket.send_text(json.dumps({
            "type": "list",
            "content": f"Analyzing the content of file: {files[0]}"
        }))
        print(f"Received directory listing (request_id: {request_id}): Files: {files}, Folders: {folders}")
        

    async def handle_file_content_response(self, websocket: WebSocket, data: Dict[str, Any]):
        """Handles the content of a file sent by the client."""
        filename = data.get("filename")
        content = data.get("content")
        request_id = data.get("request_id")
        
        print(f"Received content for '{filename}' (request_id: {request_id}). Content length: {len(content)}")
        
        await websocket.send_text(json.dumps({
            "type": "thinking_log",
            "content": f"Analyzing the content of file: {filename}"
        }))
        
    async def handle_command_output(self, websocket: WebSocket, data: Dict[str, Any]):
        """Handles the output from a command executed by the client."""
        command = data.get("command")
        output = data.get("output")
        request_id = data.get("request_id")
        
        print(f"Received output for command '{command}' (request_id: {request_id}). Output: {output}")
        
        # The AI would process the command output to determine the next step
        await websocket.send_text(json.dumps({
            "type": "thinking_log",
            "content": f"Command '{command}' completed. Analyzing output."
        }))