from datetime import datetime
from fastapi import APIRouter, HTTPException, Depends, Header, Query, Request, UploadFile, File
from fastapi.responses import RedirectResponse as redirect
from sqlalchemy.orm import Session
from app.config import settings
from app.db import models
from typing import Optional, Annotated
import logging
import os
import time
import secrets
import httpx
import jwt
import urllib.parse
# Short-lived CLI auth state store: state_token → {redirect_uri, expires_at}
_cli_state_store: dict = {}
# Correctly import from your application's schemas and dependencies
from app.api.dependencies import ServiceContainer, get_db, get_current_user, get_optional_user
from app.api import schemas
from app.core.services.user import login_required, verify_password, hash_password
from app.core.grpc.utils.crypto import encrypt_value, decrypt_value
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Derived OIDC Configuration Helpers ---
def get_oidc_urls():
server_url = settings.OIDC_SERVER_URL.rstrip("/")
return {
"auth": f"{server_url}/auth",
"token": f"{server_url}/token",
"userinfo": f"{server_url}/userinfo"
}
# A dependency to simulate getting the current user ID from a request header
def get_current_user_id(x_user_id: Annotated[Optional[str], Header()] = None) -> Optional[str]:
"""
Retrieves the user ID from the X-User-ID header.
This simulates an authentication system and is used by the login_required decorator.
"""
return x_user_id or "anonymous"
def create_users_router(services: ServiceContainer) -> APIRouter:
router = APIRouter(prefix="/users", tags=["Users"])
@router.get("/login", summary="Initiate OIDC Login Flow")
async def login_redirect(
request: Request,
frontend_callback_uri: Optional[str] = Query(None, description="The frontend URI to redirect back to after OIDC provider.")
):
"""
Initiates the OIDC authentication flow.
"""
auth_url = await services.auth_service.generate_login_url(frontend_callback_uri)
return redirect(url=auth_url)
@router.get("/login/callback", summary="Handle OIDC Login Callback")
async def login_callback(
request: Request,
code: str = Query(..., description="Authorization code from OIDC provider"),
state: str = Query(..., description="The original frontend redirect URI"),
db: Session = Depends(get_db)
):
"""
Handles the callback from the OIDC provider.
"""
result = await services.auth_service.handle_callback(code, db)
user_id = result["user_id"]
linked = result.get("linked", False)
# CLI auth flow: state is "cli:<state_token>" — redirect to the CLI's local server
if state.startswith("cli:"):
state_token = state[4:]
entry = _cli_state_store.pop(state_token, None)
if entry and time.time() < entry["expires_at"]:
session_token = services.auth_service.create_session_token(user_id)
user = db.query(models.User).filter(models.User.id == user_id).first()
email = user.email if user else ""
params = urllib.parse.urlencode({"token": session_token, "email": email, "user_id": user_id})
return redirect(url=f"{entry['redirect_uri']}?{params}")
# Expired or unknown state — fall through to dashboard
return redirect(url="/dashboard?error=cli_auth_expired")
# SECURITY: Prevent Open Redirect - Validate 'state' is a safe URL
# Ideally this matches settings.FRONTEND_URL or a whitelist.
safe_url = state
parsed_url = urllib.parse.urlparse(state)
if parsed_url.netloc:
# Absolute URL, verify domain
allowed_domains = ["ai.jerxie.com", "localhost", "127.0.0.1"]
api_domain = urllib.parse.urlparse(str(request.base_url)).netloc
allowed_domains.append(api_domain)
if parsed_url.netloc not in allowed_domains:
logger.warning(f"Prevented potentially malicious open redirect to: {state}")
safe_url = "/dashboard"
frontend_redirect_url = f"{safe_url}?user_id={user_id}"
if linked:
frontend_redirect_url += "&linked=true"
# Include the ID token if available (to allow the frontend to switch to JWT auth)
id_token = result.get("id_token")
if id_token:
frontend_redirect_url += f"&token={id_token}"
return redirect(url=frontend_redirect_url)
@router.get("/auth/cli-init", summary="Initialize CLI Authentication Flow")
async def cli_auth_init(
port: int = Query(..., ge=1024, le=65535, description="Local CLI server port for the callback"),
):
"""
Starts a gcloud-style browser-based login for CLI/MCP config generation.
Returns an auth_url the CLI should open in the browser.
For local-auth-only deployments, returns oidc=false so the CLI falls back to password prompt.
"""
# Purge expired entries to avoid unbounded growth
now = time.time()
expired = [k for k, v in _cli_state_store.items() if now >= v["expires_at"]]
for k in expired:
_cli_state_store.pop(k, None)
state_token = secrets.token_urlsafe(32)
_cli_state_store[state_token] = {
"redirect_uri": f"http://localhost:{port}/callback",
"expires_at": now + 300,
}
if settings.OIDC_ENABLED:
auth_url = await services.auth_service.generate_login_url(f"cli:{state_token}")
return {"auth_url": auth_url, "oidc": True, "expires_in": 300}
# Local auth mode: CLI will POST credentials and exchange state_token for a session token
return {
"auth_url": None,
"oidc": False,
"state_token": state_token,
"login_url": f"{settings.HUB_PUBLIC_URL}/api/v1/users/login/local",
"expires_in": 300,
}
@router.post("/auth/cli-token", summary="Exchange CLI State Token for Session Token")
async def cli_token_exchange(
state_token: str = Query(...),
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
"""
After a successful local-auth login, the CLI exchanges the state_token for a
session JWT scoped to the authenticated user. One-time use, 5-minute TTL.
"""
if not current_user:
raise HTTPException(status_code=401, detail="Authentication required.")
entry = _cli_state_store.pop(state_token, None)
if not entry or time.time() >= entry["expires_at"]:
raise HTTPException(status_code=400, detail="Invalid or expired state token.")
token = services.auth_service.create_session_token(current_user.id)
return {"token": token, "email": current_user.email, "user_id": current_user.id}
@router.get("/config", summary="Public Auth Configuration")
async def get_auth_config():
"""Publicly accessible endpoint to check which auth methods are enabled."""
return {
"oidc_configured": settings.OIDC_ENABLED,
"allow_password_login": settings.ALLOW_PASSWORD_LOGIN
}
@router.get("/me", response_model=schemas.UserStatus, summary="Get Current User Status")
async def get_current_status(
db: Session = Depends(get_db),
current_user: Optional[models.User] = Depends(get_optional_user)
):
"""
Checks the login status of the current user.
"""
if not current_user:
return schemas.UserStatus(
id="anonymous",
email="anonymous",
is_logged_in=False,
oidc_configured=settings.OIDC_ENABLED,
allow_password_login=settings.ALLOW_PASSWORD_LOGIN
)
try:
if current_user and not current_user.password_hash and not settings.OIDC_ENABLED:
raise HTTPException(status_code=403, detail="Account disabled: OIDC is inactive and no password is set.")
return schemas.UserStatus(
id=current_user.id,
email=current_user.email,
is_logged_in=True,
is_anonymous=False,
oidc_configured=settings.OIDC_ENABLED,
allow_password_login=settings.ALLOW_PASSWORD_LOGIN
)
except HTTPException as he:
raise he
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred: {e}")
@router.post("/login/local", summary="Local Authentication Fallback")
async def login_local(
request: schemas.LocalLoginRequest,
db: Session = Depends(get_db)
):
"""Day 1: Local Username/Password Login."""
# Strict enforcement of security policy: If password login is disabled, reject all attempts
if not settings.ALLOW_PASSWORD_LOGIN and os.getenv("DEVELOPER_MODE") != "true":
raise HTTPException(status_code=403, detail="Password-based login is disabled. Please use OIDC/SSO.")
user = db.query(models.User).filter(models.User.email == request.email).first()
if not user or not user.password_hash:
raise HTTPException(status_code=401, detail="Invalid email or password")
if not verify_password(request.password, user.password_hash):
raise HTTPException(status_code=401, detail="Invalid email or password")
user.last_login_at = datetime.utcnow()
db.commit()
# Issue a signed session token so the hardened API accepts this local user
token = services.auth_service.create_session_token(user.id)
return {
"user_id": user.id,
"email": user.email,
"role": user.role,
"token": token
}
@router.put("/password", summary="Update User Password")
async def update_password(
request: schemas.PasswordUpdateRequest,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
if not current_user:
raise HTTPException(status_code=401, detail="Unauthorized")
if current_user.password_hash and not verify_password(request.current_password, current_user.password_hash):
raise HTTPException(status_code=400, detail="Invalid current password")
current_user.password_hash = hash_password(request.new_password)
db.commit()
return {"status": "success", "message": "Password updated successfully"}
@router.get("/me/profile", response_model=schemas.UserProfile, summary="Get Current User Profile")
async def get_user_profile(
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
"""Retrieves profile information for the current user."""
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
response = schemas.UserProfile.model_validate(user)
if user.group:
response.group_name = user.group.name
return response
@router.put("/me/profile", response_model=schemas.UserProfile, summary="Update User Profile")
async def update_user_profile(
profile_data: schemas.UserProfileUpdate,
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
"""Updates profile details for the current user."""
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if profile_data.username: user.username = profile_data.username
if profile_data.full_name: user.full_name = profile_data.full_name
if profile_data.avatar_url: user.avatar_url = profile_data.avatar_url
db.add(user)
db.commit()
db.refresh(user)
response = schemas.UserProfile.model_validate(user)
if user.group:
response.group_name = user.group.name
return response
@router.get("/me/config", response_model=schemas.ConfigResponse, summary="Get Current User Preferences")
async def get_user_config(
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
"""Gets user specific preferences (LLM, TTS, STT config overrides)."""
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return services.preference_service.merge_user_config(user, db)
@router.put("/me/config", response_model=schemas.UserPreferences, summary="Update Current User Preferences")
async def update_user_config(
prefs: schemas.UserPreferences,
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
"""Updates user specific preferences."""
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return services.preference_service.update_user_config(user, prefs, db)
@router.get("/me/config/models", response_model=list[schemas.ModelInfoResponse], summary="Get Models for Provider")
async def get_provider_models(provider_name: str, section: str = "llm"):
return await services.preference_service.get_provider_models(provider_name, section)
@router.get("/me/config/providers", response_model=list[str], summary="Get All Valid Providers per Section")
async def get_all_providers(
section: str = "llm",
configured_only: bool = Query(False, description="If true, only returns providers currently configured in preferences or system defaults"),
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
return services.preference_service.get_all_providers(db, user, section, configured_only)
@router.post("/me/config/verify_llm", response_model=schemas.VerifyProviderResponse)
async def verify_llm(
req: schemas.VerifyProviderRequest,
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user: raise HTTPException(status_code=404, detail="User not found")
return await services.preference_service.verify_provider(db, user, req, "llm")
@router.post("/me/config/verify_tts", response_model=schemas.VerifyProviderResponse)
async def verify_tts(
req: schemas.VerifyProviderRequest,
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user: raise HTTPException(status_code=404, detail="User not found")
return await services.preference_service.verify_provider(db, user, req, "tts")
@router.post("/me/config/verify_stt", response_model=schemas.VerifyProviderResponse)
async def verify_stt(
req: schemas.VerifyProviderRequest,
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user: raise HTTPException(status_code=404, detail="User not found")
return await services.preference_service.verify_provider(db, user, req, "stt")
@router.post("/logout", summary="Log Out the Current User")
async def logout():
"""
Simulates a user logout. In a real application, this would clear the session token or cookie.
"""
return {"message": "Logged out successfully"}
@router.get("/me/config/export", summary="Export Configurations to YAML")
async def export_user_config_yaml(
reveal_secrets: bool = Query(False, description="Reveal secrets in export"),
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
"""Exports the effective user configuration as a YAML file (Admin only)."""
from fastapi.responses import PlainTextResponse
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user or user.role != "admin":
raise HTTPException(status_code=403, detail="Forbidden: Admin only")
yaml_str = services.preference_service.export_config_yaml(user, reveal_secrets)
return PlainTextResponse(content=yaml_str, media_type="application/x-yaml",
headers={"Content-Disposition": "attachment; filename=\"cortex_config.yaml\""})
@router.post("/me/config/import", response_model=schemas.UserPreferences, summary="Import Configurations from YAML")
async def import_user_config_yaml(
file: UploadFile = File(...),
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
"""Imports user configuration from a YAML file."""
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user: raise HTTPException(status_code=404, detail="User not found")
content = await file.read()
return await services.preference_service.import_config_yaml(db, user, content)
# --- NEW ADMIN ROUTES ---
@router.get("/admin/users", response_model=list[schemas.UserProfile], summary="List All Users (Admin Only)")
async def admin_list_users(
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
"""Returns a list of all registered users in the system."""
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user or user.role != "admin":
raise HTTPException(status_code=403, detail="Forbidden: Admin only")
users = services.user_service.get_all_users(db)
response = []
for u in users:
p = schemas.UserProfile.model_validate(u)
if u.group:
p.group_name = u.group.name
response.append(p)
return response
@router.put("/admin/users/{uid}/role", response_model=schemas.UserProfile, summary="Update User Role (Admin Only)")
async def admin_update_role(
uid: str,
role_req: schemas.UserRoleUpdate,
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
"""Updates a user's role. Prevents demoting the last administrator."""
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user or user.role != "admin":
raise HTTPException(status_code=403, detail="Forbidden: Admin only")
success = services.user_service.update_user_role(db, uid, role_req.role)
if not success:
raise HTTPException(status_code=400, detail="Failed to update role. Maybe this is the last admin?")
return services.user_service.get_user_by_id(db, uid)
@router.put("/admin/users/{uid}/group", response_model=schemas.UserProfile, summary="Update User Group (Admin Only)")
async def admin_update_user_group(
uid: str,
group_req: schemas.UserGroupUpdate,
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
"""Assigns a user to a group."""
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user or user.role != "admin":
raise HTTPException(status_code=403, detail="Forbidden: Admin only")
success = services.user_service.assign_user_to_group(db, uid, group_req.group_id)
if not success:
raise HTTPException(status_code=404, detail="User or group not found")
return services.user_service.get_user_by_id(db, uid)
@router.get("/admin/groups", response_model=list[schemas.GroupInfo], summary="List All Groups (Admin Only)")
async def admin_list_groups(
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
"""Returns all existing groups."""
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user or user.role != "admin":
raise HTTPException(status_code=403, detail="Forbidden: Admin only")
# Explicitly convert to Pydantic models within the session scope
# to prevent SQLAlchemy lazy-loading issues in async context.
groups = services.user_service.get_all_groups(db)
return [schemas.GroupInfo.model_validate(g) for g in groups]
@router.post("/admin/groups", response_model=schemas.GroupInfo, summary="Create Group (Admin Only)")
async def admin_create_group(
group_req: schemas.GroupCreate,
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
"""Creates a new group."""
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user or user.role != "admin":
raise HTTPException(status_code=403, detail="Forbidden: Admin only")
group = services.user_service.create_group(db, group_req.name, group_req.description, group_req.policy)
if group is None:
raise HTTPException(status_code=409, detail=f"A group named '{group_req.name}' already exists. Please choose a unique name.")
return schemas.GroupInfo.model_validate(group)
@router.put("/admin/groups/{gid}", response_model=schemas.GroupInfo, summary="Update Group (Admin Only)")
async def admin_update_group(
gid: str,
group_req: schemas.GroupUpdate,
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
"""Updates a group's metadata or policy."""
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user or user.role != "admin":
raise HTTPException(status_code=403, detail="Forbidden: Admin only")
# The 'ungrouped' group cannot be renamed — only its policy can be updated
if gid == "ungrouped" and group_req.name and group_req.name.strip().lower() != "ungrouped":
raise HTTPException(status_code=403, detail="The default 'Ungrouped' group cannot be renamed.")
group = services.user_service.update_group(db, gid, group_req.name, group_req.description, group_req.policy)
if group is None:
raise HTTPException(status_code=404, detail="Group not found")
if group is False:
raise HTTPException(status_code=409, detail=f"A group named '{group_req.name}' already exists. Please choose a unique name.")
return schemas.GroupInfo.model_validate(group)
@router.delete("/admin/groups/{gid}", summary="Delete Group (Admin Only)")
async def admin_delete_group(
gid: str,
db: Session = Depends(get_db),
user_id: str = Depends(get_current_user_id)
):
"""Deletes a group. Users are moved back to 'ungrouped'."""
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
user = services.user_service.get_user_by_id(db=db, user_id=user_id)
if not user or user.role != "admin":
raise HTTPException(status_code=403, detail="Forbidden: Admin only")
# Cannot delete the system default group
if gid == "ungrouped":
raise HTTPException(status_code=403, detail="The default 'Ungrouped' group cannot be deleted.")
success = services.user_service.delete_group(db, gid)
if not success:
raise HTTPException(status_code=400, detail="Failed to delete group.")
return {"message": "Group deleted successfully"}
return router