from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from typing import List, Optional
from app.db import models
from app.api import schemas
from app.api.dependencies import ServiceContainer, get_current_user, get_db
def create_skills_router(services: ServiceContainer) -> APIRouter:
router = APIRouter(prefix="/skills", tags=["Skills"])
@router.get("/", response_model=List[schemas.SkillResponse])
def list_skills(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
feature: Optional[str] = Query(None, description="Filter skills by feature (e.g., 'chat', 'voice')")
):
"""List all skills accessible to the user."""
# Start queries
system_query = db.query(models.Skill).filter(models.Skill.is_system == True)
user_query = db.query(models.Skill).filter(
models.Skill.owner_id == current_user.id,
models.Skill.is_system == False
)
# Policy: Only show enabled skills to non-admins
if current_user.role != 'admin':
system_query = system_query.filter(models.Skill.is_enabled == True)
user_query = user_query.filter(models.Skill.is_enabled == True)
# Target feature filtering (PostgreSQL JSONB contains or standard JSON)
if feature:
# Using standard list comparison as fallback or JSONB contains
system_skills = [s for s in system_query.all() if feature in (s.features or [])]
user_skills = [s for s in user_query.all() if feature in (s.features or [])]
else:
system_skills = system_query.all()
user_skills = user_query.all()
# Skills shared with the user's group via Group Policy
group_skills = []
if current_user.group and current_user.group.policy:
group_skill_names = current_user.group.policy.get("skills", [])
if group_skill_names:
g_query = db.query(models.Skill).filter(
models.Skill.name.in_(group_skill_names),
models.Skill.owner_id != current_user.id,
models.Skill.is_system == False
)
if current_user.role != 'admin':
g_query = g_query.filter(models.Skill.is_enabled == True)
if feature:
group_skills = [s for s in g_query.all() if feature in (s.features or [])]
else:
group_skills = g_query.all()
return system_skills + user_skills + group_skills
@router.post("/", response_model=schemas.SkillResponse)
def create_skill(
skill: schemas.SkillCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Create a new skill."""
existing = db.query(models.Skill).filter(models.Skill.name == skill.name).first()
if existing:
raise HTTPException(status_code=400, detail="Skill with this name already exists")
db_skill = models.Skill(
name=skill.name,
description=skill.description,
skill_type=skill.skill_type,
config=skill.config,
system_prompt=skill.system_prompt,
is_enabled=skill.is_enabled,
features=skill.features,
owner_id=current_user.id,
is_system=skill.is_system if current_user.role == 'admin' else False
)
db.add(db_skill)
db.commit()
db.refresh(db_skill)
return db_skill
@router.put("/{skill_id}", response_model=schemas.SkillResponse)
def update_skill(
skill_id: int,
skill_update: schemas.SkillUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Update an existing skill. User must be admin or the owner."""
db_skill = db.query(models.Skill).filter(models.Skill.id == skill_id).first()
if not db_skill:
raise HTTPException(status_code=404, detail="Skill not found")
if db_skill.owner_id != current_user.id and current_user.role != 'admin':
raise HTTPException(status_code=403, detail="Not authorized to update this skill")
if skill_update.name is not None and skill_update.name != db_skill.name:
existing = db.query(models.Skill).filter(models.Skill.name == skill_update.name).first()
if existing:
raise HTTPException(status_code=400, detail="Skill with this name already exists")
for key, value in skill_update.model_dump(exclude_unset=True).items():
if key == 'is_system' and current_user.role != 'admin':
continue
setattr(db_skill, key, value)
db.commit()
db.refresh(db_skill)
return db_skill
@router.delete("/{skill_id}")
def delete_skill(
skill_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Delete a skill."""
db_skill = db.query(models.Skill).filter(models.Skill.id == skill_id).first()
if not db_skill:
raise HTTPException(status_code=404, detail="Skill not found")
if db_skill.owner_id != current_user.id and current_user.role != 'admin':
raise HTTPException(status_code=403, detail="Not authorized to delete this skill")
db.delete(db_skill)
db.commit()
return {"message": "Skill deleted"}
return router