Files
blog.pyaqa.ru/app/presentation/web/deps.py
Sergey Vanyushkin 0cb706e54b feat(auth): implement web authentication with Keycloak OAuth2
- Add auth routes: /auth/login, /auth/callback, /auth/logout
- Add OAuth2 flow with Keycloak using HTTP-only cookies
- Add web auth dependencies with role checking
- Add profile page (read-only) at /web/profile
- Update header with user menu (sign in/out, profile)
- Filter posts based on user permissions (hide drafts from guests)
- Conditionally show/hide create/edit/delete buttons
- Add authorization rules documentation to AGENTS.md
- Secure post editing/deletion endpoints with auth checks
- Add can_edit, can_delete flags to templates
2026-05-02 15:39:49 +03:00

214 lines
5.3 KiB
Python

"""Web dependencies for authentication and authorization.
This module provides FastAPI dependencies for web UI authentication
including user extraction from cookies and role checking.
"""
from typing import Annotated
from fastapi import Cookie, Depends, HTTPException, Request
from app.domain.roles import Role, get_effective_role
from app.infrastructure.auth import KeycloakAuthClient, TokenInfo
def get_keycloak_client(request: Request) -> KeycloakAuthClient:
"""Get Keycloak client from DI container via request state.
Args:
request: FastAPI request object.
Returns:
KeycloakAuthClient instance from container.
"""
client: KeycloakAuthClient = request.state.dishka_container.get(KeycloakAuthClient)
return client
async def get_optional_user(
request: Request,
access_token: Annotated[str | None, Cookie()] = None,
) -> TokenInfo | None:
"""Get current user from cookie if authenticated.
Args:
request: FastAPI request object.
access_token: Access token from HTTP-only cookie.
Returns:
TokenInfo if user is authenticated, None otherwise.
"""
if not access_token:
return None
try:
keycloak_client = get_keycloak_client(request)
token_info = await keycloak_client.introspect_token(access_token)
if not token_info.is_valid:
return None
return token_info
except Exception:
return None
async def get_current_user(
request: Request,
access_token: Annotated[str | None, Cookie()] = None,
) -> TokenInfo:
"""Get current user or raise HTTPException.
Args:
request: HTTP request object.
access_token: Access token from HTTP-only cookie.
Returns:
Validated TokenInfo for current user.
Raises:
HTTPException: If user is not authenticated.
"""
user = await get_optional_user(request, access_token)
if not user:
raise HTTPException(
status_code=307,
headers={"Location": "/auth/login"},
)
return user
OptionalUserDep = Annotated[TokenInfo | None, Depends(get_optional_user)]
CurrentUserDep = Annotated[TokenInfo, Depends(get_current_user)]
def get_user_role(user: TokenInfo | None) -> Role:
"""Get effective role from user token.
Args:
user: User token info or None for guest.
Returns:
Effective role for the user.
"""
if not user:
return Role.GUEST
return get_effective_role(user.roles)
def require_role(required_role: Role): # type: ignore[no-untyped-def]
"""Create dependency that requires specific role or higher.
Args:
required_role: Minimum required role.
Returns:
Dependency function for role checking.
"""
async def role_checker(user: OptionalUserDep) -> TokenInfo:
"""Check if user has required role.
Args:
user: Current user from dependency.
Returns:
User token info if authorized.
Raises:
HTTPException: If user lacks required role.
"""
if not user:
raise HTTPException(
status_code=307,
headers={"Location": "/auth/login"},
)
user_role = get_user_role(user)
role_hierarchy = [Role.GUEST, Role.USER, Role.ADMIN]
user_level = role_hierarchy.index(user_role)
required_level = role_hierarchy.index(required_role)
if user_level < required_level:
raise HTTPException(
status_code=403,
detail=f"Role '{required_role.value}' or higher required",
)
return user
return role_checker
RequireUserDep = Annotated[TokenInfo, Depends(require_role(Role.USER))]
RequireAdminDep = Annotated[TokenInfo, Depends(require_role(Role.ADMIN))]
def can_edit_post(user: TokenInfo | None, post_author_id: str) -> bool:
"""Check if user can edit a post.
Args:
user: Current user or None.
post_author_id: ID of the post author.
Returns:
True if user can edit the post.
"""
if not user:
return False
user_role = get_user_role(user)
return user_role == Role.ADMIN or (user_role == Role.USER and user.user_id == post_author_id)
def can_delete_post(user: TokenInfo | None, post_author_id: str) -> bool:
"""Check if user can delete a post.
Args:
user: Current user or None.
post_author_id: ID of the post author.
Returns:
True if user can delete the post.
"""
return can_edit_post(user, post_author_id)
def can_see_draft(user: TokenInfo | None, post_author_id: str) -> bool:
"""Check if user can see a draft post.
Args:
user: Current user or None.
post_author_id: ID of the post author.
Returns:
True if user can see the draft.
"""
if not user:
return False
user_role = get_user_role(user)
return user_role == Role.ADMIN or (user_role == Role.USER and user.user_id == post_author_id)
def can_create_post(user: TokenInfo | None) -> bool:
"""Check if user can create a post.
Args:
user: Current user or None.
Returns:
True if user can create posts.
"""
if not user:
return False
user_role = get_user_role(user)
return user_role in (Role.USER, Role.ADMIN)