Newer
Older
cortex-hub / ai-hub / app / api / routes / sessions.py
from fastapi import APIRouter, HTTPException, Depends, File, UploadFile, Response
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from app.api.dependencies import ServiceContainer, get_db
from app.api import schemas
from typing import AsyncGenerator, List, Optional
from app.db import models
from app.core.pipelines.validator import Validator
import os
import shutil

def create_sessions_router(services: ServiceContainer) -> APIRouter:
    router = APIRouter(prefix="/sessions", tags=["Sessions"])

    @router.post("/", response_model=schemas.Session, summary="Create a New Chat Session")
    def create_session(
        request: schemas.SessionCreate,
        db: Session = Depends(get_db)
    ):
        if request.user_id is None or request.provider_name is None:
            raise HTTPException(status_code=400, detail="user_id and provider_name are required to create a session.")
        try:
            new_session = services.session_service.create_session(
                db=db,
                user_id=request.user_id,
                provider_name=request.provider_name,
                feature_name=request.feature_name,
                stt_provider_name=request.stt_provider_name,
                tts_provider_name=request.tts_provider_name
            )

            # M3: Auto-attach user's default nodes from preferences
            import uuid as _uuid
            user = db.query(models.User).filter(models.User.id == request.user_id).first()
            if user:
                node_prefs = (user.preferences or {}).get("nodes", {})
                default_nodes = node_prefs.get("default_node_ids", [])
                if default_nodes:
                    new_session.sync_workspace_id = f"session-{new_session.id}-{_uuid.uuid4().hex[:8]}"
                    new_session.attached_node_ids = list(default_nodes)
                    new_session.node_sync_status = {
                        nid: {"status": "pending", "last_sync": None}
                        for nid in default_nodes
                    }
                    db.commit()
                    db.refresh(new_session)

                    # Notify live nodes that they've been attached
                    registry = services.node_registry_service
                    for nid in default_nodes:
                        try:
                            registry.emit(nid, "info", {
                                "message": f"Auto-attached to session {new_session.id}",
                                "workspace_id": new_session.sync_workspace_id,
                            })
                        except Exception:
                            pass

            return new_session
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Failed to create session: {e}")


    @router.post("/{session_id}/chat", response_model=schemas.ChatResponse, summary="Send a Message in a Session")
    async def chat_in_session(
        session_id: int,
        request: schemas.ChatRequest,
        db: Session = Depends(get_db)
    ):
        try:
            response_text, provider_used, message_id = await services.rag_service.chat_with_rag(
                db=db,
                session_id=session_id,
                prompt=request.prompt,
                provider_name=request.provider_name,
                load_faiss_retriever=request.load_faiss_retriever,
                user_service=services.user_service
            )
            return schemas.ChatResponse(answer=response_text, provider_used=provider_used, message_id=message_id)
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"An error occurred during chat: {e}")

    @router.get("/{session_id}/messages", response_model=schemas.MessageHistoryResponse, summary="Get Session Chat History")
    def get_session_messages(session_id: int, db: Session = Depends(get_db)):
        try:
            messages = services.rag_service.get_message_history(db=db, session_id=session_id)
            if messages is None:
                raise HTTPException(status_code=404, detail=f"Session with ID {session_id} not found.")

            # Enhance messages with audio availability
            enhanced_messages = []
            for m in messages:
                msg_dict = schemas.Message.model_validate(m).model_dump()
                if m.audio_path and os.path.exists(m.audio_path):
                    msg_dict["has_audio"] = True
                    msg_dict["audio_url"] = f"/sessions/messages/{m.id}/audio"
                enhanced_messages.append(msg_dict)

            return schemas.MessageHistoryResponse(session_id=session_id, messages=enhanced_messages)
        except HTTPException:
            raise
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"An error occurred: {e}")
            
    @router.get("/{session_id}/tokens", response_model=schemas.SessionTokenUsageResponse, summary="Get Session Token Usage")
    def get_session_token_usage(session_id: int, db: Session = Depends(get_db)):
        try:
            session = db.query(models.Session).filter(models.Session.id == session_id).first()
            if not session:
                raise HTTPException(status_code=404, detail=f"Session with ID {session_id} not found.")

            messages = services.rag_service.get_message_history(db=db, session_id=session_id)
            combined_text = " ".join([m.content for m in messages])
            
            # Resolve dynamic token limit from model info
            from app.core.providers.factory import get_model_limit
            token_limit = get_model_limit(session.provider_name)
            
            validator = Validator(token_limit=token_limit)
            token_count = len(validator.encoding.encode(combined_text))
            percentage = round((token_count / token_limit) * 100, 2) if token_limit > 0 else 0.0

            return schemas.SessionTokenUsageResponse(
                token_count=token_count,
                token_limit=token_limit,
                percentage=percentage
            )
        except HTTPException:
            raise
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"An error occurred: {e}")

    @router.get("/", response_model=List[schemas.Session], summary="Get All Chat Sessions")
    def get_sessions(
        user_id: str,
        feature_name: str = "default",
        db: Session = Depends(get_db)
    ):
        try:
            sessions = db.query(models.Session).filter(
                models.Session.user_id == user_id,
                models.Session.feature_name == feature_name,
                models.Session.is_archived == False
            ).order_by(models.Session.created_at.desc()).all()
            return sessions
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Failed to fetch sessions: {e}")

    @router.get("/{session_id}", response_model=schemas.Session, summary="Get a Single Session")
    def get_session(session_id: int, db: Session = Depends(get_db)):
        try:
            session = db.query(models.Session).filter(
                models.Session.id == session_id,
                models.Session.is_archived == False
            ).first()
            if not session:
                raise HTTPException(status_code=404, detail="Session not found.")
            return session
        except HTTPException:
            raise
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Failed to fetch session: {e}")

    @router.patch("/{session_id}", response_model=schemas.Session, summary="Update a Chat Session")
    def update_session(session_id: int, session_update: schemas.SessionUpdate, db: Session = Depends(get_db)):
        try:
            session = db.query(models.Session).filter(
                models.Session.id == session_id,
                models.Session.is_archived == False
            ).first()
            if not session:
                raise HTTPException(status_code=404, detail="Session not found.")
            
            if session_update.title is not None:
                session.title = session_update.title
            if session_update.provider_name is not None:
                session.provider_name = session_update.provider_name
            if session_update.stt_provider_name is not None:
                session.stt_provider_name = session_update.stt_provider_name
            if session_update.tts_provider_name is not None:
                session.tts_provider_name = session_update.tts_provider_name
                
            db.commit()
            db.refresh(session)
            return session
        except HTTPException:
            raise
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Failed to update session: {e}")

    @router.delete("/{session_id}", summary="Delete a Chat Session")
    def delete_session(session_id: int, db: Session = Depends(get_db)):
        try:
            session = db.query(models.Session).filter(models.Session.id == session_id).first()
            if not session:
                raise HTTPException(status_code=404, detail="Session not found.")
            session.is_archived = True
            db.commit()
            return {"message": "Session deleted successfully."}
        except HTTPException:
            raise
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Failed to delete session: {e}")

    @router.delete("/", summary="Delete All Sessions for Feature")
    def delete_all_sessions(user_id: str, feature_name: str = "default", db: Session = Depends(get_db)):
        try:
            sessions = db.query(models.Session).filter(
                models.Session.user_id == user_id,
                models.Session.feature_name == feature_name
            ).all()
            for session in sessions:
                session.is_archived = True
            db.commit()
            return {"message": "All sessions deleted successfully."}
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Failed to delete all sessions: {e}")

    @router.post("/messages/{message_id}/audio", summary="Upload audio for a specific message")
    async def upload_message_audio(message_id: int, file: UploadFile = File(...), db: Session = Depends(get_db)):
        try:
            message = db.query(models.Message).filter(models.Message.id == message_id).first()
            if not message:
                raise HTTPException(status_code=404, detail="Message not found.")
            
            # Create data directory if not exists
            audio_dir = "/app/data/audio"
            os.makedirs(audio_dir, exist_ok=True)
            
            # Save file
            file_path = f"{audio_dir}/message_{message_id}.wav"
            with open(file_path, "wb") as buffer:
                shutil.copyfileobj(file.file, buffer)
            
            # Update database
            message.audio_path = file_path
            db.commit()
            
            return {"message": "Audio uploaded successfully.", "audio_path": file_path}
        except Exception as e:
            print(f"Error uploading audio: {e}")
            raise HTTPException(status_code=500, detail=f"Failed to upload audio: {e}")

    @router.get("/messages/{message_id}/audio", summary="Get audio for a specific message")
    async def get_message_audio(message_id: int, db: Session = Depends(get_db)):
        try:
            message = db.query(models.Message).filter(models.Message.id == message_id).first()
            if not message or not message.audio_path:
                raise HTTPException(status_code=404, detail="Audio not found for this message.")
            
            if not os.path.exists(message.audio_path):
                 raise HTTPException(status_code=404, detail="Audio file missing on disk.")

            return FileResponse(message.audio_path, media_type="audio/wav")
        except HTTPException:
            raise
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Failed to get audio: {e}")

    # ==================================================================
    # M3: Session ↔ Node Attachment
    # ==================================================================

    @router.post("/{session_id}/nodes", response_model=schemas.SessionNodeStatusResponse,
                 summary="Attach Nodes to Session")
    def attach_nodes_to_session(
        session_id: int,
        request: schemas.NodeAttachRequest,
        db: Session = Depends(get_db)
    ):
        """
        Attach one or more Agent Nodes to a chat session.
        - Generates a stable sync_workspace_id on first attachment (used as Ghost Mirror session_id).
        - Records per-node sync status initialized to 'pending'.
        - Emits sync_push events via the NodeRegistryService event bus.
        """
        import uuid
        from datetime import datetime

        session = db.query(models.Session).filter(
            models.Session.id == session_id,
            models.Session.is_archived == False
        ).first()
        if not session:
            raise HTTPException(status_code=404, detail="Session not found.")

        # Generate stable workspace ID on first attachment
        if not session.sync_workspace_id:
            session.sync_workspace_id = f"session-{session_id}-{uuid.uuid4().hex[:8]}"

        # Merge node list (avoid duplicates)
        current_nodes = list(session.attached_node_ids or [])
        new_nodes = [nid for nid in request.node_ids if nid not in current_nodes]
        current_nodes.extend(new_nodes)
        session.attached_node_ids = current_nodes

        # Initialize sync status for new nodes
        sync_status = dict(session.node_sync_status or {})
        now_iso = datetime.utcnow().isoformat()
        for nid in new_nodes:
            sync_status[nid] = {"status": "pending", "last_sync": None}

            # Emit event to live UI stream
            try:
                services.node_registry_service.emit(
                    nid, "info",
                    {"message": f"Attached to session {session_id}", "workspace_id": session.sync_workspace_id},
                )
            except Exception:
                pass

        session.node_sync_status = sync_status
        db.commit()
        db.refresh(session)

        # Trigger actual workspace sync commands via gRPC
        # We need access to the orchestrator instance from app.state
        from fastapi import Request
        # Note: In a real app we'd use a cleaner injection, but here we'll grab from Request if available
        # or globally if it's a singleton. In this project, it's stored in app.state.
        orchestrator = getattr(db, "_request_app", None) # This depends on how GetDB is implemented
        # Better: just use the registry to get the assistant if possible, 
        # but the assistant lives in the Orchestrator.
        
        # Let's try to get the orchestrator from the registry if we can't get it from the app.
        # Actually, let's just use a global reference or pass it in.
        # For this implementation, I'll use a safer approach:
        try:
            from app.main import app 
            assistant = app.state.orchestrator.assistant
            
            config = request.config or schemas.NodeWorkspaceConfig(source="empty")
            
            for nid in new_nodes:
                if config.source == "server":
                    # Server -> Node: Push everything from workspace
                    assistant.push_workspace(nid, session.sync_workspace_id)
                elif config.source == "node_local":
                    # Node -> Server: Request manifest to start syncing from Node
                    assistant.request_manifest(nid, session.sync_workspace_id, path=config.path or ".")
                    # Also tell it to start watching
                    assistant.control_sync(nid, session.sync_workspace_id, action="START", path=config.path or ".")
                    
        except Exception as e:
            print(f"[⚠️] Failed to trigger session node sync: {e}")

        return schemas.SessionNodeStatusResponse(
            session_id=session_id,
            sync_workspace_id=session.sync_workspace_id,
            nodes=[
                schemas.NodeSyncStatusEntry(
                    node_id=nid,
                    status=sync_status.get(nid, {}).get("status", "pending"),
                    last_sync=sync_status.get(nid, {}).get("last_sync"),
                )
                for nid in session.attached_node_ids
            ]
        )

    @router.delete("/{session_id}/nodes/{node_id}", summary="Detach Node from Session")
    def detach_node_from_session(
        session_id: int,
        node_id: str,
        db: Session = Depends(get_db)
    ):
        session = db.query(models.Session).filter(
            models.Session.id == session_id,
            models.Session.is_archived == False
        ).first()
        if not session:
            raise HTTPException(status_code=404, detail="Session not found.")

        nodes = list(session.attached_node_ids or [])
        if node_id not in nodes:
            raise HTTPException(status_code=404, detail=f"Node '{node_id}' not attached to this session.")

        nodes.remove(node_id)
        session.attached_node_ids = nodes

        status = dict(session.node_sync_status or {})
        status.pop(node_id, None)
        session.node_sync_status = status

        db.commit()
        return {"message": f"Node '{node_id}' detached from session {session_id}."}

    @router.get("/{session_id}/nodes", response_model=schemas.SessionNodeStatusResponse,
                summary="Get Session Node Status")
    def get_session_nodes(session_id: int, db: Session = Depends(get_db)):
        """
        Returns all nodes attached to a session and their current sync status.
        Merges persisted sync_status with live connection state from the registry.
        """
        session = db.query(models.Session).filter(
            models.Session.id == session_id,
            models.Session.is_archived == False
        ).first()
        if not session:
            raise HTTPException(status_code=404, detail="Session not found.")

        registry = services.node_registry_service
        sync_status = session.node_sync_status or {}
        entries = []
        for nid in (session.attached_node_ids or []):
            live = registry.get_node(nid)
            persisted = sync_status.get(nid, {})
            # If node is live and was previously pending, show as 'connected'
            if live and persisted.get("status") == "pending":
                status_val = "connected"
            else:
                status_val = persisted.get("status", "pending")

            entries.append(schemas.NodeSyncStatusEntry(
                node_id=nid,
                status=status_val,
                last_sync=persisted.get("last_sync"),
                error=persisted.get("error"),
            ))

        return schemas.SessionNodeStatusResponse(
            session_id=session_id,
            sync_workspace_id=session.sync_workspace_id,
            nodes=entries,
        )

    return router