"""Web authentication routes for blog application. This module provides OAuth2/OIDC authentication flow with Keycloak for the web UI. Uses HTTP-only cookies for token storage. """ from typing import Any import httpx from fastapi import APIRouter, HTTPException, Request, Response from fastapi.responses import RedirectResponse from app.infrastructure.config.settings import settings router = APIRouter(prefix="/auth", tags=["auth"]) def get_keycloak_login_url(redirect_uri: str) -> str: """Build Keycloak authorization URL. Args: redirect_uri: Callback URL after Keycloak authentication. Returns: Full Keycloak authorization endpoint URL. """ base_url = f"{settings.kc.server_url}/realms/{settings.kc.realm}" return ( f"{base_url}/protocol/openid-connect/auth" f"?client_id={settings.kc.client_id}" f"&response_type=code" f"&redirect_uri={redirect_uri}" f"&scope=openid" ) def get_keycloak_logout_url(redirect_uri: str) -> str: """Build Keycloak logout URL. Args: redirect_uri: URL to redirect after logout. Returns: Full Keycloak logout endpoint URL. """ base_url = f"{settings.kc.server_url}/realms/{settings.kc.realm}" return ( f"{base_url}/protocol/openid-connect/logout" f"?client_id={settings.kc.client_id}" f"&post_logout_redirect_uri={redirect_uri}" ) async def exchange_code_for_token(code: str, redirect_uri: str) -> dict[str, Any]: """Exchange authorization code for access token. Args: code: Authorization code from Keycloak. redirect_uri: Callback URL used during login. Returns: Token response containing access_token, refresh_token, etc. Raises: HTTPException: If token exchange fails. """ token_url = f"{settings.kc.server_url}/realms/{settings.kc.realm}/protocol/openid-connect/token" data = { "grant_type": "authorization_code", "code": code, "client_id": settings.kc.client_id, "client_secret": settings.kc.client_secret, "redirect_uri": redirect_uri, } async with httpx.AsyncClient() as client: response = await client.post(token_url, data=data) if response.status_code != 200: raise HTTPException(status_code=400, detail="Failed to exchange code for token") result: dict[str, Any] = response.json() return result @router.get("/login") async def login(request: Request) -> RedirectResponse: """Redirect to Keycloak login page or dev login in development mode. In development mode redirects to the local dev login page instead of the external Keycloak server. Args: request: HTTP request object. Returns: RedirectResponse to Keycloak or dev login endpoint. """ if settings.is_dev: return RedirectResponse(url="/auth/dev-login") callback_url = str(request.base_url).rstrip("/") + "/auth/callback" login_url = get_keycloak_login_url(callback_url) return RedirectResponse(url=login_url) @router.get("/callback") async def callback(request: Request, code: str | None = None) -> Response: """Handle OAuth callback from Keycloak. Exchanges authorization code for tokens and sets HTTP-only cookie. Args: request: HTTP request object. code: Authorization code from Keycloak. Returns: RedirectResponse to home page with token cookie set. Raises: HTTPException: If code is missing or token exchange fails. """ if not code: raise HTTPException(status_code=400, detail="Authorization code not provided") callback_url = str(request.base_url).rstrip("/") + "/auth/callback" token_data = await exchange_code_for_token(code, callback_url) access_token = token_data.get("access_token") if not access_token: raise HTTPException(status_code=400, detail="No access token received") response = RedirectResponse(url="/web/", status_code=302) response.set_cookie( key="access_token", value=access_token, httponly=True, secure=not settings.is_dev, # Secure in production samesite="lax", max_age=token_data.get("expires_in", 3600), ) return response @router.get("/logout") async def logout(request: Request) -> Response: """Logout user and clear token cookie. In development mode redirects directly to home page. In production redirects to Keycloak logout endpoint. Args: request: HTTP request object. Returns: RedirectResponse with cookie cleared. """ home_url = str(request.base_url).rstrip("/") + "/web/" response = RedirectResponse(url=home_url) response.delete_cookie(key="access_token") if not settings.is_dev: logout_url = get_keycloak_logout_url(home_url) response = RedirectResponse(url=logout_url) response.delete_cookie(key="access_token") return response @router.get("/dev-login") async def dev_login(request: Request) -> Response: """Show dev login page for development mode. Only available in development mode. Provides a simple form to select role and log in without a real Keycloak server. Args: request: HTTP request object. Returns: HTMLResponse with dev login form. Raises: HTTPException: If accessed outside development mode. """ if not settings.is_dev: raise HTTPException(status_code=404, detail="Not found") from fastapi.responses import HTMLResponse return HTMLResponse( content=""" Dev Login - Blog

Development Login

DEV ONLY

This bypasses Keycloak for local development only.

""" ) @router.post("/dev-login") async def dev_login_submit(request: Request) -> Response: """Handle dev login form submission. Sets a dev-specific cookie that MockKeycloakClient recognizes. Args: request: HTTP request object with form data. Returns: RedirectResponse to home page with dev token cookie set. Raises: HTTPException: If accessed outside development mode. """ if not settings.is_dev: raise HTTPException(status_code=404, detail="Not found") form = await request.form() role = str(form.get("role", "user")).strip() token = f"dev-token-{role}" response = RedirectResponse(url="/web/", status_code=302) response.set_cookie( key="access_token", value=token, httponly=True, secure=False, samesite="lax", max_age=86400, ) return response