base ui #11

Merged
pi3c merged 10 commits from feature/base_ui into dev 2026-05-02 16:10:18 +00:00
52 changed files with 2043 additions and 304 deletions
Showing only changes of commit ca4e8877a5 - Show all commits

View File

@@ -110,6 +110,82 @@ tests/
- Use **Repository** pattern for data access
- Use **Dependency Injection** via FastAPI's Depends()
## AI Code Generation Requirements
### Documentation Standards
- **Write self-documenting code** - use clear, descriptive variable names and function names
- **NO inline comments** - code should be readable without explanatory comments
- **Google-style docstrings** are REQUIRED for all modules, classes, and functions
### Docstring Requirements
#### Modules
Every module must have a module-level docstring:
```python
"""Module for managing blog posts.
This module provides use cases for creating, updating, and deleting
blog posts in the application layer.
"""
```
#### Classes
Every class must have a detailed docstring:
```python
class CreatePostUseCase:
"""Use case for creating a new blog post.
This class encapsulates the business logic for creating posts,
including validation and slug generation.
Attributes:
uow: Unit of Work for database transactions.
slug_service: Service for generating URL-friendly slugs.
Example:
>>> use_case = CreatePostUseCase(uow, slug_service)
>>> result = await use_case.execute(dto)
"""
```
#### Functions/Methods
Every function must have a detailed docstring with Args, Returns, Raises:
```python
async def execute(self, dto: CreatePostDTO) -> PostDTO:
"""Execute the use case to create a new post.
Args:
dto: Data transfer object containing post creation data
including title, content, and author information.
Returns:
PostDTO: The created post data transfer object with
generated ID and slug.
Raises:
TitleValidationError: If the title is empty or too long.
ContentValidationError: If the content is empty.
DuplicateSlugError: If a post with the same slug exists.
Note:
This method is idempotent - calling it multiple times with
the same data will create separate posts with unique slugs.
"""
```
### Google-Style Docstring Format
Use the following sections as appropriate:
- `Args` - Parameter descriptions with types
- `Returns` - Return value description with type
- `Raises` - Exceptions that may be raised
- `Yields` - For generator functions
- `Example` - Usage examples
- `Note` - Additional important information
- `Warning` - Critical warnings
- `Attributes` - For class attributes
- `See Also` - References to related code
## DDD Concepts Used
### Entities

View File

@@ -1 +1,5 @@
"""Application package."""
"""Blog API Application.
This package provides a complete blog API implementation following
Domain-Driven Design principles with FastAPI and SQLAlchemy.
"""

View File

