"""Web authentication routes for blog application. This module provides OAuth2/OIDC authentication flow with Keycloak for the web UI. Uses HTTP-only cookies for token storage. """ from typing import Any import httpx from fastapi import APIRouter, HTTPException, Request, Response from fastapi.responses import RedirectResponse from app.infrastructure.config.settings import settings router = APIRouter(prefix="/auth", tags=["auth"]) def get_keycloak_login_url(redirect_uri: str) -> str: """Build Keycloak authorization URL. Args: redirect_uri: Callback URL after Keycloak authentication. Returns: Full Keycloak authorization endpoint URL. """ base_url = f"{settings.kc.server_url}/realms/{settings.kc.realm}" return ( f"{base_url}/protocol/openid-connect/auth" f"?client_id={settings.kc.client_id}" f"&response_type=code" f"&redirect_uri={redirect_uri}" f"&scope=openid" ) def get_keycloak_logout_url(redirect_uri: str) -> str: """Build Keycloak logout URL. Args: redirect_uri: URL to redirect after logout. Returns: Full Keycloak logout endpoint URL. """ base_url = f"{settings.kc.server_url}/realms/{settings.kc.realm}" return ( f"{base_url}/protocol/openid-connect/logout" f"?client_id={settings.kc.client_id}" f"&post_logout_redirect_uri={redirect_uri}" ) async def exchange_code_for_token(code: str, redirect_uri: str) -> dict[str, Any]: """Exchange authorization code for access token. Args: code: Authorization code from Keycloak. redirect_uri: Callback URL used during login. Returns: Token response containing access_token, refresh_token, etc. Raises: HTTPException: If token exchange fails. """ token_url = f"{settings.kc.server_url}/realms/{settings.kc.realm}/protocol/openid-connect/token" data = { "grant_type": "authorization_code", "code": code, "client_id": settings.kc.client_id, "client_secret": settings.kc.client_secret, "redirect_uri": redirect_uri, } async with httpx.AsyncClient() as client: response = await client.post(token_url, data=data) if response.status_code != 200: raise HTTPException(status_code=400, detail="Failed to exchange code for token") result: dict[str, Any] = response.json() return result @router.get("/login") async def login(request: Request) -> RedirectResponse: """Redirect to Keycloak login page. Args: request: HTTP request object. Returns: RedirectResponse to Keycloak authorization endpoint. """ callback_url = str(request.base_url).rstrip("/") + "/auth/callback" login_url = get_keycloak_login_url(callback_url) return RedirectResponse(url=login_url) @router.get("/callback") async def callback(request: Request, code: str | None = None) -> Response: """Handle OAuth callback from Keycloak. Exchanges authorization code for tokens and sets HTTP-only cookie. Args: request: HTTP request object. code: Authorization code from Keycloak. Returns: RedirectResponse to home page with token cookie set. Raises: HTTPException: If code is missing or token exchange fails. """ if not code: raise HTTPException(status_code=400, detail="Authorization code not provided") callback_url = str(request.base_url).rstrip("/") + "/auth/callback" token_data = await exchange_code_for_token(code, callback_url) access_token = token_data.get("access_token") if not access_token: raise HTTPException(status_code=400, detail="No access token received") response = RedirectResponse(url="/web/", status_code=302) response.set_cookie( key="access_token", value=access_token, httponly=True, secure=not settings.is_dev, # Secure in production samesite="lax", max_age=token_data.get("expires_in", 3600), ) return response @router.get("/logout") async def logout(request: Request) -> Response: """Logout user and clear token cookie. Args: request: HTTP request object. Returns: RedirectResponse to Keycloak logout with cookie cleared. """ home_url = str(request.base_url).rstrip("/") + "/web/" logout_url = get_keycloak_logout_url(home_url) response = RedirectResponse(url=logout_url) response.delete_cookie(key="access_token") return response