326 lines
12 KiB
Python
326 lines
12 KiB
Python
"""Tests for Keycloak authentication client."""
|
|
|
|
from unittest.mock import AsyncMock, Mock, patch
|
|
|
|
import pytest
|
|
|
|
from app.infrastructure.auth import KeycloakAuthClient, KeycloakUser, TokenInfo
|
|
from app.infrastructure.config.settings import Settings
|
|
|
|
|
|
class TestTokenInfo:
|
|
"""Test TokenInfo dataclass."""
|
|
|
|
def test_token_info_valid(self) -> None:
|
|
"""Test valid token info."""
|
|
token_info = TokenInfo(
|
|
active=True,
|
|
user_id="user-123",
|
|
username="testuser",
|
|
email="test@example.com",
|
|
roles=["user"],
|
|
)
|
|
assert token_info.is_valid is True
|
|
assert token_info.user_id == "user-123"
|
|
assert token_info.username == "testuser"
|
|
assert token_info.email == "test@example.com"
|
|
assert token_info.roles == ["user"]
|
|
|
|
def test_token_info_invalid_not_active(self) -> None:
|
|
"""Test invalid token when not active."""
|
|
token_info = TokenInfo(
|
|
active=False,
|
|
user_id="user-123",
|
|
username="testuser",
|
|
email="test@example.com",
|
|
roles=["user"],
|
|
)
|
|
assert token_info.is_valid is False
|
|
|
|
def test_token_info_invalid_no_user_id(self) -> None:
|
|
"""Test invalid token when no user_id."""
|
|
token_info = TokenInfo(
|
|
active=True,
|
|
user_id="",
|
|
username="testuser",
|
|
email="test@example.com",
|
|
roles=["user"],
|
|
)
|
|
assert token_info.is_valid is False
|
|
|
|
def test_token_info_empty_roles(self) -> None:
|
|
"""Test token info with empty roles."""
|
|
token_info = TokenInfo(
|
|
active=True,
|
|
user_id="user-123",
|
|
username="testuser",
|
|
email="test@example.com",
|
|
roles=[],
|
|
)
|
|
assert token_info.is_valid is True
|
|
assert token_info.roles == []
|
|
|
|
|
|
class TestKeycloakUser:
|
|
"""Test KeycloakUser dataclass."""
|
|
|
|
def test_keycloak_user_creation(self) -> None:
|
|
"""Test KeycloakUser creation."""
|
|
user = KeycloakUser(
|
|
id="user-123",
|
|
username="testuser",
|
|
email="test@example.com",
|
|
first_name="Test",
|
|
last_name="User",
|
|
roles=["user", "admin"],
|
|
is_active=True,
|
|
)
|
|
assert user.id == "user-123"
|
|
assert user.username == "testuser"
|
|
assert user.email == "test@example.com"
|
|
assert user.first_name == "Test"
|
|
assert user.last_name == "User"
|
|
assert user.roles == ["user", "admin"]
|
|
assert user.is_active is True
|
|
|
|
def test_keycloak_user_defaults(self) -> None:
|
|
"""Test KeycloakUser with default values."""
|
|
user = KeycloakUser(
|
|
id="user-123",
|
|
username="testuser",
|
|
email="test@example.com",
|
|
)
|
|
assert user.first_name == ""
|
|
assert user.last_name == ""
|
|
assert user.roles == []
|
|
assert user.is_active is True
|
|
|
|
|
|
class TestKeycloakAuthClient:
|
|
"""Test KeycloakAuthClient."""
|
|
|
|
@pytest.fixture
|
|
def settings(self) -> Settings:
|
|
"""Create test settings."""
|
|
from app.infrastructure.config import KCConfig, SecurityConfig
|
|
|
|
return Settings(
|
|
environment="dev",
|
|
kc=KCConfig(
|
|
server_url="http://localhost:8080",
|
|
realm="test-realm",
|
|
client_id="test-client",
|
|
client_secret="test-secret",
|
|
token_cache_ttl=60,
|
|
),
|
|
security=SecurityConfig(
|
|
secret_key="test-secret-key-for-jwt-tokens",
|
|
),
|
|
)
|
|
|
|
@pytest.fixture
|
|
def client(self, settings: Settings) -> KeycloakAuthClient:
|
|
"""Create Keycloak client."""
|
|
return KeycloakAuthClient(settings)
|
|
|
|
def test_client_initialization(self, client: KeycloakAuthClient, settings: Settings) -> None:
|
|
"""Test client initialization."""
|
|
assert client._settings == settings
|
|
assert client._base_url == "http://localhost:8080/realms/test-realm"
|
|
assert client._client_id == "test-client"
|
|
assert client._client_secret == "test-secret"
|
|
assert client._cache_ttl == 60
|
|
|
|
def test_get_introspection_url(self, client: KeycloakAuthClient) -> None:
|
|
"""Test introspection URL generation."""
|
|
url = client._get_introspection_url()
|
|
assert (
|
|
url
|
|
== "http://localhost:8080/realms/test-realm/protocol/openid-connect/token/introspection"
|
|
)
|
|
|
|
def test_get_userinfo_url(self, client: KeycloakAuthClient) -> None:
|
|
"""Test userinfo URL generation."""
|
|
url = client._get_userinfo_url()
|
|
assert url == "http://localhost:8080/realms/test-realm/protocol/openid-connect/userinfo"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_introspect_token_success(self, client: KeycloakAuthClient) -> None:
|
|
"""Test successful token introspection."""
|
|
mock_response = Mock()
|
|
mock_response.json.return_value = {
|
|
"active": True,
|
|
"sub": "user-123",
|
|
"preferred_username": "testuser",
|
|
"email": "test@example.com",
|
|
"realm_access": {"roles": ["user", "admin"]},
|
|
}
|
|
mock_response.raise_for_status = Mock()
|
|
|
|
mock_async_client = AsyncMock()
|
|
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
|
|
mock_async_client.__aexit__ = AsyncMock(return_value=None)
|
|
mock_async_client.post = AsyncMock(return_value=mock_response)
|
|
|
|
with patch("httpx.AsyncClient", return_value=mock_async_client):
|
|
result = await client.introspect_token("test-token")
|
|
|
|
assert result.active is True
|
|
assert result.user_id == "user-123"
|
|
assert result.username == "testuser"
|
|
assert result.email == "test@example.com"
|
|
assert result.roles == ["user", "admin"]
|
|
assert result.is_valid is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_introspect_token_inactive(self, client: KeycloakAuthClient) -> None:
|
|
"""Test introspection with inactive token."""
|
|
mock_response = Mock()
|
|
mock_response.json.return_value = {"active": False}
|
|
mock_response.raise_for_status = Mock()
|
|
|
|
mock_async_client = AsyncMock()
|
|
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
|
|
mock_async_client.__aexit__ = AsyncMock(return_value=None)
|
|
mock_async_client.post = AsyncMock(return_value=mock_response)
|
|
|
|
with patch("httpx.AsyncClient", return_value=mock_async_client):
|
|
result = await client.introspect_token("test-token")
|
|
|
|
assert result.active is False
|
|
assert result.is_valid is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_introspect_token_http_error(self, client: KeycloakAuthClient) -> None:
|
|
"""Test introspection with HTTP error."""
|
|
import httpx
|
|
|
|
mock_async_client = AsyncMock()
|
|
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
|
|
mock_async_client.__aexit__ = AsyncMock(return_value=None)
|
|
mock_async_client.post = AsyncMock(side_effect=httpx.HTTPError("Connection error"))
|
|
|
|
with patch("httpx.AsyncClient", return_value=mock_async_client):
|
|
result = await client.introspect_token("test-token")
|
|
|
|
assert result.active is False
|
|
assert result.is_valid is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_introspect_token_uses_cache(self, client: KeycloakAuthClient) -> None:
|
|
"""Test that token introspection uses cache."""
|
|
mock_response = Mock()
|
|
mock_response.json.return_value = {
|
|
"active": True,
|
|
"sub": "user-123",
|
|
"preferred_username": "testuser",
|
|
"email": "test@example.com",
|
|
"realm_access": {"roles": ["user"]},
|
|
}
|
|
mock_response.raise_for_status = Mock()
|
|
|
|
mock_async_client = AsyncMock()
|
|
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
|
|
mock_async_client.__aexit__ = AsyncMock(return_value=None)
|
|
mock_async_client.post = AsyncMock(return_value=mock_response)
|
|
|
|
with patch("httpx.AsyncClient", return_value=mock_async_client):
|
|
# First call
|
|
result1 = await client.introspect_token("test-token")
|
|
# Second call should use cache
|
|
result2 = await client.introspect_token("test-token")
|
|
|
|
# HTTP client should only be called once
|
|
assert mock_async_client.post.call_count == 1
|
|
assert result1.user_id == result2.user_id
|
|
|
|
def test_get_cached_token_expired(self, client: KeycloakAuthClient) -> None:
|
|
"""Test expired cache entry returns None and is removed."""
|
|
from app.infrastructure.auth.models import TokenInfo
|
|
|
|
client._cache["expired-token"] = (TokenInfo(active=True), 0)
|
|
with patch("time.time", return_value=1000):
|
|
result = client._get_cached_token("expired-token")
|
|
assert result is None
|
|
assert "expired-token" not in client._cache
|
|
|
|
def test_cache_token_removes_expired_entries(self, client: KeycloakAuthClient) -> None:
|
|
"""Test caching new token removes expired existing entries."""
|
|
from app.infrastructure.auth.models import TokenInfo
|
|
|
|
old_token = TokenInfo(active=True)
|
|
new_token = TokenInfo(active=True)
|
|
client._cache["old"] = (old_token, 0)
|
|
with patch("time.time", return_value=1000):
|
|
client._cache_token("new", new_token)
|
|
assert "old" not in client._cache
|
|
assert "new" in client._cache
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_userinfo_success(self, client: KeycloakAuthClient) -> None:
|
|
"""Test successful userinfo retrieval."""
|
|
mock_response = Mock()
|
|
mock_response.json.return_value = {
|
|
"sub": "user-123",
|
|
"preferred_username": "testuser",
|
|
"email": "test@example.com",
|
|
"given_name": "Test",
|
|
"family_name": "User",
|
|
"realm_access": {"roles": ["user"]},
|
|
}
|
|
mock_response.raise_for_status = Mock()
|
|
|
|
mock_async_client = AsyncMock()
|
|
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
|
|
mock_async_client.__aexit__ = AsyncMock(return_value=None)
|
|
mock_async_client.get = AsyncMock(return_value=mock_response)
|
|
|
|
with patch("httpx.AsyncClient", return_value=mock_async_client):
|
|
result = await client.get_userinfo("test-token")
|
|
|
|
assert result is not None
|
|
assert result.id == "user-123"
|
|
assert result.username == "testuser"
|
|
assert result.email == "test@example.com"
|
|
assert result.first_name == "Test"
|
|
assert result.last_name == "User"
|
|
assert result.roles == ["user"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_userinfo_error(self, client: KeycloakAuthClient) -> None:
|
|
"""Test userinfo retrieval with error."""
|
|
import httpx
|
|
|
|
mock_async_client = AsyncMock()
|
|
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
|
|
mock_async_client.__aexit__ = AsyncMock(return_value=None)
|
|
mock_async_client.get = AsyncMock(side_effect=httpx.HTTPError("Connection error"))
|
|
|
|
with patch("httpx.AsyncClient", return_value=mock_async_client):
|
|
result = await client.get_userinfo("test-token")
|
|
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_introspect_token_no_realm_roles(self, client: KeycloakAuthClient) -> None:
|
|
"""Test introspection without realm_access roles."""
|
|
mock_response = Mock()
|
|
mock_response.json.return_value = {
|
|
"active": True,
|
|
"sub": "user-123",
|
|
"preferred_username": "testuser",
|
|
"email": "test@example.com",
|
|
}
|
|
mock_response.raise_for_status = Mock()
|
|
|
|
mock_async_client = AsyncMock()
|
|
mock_async_client.__aenter__ = AsyncMock(return_value=mock_async_client)
|
|
mock_async_client.__aexit__ = AsyncMock(return_value=None)
|
|
mock_async_client.post = AsyncMock(return_value=mock_response)
|
|
|
|
with patch("httpx.AsyncClient", return_value=mock_async_client):
|
|
result = await client.introspect_token("test-token")
|
|
|
|
assert result.active is True
|
|
assert result.roles == []
|