Files
blog.pyaqa.ru/app/presentation/api/deps.py
Sergey Vanyushkin c9b380c601 test(api): add full API test suite with get_keycloak_client async fix
Add 45 API tests covering all 12 post endpoints (CRUD, publish/unpublish) with RBAC policy coverage across guest, user, admin roles.

Fix get_keycloak_client() in deps.py to be async - Dishka's async container requires await on get(), without it a coroutine object was returned instead of the actual client.
2026-05-10 11:21:58 +00:00

190 lines
5.0 KiB
Python

"""API dependencies using Dishka.
This module defines FastAPI dependencies for authentication, authorization,
and use case injection using Dishka DI container.
"""
from typing import Annotated, Any
from dishka.integrations.fastapi import FromDishka
from fastapi import Depends, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from app.application import (
CreatePostUseCase,
DeletePostUseCase,
GetPostUseCase,
ListPostsUseCase,
PublishPostUseCase,
UpdatePostUseCase,
)
from app.domain.exceptions import ForbiddenException, UnauthorizedException
from app.domain.roles import Role, get_effective_role
from app.infrastructure.auth import KeycloakAuthClient, TokenInfo
CreatePostDep = FromDishka[CreatePostUseCase]
GetPostDep = FromDishka[GetPostUseCase]
UpdatePostDep = FromDishka[UpdatePostUseCase]
DeletePostDep = FromDishka[DeletePostUseCase]
ListPostsDep = FromDishka[ListPostsUseCase]
PublishPostDep = FromDishka[PublishPostUseCase]
security = HTTPBearer(auto_error=False)
async def get_keycloak_client(request: Request) -> KeycloakAuthClient:
"""Get Keycloak client from DI container via request state.
Args:
request: FastAPI request object.
Returns:
KeycloakAuthClient instance from container.
"""
client: KeycloakAuthClient = await request.state.dishka_container.get(KeycloakAuthClient)
return client
async def get_current_token_info(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)],
request: Request,
) -> TokenInfo:
"""Validate token and return token info from Keycloak.
Args:
credentials: HTTP authorization credentials.
request: FastAPI request object.
Returns:
Validated TokenInfo instance.
Raises:
UnauthorizedException: If no credentials or invalid token.
"""
if not credentials:
raise UnauthorizedException("Authentication required")
keycloak_client = await get_keycloak_client(request)
token = credentials.credentials
token_info = await keycloak_client.introspect_token(token)
if not token_info.is_valid:
raise UnauthorizedException("Invalid or expired token")
return token_info
async def get_current_user_id(
token_info: Annotated[TokenInfo, Depends(get_current_token_info)],
) -> str:
"""Get current user ID from validated token.
Args:
token_info: Validated token info.
Returns:
User ID string from token.
"""
return token_info.user_id
CurrentUserDep = Annotated[str, Depends(get_current_user_id)]
TokenInfoDep = Annotated[TokenInfo, Depends(get_current_token_info)]
async def get_optional_token_info(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)],
request: Request,
) -> TokenInfo | None:
"""Get token info if valid token provided, otherwise None.
For endpoints that support both authenticated and guest access.
Args:
credentials: HTTP authorization credentials.
request: FastAPI request object.
Returns:
TokenInfo if valid, None otherwise.
"""
if not credentials:
return None
keycloak_client = await get_keycloak_client(request)
token = credentials.credentials
token_info = await keycloak_client.introspect_token(token)
if token_info.is_valid:
return token_info
return None
OptionalTokenInfoDep = Annotated[TokenInfo | None, Depends(get_optional_token_info)]
async def get_optional_user_id(
token_info: OptionalTokenInfoDep,
) -> str | None:
"""Get current user ID if token is valid, otherwise None.
Args:
token_info: Optional token info.
Returns:
User ID if authenticated, None for guests.
"""
if token_info:
return token_info.user_id
return None
OptionalUserDep = Annotated[str | None, Depends(get_optional_user_id)]
def get_current_role(token_info: OptionalTokenInfoDep) -> Role:
"""Get effective role from token info.
Returns GUEST if no valid token provided.
Args:
token_info: Optional token info.
Returns:
Effective Role enum value.
"""
if token_info and token_info.roles:
return get_effective_role(token_info.roles)
return Role.GUEST
CurrentRoleDep = Annotated[Role, Depends(get_current_role)]
def require_roles(allowed_roles: list[Role]) -> Any:
"""Create dependency that checks if user has one of the allowed roles.
Args:
allowed_roles: List of roles allowed to access.
Returns:
FastAPI Depends for role checking.
Raises:
ForbiddenException: If user role is not in allowed list.
"""
async def check_role(role: CurrentRoleDep) -> Role:
if role not in allowed_roles:
raise ForbiddenException(
f"Access denied. Required roles: {[r.value for r in allowed_roles]}"
)
return role
return Depends(check_role)
RequireAdmin = require_roles([Role.ADMIN])
RequireUser = require_roles([Role.USER, Role.ADMIN])
RequireAny = require_roles([Role.GUEST, Role.USER, Role.ADMIN])