Files
blog.pyaqa.ru/app/presentation/web/auth.py
Sergey Vanyushkin 0cb706e54b feat(auth): implement web authentication with Keycloak OAuth2
- Add auth routes: /auth/login, /auth/callback, /auth/logout
- Add OAuth2 flow with Keycloak using HTTP-only cookies
- Add web auth dependencies with role checking
- Add profile page (read-only) at /web/profile
- Update header with user menu (sign in/out, profile)
- Filter posts based on user permissions (hide drafts from guests)
- Conditionally show/hide create/edit/delete buttons
- Add authorization rules documentation to AGENTS.md
- Secure post editing/deletion endpoints with auth checks
- Add can_edit, can_delete flags to templates
2026-05-02 15:39:49 +03:00

158 lines
4.4 KiB
Python

"""Web authentication routes for blog application.
This module provides OAuth2/OIDC authentication flow with Keycloak
for the web UI. Uses HTTP-only cookies for token storage.
"""
from typing import Any
import httpx
from fastapi import APIRouter, HTTPException, Request, Response
from fastapi.responses import RedirectResponse
from app.infrastructure.config.settings import settings
router = APIRouter(prefix="/auth", tags=["auth"])
def get_keycloak_login_url(redirect_uri: str) -> str:
"""Build Keycloak authorization URL.
Args:
redirect_uri: Callback URL after Keycloak authentication.
Returns:
Full Keycloak authorization endpoint URL.
"""
base_url = f"{settings.kc.server_url}/realms/{settings.kc.realm}"
return (
f"{base_url}/protocol/openid-connect/auth"
f"?client_id={settings.kc.client_id}"
f"&response_type=code"
f"&redirect_uri={redirect_uri}"
f"&scope=openid"
)
def get_keycloak_logout_url(redirect_uri: str) -> str:
"""Build Keycloak logout URL.
Args:
redirect_uri: URL to redirect after logout.
Returns:
Full Keycloak logout endpoint URL.
"""
base_url = f"{settings.kc.server_url}/realms/{settings.kc.realm}"
return (
f"{base_url}/protocol/openid-connect/logout"
f"?client_id={settings.kc.client_id}"
f"&post_logout_redirect_uri={redirect_uri}"
)
async def exchange_code_for_token(code: str, redirect_uri: str) -> dict[str, Any]:
"""Exchange authorization code for access token.
Args:
code: Authorization code from Keycloak.
redirect_uri: Callback URL used during login.
Returns:
Token response containing access_token, refresh_token, etc.
Raises:
HTTPException: If token exchange fails.
"""
token_url = f"{settings.kc.server_url}/realms/{settings.kc.realm}/protocol/openid-connect/token"
data = {
"grant_type": "authorization_code",
"code": code,
"client_id": settings.kc.client_id,
"client_secret": settings.kc.client_secret,
"redirect_uri": redirect_uri,
}
async with httpx.AsyncClient() as client:
response = await client.post(token_url, data=data)
if response.status_code != 200:
raise HTTPException(status_code=400, detail="Failed to exchange code for token")
result: dict[str, Any] = response.json()
return result
@router.get("/login")
async def login(request: Request) -> RedirectResponse:
"""Redirect to Keycloak login page.
Args:
request: HTTP request object.
Returns:
RedirectResponse to Keycloak authorization endpoint.
"""
callback_url = str(request.base_url).rstrip("/") + "/auth/callback"
login_url = get_keycloak_login_url(callback_url)
return RedirectResponse(url=login_url)
@router.get("/callback")
async def callback(request: Request, code: str | None = None) -> Response:
"""Handle OAuth callback from Keycloak.
Exchanges authorization code for tokens and sets HTTP-only cookie.
Args:
request: HTTP request object.
code: Authorization code from Keycloak.
Returns:
RedirectResponse to home page with token cookie set.
Raises:
HTTPException: If code is missing or token exchange fails.
"""
if not code:
raise HTTPException(status_code=400, detail="Authorization code not provided")
callback_url = str(request.base_url).rstrip("/") + "/auth/callback"
token_data = await exchange_code_for_token(code, callback_url)
access_token = token_data.get("access_token")
if not access_token:
raise HTTPException(status_code=400, detail="No access token received")
response = RedirectResponse(url="/web/", status_code=302)
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
secure=not settings.is_dev, # Secure in production
samesite="lax",
max_age=token_data.get("expires_in", 3600),
)
return response
@router.get("/logout")
async def logout(request: Request) -> Response:
"""Logout user and clear token cookie.
Args:
request: HTTP request object.
Returns:
RedirectResponse to Keycloak logout with cookie cleared.
"""
home_url = str(request.base_url).rstrip("/") + "/web/"
logout_url = get_keycloak_logout_url(home_url)
response = RedirectResponse(url=logout_url)
response.delete_cookie(key="access_token")
return response