Основные изменения: - Добавлены E2E тесты для проверки ownership (TC-E2E-102/103): * test_admin_can_edit_any_post — admin может редактировать любой пост * test_user_cannot_edit_other_users_post — user не может редактировать чужой пост - Исправлены use cases (UpdatePost, DeletePost, PublishPost) — добавлена проверка роли admin - Обновлены web routes и API routes для передачи роли в use cases - Добавлены unit тесты для admin-сценариев Реструктуризация тестов: - Удалены старые API тесты (tests/api/) — требуют переработки - Удалены старые integration тесты (tests/integration/) - Переработаны E2E тесты: удалены старые, добавлены новые с POM - Добавлена документация тестов: FEATURE_*.md, TEST_MODEL.md, AGENTS.md Инфраструктура: - Добавлен MockKeycloakClient для dev-режима - Добавлены статические файлы: EasyMDE, Highlight.js, стили markdown - Обновлены шаблоны: base.html, post_form.html, post_detail.html - Обновлена DI конфигурация и провайдеры Документация: - tests/FEATURE_RBAC.md — матрица тестов RBAC - tests/FEATURE_POST_LIFECYCLE.md — тесты жизненного цикла поста - tests/FEATURE_DOMAIN_FOUNDATION.md — тесты доменного слоя - tests/FEATURE_INFRASTRUCTURE.md — тесты инфраструктуры - tests/TEST_MODEL.md — глобальная матрица покрытия - app/presentation/web/AGENTS.md — гайд по Web UI - tests/AGENTS.md — гайд по тестированию
226 lines
5.8 KiB
Python
226 lines
5.8 KiB
Python
"""Web dependencies for authentication and authorization.
|
|
|
|
This module provides FastAPI dependencies for web UI authentication
|
|
including user extraction from cookies and role checking.
|
|
"""
|
|
|
|
from typing import Annotated
|
|
|
|
from fastapi import Cookie, Depends, HTTPException, Request
|
|
|
|
from app.domain.roles import Role, get_effective_role
|
|
from app.infrastructure.auth import KeycloakAuthClient, MockKeycloakClient, TokenInfo
|
|
from app.infrastructure.config.settings import settings
|
|
|
|
|
|
async def get_keycloak_client(
|
|
request: Request,
|
|
) -> KeycloakAuthClient | MockKeycloakClient:
|
|
"""Get Keycloak client from DI container via request state.
|
|
|
|
In development mode returns MockKeycloakClient for local testing.
|
|
|
|
Args:
|
|
request: FastAPI request object.
|
|
|
|
Returns:
|
|
KeycloakAuthClient or MockKeycloakClient instance from container.
|
|
"""
|
|
client: KeycloakAuthClient | MockKeycloakClient = await request.state.dishka_container.get(
|
|
KeycloakAuthClient
|
|
)
|
|
return client
|
|
|
|
|
|
async def get_optional_user(
|
|
request: Request,
|
|
access_token: Annotated[str | None, Cookie()] = None,
|
|
) -> TokenInfo | None:
|
|
"""Get current user from cookie if authenticated.
|
|
|
|
Args:
|
|
request: FastAPI request object.
|
|
access_token: Access token from HTTP-only cookie.
|
|
|
|
Returns:
|
|
TokenInfo if user is authenticated, None otherwise.
|
|
"""
|
|
if not access_token:
|
|
return None
|
|
|
|
try:
|
|
keycloak_client = await get_keycloak_client(request)
|
|
token_info = await keycloak_client.introspect_token(access_token)
|
|
|
|
if not token_info.is_valid:
|
|
return None
|
|
|
|
return token_info
|
|
except Exception as e:
|
|
import logging
|
|
|
|
logging.getLogger(__name__).warning(f"Token validation error: {e}")
|
|
return None
|
|
|
|
|
|
async def get_current_user(
|
|
request: Request,
|
|
access_token: Annotated[str | None, Cookie()] = None,
|
|
) -> TokenInfo:
|
|
"""Get current user or raise HTTPException.
|
|
|
|
Args:
|
|
request: HTTP request object.
|
|
access_token: Access token from HTTP-only cookie.
|
|
|
|
Returns:
|
|
Validated TokenInfo for current user.
|
|
|
|
Raises:
|
|
HTTPException: If user is not authenticated.
|
|
"""
|
|
user = await get_optional_user(request, access_token)
|
|
|
|
if not user:
|
|
login_url = "/auth/dev-login" if settings.is_dev else "/auth/login"
|
|
raise HTTPException(
|
|
status_code=307,
|
|
headers={"Location": login_url},
|
|
)
|
|
|
|
return user
|
|
|
|
|
|
OptionalUserDep = Annotated[TokenInfo | None, Depends(get_optional_user)]
|
|
CurrentUserDep = Annotated[TokenInfo, Depends(get_current_user)]
|
|
|
|
|
|
def get_user_role(user: TokenInfo | None) -> Role:
|
|
"""Get effective role from user token.
|
|
|
|
Args:
|
|
user: User token info or None for guest.
|
|
|
|
Returns:
|
|
Effective role for the user.
|
|
"""
|
|
if not user:
|
|
return Role.GUEST
|
|
|
|
return get_effective_role(user.roles)
|
|
|
|
|
|
def require_role(required_role: Role): # type: ignore[no-untyped-def]
|
|
"""Create dependency that requires specific role or higher.
|
|
|
|
Args:
|
|
required_role: Minimum required role.
|
|
|
|
Returns:
|
|
Dependency function for role checking.
|
|
"""
|
|
|
|
async def role_checker(user: OptionalUserDep) -> TokenInfo:
|
|
"""Check if user has required role.
|
|
|
|
Args:
|
|
user: Current user from dependency.
|
|
|
|
Returns:
|
|
User token info if authorized.
|
|
|
|
Raises:
|
|
HTTPException: If user lacks required role.
|
|
"""
|
|
if not user:
|
|
login_url = "/auth/dev-login" if settings.is_dev else "/auth/login"
|
|
raise HTTPException(
|
|
status_code=307,
|
|
headers={"Location": login_url},
|
|
)
|
|
|
|
user_role = get_user_role(user)
|
|
role_hierarchy = [Role.GUEST, Role.USER, Role.ADMIN]
|
|
|
|
user_level = role_hierarchy.index(user_role)
|
|
required_level = role_hierarchy.index(required_role)
|
|
|
|
if user_level < required_level:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=f"Role '{required_role.value}' or higher required",
|
|
)
|
|
|
|
return user
|
|
|
|
return role_checker
|
|
|
|
|
|
RequireUserDep = Annotated[TokenInfo, Depends(require_role(Role.USER))]
|
|
RequireAdminDep = Annotated[TokenInfo, Depends(require_role(Role.ADMIN))]
|
|
|
|
|
|
def can_edit_post(user: TokenInfo | None, post_author_id: str) -> bool:
|
|
"""Check if user can edit a post.
|
|
|
|
Args:
|
|
user: Current user or None.
|
|
post_author_id: ID of the post author.
|
|
|
|
Returns:
|
|
True if user can edit the post.
|
|
"""
|
|
if not user:
|
|
return False
|
|
|
|
user_role = get_user_role(user)
|
|
|
|
return user_role == Role.ADMIN or (user_role == Role.USER and user.user_id == post_author_id)
|
|
|
|
|
|
def can_delete_post(user: TokenInfo | None, post_author_id: str) -> bool:
|
|
"""Check if user can delete a post.
|
|
|
|
Args:
|
|
user: Current user or None.
|
|
post_author_id: ID of the post author.
|
|
|
|
Returns:
|
|
True if user can delete the post.
|
|
"""
|
|
return can_edit_post(user, post_author_id)
|
|
|
|
|
|
def can_see_draft(user: TokenInfo | None, post_author_id: str) -> bool:
|
|
"""Check if user can see a draft post.
|
|
|
|
Args:
|
|
user: Current user or None.
|
|
post_author_id: ID of the post author.
|
|
|
|
Returns:
|
|
True if user can see the draft.
|
|
"""
|
|
if not user:
|
|
return False
|
|
|
|
user_role = get_user_role(user)
|
|
|
|
return user_role == Role.ADMIN or (user_role == Role.USER and user.user_id == post_author_id)
|
|
|
|
|
|
def can_create_post(user: TokenInfo | None) -> bool:
|
|
"""Check if user can create a post.
|
|
|
|
Args:
|
|
user: Current user or None.
|
|
|
|
Returns:
|
|
True if user can create posts.
|
|
"""
|
|
if not user:
|
|
return False
|
|
|
|
user_role = get_user_role(user)
|
|
return user_role in (Role.USER, Role.ADMIN)
|