"""Tests for Keycloak authentication client.""" from unittest.mock import AsyncMock, Mock, patch import pytest from app.infrastructure.auth import KeycloakAuthClient, KeycloakUser, TokenInfo from app.infrastructure.config.settings import Settings class TestTokenInfo: """Test TokenInfo dataclass.""" def test_token_info_valid(self) -> None: """Test valid token info.""" token_info = TokenInfo( active=True, user_id="user-123", username="testuser", email="test@example.com", roles=["user"], ) assert token_info.is_valid is True assert token_info.user_id == "user-123" assert token_info.username == "testuser" assert token_info.email == "test@example.com" assert token_info.roles == ["user"] def test_token_info_invalid_not_active(self) -> None: """Test invalid token when not active.""" token_info = TokenInfo( active=False, user_id="user-123", username="testuser", email="test@example.com", roles=["user"], ) assert token_info.is_valid is False def test_token_info_invalid_no_user_id(self) -> None: """Test invalid token when no user_id.""" token_info = TokenInfo( active=True, user_id="", username="testuser", email="test@example.com", roles=["user"], ) assert token_info.is_valid is False def test_token_info_empty_roles(self) -> None: """Test token info with empty roles.""" token_info = TokenInfo( active=True, user_id="user-123", username="testuser", email="test@example.com", roles=[], ) assert token_info.is_valid is True assert token_info.roles == [] class TestKeycloakUser: """Test KeycloakUser dataclass.""" def test_keycloak_user_creation(self) -> None: """Test KeycloakUser creation.""" user = KeycloakUser( id="user-123", username="testuser", email="test@example.com", first_name="Test", last_name="User", roles=["user", "admin"], is_active=True, ) assert user.id == "user-123" assert user.username == "testuser" assert user.email == "test@example.com" assert user.first_name == "Test" assert user.last_name == "User" assert user.roles == ["user", "admin"] assert user.is_active is True def test_keycloak_user_defaults(self) -> None: """Test KeycloakUser with default values.""" user = KeycloakUser( id="user-123", username="testuser", email="test@example.com", ) assert user.first_name == "" assert user.last_name == "" assert user.roles == [] assert user.is_active is True class TestKeycloakAuthClient: """Test KeycloakAuthClient.""" @pytest.fixture def settings(self) -> Settings: """Create test settings.""" from app.infrastructure.config import KCConfig, SecurityConfig return Settings( environment="dev", kc=KCConfig( server_url="http://localhost:8080", realm="test-realm", client_id="test-client", client_secret="test-secret", token_cache_ttl=60, ), security=SecurityConfig( secret_key="test-secret-key-for-jwt-tokens", ), ) @pytest.fixture def client(self, settings: Settings) -> KeycloakAuthClient: """Create Keycloak client.""" return KeycloakAuthClient(settings) def test_client_initialization(self, client: KeycloakAuthClient, settings: Settings) -> None: """Test client initialization.""" assert client._settings == settings assert client._base_url == "http://localhost:8080/realms/test-realm" assert client._client_id == "test-client" assert client._client_secret == "test-secret" assert client._cache_ttl == 60 def test_get_introspection_url(self, client: KeycloakAuthClient) -> None: """Test introspection URL generation.""" url = client._get_introspection_url() assert ( url == "http://localhost:8080/realms/test-realm/protocol/openid-connect/token/introspection" ) def test_get_userinfo_url(self, client: KeycloakAuthClient) -> None: """Test userinfo URL generation.""" url = client._get_userinfo_url() assert url == "http://localhost:8080/realms/test-realm/protocol/openid-connect/userinfo" @pytest.mark.asyncio async def test_introspect_token_success(self, client: KeycloakAuthClient) -> None: """Test successful token introspection.""" mock_response = Mock() mock_response.json.return_value = { "active": True, "sub": "user-123", "preferred_username": "testuser", "email": "test@example.com", "realm_access": {"roles": ["user", "admin"]}, } mock_response.raise_for_status = Mock() mock_async_client = AsyncMock() mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client) mock_async_client.__aexit__ = AsyncMock(return_value=None) mock_async_client.post = AsyncMock(return_value=mock_response) with patch("httpx.AsyncClient", return_value=mock_async_client): result = await client.introspect_token("test-token") assert result.active is True assert result.user_id == "user-123" assert result.username == "testuser" assert result.email == "test@example.com" assert result.roles == ["user", "admin"] assert result.is_valid is True @pytest.mark.asyncio async def test_introspect_token_inactive(self, client: KeycloakAuthClient) -> None: """Test introspection with inactive token.""" mock_response = Mock() mock_response.json.return_value = {"active": False} mock_response.raise_for_status = Mock() mock_async_client = AsyncMock() mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client) mock_async_client.__aexit__ = AsyncMock(return_value=None) mock_async_client.post = AsyncMock(return_value=mock_response) with patch("httpx.AsyncClient", return_value=mock_async_client): result = await client.introspect_token("test-token") assert result.active is False assert result.is_valid is False @pytest.mark.asyncio async def test_introspect_token_http_error(self, client: KeycloakAuthClient) -> None: """Test introspection with HTTP error.""" import httpx mock_async_client = AsyncMock() mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client) mock_async_client.__aexit__ = AsyncMock(return_value=None) mock_async_client.post = AsyncMock(side_effect=httpx.HTTPError("Connection error")) with patch("httpx.AsyncClient", return_value=mock_async_client): result = await client.introspect_token("test-token") assert result.active is False assert result.is_valid is False @pytest.mark.asyncio async def test_introspect_token_uses_cache(self, client: KeycloakAuthClient) -> None: """Test that token introspection uses cache.""" mock_response = Mock() mock_response.json.return_value = { "active": True, "sub": "user-123", "preferred_username": "testuser", "email": "test@example.com", "realm_access": {"roles": ["user"]}, } mock_response.raise_for_status = Mock() mock_async_client = AsyncMock() mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client) mock_async_client.__aexit__ = AsyncMock(return_value=None) mock_async_client.post = AsyncMock(return_value=mock_response) with patch("httpx.AsyncClient", return_value=mock_async_client): # First call result1 = await client.introspect_token("test-token") # Second call should use cache result2 = await client.introspect_token("test-token") # HTTP client should only be called once assert mock_async_client.post.call_count == 1 assert result1.user_id == result2.user_id @pytest.mark.asyncio async def test_get_userinfo_success(self, client: KeycloakAuthClient) -> None: """Test successful userinfo retrieval.""" mock_response = Mock() mock_response.json.return_value = { "sub": "user-123", "preferred_username": "testuser", "email": "test@example.com", "given_name": "Test", "family_name": "User", "realm_access": {"roles": ["user"]}, } mock_response.raise_for_status = Mock() mock_async_client = AsyncMock() mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client) mock_async_client.__aexit__ = AsyncMock(return_value=None) mock_async_client.get = AsyncMock(return_value=mock_response) with patch("httpx.AsyncClient", return_value=mock_async_client): result = await client.get_userinfo("test-token") assert result is not None assert result.id == "user-123" assert result.username == "testuser" assert result.email == "test@example.com" assert result.first_name == "Test" assert result.last_name == "User" assert result.roles == ["user"] @pytest.mark.asyncio async def test_get_userinfo_error(self, client: KeycloakAuthClient) -> None: """Test userinfo retrieval with error.""" import httpx mock_async_client = AsyncMock() mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client) mock_async_client.__aexit__ = AsyncMock(return_value=None) mock_async_client.get = AsyncMock(side_effect=httpx.HTTPError("Connection error")) with patch("httpx.AsyncClient", return_value=mock_async_client): result = await client.get_userinfo("test-token") assert result is None @pytest.mark.asyncio async def test_introspect_token_no_realm_roles(self, client: KeycloakAuthClient) -> None: """Test introspection without realm_access roles.""" mock_response = Mock() mock_response.json.return_value = { "active": True, "sub": "user-123", "preferred_username": "testuser", "email": "test@example.com", } mock_response.raise_for_status = Mock() mock_async_client = AsyncMock() mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client) mock_async_client.__aexit__ = AsyncMock(return_value=None) mock_async_client.post = AsyncMock(return_value=mock_response) with patch("httpx.AsyncClient", return_value=mock_async_client): result = await client.introspect_token("test-token") assert result.active is True assert result.roles == []