@@ -1,4 +1,8 @@
"""Application layer exports."""
"""Application layer exports.
This module re-exports all application layer components including
DTOs, interfaces, and use cases for convenient importing.
"""
from app.application.dtos import CreatePostDTO, PostResponseDTO, UpdatePostDTO
from app.application.interfaces import TransactionManager
@@ -12,13 +16,10 @@ from app.application.use_cases import (
)
__all__ = [
# DTOs
"CreatePostDTO",
"UpdatePostDTO",
"PostResponseDTO",
# Interfaces
"TransactionManager",
# Use Cases
"CreatePostUseCase",
"GetPostUseCase",
"UpdatePostUseCase",

View File

@@ -1,4 +1,8 @@
"""Application DTOs."""
"""Application DTOs.
This module re-exports all Data Transfer Objects used in the
application layer for data communication.
"""
from app.application.dtos.post import CreatePostDTO, PostResponseDTO, UpdatePostDTO

View File

@@ -1,4 +1,9 @@
"""DTOs for post use cases."""
"""DTOs for post use cases.
This module defines Data Transfer Objects used for communication between
application layer use cases and presentation layer. DTOs are immutable
dataclasses that carry data across process boundaries.
"""
from dataclasses import dataclass
from datetime import datetime
@@ -7,7 +12,25 @@ from uuid import UUID
@dataclass(frozen=True)
class CreatePostDTO:
"""DTO for creating a post."""
"""DTO for creating a post.
Carries post creation data from API to use case.
Contains all required fields for post creation.
Attributes:
title: Post title string.
content: Post content string.
author_id: Identifier of the post author.
tags: Optional list of tags for categorization.
Example:
>>> dto = CreatePostDTO(
... title="My Post",
... content="Content here...",
... author_id="user-123",
... tags=["python"]
... )
"""
title: str
content: str
@@ -17,7 +40,19 @@ class CreatePostDTO:
@dataclass(frozen=True)
class UpdatePostDTO:
"""DTO for updating a post."""
"""DTO for updating a post.
Carries optional post update data. All fields are optional
allowing partial updates.
Attributes:
title: Optional new title.
content: Optional new content.
tags: Optional new tags list.
Example:
>>> dto = UpdatePostDTO(title="Updated Title")
"""
title: str | None = None
content: str | None = None
@@ -26,7 +61,35 @@ class UpdatePostDTO:
@dataclass(frozen=True)
class PostResponseDTO:
"""DTO for post response."""
"""DTO for post response.
Carries complete post data for API responses.
Includes all post attributes and metadata.
Attributes:
id: Unique post identifier.
title: Post title.
content: Post content.
slug: URL-friendly slug.
author_id: Author identifier.
published: Publication status.
tags: List of tags.
created_at: Creation timestamp.
updated_at: Last update timestamp.
Example:
>>> dto = PostResponseDTO(
... id=uuid,
... title="Post",
... content="...",
... slug="post",
... author_id="user-123",
... published=True,
... tags=[],
... created_at=datetime.now(),
... updated_at=datetime.now()
... )
"""
id: UUID
title: str

View File

@@ -1,4 +1,8 @@
"""Application interfaces."""
"""Application interfaces.
This module re-exports all abstract interfaces used in the
application layer for dependency inversion.
"""
from app.application.interfaces.transaction_manager import TransactionManager

View File

@@ -1,17 +1,38 @@
"""Transaction Manager interface for managing database transactions."""
"""Transaction Manager interface for managing database transactions.
This module defines the abstract interface for transaction management.
Implementations control transaction boundaries for use cases.
"""
from abc import ABC, abstractmethod
class TransactionManager(ABC):
"""Abstract Transaction Manager for controlling transaction boundaries."""
"""Abstract Transaction Manager for controlling transaction boundaries.
Provides interface for committing or rolling back database transactions.
Used by use cases to manage atomic operations.
Example:
>>> async with transaction_manager:
... await repository.add(entity)
... await transaction_manager.commit()
"""
@abstractmethod
async def commit(self) -> None:
"""Commit the current transaction."""
"""Commit the current transaction.
Persists all pending changes to the database.
Should be called after all operations succeed.
"""
...
@abstractmethod
async def rollback(self) -> None:
"""Rollback the current transaction."""
"""Rollback the current transaction.
Discards all pending changes.
Should be called when an error occurs.
"""
...

View File

@@ -1,4 +1,8 @@
"""Use cases."""
"""Use cases.
This module re-exports all application use cases that implement
business logic operations for the blog API.
"""
from app.application.use_cases.create_post import CreatePostUseCase
from app.application.use_cases.delete_post import DeletePostUseCase

View File

@@ -1,4 +1,8 @@
"""Create post use case."""
"""Create post use case.
This module implements the use case for creating new blog posts.
Handles slug generation, duplicate checking, and entity persistence.
"""
from app.application.dtos.post import CreatePostDTO, PostResponseDTO
from app.application.interfaces import TransactionManager
@@ -8,28 +12,57 @@ from app.domain.repositories import PostRepository
class CreatePostUseCase:
"""Use case for creating a new blog post."""
"""Use case for creating a new blog post.
Encapsulates the business logic for creating posts including
slug generation from title and duplicate slug detection.
Attributes:
_post_repo: Repository for post data access.
_tx_manager: Transaction manager for commit control.
Example:
>>> use_case = CreatePostUseCase(post_repo, tx_manager)
>>> result = await use_case.execute(dto)
"""
def __init__(
self,
post_repo: PostRepository,
tx_manager: TransactionManager,
) -> None:
"""Initialize use case with dependencies.
Args:
post_repo: Repository for post operations.
tx_manager: Transaction manager instance.
"""
self._post_repo = post_repo
self._tx_manager = tx_manager
async def execute(self, dto: CreatePostDTO) -> PostResponseDTO:
"""Execute the use case."""
# Generate slug from title
"""Execute the use case to create a new post.
Args:
dto: Data transfer object containing post creation data
including title, content, author ID, and optional tags.
Returns:
PostResponseDTO with created post data including generated ID and slug.
Raises:
AlreadyExistsException: If a post with the same slug exists.
Note:
Slug is automatically generated from the title.
"""
from app.domain.value_objects import Slug
slug = Slug.from_title(dto.title)
# Check if slug already exists
if await self._post_repo.slug_exists(slug.value):
raise AlreadyExistsException(f"Post with slug '{slug.value}' already exists")
# Create domain entity
post = Post.create(
title_str=dto.title,
content_str=dto.content,
@@ -37,16 +70,20 @@ class CreatePostUseCase:
tags=dto.tags or [],
)
# Persist entity
await self._post_repo.add(post)
# Commit transaction
await self._tx_manager.commit()
return self._map_to_dto(post)
def _map_to_dto(self, post: Post) -> PostResponseDTO:
"""Map domain entity to response DTO."""
"""Map domain entity to response DTO.
Args:
post: Domain post entity.
Returns:
PostResponseDTO with all post attributes.
"""
return PostResponseDTO(
id=post.id,
title=post.title.value,

View File

@@ -1,4 +1,8 @@
"""Delete post use case."""
"""Delete post use case.
This module implements the use case for deleting blog posts.
Includes authorization checks to ensure users can only delete their own posts.
"""
from uuid import UUID
@@ -8,28 +12,51 @@ from app.domain.repositories import PostRepository
class DeletePostUseCase:
"""Use case for deleting a blog post."""
"""Use case for deleting a blog post.
Handles post deletion with authorization checks.
Users can only delete posts they authored.
Attributes:
_post_repo: Repository for post data access.
_tx_manager: Transaction manager for commit control.
Example:
>>> use_case = DeletePostUseCase(post_repo, tx_manager)
>>> await use_case.execute(post_id, user_id)
"""
def __init__(
self,
post_repo: PostRepository,
tx_manager: TransactionManager,
) -> None:
"""Initialize use case with dependencies.
Args:
post_repo: Repository for post operations.
tx_manager: Transaction manager instance.
"""
self._post_repo = post_repo
self._tx_manager = tx_manager
async def execute(self, post_id: UUID, current_user_id: str) -> None:
"""Execute the use case."""
"""Execute the use case to delete a post.
Args:
post_id: Unique identifier of the post to delete.
current_user_id: ID of the user requesting deletion.
Raises:
NotFoundException: If post with given ID does not exist.
ForbiddenException: If user is not the post author.
"""
post = await self._post_repo.get_by_id(post_id)
if not post:
raise NotFoundException(f"Post with id '{post_id}' not found")
# Check authorization
if post.author_id != current_user_id:
raise ForbiddenException("You can only delete your own posts")
# Delete the post
await self._post_repo.delete(post_id)
# Commit transaction
await self._tx_manager.commit()

View File

@@ -1,4 +1,8 @@
"""Get post use case."""
"""Get post use case.
This module implements the use case for retrieving blog posts.
Supports lookup by both ID and slug.
"""
from uuid import UUID
@@ -10,32 +14,78 @@ from app.domain.repositories import PostRepository
class GetPostUseCase:
"""Use case for retrieving a post by ID or slug."""
"""Use case for retrieving a post by ID or slug.
Provides methods to fetch posts using different identifiers.
Handles not-found scenarios with appropriate exceptions.
Attributes:
_post_repo: Repository for post data access.
_tx_manager: Transaction manager for transaction control.
Example:
>>> use_case = GetPostUseCase(post_repo, tx_manager)
>>> post = await use_case.by_id(post_id)
>>> post = await use_case.by_slug("my-post")
"""
def __init__(
self,
post_repo: PostRepository,
tx_manager: TransactionManager,
) -> None:
"""Initialize use case with dependencies.
Args:
post_repo: Repository for post operations.
tx_manager: Transaction manager instance.
"""
self._post_repo = post_repo
self._tx_manager = tx_manager
async def by_id(self, post_id: UUID) -> PostResponseDTO:
"""Get post by ID."""
"""Get post by ID.
Args:
post_id: Unique identifier of the post.
Returns:
PostResponseDTO with complete post data.
Raises:
NotFoundException: If post with given ID does not exist.
"""
post = await self._post_repo.get_by_id(post_id)
if not post:
raise NotFoundException(f"Post with id '{post_id}' not found")
return self._map_to_dto(post)
async def by_slug(self, slug: str) -> PostResponseDTO:
"""Get post by slug."""
"""Get post by slug.
Args:
slug: URL-friendly slug identifier.
Returns:
PostResponseDTO with complete post data.
Raises:
NotFoundException: If post with given slug does not exist.
"""
post = await self._post_repo.get_by_slug(slug)
if not post:
raise NotFoundException(f"Post with slug '{slug}' not found")
return self._map_to_dto(post)
def _map_to_dto(self, post: Post) -> PostResponseDTO:
"""Map domain entity to response DTO."""
"""Map domain entity to response DTO.
Args:
post: Domain post entity.
Returns:
PostResponseDTO with all post attributes.
"""
return PostResponseDTO(
id=post.id,
title=post.title.value,

View File

@@ -1,4 +1,8 @@
"""List posts use case."""
"""List posts use case.
This module implements the use case for listing blog posts.
Provides multiple query methods including filtering by author, tag, and search.
"""
from app.application.dtos.post import PostResponseDTO
from app.application.interfaces import TransactionManager
@@ -7,18 +11,40 @@ from app.domain.repositories import PostRepository
class ListPostsUseCase:
"""Use case for listing blog posts with filtering."""
"""Use case for listing blog posts with filtering.
Provides various methods to query posts with different criteria
including pagination support for large result sets.
Attributes:
_post_repo: Repository for post data access.
_tx_manager: Transaction manager for transaction control.
Example:
>>> use_case = ListPostsUseCase(post_repo, tx_manager)
>>> posts = await use_case.published_posts(limit=10, offset=0)
"""
def __init__(
self,
post_repo: PostRepository,
tx_manager: TransactionManager,
) -> None:
"""Initialize use case with dependencies.
Args:
post_repo: Repository for post operations.
tx_manager: Transaction manager instance.
"""
self._post_repo = post_repo
self._tx_manager = tx_manager
async def all_posts(self) -> list[PostResponseDTO]:
"""Get all posts."""
"""Get all posts.
Returns:
List of PostResponseDTO for all posts.
"""
posts = await self._post_repo.get_all()
return [self._map_to_dto(post) for post in posts]
@@ -27,7 +53,15 @@ class ListPostsUseCase:
limit: int | None = None,
offset: int | None = None,
) -> list[PostResponseDTO]:
"""Get all published posts."""
"""Get all published posts.
Args:
limit: Maximum number of posts to return.
offset: Number of posts to skip.
Returns:
List of PostResponseDTO for published posts.
"""
posts = await self._post_repo.get_published(limit=limit, offset=offset)
return [self._map_to_dto(post) for post in posts]
@@ -37,7 +71,16 @@ class ListPostsUseCase:
limit: int | None = None,
offset: int | None = None,
) -> list[PostResponseDTO]:
"""Get posts by author."""
"""Get posts by author.
Args:
author_id: Identifier of the author.
limit: Maximum number of posts to return.
offset: Number of posts to skip.
Returns:
List of PostResponseDTO for posts by the author.
"""
posts = await self._post_repo.get_by_author(author_id, limit=limit, offset=offset)
return [self._map_to_dto(post) for post in posts]
@@ -47,7 +90,16 @@ class ListPostsUseCase:
limit: int | None = None,
offset: int | None = None,
) -> list[PostResponseDTO]:
"""Get posts by tag."""
"""Get posts by tag.
Args:
tag: Tag to filter by.
limit: Maximum number of posts to return.
offset: Number of posts to skip.
Returns:
List of PostResponseDTO for posts with the tag.
"""
posts = await self._post_repo.get_by_tag(tag, limit=limit, offset=offset)
return [self._map_to_dto(post) for post in posts]
@@ -57,12 +109,28 @@ class ListPostsUseCase:
limit: int | None = None,
offset: int | None = None,
) -> list[PostResponseDTO]:
"""Search posts."""
"""Search posts.
Args:
query: Search query string.
limit: Maximum number of posts to return.
offset: Number of posts to skip.
Returns:
List of PostResponseDTO for matching posts.
"""
posts = await self._post_repo.search(query, limit=limit, offset=offset)
return [self._map_to_dto(post) for post in posts]
def _map_to_dto(self, post: Post) -> PostResponseDTO:
"""Map domain entity to response DTO."""
"""Map domain entity to response DTO.
Args:
post: Domain post entity.
Returns:
PostResponseDTO with all post attributes.
"""
return PostResponseDTO(
id=post.id,
title=post.title.value,

View File

@@ -1,4 +1,8 @@
"""Publish post use case."""
"""Publish post use case.
This module implements the use case for publishing and unpublishing blog posts.
Includes authorization checks to ensure users can only manage their own posts.
"""
from uuid import UUID
@@ -10,18 +14,48 @@ from app.domain.repositories import PostRepository
class PublishPostUseCase:
"""Use case for publishing/unpublishing a blog post."""
"""Use case for publishing/unpublishing a blog post.
Handles post publication state changes with authorization checks.
Users can only publish or unpublish posts they authored.
Attributes:
_post_repo: Repository for post data access.
_tx_manager: Transaction manager for commit control.
Example:
>>> use_case = PublishPostUseCase(post_repo, tx_manager)
>>> post = await use_case.publish(post_id, user_id)
"""
def __init__(
self,
post_repo: PostRepository,
tx_manager: TransactionManager,
) -> None:
"""Initialize use case with dependencies.
Args:
post_repo: Repository for post operations.
tx_manager: Transaction manager instance.
"""
self._post_repo = post_repo
self._tx_manager = tx_manager
async def publish(self, post_id: UUID, current_user_id: str) -> PostResponseDTO:
"""Publish a post."""
"""Publish a post.
Args:
post_id: Unique identifier of the post.
current_user_id: ID of the user requesting publication.
Returns:
PostResponseDTO with updated post data.
Raises:
NotFoundException: If post with given ID does not exist.
ForbiddenException: If user is not the post author.
"""
post = await self._post_repo.get_by_id(post_id)
if not post:
raise NotFoundException(f"Post with id '{post_id}' not found")
@@ -36,7 +70,19 @@ class PublishPostUseCase:
return self._map_to_dto(post)
async def unpublish(self, post_id: UUID, current_user_id: str) -> PostResponseDTO:
"""Unpublish a post."""
"""Unpublish a post.
Args:
post_id: Unique identifier of the post.
current_user_id: ID of the user requesting unpublish.
Returns:
PostResponseDTO with updated post data.
Raises:
NotFoundException: If post with given ID does not exist.
ForbiddenException: If user is not the post author.
"""
post = await self._post_repo.get_by_id(post_id)
if not post:
raise NotFoundException(f"Post with id '{post_id}' not found")
@@ -51,7 +97,14 @@ class PublishPostUseCase:
return self._map_to_dto(post)
def _map_to_dto(self, post: Post) -> PostResponseDTO:
"""Map domain entity to response DTO."""
"""Map domain entity to response DTO.
Args:
post: Domain post entity.
Returns:
PostResponseDTO with all post attributes.
"""
return PostResponseDTO(
id=post.id,
title=post.title.value,

View File

@@ -1,4 +1,8 @@
"""Update post use case."""
"""Update post use case.
This module implements the use case for updating blog posts.
Supports partial updates and includes authorization checks.
"""
from uuid import UUID
@@ -11,13 +15,33 @@ from app.domain.value_objects import Content, Title
class UpdatePostUseCase:
"""Use case for updating a blog post."""
"""Use case for updating a blog post.
Handles post updates with authorization checks.
Supports partial updates - only provided fields are changed.
Users can only update posts they authored.
Attributes:
_post_repo: Repository for post data access.
_tx_manager: Transaction manager for commit control.
Example:
>>> use_case = UpdatePostUseCase(post_repo, tx_manager)
>>> dto = UpdatePostDTO(title="New Title")
>>> result = await use_case.execute(post_id, dto, user_id)
"""
def __init__(
self,
post_repo: PostRepository,
tx_manager: TransactionManager,
) -> None:
"""Initialize use case with dependencies.
Args:
post_repo: Repository for post operations.
tx_manager: Transaction manager instance.
"""
self._post_repo = post_repo
self._tx_manager = tx_manager
@@ -27,16 +51,27 @@ class UpdatePostUseCase:
dto: UpdatePostDTO,
current_user_id: str,
) -> PostResponseDTO:
"""Execute the use case."""
"""Execute the use case to update a post.
Args:
post_id: Unique identifier of the post to update.
dto: Data transfer object with update data.
current_user_id: ID of the user requesting update.
Returns:
PostResponseDTO with updated post data.
Raises:
NotFoundException: If post with given ID does not exist.
ForbiddenException: If user is not the post author.
"""
post = await self._post_repo.get_by_id(post_id)
if not post:
raise NotFoundException(f"Post with id '{post_id}' not found")
# Check authorization
if post.author_id != current_user_id:
raise ForbiddenException("You can only update your own posts")
# Update fields
if dto.title is not None:
post.update_title(Title(dto.title))
@@ -44,22 +79,25 @@ class UpdatePostUseCase:
post.update_content(Content(dto.content))
if dto.tags is not None:
# Replace all tags
for tag in post.tags[:]:
post.remove_tag(tag)
for tag in dto.tags:
post.add_tag(tag)
# Persist changes
await self._post_repo.update(post)
# Commit transaction
await self._tx_manager.commit()
return self._map_to_dto(post)
def _map_to_dto(self, post: Post) -> PostResponseDTO:
"""Map domain entity to response DTO."""
"""Map domain entity to response DTO.
Args:
post: Domain post entity.
Returns:
PostResponseDTO with all post attributes.
"""
return PostResponseDTO(
id=post.id,
title=post.title.value,

View File

@@ -1,4 +1,8 @@
"""Domain layer exports."""
"""Domain layer exports.
This module re-exports all domain layer components including
entities, value objects, repositories, and exceptions.
"""
from app.domain.entities import BaseEntity, Post
from app.domain.exceptions import (
@@ -13,18 +17,14 @@ from app.domain.repositories import PostRepository, Repository
from app.domain.value_objects import Content, Slug, Title, ValueObject
__all__ = [
# Entities
"BaseEntity",
"Post",
# Value Objects
"ValueObject",
"Title",
"Content",
"Slug",
# Repositories
"Repository",
"PostRepository",
# Exceptions
"DomainException",
"ValidationException",
"NotFoundException",

View File

@@ -1,4 +1,8 @@
"""Domain entities."""
"""Domain entities.
This module re-exports all domain entities that represent
core business objects with identity.
"""
from app.domain.entities.base import BaseEntity
from app.domain.entities.post import Post

View File

@@ -1,4 +1,9 @@
"""Base entity for DDD domain layer."""
"""Base entity for DDD domain layer.
This module provides the foundational BaseEntity class that all domain
entities must inherit from. It implements common entity patterns including
identity management, equality comparison, and timestamp tracking.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
@@ -9,25 +14,62 @@ from uuid import UUID, uuid4
@dataclass(kw_only=True)
class BaseEntity(ABC):
"""Base class for all domain entities."""
"""Base class for all domain entities.
Provides common functionality for domain entities including unique
identification, creation/update timestamps, and equality comparison
based on identity.
Attributes:
id: Unique identifier for the entity, automatically generated.
created_at: Timestamp when the entity was created.
updated_at: Timestamp when the entity was last updated.
Example:
>>> class User(BaseEntity):
... name: str
... def to_dict(self) -> dict[str, Any]:
... return {"id": str(self.id), "name": self.name}
"""
id: UUID = field(default_factory=uuid4)
created_at: datetime = field(default_factory=lambda: datetime.now(UTC))
updated_at: datetime = field(default_factory=lambda: datetime.now(UTC))
def __eq__(self, other: object) -> bool:
"""Compare entities by identity.
Args:
other: Another object to compare with.
Returns:
True if both objects are BaseEntity instances with same ID.
"""
if not isinstance(other, BaseEntity):
return NotImplemented
return self.id == other.id
def __hash__(self) -> int:
"""Get hash based on entity identity.
Returns:
Hash of the entity ID.
"""
return hash(self.id)
def touch(self) -> None:
"""Update the updated_at timestamp."""
"""Update the updated_at timestamp.
Should be called whenever the entity is modified to track
the last modification time.
"""
self.updated_at = datetime.now(UTC)
@abstractmethod
def to_dict(self) -> dict[str, Any]:
"""Convert entity to dictionary."""
"""Convert entity to dictionary representation.
Returns:
Dictionary containing all entity attributes.
"""
...

View File

@@ -1,4 +1,9 @@
"""Domain entity for Blog Post."""
"""Domain entity for Blog Post.
This module defines the Post aggregate root entity that encapsulates
all business logic related to blog posts including publishing, content
management, and tag operations.
"""
from dataclasses import dataclass, field
from typing import Any
@@ -11,7 +16,28 @@ from app.domain.value_objects.title import Title
@dataclass(kw_only=True)
class Post(BaseEntity):
"""Blog post domain entity."""
"""Blog post domain entity.
Represents a blog post with title, content, slug, and metadata.
Encapsulates business logic for post lifecycle management.
Attributes:
title: Post title value object with validation.
content: Post content value object with validation.
slug: URL-friendly identifier generated from title.
author_id: Identifier of the post author.
published: Publication status flag.
tags: List of tags associated with the post.
Example:
>>> post = Post.create(
... title_str="My First Post",
... content_str="This is the content...",
... author_id="user-123",
... tags=["python", "fastapi"]
... )
>>> post.publish()
"""
title: Title
content: Content
@@ -21,40 +47,66 @@ class Post(BaseEntity):
tags: list[str] = field(default_factory=list)
def publish(self) -> None:
"""Publish the post."""
"""Publish the post.
Sets the published flag to True and updates the timestamp.
"""
self.published = True
self.touch()
def unpublish(self) -> None:
"""Unpublish the post."""
"""Unpublish the post.
Sets the published flag to False and updates the timestamp.
"""
self.published = False
self.touch()
def update_content(self, content: Content) -> None:
"""Update post content."""
"""Update post content.
Args:
content: New content value object.
"""
self.content = content
self.touch()
def update_title(self, title: Title) -> None:
"""Update post title and regenerate slug."""
"""Update post title and regenerate slug.
Args:
title: New title value object.
"""
self.title = title
self.slug = Slug.from_title(title.value)
self.touch()
def add_tag(self, tag: str) -> None:
"""Add a tag to the post."""
"""Add a tag to the post.
Args:
tag: Tag string to add. Only adds if not already present.
"""
if tag not in self.tags:
self.tags.append(tag)
self.touch()
def remove_tag(self, tag: str) -> None:
"""Remove a tag from the post."""
"""Remove a tag from the post.
Args:
tag: Tag string to remove. Only removes if present.
"""
if tag in self.tags:
self.tags.remove(tag)
self.touch()
def to_dict(self) -> dict[str, Any]:
"""Convert entity to dictionary."""
"""Convert entity to dictionary.
Returns:
Dictionary representation with all post attributes.
"""
return {
"id": str(self.id),
"title": self.title.value,
@@ -75,7 +127,17 @@ class Post(BaseEntity):
author_id: str,
tags: list[str] | None = None,
) -> "Post":
"""Factory method to create a new post."""
"""Factory method to create a new post.
Args:
title_str: Title string for the post.
content_str: Content string for the post.
author_id: Identifier of the post author.
tags: Optional list of tags.
Returns:
New Post instance with validated value objects.
"""
title = Title(title_str)
content = Content(content_str)
slug = Slug.from_title(title_str)

View File

@@ -1,39 +1,78 @@
"""Domain exceptions."""
"""Domain exceptions for business logic errors.
This module defines the exception hierarchy for domain layer errors.
All domain exceptions inherit from DomainException base class.
"""
class DomainException(Exception):
"""Base exception for domain layer."""
"""Base exception for domain layer.
All domain-specific exceptions should inherit from this class.
Provides a consistent interface for error messages.
Attributes:
message: Human-readable error description.
Example:
>>> raise DomainException("Business rule violated")
"""
def __init__(self, message: str) -> None:
"""Initialize domain exception.
Args:
message: Error message describing the exception.
"""
self.message = message
super().__init__(self.message)
class ValidationException(DomainException):
"""Raised when validation fails."""
"""Raised when validation fails.
pass
Used when entity or value object validation does not pass.
Example:
>>> raise ValidationException("Title is too long")
"""
class NotFoundException(DomainException):
"""Raised when an entity is not found."""
"""Raised when an entity is not found.
pass
Used when requesting an entity that does not exist in the repository.
Example:
>>> raise NotFoundException("Post with id 123 not found")
"""
class AlreadyExistsException(DomainException):
"""Raised when trying to create an entity that already exists."""
"""Raised when trying to create an entity that already exists.
pass
Used when attempting to create a duplicate entity.
Example:
>>> raise AlreadyExistsException("Post with this slug already exists")
"""
class UnauthorizedException(DomainException):
"""Raised when user is not authorized."""
"""Raised when user is not authorized.
pass
Used when authentication is required but not provided or invalid.
Example:
>>> raise UnauthorizedException("Authentication required")
"""
class ForbiddenException(DomainException):
"""Raised when access is forbidden."""
"""Raised when access is forbidden.
pass
Used when authenticated user lacks required permissions.
Example:
>>> raise ForbiddenException("Only admins can delete posts")
"""

View File

@@ -1,4 +1,8 @@
"""Repository interfaces."""
"""Repository interfaces.
This module re-exports all repository interfaces that define
the contract for data access operations.
"""
from app.domain.repositories.base import Repository
from app.domain.repositories.post import PostRepository

View File

@@ -1,4 +1,8 @@
"""Base repository interface for DDD."""
"""Base repository interface for DDD.
This module defines the generic repository pattern interface that all
repository implementations must follow. Provides standard CRUD operations.
"""
from abc import ABC, abstractmethod
from typing import Generic, TypeVar
@@ -10,34 +14,76 @@ T = TypeVar("T", bound=BaseEntity)
class Repository(ABC, Generic[T]):
"""Generic repository interface."""
"""Generic repository interface.
Defines the contract for repository implementations. All repositories
must provide standard CRUD operations for their entity type.
Type Parameters:
T: Entity type that must inherit from BaseEntity.
Example:
>>> class PostRepository(Repository[Post]):
... async def get_by_id(self, entity_id: UUID) -> Post | None:
... ...
"""
@abstractmethod
async def get_by_id(self, entity_id: UUID) -> T | None:
"""Get entity by ID."""
"""Get entity by ID.
Args:
entity_id: Unique identifier of the entity.
Returns:
Entity instance if found, None otherwise.
"""
...
@abstractmethod
async def get_all(self) -> list[T]:
"""Get all entities."""
"""Get all entities.
Returns:
List of all entity instances.
"""
...
@abstractmethod
async def add(self, entity: T) -> None:
"""Add new entity."""
"""Add new entity.
Args:
entity: Entity instance to add.
"""
...
@abstractmethod
async def update(self, entity: T) -> None:
"""Update existing entity."""
"""Update existing entity.
Args:
entity: Entity instance with updated data.
"""
...
@abstractmethod
async def delete(self, entity_id: UUID) -> None:
"""Delete entity by ID."""
"""Delete entity by ID.
Args:
entity_id: Unique identifier of the entity to delete.
"""
...
@abstractmethod
async def exists(self, entity_id: UUID) -> bool:
"""Check if entity exists."""
"""Check if entity exists.
Args:
entity_id: Unique identifier of the entity.
Returns:
True if entity exists, False otherwise.
"""
...

View File

@@ -1,4 +1,8 @@
"""Post repository interface."""
"""Post repository interface.
This module extends the base repository interface with post-specific
query methods including slug lookup, author filtering, and search.
"""
from abc import abstractmethod
@@ -7,11 +11,27 @@ from app.domain.repositories.base import Repository
class PostRepository(Repository[Post]):
"""Repository interface for Blog Posts."""
"""Repository interface for Blog Posts.
Extends the generic repository with post-specific operations
including slug-based lookup, author filtering, tag filtering,
and full-text search capabilities.
Example:
>>> posts = await repo.get_by_author("user-123", limit=10)
>>> exists = await repo.slug_exists("my-first-post")
"""
@abstractmethod
async def get_by_slug(self, slug: str) -> Post | None:
"""Get post by slug."""
"""Get post by slug.
Args:
slug: URL-friendly slug identifier.
Returns:
Post instance if found, None otherwise.
"""
...
@abstractmethod
@@ -21,7 +41,16 @@ class PostRepository(Repository[Post]):
limit: int | None = None,
offset: int | None = None,
) -> list[Post]:
"""Get all posts by author."""
"""Get all posts by author.
Args:
author_id: Identifier of the author.
limit: Maximum number of posts to return.
offset: Number of posts to skip.
Returns:
List of posts by the specified author.
"""
...
@abstractmethod
@@ -30,7 +59,15 @@ class PostRepository(Repository[Post]):
limit: int | None = None,
offset: int | None = None,
) -> list[Post]:
"""Get all published posts."""
"""Get all published posts.
Args:
limit: Maximum number of posts to return.
offset: Number of posts to skip.
Returns:
List of published posts.
"""
...
@abstractmethod
@@ -40,12 +77,28 @@ class PostRepository(Repository[Post]):
limit: int | None = None,
offset: int | None = None,
) -> list[Post]:
"""Get posts by tag."""
"""Get posts by tag.
Args:
tag: Tag to filter by.
limit: Maximum number of posts to return.
offset: Number of posts to skip.
Returns:
List of posts with the specified tag.
"""
...
@abstractmethod
async def slug_exists(self, slug: str) -> bool:
"""Check if slug already exists."""
"""Check if slug already exists.
Args:
slug: Slug to check for existence.
Returns:
True if slug exists, False otherwise.
"""
...
@abstractmethod
@@ -55,5 +108,14 @@ class PostRepository(Repository[Post]):
limit: int | None = None,
offset: int | None = None,
) -> list[Post]:
"""Search posts by query string."""
"""Search posts by query string.
Args:
query: Search query string.
limit: Maximum number of posts to return.
offset: Number of posts to skip.
Returns:
List of posts matching the search query.
"""
...

View File

@@ -1,4 +1,8 @@
"""Role-based access control definitions."""
"""Role-based access control definitions.
This module provides role and permission definitions for the application
along with utility functions and decorators for permission checking.
"""
from collections.abc import Callable
from enum import Enum
@@ -9,7 +13,20 @@ from app.domain.exceptions import ForbiddenException
class Role(str, Enum):
"""User roles in the system."""
"""User roles in the system.
Defines the available user roles with hierarchical permissions.
ADMIN has full access, USER has standard access, GUEST has read-only.
Attributes:
ADMIN: Administrator with full system access.
USER: Regular authenticated user.
GUEST: Unauthenticated or limited access user.
Example:
>>> if role == Role.ADMIN:
... grant_full_access()
"""
ADMIN = "admin"
USER = "user"
@@ -17,9 +34,16 @@ class Role(str, Enum):
class Permission:
"""Permission definitions."""
"""Permission definitions.
Contains string constants for all available permissions in the system.
Used for role-based access control checks.
Example:
>>> if has_permission(role, Permission.POST_CREATE):
... allow_post_creation()
"""
# Post permissions
POST_CREATE = "post:create"
POST_READ = "post:read"
POST_READ_UNPUBLISHED = "post:read_unpublished"
@@ -28,7 +52,6 @@ class Permission:
POST_PUBLISH = "post:publish"
# Role-based permission mapping
ROLE_PERMISSIONS: dict[Role, list[str]] = {
Role.ADMIN: [
Permission.POST_CREATE,
@@ -52,24 +75,52 @@ ROLE_PERMISSIONS: dict[Role, list[str]] = {
def has_permission(role: Role, permission: str) -> bool:
"""Check if role has specific permission."""
"""Check if role has specific permission.
Args:
role: User role to check.
permission: Permission string to verify.
Returns:
True if role has the permission, False otherwise.
Example:
>>> has_permission(Role.ADMIN, Permission.POST_DELETE)
True
"""
return permission in ROLE_PERMISSIONS.get(role, [])
def require_permission(
permission: str,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Decorator to require specific permission."""
"""Decorator to require specific permission.
Creates a decorator that checks if the user has the required permission
before executing the decorated function.
Args:
permission: Permission string required for execution.
Returns:
Decorator function for permission checking.
Raises:
ForbiddenException: If user lacks the required permission.
Example:
>>> @require_permission(Permission.POST_CREATE)
... async def create_post():
... ...
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
# Get token_info from kwargs
token_info = kwargs.get("token_info")
if not token_info:
raise ForbiddenException("Authentication required")
# Determine role from token or default to guest
roles = getattr(token_info, "roles", [])
if Role.ADMIN.value in roles:
role = Role.ADMIN
@@ -93,7 +144,18 @@ def require_permission(
def get_effective_role(roles: list[str]) -> Role:
"""Determine effective role from list of roles.
Priority: admin > user > guest
Evaluates multiple roles and returns the highest privilege role.
Priority order: admin > user > guest.
Args:
roles: List of role strings from token.
Returns:
Highest privilege Role enum value.
Example:
>>> get_effective_role(["user", "admin"])
<Role.ADMIN: 'admin'>
"""
if Role.ADMIN.value in roles:
return Role.ADMIN

View File

@@ -1,4 +1,8 @@
"""Value objects."""
"""Value objects.
This module re-exports all domain value objects that represent
immutable validated domain concepts.
"""
from app.domain.value_objects.base import ValueObject
from app.domain.value_objects.content import Content

View File

@@ -1,4 +1,9 @@
"""Base value object for DDD domain layer."""
"""Base value object for DDD domain layer.
This module provides the foundational ValueObject class that all domain
value objects must inherit from. Implements equality, hashing, and
validation patterns for immutable value objects.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
@@ -9,29 +14,78 @@ T = TypeVar("T")
@dataclass(frozen=True, slots=True)
class ValueObject(ABC, Generic[T]):
"""Base class for all value objects."""
"""Base class for all value objects.
Value objects are immutable objects defined by their attributes rather
than identity. They are validated on creation and provide type safety.
Attributes:
value: The underlying value wrapped by the value object.
Type Parameters:
T: Type of the wrapped value.
Example:
>>> class Email(ValueObject[str]):
... def _validate(self) -> None:
... if "@" not in self.value:
... raise ValueError("Invalid email")
"""
value: T
def __post_init__(self) -> None:
"""Validate value object after initialization.
Automatically called by dataclass after __init__. Triggers
the validation method to ensure value integrity.
"""
self._validate()
@abstractmethod
def _validate(self) -> None:
"""Validate the value object. Raise ValueError if invalid."""
"""Validate the value object.
Must be implemented by subclasses to enforce value constraints.
Raises:
ValueError: If the value does not meet validation criteria.
"""
...
def __eq__(self, other: object) -> bool:
"""Compare value objects by value.
Args:
other: Another object to compare with.
Returns:
True if both are ValueObjects with equal values.
"""
if not isinstance(other, ValueObject):
return False
return bool(self.value == other.value)
def __hash__(self) -> int:
"""Get hash based on wrapped value.
Returns:
Hash of the underlying value.
"""
return hash(self.value)
def __str__(self) -> str:
"""Convert value object to string.
Returns:
String representation of the wrapped value.
"""
return str(self.value)
def to_primitive(self) -> Any:
"""Convert value object to primitive type."""
"""Convert value object to primitive type.
Returns:
The underlying primitive value.
"""
return self.value

View File

@@ -1,4 +1,8 @@
"""Content value object."""
"""Content value object.
This module defines the Content value object for blog post content
with validation for minimum and maximum length constraints.
"""
from dataclasses import dataclass
@@ -7,12 +11,35 @@ from app.domain.value_objects.base import ValueObject
@dataclass(frozen=True, slots=True)
class Content(ValueObject[str]):
"""Blog post content value object."""
"""Blog post content value object.
Wraps and validates blog post content ensuring it meets length
requirements and is not empty.
Attributes:
value: The content string value.
MIN_LENGTH: Minimum allowed content length (10 characters).
MAX_LENGTH: Maximum allowed content length (50000 characters).
Raises:
ValueError: If content is empty, too short, or too long.
Example:
>>> content = Content("This is valid content...")
>>> print(content.value)
"""
MIN_LENGTH: int = 10
MAX_LENGTH: int = 50000
def _validate(self) -> None:
"""Validate content string.
Checks that content is a non-empty string within length bounds.
Raises:
ValueError: If content fails validation criteria.
"""
if not isinstance(self.value, str):
raise ValueError("Content must be a string")
if not self.value.strip():

View File

@@ -1,4 +1,9 @@
"""Slug value object for URL-friendly identifiers."""
"""Slug value object for URL-friendly identifiers.
This module defines the Slug value object for generating and validating
URL-friendly slugs from titles. Enforces lowercase, alphanumeric, and
hyphen-only format.
"""
import re
from dataclasses import dataclass
@@ -8,12 +13,36 @@ from app.domain.value_objects.base import ValueObject
@dataclass(frozen=True, slots=True)
class Slug(ValueObject[str]):
"""URL slug value object."""
"""URL slug value object.
Represents a URL-friendly identifier generated from titles.
Validates format and provides factory method for slug generation.
Attributes:
value: The slug string value.
MAX_LENGTH: Maximum allowed slug length (200 characters).
SLUG_PATTERN: Regex pattern for valid slug format.
Raises:
ValueError: If slug format is invalid.
Example:
>>> slug = Slug.from_title("My First Post!")
>>> print(slug.value)
'my-first-post'
"""
MAX_LENGTH: int = 200
SLUG_PATTERN: str = r"^[a-z0-9]+(?:-[a-z0-9]+)*$"
def _validate(self) -> None:
"""Validate slug format.
Ensures slug contains only lowercase letters, numbers, and hyphens.
Raises:
ValueError: If slug format is invalid.
"""
if not isinstance(self.value, str):
raise ValueError("Slug must be a string")
if len(self.value) > self.MAX_LENGTH:
@@ -23,17 +52,22 @@ class Slug(ValueObject[str]):
@classmethod
def from_title(cls, title: str) -> "Slug":
"""Generate slug from title."""
# Convert to lowercase, replace spaces with hyphens
"""Generate slug from title.
Converts title to URL-friendly format by lowercasing, removing
special characters, and replacing spaces with hyphens.
Args:
title: Source title string.
Returns:
New Slug instance with generated value.
"""
slug = title.lower().strip()
# Keep only alphanumeric, spaces, and hyphens
slug = re.sub(r"[^a-z0-9\s-]", "", slug)
# Replace spaces and multiple hyphens with single hyphen
slug = re.sub(r"[-\s]+", "-", slug)
# Limit length and strip hyphens
max_len = 200 # Same as MAX_LENGTH
max_len = 200
slug = slug[:max_len].strip("-")
# Ensure we have at least one character
if not slug:
slug = "post"
return cls(value=slug)

View File

@@ -1,4 +1,8 @@
"""Title value object."""
"""Title value object.
This module defines the Title value object for blog post titles
with validation for minimum and maximum length constraints.
"""
from dataclasses import dataclass
@@ -7,12 +11,35 @@ from app.domain.value_objects.base import ValueObject
@dataclass(frozen=True, slots=True)
class Title(ValueObject[str]):
"""Blog post title value object."""
"""Blog post title value object.
Wraps and validates blog post titles ensuring they meet length
requirements and are not empty.
Attributes:
value: The title string value.
MIN_LENGTH: Minimum allowed title length (3 characters).
MAX_LENGTH: Maximum allowed title length (200 characters).
Raises:
ValueError: If title is empty, too short, or too long.
Example:
>>> title = Title("My Blog Post")
>>> print(title.value)
"""
MIN_LENGTH: int = 3
MAX_LENGTH: int = 200
def _validate(self) -> None:
"""Validate title string.
Checks that title is a non-empty string within length bounds.
Raises:
ValueError: If title fails validation criteria.
"""
if not isinstance(self.value, str):
raise ValueError("Title must be a string")
if len(self.value) < self.MIN_LENGTH:

View File

@@ -1,4 +1,8 @@
"""Infrastructure layer exports."""
"""Infrastructure layer exports.
This module re-exports all infrastructure components including
config, database, repositories, DI, and middleware.
"""
from app.infrastructure.config import Settings, settings
from app.infrastructure.database import (
@@ -15,10 +19,8 @@ from app.infrastructure.middleware import register_exception_handlers
from app.infrastructure.repositories import SQLAlchemyPostRepository
__all__ = [
# Config
"Settings",
"settings",
# Database
"Base",
"PostORM",
"engine",
@@ -26,10 +28,7 @@ __all__ = [
"get_session",
"init_db",
"close_db",
# Repositories
"SQLAlchemyPostRepository",
# DI
"create_container",
# Middleware
"register_exception_handlers",
]

View File

@@ -1,4 +1,8 @@
"""Authentication infrastructure package."""
"""Authentication infrastructure package.
This module provides Keycloak authentication client and models
for token validation and user info retrieval.
"""
from app.infrastructure.auth.client import KeycloakAuthClient
from app.infrastructure.auth.models import KeycloakUser, TokenInfo

View File

@@ -1,4 +1,8 @@
"""Keycloak authentication client."""
"""Keycloak authentication client.
This module provides a client for Keycloak authentication operations
including token introspection and user info retrieval.
"""
import time
@@ -9,10 +13,30 @@ from app.infrastructure.config.settings import Settings
class KeycloakAuthClient:
"""Client for Keycloak authentication operations."""
"""Client for Keycloak authentication operations.
Handles token validation via introspection and user info retrieval.
Implements token caching to reduce Keycloak server load.
Attributes:
_settings: Application settings with Keycloak config.
_base_url: Keycloak realm base URL.
_client_id: OAuth client identifier.
_client_secret: OAuth client secret.
_cache: Token info cache for performance.
_cache_ttl: Cache time-to-live in seconds.
Example:
>>> client = KeycloakAuthClient(settings)
>>> token_info = await client.introspect_token(token)
"""
def __init__(self, settings: Settings) -> None:
"""Initialize Keycloak client with settings."""
"""Initialize Keycloak client with settings.
Args:
settings: Application settings with Keycloak configuration.
"""
self._settings = settings
self._base_url = f"{settings.kc.server_url}/realms/{settings.kc.realm}"
self._client_id = settings.kc.client_id
@@ -21,15 +45,30 @@ class KeycloakAuthClient:
self._cache_ttl = settings.kc.token_cache_ttl
def _get_introspection_url(self) -> str:
"""Get token introspection endpoint URL."""
"""Get token introspection endpoint URL.
Returns:
Full URL for token introspection endpoint.
"""
return f"{self._base_url}/protocol/openid-connect/token/introspection"
def _get_userinfo_url(self) -> str:
"""Get userinfo endpoint URL."""
"""Get userinfo endpoint URL.
Returns:
Full URL for userinfo endpoint.
"""
return f"{self._base_url}/protocol/openid-connect/userinfo"
def _get_cached_token(self, token: str) -> TokenInfo | None:
"""Get cached token info if valid."""
"""Get cached token info if valid.
Args:
token: Access token string.
Returns:
Cached TokenInfo if valid and not expired, None otherwise.
"""
if token not in self._cache:
return None
@@ -41,9 +80,13 @@ class KeycloakAuthClient:
return token_info
def _cache_token(self, token: str, token_info: TokenInfo) -> None:
"""Cache token info."""
"""Cache token info.
Args:
token: Access token string as cache key.
token_info: TokenInfo to cache.
"""
self._cache[token] = (token_info, time.time())
# Simple cleanup of old entries
current_time = time.time()
expired_keys = [
k for k, (_, t) in self._cache.items() if current_time - t > self._cache_ttl
@@ -52,13 +95,21 @@ class KeycloakAuthClient:
del self._cache[k]
async def introspect_token(self, token: str) -> TokenInfo:
"""Introspect access token using Keycloak."""
# Check cache first
"""Introspect access token using Keycloak.
Validates token with Keycloak server and extracts user information.
Uses cache to reduce server requests for recently validated tokens.
Args:
token: Access token to validate.
Returns:
TokenInfo with validation result and user claims.
"""
cached = self._get_cached_token(token)
if cached:
return cached
# Prepare introspection request
data = {
"token": token,
"client_id": self._client_id,
@@ -81,7 +132,6 @@ class KeycloakAuthClient:
if not result.get("active", False):
return TokenInfo(active=False, raw_claims=result)
# Extract roles from realm_access or resource_access
roles: list[str] = []
realm_access = result.get("realm_access", {})
if isinstance(realm_access, dict):
@@ -96,13 +146,21 @@ class KeycloakAuthClient:
raw_claims=result,
)
# Cache valid token
self._cache_token(token, token_info)
return token_info
async def get_userinfo(self, token: str) -> KeycloakUser | None:
"""Get user information from Keycloak using access token."""
"""Get user information from Keycloak using access token.
Fetches detailed user profile from Keycloak userinfo endpoint.
Args:
token: Valid access token.
Returns:
KeycloakUser with profile data, or None on error.
"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(

View File

@@ -1,4 +1,8 @@
"""Keycloak authentication models."""
"""Keycloak authentication models.
This module defines data models for Keycloak authentication data
including token info and user profiles.
"""
from dataclasses import dataclass, field
from typing import Any
@@ -6,7 +10,24 @@ from typing import Any
@dataclass(frozen=True)
class TokenInfo:
"""Information about validated token from Keycloak."""
"""Information about validated token from Keycloak.
Contains the result of token introspection including user claims
and role information.
Attributes:
active: Whether the token is active and valid.
user_id: Subject identifier from token.
username: Username from token claims.
email: Email from token claims.
roles: List of roles from token.
raw_claims: Complete raw claims from token.
Example:
>>> token_info = TokenInfo(active=True, user_id="123", roles=["user"])
>>> if token_info.is_valid:
... grant_access()
"""
active: bool
user_id: str = ""
@@ -17,13 +38,32 @@ class TokenInfo:
@property
def is_valid(self) -> bool:
"""Check if token is valid and active."""
"""Check if token is valid and active.
Returns:
True if token is active and has user_id.
"""
return self.active and bool(self.user_id)
@dataclass(frozen=True)
class KeycloakUser:
"""User information from Keycloak."""
"""User information from Keycloak.
Contains user profile data from Keycloak userinfo endpoint.
Attributes:
id: User subject identifier.
username: Username.
email: Email address.
first_name: First name.
last_name: Last name.
roles: List of user roles.
is_active: Whether user account is active.
Example:
>>> user = KeycloakUser(id="123", username="john", email="john@example.com")
"""
id: str
username: str

View File

@@ -1,4 +1,8 @@
"""Infrastructure configuration."""
"""Infrastructure configuration.
This module re-exports all configuration classes and the global
settings instance for application configuration.
"""
from app.infrastructure.config.settings import (
AppConfig,

View File

@@ -1,4 +1,8 @@
"""Application settings with composition pattern."""
"""Application settings with composition pattern.
This module defines the application configuration using pydantic-settings.
Provides typed configuration for database, Keycloak, security, and app settings.
"""
from enum import Enum
from functools import cached_property
@@ -8,14 +12,38 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
class Environment(str, Enum):
"""Application environment modes."""
"""Application environment modes.
Defines the available deployment environments.
Each environment may have different configuration defaults.
Attributes:
DEV: Development environment with debug features.
PROD: Production environment with strict security.
Example:
>>> if settings.environment == Environment.PROD:
... enable_strict_security()
"""
DEV = "dev"
PROD = "prod"
class AppConfig(BaseSettings):
"""Application configuration."""
"""Application configuration.
Contains general application settings like name, host, and port.
Attributes:
name: Application display name.
debug: Debug mode flag.
host: Server bind host.
port: Server bind port.
Example:
>>> config = AppConfig(name="My API", port=8000)
"""
name: str = "Blog API"
debug: bool = False
@@ -30,14 +58,27 @@ class AppConfig(BaseSettings):
class DBConfig(BaseSettings):
"""Database configuration."""
"""Database configuration.
Contains database connection settings. Supports both SQLite for
development and PostgreSQL for production.
Attributes:
url: Full database URL (optional, can build from components).
echo: Enable SQL query logging.
host: Database server host.
port: Database server port.
user: Database username.
password: Database password.
name: Database name.
Example:
>>> db_config = DBConfig(host="localhost", name="blog")
"""
# For dev: sqlite+aiosqlite:///./blog.db
# For prod: postgresql+asyncpg://user:pass@host:port/db
url: str | None = None
echo: bool = False
# PostgreSQL-specific settings (used in prod)
host: str = "localhost"
port: int = 5432
user: str = "postgres"
@@ -53,7 +94,17 @@ class DBConfig(BaseSettings):
@field_validator("url")
@classmethod
def validate_url(cls, v: str | None) -> str | None:
"""Validate database URL if provided."""
"""Validate database URL if provided.
Args:
v: Database URL string to validate.
Returns:
Validated URL string.
Raises:
ValueError: If URL does not start with supported prefix.
"""
if v is None:
return v
if not any(v.startswith(prefix) for prefix in ("sqlite+", "postgresql+")):
@@ -62,7 +113,20 @@ class DBConfig(BaseSettings):
class KCConfig(BaseSettings):
"""Keycloak configuration."""
"""Keycloak configuration.
Contains Keycloak authentication server settings.
Attributes:
server_url: Keycloak server base URL.
realm: Keycloak realm name.
client_id: OAuth client identifier.
client_secret: OAuth client secret.
token_cache_ttl: Token cache time-to-live in seconds.
Example:
>>> kc = KCConfig(server_url="http://localhost:8080", realm="blog")
"""
server_url: str = "http://localhost:8080"
realm: str = "blog"
@@ -71,7 +135,7 @@ class KCConfig(BaseSettings):
default="",
description="Keycloak client secret - must be set via env in production",
)
token_cache_ttl: int = 60 # seconds
token_cache_ttl: int = 60
model_config = SettingsConfigDict(
env_prefix="KC_",
@@ -81,12 +145,26 @@ class KCConfig(BaseSettings):
@property
def is_configured(self) -> bool:
"""Check if Keycloak is properly configured."""
"""Check if Keycloak is properly configured.
Returns:
True if client_secret is set.
"""
return bool(self.client_secret)
class SecurityConfig(BaseSettings):
"""Security configuration."""
"""Security configuration.
Contains security-related settings for JWT and authentication.
Attributes:
secret_key: Secret key for JWT signing.
access_token_expire_minutes: Token expiration time in minutes.
Example:
>>> security = SecurityConfig(secret_key="super-secret-key")
"""
secret_key: str = Field(
default="", description="Secret key for JWT - must be set via env in production"
@@ -101,17 +179,37 @@ class SecurityConfig(BaseSettings):
@property
def is_configured(self) -> bool:
"""Check if security is properly configured."""
"""Check if security is properly configured.
Returns:
True if secret_key is set.
"""
return bool(self.secret_key)
class Settings(BaseSettings):
"""Application configuration settings with composition."""
"""Application configuration settings with composition.
Main settings class that composes all sub-configurations.
Validates production settings and provides computed properties.
Attributes:
environment: Current deployment environment.
app: Application configuration.
db: Database configuration.
kc: Keycloak configuration.
security: Security configuration.
Raises:
ValueError: If required production settings are missing.
Example:
>>> settings = Settings()
>>> print(settings.database_url)
"""
# Environment mode
environment: Environment = Environment.DEV
# Sub-configurations
app: AppConfig = Field(default_factory=AppConfig)
db: DBConfig = Field(default_factory=DBConfig)
kc: KCConfig = Field(default_factory=KCConfig)
@@ -125,7 +223,13 @@ class Settings(BaseSettings):
)
def model_post_init(self, __context: object) -> None:
"""Validate settings after initialization."""
"""Validate settings after initialization.
Checks that required settings are configured for production mode.
Raises:
ValueError: If required production settings are missing.
"""
if self.is_prod:
if not self.security.is_configured:
raise ValueError("SECURITY_SECRET_KEY must be set in production mode")
@@ -136,14 +240,16 @@ class Settings(BaseSettings):
def database_url(self) -> str:
"""Get database URL based on environment.
- In dev: uses SQLite if no URL provided
- In prod: uses PostgreSQL if no URL provided
Returns configured URL or builds one from components.
Uses SQLite for development, PostgreSQL for production.
Returns:
Complete database URL string.
"""
if self.db.url:
return self.db.url
if self.environment == Environment.PROD:
# Build PostgreSQL URL from components
return str(
PostgresDsn.build(
scheme="postgresql+asyncpg",
@@ -155,19 +261,25 @@ class Settings(BaseSettings):
)
)
# Default dev SQLite URL
return "sqlite+aiosqlite:///./blog.db"
@property
def is_dev(self) -> bool:
"""Check if running in development mode."""
"""Check if running in development mode.
Returns:
True if environment is DEV.
"""
return self.environment == Environment.DEV
@property
def is_prod(self) -> bool:
"""Check if running in production mode."""
"""Check if running in production mode.
Returns:
True if environment is PROD.
"""
return self.environment == Environment.PROD
# Global settings instance
settings = Settings()

View File

@@ -1,4 +1,8 @@
"""Database infrastructure."""
"""Database infrastructure.
This module re-exports database connection utilities and ORM models
for data persistence.
"""
from app.infrastructure.database.connection import (
AsyncSessionLocal,

View File

@@ -1,4 +1,8 @@
"""Database connection and session management."""
"""Database connection and session management.
This module handles database engine creation, session management,
and connection lifecycle for the application.
"""
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
@@ -13,22 +17,26 @@ from sqlalchemy.ext.asyncio import (
from app.infrastructure.config import settings
# Convert SQLite URL to async format if needed
def _get_database_url() -> str:
"""Get database URL with SQLite async compatibility.
Converts SQLite URL to async format if needed.
Returns:
Database URL string ready for async engine.
"""
url = settings.database_url
if url.startswith("sqlite:///") and not url.startswith("sqlite+aiosqlite:///"):
return url.replace("sqlite:///", "sqlite+aiosqlite:///")
return url
# Create async engine
engine: AsyncEngine = create_async_engine(
_get_database_url(),
echo=settings.db.echo,
future=True,
)
# Create session factory
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
@@ -39,7 +47,11 @@ AsyncSessionLocal = async_sessionmaker(
async def get_session() -> AsyncGenerator[AsyncSession]:
"""Get database session."""
"""Get database session.
Yields:
AsyncSession instance for database operations.
"""
async with AsyncSessionLocal() as session:
try:
yield session
@@ -49,7 +61,11 @@ async def get_session() -> AsyncGenerator[AsyncSession]:
@asynccontextmanager
async def get_session_context() -> AsyncGenerator[AsyncSession]:
"""Get database session as context manager."""
"""Get database session as context manager.
Yields:
AsyncSession instance for database operations.
"""
async with AsyncSessionLocal() as session:
try:
yield session
@@ -58,7 +74,11 @@ async def get_session_context() -> AsyncGenerator[AsyncSession]:
async def init_db() -> None:
"""Initialize database tables."""
"""Initialize database tables.
Creates all tables defined in the metadata.
Should be called on application startup.
"""
from app.infrastructure.database.models import Base
async with engine.begin() as conn:
@@ -66,5 +86,9 @@ async def init_db() -> None:
async def close_db() -> None:
"""Close database connections."""
"""Close database connections.
Disposes of the engine and all connections.
Should be called on application shutdown.
"""
await engine.dispose()

View File

@@ -1,4 +1,8 @@
"""SQLAlchemy ORM models."""
"""SQLAlchemy ORM models.
This module defines the database ORM models that map to database tables.
Models are used by repositories for data persistence.
"""
from datetime import UTC, datetime
from uuid import uuid4
@@ -10,7 +14,25 @@ Base = declarative_base()
class PostORM(Base): # type: ignore[valid-type,misc]
"""SQLAlchemy model for Blog Post."""
"""SQLAlchemy model for Blog Post.
Database table representation of blog posts.
Maps to the 'posts' table with all post attributes.
Attributes:
id: Primary key as UUID string.
title: Post title (max 200 chars).
content: Post content (text).
slug: URL-friendly unique identifier.
author_id: Author reference.
published: Publication status flag.
tags: JSON array of tags.
created_at: Creation timestamp.
updated_at: Last update timestamp.
Example:
>>> post = PostORM(title="Post", content="...", slug="post", author_id="user-1")
"""
__tablename__ = "posts"

View File

@@ -1,4 +1,8 @@
"""Dependency Injection using Dishka."""
"""Dependency Injection using Dishka.
This module provides DI container setup and configuration
for the application using Dishka library.
"""
from app.infrastructure.di.container import create_container

View File

@@ -1,4 +1,8 @@
"""Dishka providers for dependency injection."""
"""Dishka providers for dependency injection.
This module defines Dishka providers for all application dependencies.
Providers configure how dependencies are created and scoped.
"""
from collections.abc import AsyncGenerator
@@ -22,16 +26,31 @@ from app.infrastructure.repositories.post import SQLAlchemyPostRepository
class DatabaseProvider(Provider):
"""Provider for database-related dependencies."""
"""Provider for database-related dependencies.
Provides database engine and session scoped appropriately.
Engine is application-scoped, sessions are request-scoped.
Example:
>>> provider = DatabaseProvider()
"""
@provide(scope=Scope.APP)
def get_engine(self) -> AsyncEngine:
"""Provide SQLAlchemy engine."""
"""Provide SQLAlchemy engine.
Returns:
AsyncEngine instance for database operations.
"""
return engine
@provide(scope=Scope.REQUEST)
async def get_session(self) -> AsyncGenerator[AsyncSession]:
"""Provide database session per request."""
"""Provide database session per request.
Yields:
AsyncSession instance for the request lifetime.
"""
async with AsyncSessionLocal() as session:
try:
yield session
@@ -40,27 +59,62 @@ class DatabaseProvider(Provider):
class RepositoryProvider(Provider):
"""Provider for repository implementations."""
"""Provider for repository implementations.
Provides concrete repository implementations for interfaces.
All repositories are request-scoped.
Example:
>>> provider = RepositoryProvider()
"""
@provide(scope=Scope.REQUEST)
def get_post_repository(self, session: AsyncSession) -> PostRepository:
"""Provide PostRepository implementation."""
"""Provide PostRepository implementation.
Args:
session: Database session from DI container.
Returns:
SQLAlchemyPostRepository instance.
"""
return SQLAlchemyPostRepository(session)
class TransactionManagerProvider(Provider):
"""Provider for transaction manager."""
"""Provider for transaction manager.
Provides transaction manager implementation for use cases.
Scoped per request for transaction isolation.
Example:
>>> provider = TransactionManagerProvider()
"""
@provide(scope=Scope.REQUEST)
def get_transaction_manager(self, session: AsyncSession) -> TransactionManager:
"""Provide TransactionManager implementation."""
"""Provide TransactionManager implementation.
Args:
session: Database session from DI container.
Returns:
SessionTransactionManager instance.
"""
from app.infrastructure.di.transaction_manager import SessionTransactionManager
return SessionTransactionManager(session)
class UseCaseProvider(Provider):
"""Provider for use cases."""
"""Provider for use cases.
Provides all application use cases with their dependencies.
All use cases are request-scoped for transaction isolation.
Example:
>>> provider = UseCaseProvider()
"""
@provide(scope=Scope.REQUEST)
def get_create_post_use_case(
@@ -68,7 +122,15 @@ class UseCaseProvider(Provider):
post_repo: PostRepository,
tx_manager: TransactionManager,
) -> CreatePostUseCase:
"""Provide CreatePostUseCase."""
"""Provide CreatePostUseCase.
Args:
post_repo: Post repository dependency.
tx_manager: Transaction manager dependency.
Returns:
Configured CreatePostUseCase instance.
"""
return CreatePostUseCase(
post_repo=post_repo,
tx_manager=tx_manager,
@@ -80,7 +142,15 @@ class UseCaseProvider(Provider):
post_repo: PostRepository,
tx_manager: TransactionManager,
) -> GetPostUseCase:
"""Provide GetPostUseCase."""
"""Provide GetPostUseCase.
Args:
post_repo: Post repository dependency.
tx_manager: Transaction manager dependency.
Returns:
Configured GetPostUseCase instance.
"""
return GetPostUseCase(
post_repo=post_repo,
tx_manager=tx_manager,
@@ -92,7 +162,15 @@ class UseCaseProvider(Provider):
post_repo: PostRepository,
tx_manager: TransactionManager,
) -> UpdatePostUseCase:
"""Provide UpdatePostUseCase."""
"""Provide UpdatePostUseCase.
Args:
post_repo: Post repository dependency.
tx_manager: Transaction manager dependency.
Returns:
Configured UpdatePostUseCase instance.
"""
return UpdatePostUseCase(
post_repo=post_repo,
tx_manager=tx_manager,
@@ -104,7 +182,15 @@ class UseCaseProvider(Provider):
post_repo: PostRepository,
tx_manager: TransactionManager,
) -> DeletePostUseCase:
"""Provide DeletePostUseCase."""
"""Provide DeletePostUseCase.
Args:
post_repo: Post repository dependency.
tx_manager: Transaction manager dependency.
Returns:
Configured DeletePostUseCase instance.
"""
return DeletePostUseCase(
post_repo=post_repo,
tx_manager=tx_manager,
@@ -116,7 +202,15 @@ class UseCaseProvider(Provider):
post_repo: PostRepository,
tx_manager: TransactionManager,
) -> ListPostsUseCase:
"""Provide ListPostsUseCase."""
"""Provide ListPostsUseCase.
Args:
post_repo: Post repository dependency.
tx_manager: Transaction manager dependency.
Returns:
Configured ListPostsUseCase instance.
"""
return ListPostsUseCase(
post_repo=post_repo,
tx_manager=tx_manager,
@@ -128,7 +222,15 @@ class UseCaseProvider(Provider):
post_repo: PostRepository,
tx_manager: TransactionManager,
) -> PublishPostUseCase:
"""Provide PublishPostUseCase."""
"""Provide PublishPostUseCase.
Args:
post_repo: Post repository dependency.
tx_manager: Transaction manager dependency.
Returns:
Configured PublishPostUseCase instance.
"""
return PublishPostUseCase(
post_repo=post_repo,
tx_manager=tx_manager,
@@ -136,9 +238,20 @@ class UseCaseProvider(Provider):
class KeycloakProvider(Provider):
"""Provider for Keycloak authentication client."""
"""Provider for Keycloak authentication client.
Provides Keycloak client as application-scoped singleton.
Client is stateless and can be shared across requests.
Example:
>>> provider = KeycloakProvider()
"""
@provide(scope=Scope.APP)
def get_keycloak_client(self) -> KeycloakAuthClient:
"""Provide KeycloakAuthClient singleton."""
"""Provide KeycloakAuthClient singleton.
Returns:
KeycloakAuthClient instance.
"""
return KeycloakAuthClient(settings)

View File

@@ -1,4 +1,8 @@
"""SQLAlchemy implementation of Transaction Manager."""
"""SQLAlchemy implementation of Transaction Manager.
This module provides the concrete implementation of TransactionManager
using SQLAlchemy async sessions for transaction control.
"""
from sqlalchemy.ext.asyncio import AsyncSession
@@ -6,19 +10,44 @@ from app.application.interfaces import TransactionManager
class SessionTransactionManager(TransactionManager):
"""SQLAlchemy Session-based Transaction Manager."""
"""SQLAlchemy Session-based Transaction Manager.
Implements transaction control using SQLAlchemy async session.
Tracks commit state to prevent duplicate commits.
Attributes:
_session: SQLAlchemy async session for transactions.
_committed: Flag indicating if transaction was committed.
Example:
>>> tx_manager = SessionTransactionManager(session)
>>> await tx_manager.commit()
"""
def __init__(self, session: AsyncSession) -> None:
"""Initialize transaction manager.
Args:
session: SQLAlchemy async session instance.
"""
self._session = session
self._committed: bool = False
async def commit(self) -> None:
"""Commit the current transaction."""
"""Commit the current transaction.
Persists all pending changes to the database.
Only commits once - subsequent calls are no-ops.
"""
if not self._committed:
await self._session.commit()
self._committed = True
async def rollback(self) -> None:
"""Rollback the current transaction."""
"""Rollback the current transaction.
Discards all pending changes.
Only rolls back if not already committed.
"""
if not self._committed:
await self._session.rollback()

View File

@@ -1,4 +1,8 @@
"""Infrastructure middleware."""
"""Infrastructure middleware.
This module re-exports exception handling middleware for
centralized error management in the application.
"""
from app.infrastructure.middleware.error_handler import (
domain_exception_handler,

View File

@@ -1,4 +1,8 @@
"""Exception handling middleware."""
"""Exception handling middleware.
This module provides exception handlers for FastAPI application.
Maps domain exceptions to appropriate HTTP status codes.
"""
from datetime import UTC, datetime
@@ -17,7 +21,14 @@ from app.domain.exceptions import (
def get_status_code(exc: DomainException) -> int:
"""Map domain exceptions to HTTP status codes."""
"""Map domain exceptions to HTTP status codes.
Args:
exc: Domain exception instance.
Returns:
HTTP status code for the exception type.
"""
match exc:
case ValidationException():
return 400
@@ -34,7 +45,17 @@ def get_status_code(exc: DomainException) -> int:
async def domain_exception_handler(request: Request, exc: DomainException) -> JSONResponse:
"""Handle domain exceptions."""
"""Handle domain exceptions.
Converts domain exceptions to JSON error responses.
Args:
request: FastAPI request object.
exc: Domain exception instance.
Returns:
JSONResponse with error details.
"""
status_code = get_status_code(exc)
return JSONResponse(
status_code=status_code,
@@ -48,7 +69,17 @@ async def domain_exception_handler(request: Request, exc: DomainException) -> JS
async def http_exception_handler(request: Request, exc: StarletteHTTPException) -> JSONResponse:
"""Handle HTTP exceptions."""
"""Handle HTTP exceptions.
Converts Starlette HTTP exceptions to JSON error responses.
Args:
request: FastAPI request object.
exc: Starlette HTTP exception instance.
Returns:
JSONResponse with error details.
"""
return JSONResponse(
status_code=exc.status_code,
content={
@@ -61,7 +92,18 @@ async def http_exception_handler(request: Request, exc: StarletteHTTPException)
async def generic_exception_handler(request: Request, exc: Exception) -> JSONResponse:
"""Handle generic exceptions."""
"""Handle generic exceptions.
Converts unhandled exceptions to generic error responses.
Hides internal details for security.
Args:
request: FastAPI request object.
exc: Generic exception instance.
Returns:
JSONResponse with generic error message.
"""
return JSONResponse(
status_code=500,
content={
@@ -74,16 +116,16 @@ async def generic_exception_handler(request: Request, exc: Exception) -> JSONRes
def register_exception_handlers(app: FastAPI) -> None:
"""Register all exception handlers with FastAPI app."""
"""Register all exception handlers with FastAPI app.
Args:
app: FastAPI application instance.
Raises:
TypeError: If app is not a FastAPI instance.
"""
if not isinstance(app, FastAPI):
raise TypeError("app must be a FastAPI instance")
# Domain exceptions
app.add_exception_handler(DomainException, domain_exception_handler) # type: ignore[arg-type]
# HTTP exceptions
app.add_exception_handler(StarletteHTTPException, http_exception_handler) # type: ignore[arg-type]
# Generic exceptions (only in production)
# In development, let FastAPI show detailed traceback
# app.add_exception_handler(Exception, generic_exception_handler)

View File

@@ -1,4 +1,8 @@
"""Repository implementations."""
"""Repository implementations.
This module re-exports concrete repository implementations
for data access using SQLAlchemy ORM.
"""
from app.infrastructure.repositories.post import SQLAlchemyPostRepository

View File

@@ -1,4 +1,8 @@
"""SQLAlchemy implementation of PostRepository."""
"""SQLAlchemy implementation of PostRepository.
This module provides the concrete implementation of PostRepository
using SQLAlchemy ORM for data persistence.
"""
from uuid import UUID
@@ -12,13 +16,36 @@ from app.infrastructure.database.models import PostORM
class SQLAlchemyPostRepository(PostRepository):
"""SQLAlchemy implementation of Post repository."""
"""SQLAlchemy implementation of Post repository.
Provides data access methods for Post entities using SQLAlchemy ORM.
Handles conversion between domain entities and ORM models.
Attributes:
_session: SQLAlchemy async session for database operations.
Example:
>>> repo = SQLAlchemyPostRepository(session)
>>> post = await repo.get_by_id(post_id)
"""
def __init__(self, session: AsyncSession) -> None:
"""Initialize repository with session.
Args:
session: SQLAlchemy async session instance.
"""
self._session = session
def _to_domain(self, orm: PostORM) -> Post:
"""Convert ORM model to domain entity."""
"""Convert ORM model to domain entity.
Args:
orm: SQLAlchemy ORM model instance.
Returns:
Domain Post entity with validated value objects.
"""
return Post(
id=UUID(orm.id),
title=Title(orm.title),
@@ -32,7 +59,14 @@ class SQLAlchemyPostRepository(PostRepository):
)
def _to_orm(self, post: Post) -> PostORM:
"""Convert domain entity to ORM model."""
"""Convert domain entity to ORM model.
Args:
post: Domain Post entity.
Returns:
SQLAlchemy ORM model instance.
"""
return PostORM(
id=str(post.id),
title=post.title.value,
@@ -46,25 +80,43 @@ class SQLAlchemyPostRepository(PostRepository):
)
async def get_by_id(self, entity_id: UUID) -> Post | None:
"""Get post by ID."""
"""Get post by ID.
Args:
entity_id: Unique identifier of the post.
Returns:
Post entity if found, None otherwise.
"""
result = await self._session.execute(select(PostORM).where(PostORM.id == str(entity_id)))
orm = result.scalar_one_or_none()
return self._to_domain(orm) if orm else None
async def get_all(self) -> list[Post]:
"""Get all posts."""
"""Get all posts.
Returns:
List of all Post entities.
"""
result = await self._session.execute(select(PostORM))
orms = result.scalars().all()
return [self._to_domain(orm) for orm in orms]
async def add(self, entity: Post) -> None:
"""Add new post."""
"""Add new post.
Args:
entity: Post entity to add.
"""
orm = self._to_orm(entity)
self._session.add(orm)
# Commit делает TransactionManager
async def update(self, entity: Post) -> None:
"""Update existing post."""
"""Update existing post.
Args:
entity: Post entity with updated data.
"""
result = await self._session.execute(select(PostORM).where(PostORM.id == str(entity.id)))
orm = result.scalar_one()
@@ -75,22 +127,38 @@ class SQLAlchemyPostRepository(PostRepository):
orm.tags = entity.tags
orm.updated_at = entity.updated_at
# Commit делает TransactionManager
async def delete(self, entity_id: UUID) -> None:
"""Delete post by ID."""
"""Delete post by ID.
Args:
entity_id: Unique identifier of the post to delete.
"""
result = await self._session.execute(select(PostORM).where(PostORM.id == str(entity_id)))
orm = result.scalar_one_or_none()
if orm:
await self._session.delete(orm)
async def exists(self, entity_id: UUID) -> bool:
"""Check if post exists."""
"""Check if post exists.
Args:
entity_id: Unique identifier of the post.
Returns:
True if post exists, False otherwise.
"""
result = await self._session.execute(select(PostORM).where(PostORM.id == str(entity_id)))
return result.scalar_one_or_none() is not None
async def get_by_slug(self, slug: str) -> Post | None:
"""Get post by slug."""
"""Get post by slug.
Args:
slug: URL-friendly slug identifier.
Returns:
Post entity if found, None otherwise.
"""
result = await self._session.execute(select(PostORM).where(PostORM.slug == slug))
orm = result.scalar_one_or_none()
return self._to_domain(orm) if orm else None
@@ -101,7 +169,16 @@ class SQLAlchemyPostRepository(PostRepository):
limit: int | None = None,
offset: int | None = None,
) -> list[Post]:
"""Get posts by author."""
"""Get posts by author.
Args:
author_id: Identifier of the author.
limit: Maximum number of posts to return.
offset: Number of posts to skip.
Returns:
List of Post entities by the author.
"""
query = select(PostORM).where(PostORM.author_id == author_id)
if limit is not None:
query = query.limit(limit)
@@ -116,7 +193,15 @@ class SQLAlchemyPostRepository(PostRepository):
limit: int | None = None,
offset: int | None = None,
) -> list[Post]:
"""Get published posts."""
"""Get published posts.
Args:
limit: Maximum number of posts to return.
offset: Number of posts to skip.
Returns:
List of published Post entities.
"""
query = select(PostORM).where(PostORM.published.is_(True))
if limit is not None:
query = query.limit(limit)
@@ -132,7 +217,16 @@ class SQLAlchemyPostRepository(PostRepository):
limit: int | None = None,
offset: int | None = None,
) -> list[Post]:
"""Get posts by tag."""
"""Get posts by tag.
Args:
tag: Tag to filter by.
limit: Maximum number of posts to return.
offset: Number of posts to skip.
Returns:
List of Post entities with the tag.
"""
query = select(PostORM).where(PostORM.tags.contains([tag]))
if limit is not None:
query = query.limit(limit)
@@ -143,7 +237,14 @@ class SQLAlchemyPostRepository(PostRepository):
return [self._to_domain(orm) for orm in orms]
async def slug_exists(self, slug: str) -> bool:
"""Check if slug exists."""
"""Check if slug exists.
Args:
slug: Slug to check for existence.
Returns:
True if slug exists, False otherwise.
"""
result = await self._session.execute(select(PostORM).where(PostORM.slug == slug))
return result.scalar_one_or_none() is not None
@@ -153,7 +254,16 @@ class SQLAlchemyPostRepository(PostRepository):
limit: int | None = None,
offset: int | None = None,
) -> list[Post]:
"""Search posts."""
"""Search posts.
Args:
query: Search query string.
limit: Maximum number of posts to return.
offset: Number of posts to skip.
Returns:
List of Post entities matching the query.
"""
search_pattern = f"%{query}%"
stmt = select(PostORM).where(
or_(

View File

@@ -1,4 +1,8 @@
"""Application entry point with DDD architecture."""
"""Application entry point with DDD architecture.
This module is the main entry point for the FastAPI application.
Configures DI container, middleware, and routes following DDD principles.
"""
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
@@ -22,16 +26,29 @@ from app.presentation import router
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
"""Application lifespan manager."""
# Startup
"""Application lifespan manager.
Handles startup and shutdown tasks for the application.
Args:
app: FastAPI application instance.
Yields:
None during application runtime.
"""
await init_db()
yield
# Shutdown
await close_db()
def app_factory() -> FastAPI:
"""Create and configure FastAPI application."""
"""Create and configure FastAPI application.
Sets up DI container, exception handlers, middleware, and routes.
Returns:
Configured FastAPI application instance.
"""
app = FastAPI(
title=settings.app.name,
debug=settings.app.debug,
@@ -40,7 +57,6 @@ def app_factory() -> FastAPI:
redoc_url="/redoc" if settings.is_dev else None,
)
# Setup Dishka DI container
container = make_async_container(
DatabaseProvider(),
RepositoryProvider(),
@@ -50,10 +66,8 @@ def app_factory() -> FastAPI:
)
setup_dishka(container, app)
# Register exception handlers
register_exception_handlers(app)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
@@ -62,12 +76,15 @@ def app_factory() -> FastAPI:
allow_headers=["*"],
)
# Include API routes
app.include_router(router, prefix="/api")
# Health check endpoint
@app.get("/health", tags=["health"])
async def health_check() -> dict[str, str]:
"""Health check endpoint.
Returns:
Status information dictionary.
"""
return {
"status": "ok",
"app": settings.app.name,
@@ -78,7 +95,10 @@ def app_factory() -> FastAPI:
def main() -> None:
"""Run the application."""
"""Run the application.
Starts uvicorn server with application factory.
"""
uvicorn.run(
app_factory,
factory=True,

View File

@@ -1,4 +1,8 @@
"""Presentation layer exports."""
"""Presentation layer exports.
This module re-exports presentation layer components including
API router and Pydantic schemas.
"""
from app.presentation.api import router
from app.presentation.schemas import (

View File

@@ -1,4 +1,8 @@
"""API router configuration."""
"""API router configuration.
This module sets up the main API router and includes versioned
sub-routers for API organization.
"""
from fastapi import APIRouter

View File

@@ -1,4 +1,8 @@
"""API dependencies using Dishka."""
"""API dependencies using Dishka.
This module defines FastAPI dependencies for authentication, authorization,
and use case injection using Dishka DI container.
"""
from typing import Annotated, Any
@@ -18,7 +22,6 @@ from app.domain.exceptions import ForbiddenException, UnauthorizedException
from app.domain.roles import Role, get_effective_role
from app.infrastructure.auth import KeycloakAuthClient, TokenInfo
# Use case dependencies - injected via Dishka
CreatePostDep = FromDishka[CreatePostUseCase]
GetPostDep = FromDishka[GetPostUseCase]
UpdatePostDep = FromDishka[UpdatePostUseCase]
@@ -26,12 +29,18 @@ DeletePostDep = FromDishka[DeletePostUseCase]
ListPostsDep = FromDishka[ListPostsUseCase]
PublishPostDep = FromDishka[PublishPostUseCase]
# Security scheme
security = HTTPBearer(auto_error=False)
def get_keycloak_client(request: Request) -> KeycloakAuthClient:
"""Get Keycloak client from DI container via request state."""
"""Get Keycloak client from DI container via request state.
Args:
request: FastAPI request object.
Returns:
KeycloakAuthClient instance from container.
"""
client: KeycloakAuthClient = request.state.dishka_container.get(KeycloakAuthClient)
return client
@@ -40,7 +49,18 @@ async def get_current_token_info(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)],
request: Request,
) -> TokenInfo:
"""Validate token and return token info from Keycloak."""
"""Validate token and return token info from Keycloak.
Args:
credentials: HTTP authorization credentials.
request: FastAPI request object.
Returns:
Validated TokenInfo instance.
Raises:
UnauthorizedException: If no credentials or invalid token.
"""
if not credentials:
raise UnauthorizedException("Authentication required")
@@ -57,7 +77,14 @@ async def get_current_token_info(
async def get_current_user_id(
token_info: Annotated[TokenInfo, Depends(get_current_token_info)],
) -> str:
"""Get current user ID from validated token."""
"""Get current user ID from validated token.
Args:
token_info: Validated token info.
Returns:
User ID string from token.
"""
return token_info.user_id
@@ -65,12 +92,21 @@ CurrentUserDep = Annotated[str, Depends(get_current_user_id)]
TokenInfoDep = Annotated[TokenInfo, Depends(get_current_token_info)]
# Optional auth - doesn't require authentication but provides user info if available
async def get_optional_token_info(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(security)],
request: Request,
) -> TokenInfo | None:
"""Get token info if valid token provided, otherwise None (guest)."""
"""Get token info if valid token provided, otherwise None.
For endpoints that support both authenticated and guest access.
Args:
credentials: HTTP authorization credentials.
request: FastAPI request object.
Returns:
TokenInfo if valid, None otherwise.
"""
if not credentials:
return None
@@ -90,7 +126,14 @@ OptionalTokenInfoDep = Annotated[TokenInfo | None, Depends(get_optional_token_in
async def get_optional_user_id(
token_info: OptionalTokenInfoDep,
) -> str | None:
"""Get current user ID if token is valid, otherwise None."""
"""Get current user ID if token is valid, otherwise None.
Args:
token_info: Optional token info.
Returns:
User ID if authenticated, None for guests.
"""
if token_info:
return token_info.user_id
return None
@@ -103,6 +146,12 @@ def get_current_role(token_info: OptionalTokenInfoDep) -> Role:
"""Get effective role from token info.
Returns GUEST if no valid token provided.
Args:
token_info: Optional token info.
Returns:
Effective Role enum value.
"""
if token_info and token_info.roles:
return get_effective_role(token_info.roles)
@@ -113,7 +162,17 @@ CurrentRoleDep = Annotated[Role, Depends(get_current_role)]
def require_roles(allowed_roles: list[Role]) -> Any:
"""Create dependency that checks if user has one of the allowed roles."""
"""Create dependency that checks if user has one of the allowed roles.
Args:
allowed_roles: List of roles allowed to access.
Returns:
FastAPI Depends for role checking.
Raises:
ForbiddenException: If user role is not in allowed list.
"""
async def check_role(role: CurrentRoleDep) -> Role:
if role not in allowed_roles:
@@ -125,7 +184,6 @@ def require_roles(allowed_roles: list[Role]) -> Any:
return Depends(check_role)
# Predefined role requirements
RequireAdmin = require_roles([Role.ADMIN])
RequireUser = require_roles([Role.USER, Role.ADMIN])
RequireAny = require_roles([Role.GUEST, Role.USER, Role.ADMIN])

View File

@@ -1,4 +1,8 @@
"""API v1 router."""
"""API v1 router.
This module sets up the version 1 API router and includes
all v1 endpoint routers.
"""
from fastapi import APIRouter

View File

@@ -1,4 +1,8 @@
"""Posts API routes."""
"""Posts API routes.
This module defines FastAPI routes for blog post operations.
Implements CRUD endpoints with authentication and authorization.
"""
from uuid import UUID
@@ -39,7 +43,16 @@ async def create_post(
use_case: CreatePostDep,
current_user_id: CurrentUserDep,
) -> PostResponseSchema:
"""Create a new blog post."""
"""Create a new blog post.
Args:
schema: Post creation data.
use_case: CreatePostUseCase dependency.
current_user_id: Authenticated user ID.
Returns:
PostResponseSchema with created post data.
"""
dto = CreatePostDTO(
title=schema.title,
content=schema.content,
@@ -65,19 +78,22 @@ async def list_posts(
"""Get blog posts with optional filtering and pagination.
Args:
use_case: ListPostsUseCase dependency.
role: Current user role.
include_unpublished: If True, returns all posts including drafts.
Only admins can use this parameter.
Only admins can use this parameter.
limit: Maximum number of posts to return (default: 10, max: 100).
offset: Number of posts to skip (default: 0).
Returns:
PostListResponseSchema with paginated posts.
Raises:
ForbiddenException: If non-admin tries to include unpublished posts.
"""
# Clamp limit to reasonable range
limit = max(1, min(limit, 100))
offset = max(0, offset)
# Check permissions for unpublished posts
if include_unpublished:
if not has_permission(role, Permission.POST_READ_UNPUBLISHED):
raise ForbiddenException("Only admins can view unpublished posts")
@@ -97,7 +113,14 @@ async def list_posts(
async def list_published_posts(
use_case: ListPostsDep,
) -> PostListResponseSchema:
"""Get all published blog posts."""
"""Get all published blog posts.
Args:
use_case: ListPostsUseCase dependency.
Returns:
PostListResponseSchema with published posts.
"""
results = await use_case.published_posts()
items = [PostResponseSchema(**r.__dict__) for r in results]
return PostListResponseSchema(items=items, total=len(items))
@@ -112,7 +135,15 @@ async def search_posts(
query: str,
use_case: ListPostsDep,
) -> PostListResponseSchema:
"""Search posts by query."""
"""Search posts by query.
Args:
query: Search query string.
use_case: ListPostsUseCase dependency.
Returns:
PostListResponseSchema with matching posts.
"""
results = await use_case.search(query)
items = [PostResponseSchema(**r.__dict__) for r in results]
return PostListResponseSchema(items=items, total=len(items))
@@ -127,7 +158,15 @@ async def get_posts_by_tag(
tag: str,
use_case: ListPostsDep,
) -> PostListResponseSchema:
"""Get posts by tag."""
"""Get posts by tag.
Args:
tag: Tag to filter by.
use_case: ListPostsUseCase dependency.
Returns:
PostListResponseSchema with tagged posts.
"""
results = await use_case.by_tag(tag)
items = [PostResponseSchema(**r.__dict__) for r in results]
return PostListResponseSchema(items=items, total=len(items))
@@ -142,7 +181,15 @@ async def get_posts_by_author(
author_id: str,
use_case: ListPostsDep,
) -> PostListResponseSchema:
"""Get posts by author."""
"""Get posts by author.
Args:
author_id: Author identifier.
use_case: ListPostsUseCase dependency.
Returns:
PostListResponseSchema with author's posts.
"""
results = await use_case.by_author(author_id)
items = [PostResponseSchema(**r.__dict__) for r in results]
return PostListResponseSchema(items=items, total=len(items))
@@ -157,7 +204,15 @@ async def get_post(
post_id: UUID,
use_case: GetPostDep,
) -> PostResponseSchema:
"""Get a post by its ID."""
"""Get a post by its ID.
Args:
post_id: Unique post identifier.
use_case: GetPostUseCase dependency.
Returns:
PostResponseSchema with post data.
"""
result = await use_case.by_id(post_id)
return PostResponseSchema(**result.__dict__)
@@ -171,7 +226,15 @@ async def get_post_by_slug(
slug: str,
use_case: GetPostDep,
) -> PostResponseSchema:
"""Get a post by its slug."""
"""Get a post by its slug.
Args:
slug: URL-friendly slug identifier.
use_case: GetPostUseCase dependency.
Returns:
PostResponseSchema with post data.
"""
result = await use_case.by_slug(slug)
return PostResponseSchema(**result.__dict__)
@@ -187,7 +250,17 @@ async def update_post(
use_case: UpdatePostDep,
current_user_id: CurrentUserDep,
) -> PostResponseSchema:
"""Update a post."""
"""Update a post.
Args:
post_id: Unique post identifier.
schema: Update data.
use_case: UpdatePostUseCase dependency.
current_user_id: Authenticated user ID.
Returns:
PostResponseSchema with updated post data.
"""
dto = UpdatePostDTO(
title=schema.title,
content=schema.content,
@@ -207,7 +280,13 @@ async def delete_post(
use_case: DeletePostDep,
current_user_id: CurrentUserDep,
) -> None:
"""Delete a post."""
"""Delete a post.
Args:
post_id: Unique post identifier.
use_case: DeletePostUseCase dependency.
current_user_id: Authenticated user ID.
"""
await use_case.execute(post_id, current_user_id)
@@ -221,7 +300,16 @@ async def publish_post(
use_case: PublishPostDep,
current_user_id: CurrentUserDep,
) -> PostResponseSchema:
"""Publish a post."""
"""Publish a post.
Args:
post_id: Unique post identifier.
use_case: PublishPostUseCase dependency.
current_user_id: Authenticated user ID.
Returns:
PostResponseSchema with published post data.
"""
result = await use_case.publish(post_id, current_user_id)
return PostResponseSchema(**result.__dict__)
@@ -236,6 +324,15 @@ async def unpublish_post(
use_case: PublishPostDep,
current_user_id: CurrentUserDep,
) -> PostResponseSchema:
"""Unpublish a post."""
"""Unpublish a post.
Args:
post_id: Unique post identifier.
use_case: PublishPostUseCase dependency.
current_user_id: Authenticated user ID.
Returns:
PostResponseSchema with unpublished post data.
"""
result = await use_case.unpublish(post_id, current_user_id)
return PostResponseSchema(**result.__dict__)

View File

@@ -1,4 +1,8 @@
"""Presentation schemas."""
"""Presentation schemas.
This module re-exports all Pydantic schemas used for
request/response validation in the API layer.
"""
from app.presentation.schemas.post import (
PostBaseSchema,

View File

@@ -1,4 +1,8 @@
"""API schemas for posts."""
"""API schemas for posts.
This module defines Pydantic schemas for request/response validation
in the posts API endpoints.
"""
from datetime import datetime
from uuid import UUID
@@ -7,7 +11,14 @@ from pydantic import BaseModel, ConfigDict, Field
class PostBaseSchema(BaseModel):
"""Base schema for posts."""
"""Base schema for posts.
Contains common fields shared across post schemas.
Attributes:
title: Post title (3-200 characters).
content: Post content (10-50000 characters).
"""
model_config = ConfigDict(from_attributes=True)
@@ -16,13 +27,27 @@ class PostBaseSchema(BaseModel):
class PostCreateSchema(PostBaseSchema):
"""Schema for creating a post."""
"""Schema for creating a post.
Extends base schema with creation-specific fields.
Attributes:
tags: List of tags for categorization.
"""
tags: list[str] = Field(default_factory=list)
class PostUpdateSchema(BaseModel):
"""Schema for updating a post."""
"""Schema for updating a post.
All fields are optional for partial updates.
Attributes:
title: Optional new title.
content: Optional new content.
tags: Optional new tags list.
"""
model_config = ConfigDict(from_attributes=True)
@@ -32,7 +57,21 @@ class PostUpdateSchema(BaseModel):
class PostResponseSchema(BaseModel):
"""Schema for post response."""
"""Schema for post response.
Complete post data for API responses.
Attributes:
id: Unique post identifier.
title: Post title.
content: Post content.
slug: URL-friendly slug.
author_id: Author identifier.
published: Publication status.
tags: List of tags.
created_at: Creation timestamp.
updated_at: Last update timestamp.
"""
model_config = ConfigDict(from_attributes=True)
@@ -48,19 +87,38 @@ class PostResponseSchema(BaseModel):
class PostListResponseSchema(BaseModel):
"""Schema for list of posts response."""
"""Schema for list of posts response.
Paginated response for list endpoints.
Attributes:
items: List of post items.
total: Total number of items.
"""
items: list[PostResponseSchema]
total: int
class PostSearchSchema(BaseModel):
"""Schema for searching posts."""
"""Schema for searching posts.
Search query parameters.
Attributes:
query: Search query string (1-100 characters).
"""
query: str = Field(..., min_length=1, max_length=100)
class PostPublishSchema(BaseModel):
"""Schema for publishing/unpublishing a post."""
"""Schema for publishing/unpublishing a post.
Publication status toggle.
Attributes:
published: Desired publication status.
"""
published: bool