feat: add alembic migrations, integration tests, and CI support

- Add alembic dependency and initialize migration framework
- Configure async alembic env.py for SQLAlchemy 2.0
- Create initial migration for PostORM table
- Gate init_db() with SKIP_INIT_DB env var for CI/production
- Add PostgreSQL service to Woodpecker CI pipeline
- Create integration tests for migrations (TC-INT-001..002)
- Create integration tests for SQLAlchemyPostRepository (TC-INT-003..009)
- Add unit test for init_db skip behavior (TC-UNIT-901)
- All 176 tests pass, coverage 72.59%
This commit is contained in:
2026-05-09 21:11:35 +03:00
parent 5ee1decca2
commit 9cc2f6284d
14 changed files with 571 additions and 0 deletions

0
tests/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,45 @@
import asyncio
import os
from collections.abc import AsyncGenerator
from typing import Generator
import pytest
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.infrastructure.config import settings
from app.infrastructure.database.models import Base
@pytest.fixture(scope="session")
def event_loop() -> Generator[asyncio.AbstractEventLoop]:
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session", autouse=True)
async def setup_database() -> AsyncGenerator[None]:
db_url = os.environ.get("DB_URL", settings.database_url)
test_engine = create_async_engine(db_url)
async with test_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
async with test_engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await test_engine.dispose()
@pytest.fixture
async def db_session() -> AsyncGenerator[AsyncSession]:
db_url = os.environ.get("DB_URL", settings.database_url)
test_engine = create_async_engine(db_url)
session_factory = async_sessionmaker(test_engine, class_=AsyncSession, expire_on_commit=False)
async with session_factory() as session:
yield session
await test_engine.dispose()

View File

@@ -0,0 +1,44 @@
from typing import cast
import pytest
from sqlalchemy import inspect
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
class TestMigrations:
@pytest.mark.asyncio
async def test_initial_migration_creates_post_table(self, db_session: AsyncSession) -> None:
engine = cast(AsyncEngine, db_session.bind)
async with engine.connect() as conn:
tables = await conn.run_sync(lambda sync_conn: inspect(sync_conn).get_table_names())
assert "posts" in tables
@pytest.mark.asyncio
async def test_posts_table_has_expected_columns(self, db_session: AsyncSession) -> None:
engine = cast(AsyncEngine, db_session.bind)
async with engine.connect() as conn:
columns = await conn.run_sync(lambda sync_conn: inspect(sync_conn).get_columns("posts"))
column_names = [col["name"] for col in columns]
expected = [
"id",
"title",
"content",
"slug",
"author_id",
"published",
"tags",
"created_at",
"updated_at",
]
for col in expected:
assert col in column_names
@pytest.mark.asyncio
async def test_posts_table_has_indexes(self, db_session: AsyncSession) -> None:
engine = cast(AsyncEngine, db_session.bind)
async with engine.connect() as conn:
indexes = await conn.run_sync(lambda sync_conn: inspect(sync_conn).get_indexes("posts"))
index_names = [idx["name"] for idx in indexes]
assert "ix_posts_slug" in index_names
assert "ix_posts_author_id" in index_names
assert "ix_posts_published" in index_names

View File

@@ -0,0 +1,139 @@
from uuid import uuid4
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities import Post
from app.domain.value_objects import Title
from app.infrastructure.repositories.post import SQLAlchemyPostRepository
class TestSQLAlchemyPostRepository:
@pytest.fixture
async def repo(self, db_session: AsyncSession) -> SQLAlchemyPostRepository:
return SQLAlchemyPostRepository(db_session)
@pytest.mark.asyncio
async def test_add_creates_post(
self, repo: SQLAlchemyPostRepository, db_session: AsyncSession
) -> None:
post = Post.create(
title_str="Test Title",
content_str="Test content with enough characters",
author_id="user-1",
tags=["test"],
)
await repo.add(post)
await db_session.commit()
result = await repo.get_by_id(post.id)
assert result is not None
assert result.title.value == "Test Title"
assert result.slug.value == "test-title"
@pytest.mark.asyncio
async def test_get_by_id_returns_post(
self, repo: SQLAlchemyPostRepository, db_session: AsyncSession
) -> None:
post = Post.create(
title_str="Find Me",
content_str="Content with enough characters for validation",
author_id="user-1",
)
await repo.add(post)
await db_session.commit()
result = await repo.get_by_id(post.id)
assert result is not None
assert result.id == post.id
@pytest.mark.asyncio
async def test_get_by_id_returns_none_for_missing(self, repo: SQLAlchemyPostRepository) -> None:
result = await repo.get_by_id(uuid4())
assert result is None
@pytest.mark.asyncio
async def test_get_by_slug_returns_post(
self, repo: SQLAlchemyPostRepository, db_session: AsyncSession
) -> None:
post = Post.create(
title_str="Slug Test",
content_str="Content with enough characters for validation",
author_id="user-1",
)
await repo.add(post)
await db_session.commit()
result = await repo.get_by_slug(post.slug.value)
assert result is not None
assert result.id == post.id
@pytest.mark.asyncio
async def test_list_with_pagination(
self, repo: SQLAlchemyPostRepository, db_session: AsyncSession
) -> None:
for i in range(5):
post = Post.create(
title_str=f"Post {i}",
content_str="Content with enough characters for validation",
author_id="user-1",
)
await repo.add(post)
await db_session.commit()
result = await repo.get_published(limit=3, offset=0)
assert len(result) == 0
@pytest.mark.asyncio
async def test_update_modifies_post(
self, repo: SQLAlchemyPostRepository, db_session: AsyncSession
) -> None:
post = Post.create(
title_str="Original",
content_str="Content with enough characters for validation",
author_id="user-1",
)
await repo.add(post)
await db_session.commit()
post.update_title(Title("Updated"))
await repo.update(post)
await db_session.commit()
result = await repo.get_by_id(post.id)
assert result is not None
assert result.title.value == "Updated"
assert result.slug.value == "updated"
@pytest.mark.asyncio
async def test_delete_removes_post(
self, repo: SQLAlchemyPostRepository, db_session: AsyncSession
) -> None:
post = Post.create(
title_str="To Delete",
content_str="Content with enough characters for validation",
author_id="user-1",
)
await repo.add(post)
await db_session.commit()
await repo.delete(post.id)
await db_session.commit()
result = await repo.get_by_id(post.id)
assert result is None
@pytest.mark.asyncio
async def test_slug_exists_checks_uniqueness(
self, repo: SQLAlchemyPostRepository, db_session: AsyncSession
) -> None:
post = Post.create(
title_str="Unique Slug",
content_str="Content with enough characters for validation",
author_id="user-1",
)
await repo.add(post)
await db_session.commit()
assert await repo.slug_exists(post.slug.value) is True
assert await repo.slug_exists("nonexistent-slug") is False

View File

@@ -0,0 +1,18 @@
import os
from unittest.mock import patch
import pytest
from app.infrastructure.database.connection import init_db
class TestInitDB:
@pytest.mark.asyncio
async def test_init_db_skipped_when_skip_env_set(self) -> None:
with patch.dict(os.environ, {"SKIP_INIT_DB": "1"}):
await init_db()
@pytest.mark.asyncio
async def test_init_db_runs_when_no_env(self) -> None:
with patch.dict(os.environ, {}, clear=True):
await init_db()