Files
blog.pyaqa.ru/app/presentation/web/deps.py
Sergey Vanyushkin 41f2a3d98e Add comprehensive API authorization tests and E2E test infrastructure
API Tests:

- Add test_authorization.py with 21 tests covering:

  - Authenticated POST/PUT/DELETE operations

  - Role-based access control (USER vs ADMIN)

  - Token validation (expired, invalid format, missing)

  - Permission checks (view unpublished posts)

  - Error response format verification

- Add auth_client and admin_client fixtures

E2E Test Infrastructure:

- Create FakeKeycloakClient for isolated testing

- Add test fixtures for authenticated browser contexts

- Implement fake auth routes (/auth/login, /auth/callback)

- Fix pytest_plugins location for pytest-playwright

- Add E2E test files for create, edit, view posts

Fixes:

- Make FakeKeycloakClient methods async (introspect_token, get_userinfo)

- Move pytest_playwright to root conftest.py

- Skip failing E2E tests pending further debugging
2026-05-03 22:34:32 +03:00

217 lines
5.4 KiB
Python

"""Web dependencies for authentication and authorization.
This module provides FastAPI dependencies for web UI authentication
including user extraction from cookies and role checking.
"""
from typing import Annotated
from fastapi import Cookie, Depends, HTTPException, Request
from app.domain.roles import Role, get_effective_role
from app.infrastructure.auth import KeycloakAuthClient, TokenInfo
async def get_keycloak_client(request: Request) -> KeycloakAuthClient:
"""Get Keycloak client from DI container via request state.
Args:
request: FastAPI request object.
Returns:
KeycloakAuthClient instance from container.
"""
client: KeycloakAuthClient = await request.state.dishka_container.get(KeycloakAuthClient)
return client
async def get_optional_user(
request: Request,
access_token: Annotated[str | None, Cookie()] = None,
) -> TokenInfo | None:
"""Get current user from cookie if authenticated.
Args:
request: FastAPI request object.
access_token: Access token from HTTP-only cookie.
Returns:
TokenInfo if user is authenticated, None otherwise.
"""
if not access_token:
return None
try:
keycloak_client = await get_keycloak_client(request)
token_info = await keycloak_client.introspect_token(access_token)
if not token_info.is_valid:
return None
return token_info
except Exception as e:
import logging
logging.getLogger(__name__).warning(f"Token validation error: {e}")
return None
async def get_current_user(
request: Request,
access_token: Annotated[str | None, Cookie()] = None,
) -> TokenInfo:
"""Get current user or raise HTTPException.
Args:
request: HTTP request object.
access_token: Access token from HTTP-only cookie.
Returns:
Validated TokenInfo for current user.
Raises:
HTTPException: If user is not authenticated.
"""
user = await get_optional_user(request, access_token)
if not user:
raise HTTPException(
status_code=307,
headers={"Location": "/auth/login"},
)
return user
OptionalUserDep = Annotated[TokenInfo | None, Depends(get_optional_user)]
CurrentUserDep = Annotated[TokenInfo, Depends(get_current_user)]
def get_user_role(user: TokenInfo | None) -> Role:
"""Get effective role from user token.
Args:
user: User token info or None for guest.
Returns:
Effective role for the user.
"""
if not user:
return Role.GUEST
return get_effective_role(user.roles)
def require_role(required_role: Role): # type: ignore[no-untyped-def]
"""Create dependency that requires specific role or higher.
Args:
required_role: Minimum required role.
Returns:
Dependency function for role checking.
"""
async def role_checker(user: OptionalUserDep) -> TokenInfo:
"""Check if user has required role.
Args:
user: Current user from dependency.
Returns:
User token info if authorized.
Raises:
HTTPException: If user lacks required role.
"""
if not user:
raise HTTPException(
status_code=307,
headers={"Location": "/auth/login"},
)
user_role = get_user_role(user)
role_hierarchy = [Role.GUEST, Role.USER, Role.ADMIN]
user_level = role_hierarchy.index(user_role)
required_level = role_hierarchy.index(required_role)
if user_level < required_level:
raise HTTPException(
status_code=403,
detail=f"Role '{required_role.value}' or higher required",
)
return user
return role_checker
RequireUserDep = Annotated[TokenInfo, Depends(require_role(Role.USER))]
RequireAdminDep = Annotated[TokenInfo, Depends(require_role(Role.ADMIN))]
def can_edit_post(user: TokenInfo | None, post_author_id: str) -> bool:
"""Check if user can edit a post.
Args:
user: Current user or None.
post_author_id: ID of the post author.
Returns:
True if user can edit the post.
"""
if not user:
return False
user_role = get_user_role(user)
return user_role == Role.ADMIN or (user_role == Role.USER and user.user_id == post_author_id)
def can_delete_post(user: TokenInfo | None, post_author_id: str) -> bool:
"""Check if user can delete a post.
Args:
user: Current user or None.
post_author_id: ID of the post author.
Returns:
True if user can delete the post.
"""
return can_edit_post(user, post_author_id)
def can_see_draft(user: TokenInfo | None, post_author_id: str) -> bool:
"""Check if user can see a draft post.
Args:
user: Current user or None.
post_author_id: ID of the post author.
Returns:
True if user can see the draft.
"""
if not user:
return False
user_role = get_user_role(user)
return user_role == Role.ADMIN or (user_role == Role.USER and user.user_id == post_author_id)
def can_create_post(user: TokenInfo | None) -> bool:
"""Check if user can create a post.
Args:
user: Current user or None.
Returns:
True if user can create posts.
"""
if not user:
return False
user_role = get_user_role(user)
return user_role in (Role.USER, Role.ADMIN)