Files
blog.pyaqa.ru/app/infrastructure/database/connection.py
Sergey Vanyushkin 3956098d4b
Some checks failed
ci/woodpecker/pr/pipeline Pipeline failed
feat: add alembic migrations and PostgreSQL CI support
- Add alembic dependency and initialize migration framework
- Configure async alembic env.py for SQLAlchemy 2.0
- Create initial migration for PostORM table
- Gate init_db() with SKIP_INIT_DB env var for CI/production
- Add PostgreSQL service to Woodpecker CI pipeline
- Create integration tests for migrations (TC-INT-001..002)
- Create integration tests for SQLAlchemyPostRepository (TC-INT-003..009)
- Add unit test for init_db skip behavior (TC-UNIT-901)
- All 176 tests pass, coverage 72.59%
2026-05-10 07:28:02 +03:00

101 lines
2.4 KiB
Python

"""Database connection and session management.
This module handles database engine creation, session management,
and connection lifecycle for the application.
"""
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from app.infrastructure.config import settings
def _get_database_url() -> str:
"""Get database URL with SQLite async compatibility.
Converts SQLite URL to async format if needed.
Returns:
Database URL string ready for async engine.
"""
url = settings.database_url
if url.startswith("sqlite:///") and not url.startswith("sqlite+aiosqlite:///"):
return url.replace("sqlite:///", "sqlite+aiosqlite:///")
return url
engine: AsyncEngine = create_async_engine(
_get_database_url(),
echo=settings.db.echo,
future=True,
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
autocommit=False,
)
async def get_session() -> AsyncGenerator[AsyncSession]:
"""Get database session.
Yields:
AsyncSession instance for database operations.
"""
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
@asynccontextmanager
async def get_session_context() -> AsyncGenerator[AsyncSession]:
"""Get database session as context manager.
Yields:
AsyncSession instance for database operations.
"""
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
async def init_db() -> None:
"""Initialize database tables.
Creates all tables defined in the metadata.
Should be called on application startup.
Skipped in CI/production where alembic manages schema.
"""
import os
if os.environ.get("SKIP_INIT_DB", "").lower() in ("1", "true", "yes"):
return
from app.infrastructure.database.models import Base
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def close_db() -> None:
"""Close database connections.
Disposes of the engine and all connections.
Should be called on application shutdown.
"""
await engine.dispose()