Files
blog.pyaqa.ru/app/presentation/api/deps.py
Sergey Vanyushkin 7ff3fa0992
All checks were successful
ci/woodpecker/pr/pipeline Pipeline was successful
feat: add comments feature with nested replies and recursive rendering
Implement full comments system: domain entities (Comment, CommentLike),
value objects (CommentContent), use cases (CRUD, like toggle), SQLAlchemy
repository, API v1 endpoints, web UI with comment form and nested replies,
i18n translations (EN/RU/FR/DE), and E2E tests.

Fix nested reply (reply-to-reply) not displaying — the flat reply_comments
dict was only queried for top-level comment IDs, so deeply nested replies
were saved to DB (incrementing comment count) but never rendered. Switch
to a recursive Jinja2 macro that renders any nesting depth.
2026-05-11 15:34:20 +03:00

201 lines
5.4 KiB
Python

"""API dependencies using Dishka.
This module defines FastAPI dependencies for authentication, authorization,
and use case injection using Dishka DI container.
"""
from typing import Annotated, Any
from dishka.integrations.fastapi import FromDishka
from fastapi import Depends, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from app.application import (
CreateCommentUseCase,
CreatePostUseCase,
DeleteCommentUseCase,
DeletePostUseCase,
GetPostUseCase,
ListCommentsUseCase,
ListPostsUseCase,
PublishPostUseCase,
ToggleCommentLikeUseCase,
TogglePostLikeUseCase,
UpdatePostUseCase,
)
from app.domain.exceptions import ForbiddenException, UnauthorizedException
from app.domain.roles import Role, get_effective_role
from app.infrastructure.auth import KeycloakAuthClient, TokenInfo
CreatePostDep = FromDishka[CreatePostUseCase]
GetPostDep = FromDishka[GetPostUseCase]
UpdatePostDep = FromDishka[UpdatePostUseCase]
DeletePostDep = FromDishka[DeletePostUseCase]
ListPostsDep = FromDishka[ListPostsUseCase]
PublishPostDep = FromDishka[PublishPostUseCase]
ToggleLikeDep = FromDishka[TogglePostLikeUseCase]
CreateCommentDep = FromDishka[CreateCommentUseCase]
DeleteCommentDep = FromDishka[DeleteCommentUseCase]
ListCommentsDep = FromDishka[ListCommentsUseCase]
ToggleCommentLikeDep = FromDishka[ToggleCommentLikeUseCase]
security = HTTPBearer(auto_error=False)
async def get_keycloak_client(request: Request) -> KeycloakAuthClient:
"""Get Keycloak client from DI container via request state.
Args:
request: FastAPI request object.
Returns:
KeycloakAuthClient instance from container.
"""
client: KeycloakAuthClient = await request.state.dishka_container.get(KeycloakAuthClient)
return client
async def get_current_token_info(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)],
request: Request,
) -> TokenInfo:
"""Validate token and return token info from Keycloak.
Args:
credentials: HTTP authorization credentials.
request: FastAPI request object.
Returns:
Validated TokenInfo instance.
Raises:
UnauthorizedException: If no credentials or invalid token.
"""
if not credentials:
raise UnauthorizedException("Authentication required")
keycloak_client = await get_keycloak_client(request)
token = credentials.credentials
token_info = await keycloak_client.introspect_token(token)
if not token_info.is_valid:
raise UnauthorizedException("Invalid or expired token")
return token_info
async def get_current_user_id(
token_info: Annotated[TokenInfo, Depends(get_current_token_info)],
) -> str:
"""Get current user ID from validated token.
Args:
token_info: Validated token info.
Returns:
User ID string from token.
"""
return token_info.user_id
CurrentUserDep = Annotated[str, Depends(get_current_user_id)]
TokenInfoDep = Annotated[TokenInfo, Depends(get_current_token_info)]
async def get_optional_token_info(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)],
request: Request,
) -> TokenInfo | None:
"""Get token info if valid token provided, otherwise None.
For endpoints that support both authenticated and guest access.
Args:
credentials: HTTP authorization credentials.
request: FastAPI request object.
Returns:
TokenInfo if valid, None otherwise.
"""
if not credentials:
return None
keycloak_client = await get_keycloak_client(request)
token = credentials.credentials
token_info = await keycloak_client.introspect_token(token)
if token_info.is_valid:
return token_info
return None
OptionalTokenInfoDep = Annotated[TokenInfo | None, Depends(get_optional_token_info)]
async def get_optional_user_id(
token_info: OptionalTokenInfoDep,
) -> str | None:
"""Get current user ID if token is valid, otherwise None.
Args:
token_info: Optional token info.
Returns:
User ID if authenticated, None for guests.
"""
if token_info:
return token_info.user_id
return None
OptionalUserDep = Annotated[str | None, Depends(get_optional_user_id)]
def get_current_role(token_info: OptionalTokenInfoDep) -> Role:
"""Get effective role from token info.
Returns GUEST if no valid token provided.
Args:
token_info: Optional token info.
Returns:
Effective Role enum value.
"""
if token_info and token_info.roles:
return get_effective_role(token_info.roles)
return Role.GUEST
CurrentRoleDep = Annotated[Role, Depends(get_current_role)]
def require_roles(allowed_roles: list[Role]) -> Any:
"""Create dependency that checks if user has one of the allowed roles.
Args:
allowed_roles: List of roles allowed to access.
Returns:
FastAPI Depends for role checking.
Raises:
ForbiddenException: If user role is not in allowed list.
"""
async def check_role(role: CurrentRoleDep) -> Role:
if role not in allowed_roles:
raise ForbiddenException(
f"Access denied. Required roles: {[r.value for r in allowed_roles]}"
)
return role
return Depends(check_role)
RequireAdmin = require_roles([Role.ADMIN])
RequireUser = require_roles([Role.USER, Role.ADMIN])
RequireAny = require_roles([Role.GUEST, Role.USER, Role.ADMIN])