from fastapi import APIRouter, HTTPException, Depends, Header, Query, Request from fastapi.responses import RedirectResponse as redirect from sqlalchemy.orm import Session from app.db import models from typing import Optional, Annotated import logging import os import requests import jwt # Correctly import from your application's schemas and dependencies from app.api.dependencies import ServiceContainer, get_db from app.api import schemas from app.core.services.user import login_required # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Minimum OIDC configuration from environment variables OIDC_CLIENT_ID = os.getenv("OIDC_CLIENT_ID", "") OIDC_CLIENT_SECRET = os.getenv("OIDC_CLIENT_SECRET", "") OIDC_SERVER_URL = os.getenv("OIDC_SERVER_URL", "") OIDC_REDIRECT_URI = os.getenv("OIDC_REDIRECT_URI", "") # --- Derived OIDC Configuration --- OIDC_AUTHORIZATION_URL = f"{OIDC_SERVER_URL}/auth" OIDC_TOKEN_URL = f"{OIDC_SERVER_URL}/token" OIDC_USERINFO_URL = f"{OIDC_SERVER_URL}/userinfo" # A dependency to simulate getting the current user ID from a request header def get_current_user_id(x_user_id: Annotated[Optional[str], Header()] = None) -> Optional[str]: """ Retrieves the user ID from the X-User-ID header. This simulates an authentication system and is used by the login_required decorator. """ return x_user_id def create_users_router(services: ServiceContainer) -> APIRouter: router = APIRouter(prefix="/users", tags=["Users"]) def create_users_router(services: ServiceContainer) -> APIRouter: router = APIRouter(prefix="/users", tags=["Users"]) @router.get("/login", summary="Initiate OIDC Login Flow") async def login_redirect( request: Request, # Allow the frontend to provide its callback URL frontend_callback_uri: Optional[str] = Query(None, description="The frontend URI to redirect back to after OIDC provider.") ): """ Initiates the OIDC authentication flow. The `frontend_callback_uri` specifies where the user should be redirected after successful authentication with the OIDC provider. """ # Store the frontend_callback_uri in a session or a cache, # linked to the state parameter for security. # For simplicity, we will pass it as a query parameter in the callback. # A more robust solution would use a state parameter. # The OIDC provider must redirect to a URL known to the backend. # So we redirect to a backend endpoint, which in turn redirects to the frontend. auth_url = ( f"{OIDC_AUTHORIZATION_URL}?" f"response_type=code&" f"scope=openid%20profile%20email&" f"client_id={OIDC_CLIENT_ID}&" f"redirect_uri={OIDC_REDIRECT_URI}&" f"state={frontend_callback_uri}" # Pass the frontend URI in the state parameter ) logger.debug(f"Redirecting to OIDC authorization URL: {auth_url}") return redirect(url=auth_url) @router.get("/login/callback", summary="Handle OIDC Login Callback") async def login_callback( request: Request, code: str = Query(..., description="Authorization code from OIDC provider"), state: str = Query(..., description="The original frontend redirect URI"), db: Session = Depends(get_db) ): """ Handles the callback from the OIDC provider, exchanges the code for tokens, and then redirects the user back to the frontend with the user data or a session token. """ logger.debug(f"Received callback with authorization code: {code}") try: # Step 1: Exchange the authorization code for an access token and an ID token token_data = { "grant_type": "authorization_code", "code": code, "redirect_uri": OIDC_REDIRECT_URI, "client_id": OIDC_CLIENT_ID, "client_secret": OIDC_CLIENT_SECRET, } token_response = requests.post(OIDC_TOKEN_URL, data=token_data) token_response.raise_for_status() response_json = token_response.json() id_token = response_json.get("id_token") if not id_token: logger.error("Error: ID token not found.") raise HTTPException(status_code=400, detail="Failed to get ID token.") # Step 2: Decode the ID token to get user information decoded_id_token = jwt.decode(id_token, options={"verify_signature": False}) oidc_id = decoded_id_token.get("sub") email = decoded_id_token.get("email") username = decoded_id_token.get("name") if not all([oidc_id, email, username]): logger.error("Error: Essential user data missing.") raise HTTPException(status_code=400, detail="Essential user data missing.") # Step 3: Save the user and get their unique ID user_id = services.user_service.save_user( db=db, oidc_id=oidc_id, email=email, username=username ) # Step 4: Redirect back to the frontend, passing the user_id or a session token # Note: This is a simplification. A real app would set a secure HTTP-only cookie. # We are passing the user_id as a query parameter for demonstration. frontend_redirect_url = f"{state}?user_id={user_id}" return redirect(url=frontend_redirect_url) except requests.exceptions.RequestException as e: logger.error(f"Token exchange error: {e}") raise HTTPException(status_code=500, detail=f"Failed to communicate with OIDC provider: {e}") except jwt.JWTDecodeError as e: logger.error(f"ID token decode error: {e}") raise HTTPException(status_code=400, detail="Failed to decode ID token.") except Exception as e: logger.error(f"An unexpected error occurred: {e}") raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {e}") @router.get("/me", response_model=schemas.UserStatus, summary="Get Current User Status") async def get_current_status( db: Session = Depends(get_db), user_id: str = Depends(get_current_user_id) ): """ Checks the login status of the current user. Requires a valid user_id to be present in the request header. """ try: # In a real-world scenario, you would fetch user details from the DB using user_id # For this example, we return a mock response based on the presence of user_id user : Optional[models.User] = services.user_service.get_user_by_id(db=db, user_id=user_id) # Ensure user exists email = user.email if user else None is_anonymous = user is None is_logged_in = user is not None return schemas.UserStatus( id=user_id, email=email, is_logged_in=is_logged_in, is_anonymous=is_anonymous ) except Exception as e: raise HTTPException(status_code=500, detail=f"An error occurred: {e}") @router.post("/logout", summary="Log Out the Current User") async def logout(): """ Simulates a user logout. In a real application, this would clear the session token or cookie. """ return {"message": "Logged out successfully"} return router