Files
blog.pyaqa.ru/tests/unit/infrastructure/test_auth.py
Sergey Vanyushkin 14adcaa3e6
All checks were successful
ci/woodpecker/pr/test Pipeline was successful
ci/woodpecker/pr/type Pipeline was successful
ci/woodpecker/pr/lint Pipeline was successful
style: apply ruff formatting to source and test files
2026-05-02 12:05:14 +03:00

304 lines
11 KiB
Python

"""Tests for Keycloak authentication client."""
from unittest.mock import AsyncMock, Mock, patch
import pytest
from app.infrastructure.auth import KeycloakAuthClient, KeycloakUser, TokenInfo
from app.infrastructure.config.settings import Settings
class TestTokenInfo:
"""Test TokenInfo dataclass."""
def test_token_info_valid(self) -> None:
"""Test valid token info."""
token_info = TokenInfo(
active=True,
user_id="user-123",
username="testuser",
email="test@example.com",
roles=["user"],
)
assert token_info.is_valid is True
assert token_info.user_id == "user-123"
assert token_info.username == "testuser"
assert token_info.email == "test@example.com"
assert token_info.roles == ["user"]
def test_token_info_invalid_not_active(self) -> None:
"""Test invalid token when not active."""
token_info = TokenInfo(
active=False,
user_id="user-123",
username="testuser",
email="test@example.com",
roles=["user"],
)
assert token_info.is_valid is False
def test_token_info_invalid_no_user_id(self) -> None:
"""Test invalid token when no user_id."""
token_info = TokenInfo(
active=True,
user_id="",
username="testuser",
email="test@example.com",
roles=["user"],
)
assert token_info.is_valid is False
def test_token_info_empty_roles(self) -> None:
"""Test token info with empty roles."""
token_info = TokenInfo(
active=True,
user_id="user-123",
username="testuser",
email="test@example.com",
roles=[],
)
assert token_info.is_valid is True
assert token_info.roles == []
class TestKeycloakUser:
"""Test KeycloakUser dataclass."""
def test_keycloak_user_creation(self) -> None:
"""Test KeycloakUser creation."""
user = KeycloakUser(
id="user-123",
username="testuser",
email="test@example.com",
first_name="Test",
last_name="User",
roles=["user", "admin"],
is_active=True,
)
assert user.id == "user-123"
assert user.username == "testuser"
assert user.email == "test@example.com"
assert user.first_name == "Test"
assert user.last_name == "User"
assert user.roles == ["user", "admin"]
assert user.is_active is True
def test_keycloak_user_defaults(self) -> None:
"""Test KeycloakUser with default values."""
user = KeycloakUser(
id="user-123",
username="testuser",
email="test@example.com",
)
assert user.first_name == ""
assert user.last_name == ""
assert user.roles == []
assert user.is_active is True
class TestKeycloakAuthClient:
"""Test KeycloakAuthClient."""
@pytest.fixture
def settings(self) -> Settings:
"""Create test settings."""
from app.infrastructure.config import KCConfig, SecurityConfig
return Settings(
environment="dev",
kc=KCConfig(
server_url="http://localhost:8080",
realm="test-realm",
client_id="test-client",
client_secret="test-secret",
token_cache_ttl=60,
),
security=SecurityConfig(
secret_key="test-secret-key-for-jwt-tokens",
),
)
@pytest.fixture
def client(self, settings: Settings) -> KeycloakAuthClient:
"""Create Keycloak client."""
return KeycloakAuthClient(settings)
def test_client_initialization(self, client: KeycloakAuthClient, settings: Settings) -> None:
"""Test client initialization."""
assert client._settings == settings
assert client._base_url == "http://localhost:8080/realms/test-realm"
assert client._client_id == "test-client"
assert client._client_secret == "test-secret"
assert client._cache_ttl == 60
def test_get_introspection_url(self, client: KeycloakAuthClient) -> None:
"""Test introspection URL generation."""
url = client._get_introspection_url()
assert (
url
== "http://localhost:8080/realms/test-realm/protocol/openid-connect/token/introspection"
)
def test_get_userinfo_url(self, client: KeycloakAuthClient) -> None:
"""Test userinfo URL generation."""
url = client._get_userinfo_url()
assert url == "http://localhost:8080/realms/test-realm/protocol/openid-connect/userinfo"
@pytest.mark.asyncio
async def test_introspect_token_success(self, client: KeycloakAuthClient) -> None:
"""Test successful token introspection."""
mock_response = Mock()
mock_response.json.return_value = {
"active": True,
"sub": "user-123",
"preferred_username": "testuser",
"email": "test@example.com",
"realm_access": {"roles": ["user", "admin"]},
}
mock_response.raise_for_status = Mock()
mock_async_client = AsyncMock()
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
mock_async_client.__aexit__ = AsyncMock(return_value=None)
mock_async_client.post = AsyncMock(return_value=mock_response)
with patch("httpx.AsyncClient", return_value=mock_async_client):
result = await client.introspect_token("test-token")
assert result.active is True
assert result.user_id == "user-123"
assert result.username == "testuser"
assert result.email == "test@example.com"
assert result.roles == ["user", "admin"]
assert result.is_valid is True
@pytest.mark.asyncio
async def test_introspect_token_inactive(self, client: KeycloakAuthClient) -> None:
"""Test introspection with inactive token."""
mock_response = Mock()
mock_response.json.return_value = {"active": False}
mock_response.raise_for_status = Mock()
mock_async_client = AsyncMock()
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
mock_async_client.__aexit__ = AsyncMock(return_value=None)
mock_async_client.post = AsyncMock(return_value=mock_response)
with patch("httpx.AsyncClient", return_value=mock_async_client):
result = await client.introspect_token("test-token")
assert result.active is False
assert result.is_valid is False
@pytest.mark.asyncio
async def test_introspect_token_http_error(self, client: KeycloakAuthClient) -> None:
"""Test introspection with HTTP error."""
import httpx
mock_async_client = AsyncMock()
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
mock_async_client.__aexit__ = AsyncMock(return_value=None)
mock_async_client.post = AsyncMock(side_effect=httpx.HTTPError("Connection error"))
with patch("httpx.AsyncClient", return_value=mock_async_client):
result = await client.introspect_token("test-token")
assert result.active is False
assert result.is_valid is False
@pytest.mark.asyncio
async def test_introspect_token_uses_cache(self, client: KeycloakAuthClient) -> None:
"""Test that token introspection uses cache."""
mock_response = Mock()
mock_response.json.return_value = {
"active": True,
"sub": "user-123",
"preferred_username": "testuser",
"email": "test@example.com",
"realm_access": {"roles": ["user"]},
}
mock_response.raise_for_status = Mock()
mock_async_client = AsyncMock()
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
mock_async_client.__aexit__ = AsyncMock(return_value=None)
mock_async_client.post = AsyncMock(return_value=mock_response)
with patch("httpx.AsyncClient", return_value=mock_async_client):
# First call
result1 = await client.introspect_token("test-token")
# Second call should use cache
result2 = await client.introspect_token("test-token")
# HTTP client should only be called once
assert mock_async_client.post.call_count == 1
assert result1.user_id == result2.user_id
@pytest.mark.asyncio
async def test_get_userinfo_success(self, client: KeycloakAuthClient) -> None:
"""Test successful userinfo retrieval."""
mock_response = Mock()
mock_response.json.return_value = {
"sub": "user-123",
"preferred_username": "testuser",
"email": "test@example.com",
"given_name": "Test",
"family_name": "User",
"realm_access": {"roles": ["user"]},
}
mock_response.raise_for_status = Mock()
mock_async_client = AsyncMock()
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
mock_async_client.__aexit__ = AsyncMock(return_value=None)
mock_async_client.get = AsyncMock(return_value=mock_response)
with patch("httpx.AsyncClient", return_value=mock_async_client):
result = await client.get_userinfo("test-token")
assert result is not None
assert result.id == "user-123"
assert result.username == "testuser"
assert result.email == "test@example.com"
assert result.first_name == "Test"
assert result.last_name == "User"
assert result.roles == ["user"]
@pytest.mark.asyncio
async def test_get_userinfo_error(self, client: KeycloakAuthClient) -> None:
"""Test userinfo retrieval with error."""
import httpx
mock_async_client = AsyncMock()
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
mock_async_client.__aexit__ = AsyncMock(return_value=None)
mock_async_client.get = AsyncMock(side_effect=httpx.HTTPError("Connection error"))
with patch("httpx.AsyncClient", return_value=mock_async_client):
result = await client.get_userinfo("test-token")
assert result is None
@pytest.mark.asyncio
async def test_introspect_token_no_realm_roles(self, client: KeycloakAuthClient) -> None:
"""Test introspection without realm_access roles."""
mock_response = Mock()
mock_response.json.return_value = {
"active": True,
"sub": "user-123",
"preferred_username": "testuser",
"email": "test@example.com",
}
mock_response.raise_for_status = Mock()
mock_async_client = AsyncMock()
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
mock_async_client.__aexit__ = AsyncMock(return_value=None)
mock_async_client.post = AsyncMock(return_value=mock_response)
with patch("httpx.AsyncClient", return_value=mock_async_client):
result = await client.introspect_token("test-token")
assert result.active is True
assert result.roles == []