"""API dependencies using Dishka. This module defines FastAPI dependencies for authentication, authorization, and use case injection using Dishka DI container. """ from typing import Annotated, Any from dishka.integrations.fastapi import FromDishka from fastapi import Depends, Request from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from app.application import ( CreateCommentUseCase, CreatePostUseCase, DeleteCommentUseCase, DeletePostUseCase, GetPostUseCase, ListCommentsUseCase, ListPostsUseCase, PublishPostUseCase, ToggleCommentLikeUseCase, TogglePostLikeUseCase, UpdatePostUseCase, ) from app.domain.exceptions import ForbiddenException, UnauthorizedException from app.domain.roles import Role, get_effective_role from app.infrastructure.auth import KeycloakAuthClient, TokenInfo CreatePostDep = FromDishka[CreatePostUseCase] GetPostDep = FromDishka[GetPostUseCase] UpdatePostDep = FromDishka[UpdatePostUseCase] DeletePostDep = FromDishka[DeletePostUseCase] ListPostsDep = FromDishka[ListPostsUseCase] PublishPostDep = FromDishka[PublishPostUseCase] ToggleLikeDep = FromDishka[TogglePostLikeUseCase] CreateCommentDep = FromDishka[CreateCommentUseCase] DeleteCommentDep = FromDishka[DeleteCommentUseCase] ListCommentsDep = FromDishka[ListCommentsUseCase] ToggleCommentLikeDep = FromDishka[ToggleCommentLikeUseCase] security = HTTPBearer(auto_error=False) async def get_keycloak_client(request: Request) -> KeycloakAuthClient: """Get Keycloak client from DI container via request state. Args: request: FastAPI request object. Returns: KeycloakAuthClient instance from container. """ client: KeycloakAuthClient = await request.state.dishka_container.get(KeycloakAuthClient) return client async def get_current_token_info( credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)], request: Request, ) -> TokenInfo: """Validate token and return token info from Keycloak. Args: credentials: HTTP authorization credentials. request: FastAPI request object. Returns: Validated TokenInfo instance. Raises: UnauthorizedException: If no credentials or invalid token. """ if not credentials: raise UnauthorizedException("Authentication required") keycloak_client = await get_keycloak_client(request) token = credentials.credentials token_info = await keycloak_client.introspect_token(token) if not token_info.is_valid: raise UnauthorizedException("Invalid or expired token") return token_info async def get_current_user_id( token_info: Annotated[TokenInfo, Depends(get_current_token_info)], ) -> str: """Get current user ID from validated token. Args: token_info: Validated token info. Returns: User ID string from token. """ return token_info.user_id CurrentUserDep = Annotated[str, Depends(get_current_user_id)] TokenInfoDep = Annotated[TokenInfo, Depends(get_current_token_info)] async def get_optional_token_info( credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)], request: Request, ) -> TokenInfo | None: """Get token info if valid token provided, otherwise None. For endpoints that support both authenticated and guest access. Args: credentials: HTTP authorization credentials. request: FastAPI request object. Returns: TokenInfo if valid, None otherwise. """ if not credentials: return None keycloak_client = await get_keycloak_client(request) token = credentials.credentials token_info = await keycloak_client.introspect_token(token) if token_info.is_valid: return token_info return None OptionalTokenInfoDep = Annotated[TokenInfo | None, Depends(get_optional_token_info)] async def get_optional_user_id( token_info: OptionalTokenInfoDep, ) -> str | None: """Get current user ID if token is valid, otherwise None. Args: token_info: Optional token info. Returns: User ID if authenticated, None for guests. """ if token_info: return token_info.user_id return None OptionalUserDep = Annotated[str | None, Depends(get_optional_user_id)] def get_current_role(token_info: OptionalTokenInfoDep) -> Role: """Get effective role from token info. Returns GUEST if no valid token provided. Args: token_info: Optional token info. Returns: Effective Role enum value. """ if token_info and token_info.roles: return get_effective_role(token_info.roles) return Role.GUEST CurrentRoleDep = Annotated[Role, Depends(get_current_role)] def require_roles(allowed_roles: list[Role]) -> Any: """Create dependency that checks if user has one of the allowed roles. Args: allowed_roles: List of roles allowed to access. Returns: FastAPI Depends for role checking. Raises: ForbiddenException: If user role is not in allowed list. """ async def check_role(role: CurrentRoleDep) -> Role: if role not in allowed_roles: raise ForbiddenException( f"Access denied. Required roles: {[r.value for r in allowed_roles]}" ) return role return Depends(check_role) RequireAdmin = require_roles([Role.ADMIN]) RequireUser = require_roles([Role.USER, Role.ADMIN]) RequireAny = require_roles([Role.GUEST, Role.USER, Role.ADMIN])