feat: расширение API клиента и добавление auth/web модулей

Основные изменения:
- API Client:
  * Переработан api_client.py — улучшена архитектура
  * Добавлены exceptions.py — кастомные исключения для API
  * Добавлены response.py — обертка для API ответов
  * Добавлены transport.py — HTTP транспорт с retry logic

- Auth модуль (новый):
  * Добавлен src/pytfm/auth/__init__.py
  * AuthProvider интерфейс для аутентификации в тестах
  * Поддержка cookie-based auth для E2E тестов

- Web модуль:
  * Добавлен web/locator.py — SmartLocator для Page Object Model
  * Обновлен web/pom.py — улучшена работа с locators
  * Обновлен web/__init__.py — экспорты для POM

- Generators:
  * Расширен generators/__init__.py — PostDataGenerator для блога
  * Добавлены методы для генерации тестовых постов

- Pytest plugin:
  * Добавлен pytest_plugin.py — фикстуры для playwright
  * Интеграция с AuthProvider для автоматической аутентификации

- Конфигурация:
  * Обновлен pyproject.toml — зависимости и entry points
This commit is contained in:
2026-05-07 19:58:12 +03:00
parent 6b9fb649c6
commit e62b43b46c
13 changed files with 2174 additions and 263 deletions

View File

@@ -11,10 +11,15 @@ dependencies = [
"playwright>=1.42.0",
"httpx>=0.28.0",
"pytest>=9.0.0",
"pytest-playwright>=0.7.0",
"pytest-asyncio>=0.23.0",
"pydantic>=2.0.0",
"mimesis>=19.1.0",
]
[project.entry-points."pytest11"]
pytfm = "pytfm.pytest_plugin"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

View File

@@ -3,19 +3,35 @@
This package provides tools for building automated tests including:
- Page Object Model (POM) for web UI testing with Playwright
- HTTP API client for REST API testing
- SmartLocator for enhanced element interactions
- Test fixtures and utilities
Example:
>>> from pytfm.web import BasePage
>>> from pytfm.web import BasePage, SmartLocator
>>> from pytfm.api import APIClient
>>>
>>> # Use in your tests
>>> async with APIClient("http://api.example.com") as client:
... response = await client.get("/posts")
>>>
>>> # SmartLocator usage
>>> submit_btn = SmartLocator.by_testid("btn-submit")
>>> await page.click(submit_btn)
"""
from pytfm.api import APIClient, APIResponse
from pytfm.web import BasePage
from pytfm.api import ApiClient, ApiResponse
from pytfm.generators import PostDataGenerator, TestDataGenerator
from pytfm.web import AsyncBasePage, AsyncSmartLocator, BasePage, LocatorConfig, SmartLocator
__version__ = "0.1.0"
__all__ = ["BasePage", "APIClient", "APIResponse"]
__all__ = [
"ApiClient",
"ApiResponse",
"AsyncBasePage",
"AsyncSmartLocator",
"BasePage",
"LocatorConfig",
"PostDataGenerator",
"SmartLocator",
"TestDataGenerator",
]

View File

@@ -1,9 +1,43 @@
"""API testing module with HTTP client.
"""API testing module with synchronous HTTP client.
This module provides a convenient HTTP client for testing REST APIs
with automatic response parsing and error handling.
This module provides a synchronous HTTP client for testing REST APIs
with authentication, Pydantic validation, and pluggable transports.
Example:
>>> from pytfm.api import ApiClient, ApiResponse
>>> from pytfm.auth import InMemoryAuthProvider
Anonymous request:
>>> with ApiClient(base_url="http://api.example.com") as client:
... response = client.get("/health")
... response.assert_response(status_code=200)
Authenticated request:
>>> provider = InMemoryAuthProvider()
>>> user = provider.create_user("test", "pass", "test@test.com")
>>> with ApiClient(
... base_url="http://api.example.com",
... user=user,
... auth_provider=provider,
... ) as client:
... response = client.get("/profile")
... response.assert_response(status_code=200)
"""
from pytfm.api.api_client import APIClient, APIResponse
from pytfm.api.api_client import ApiClient
from pytfm.api.exceptions import ApiStatusError, ApiTestError, ApiValidationError
from pytfm.api.response import ApiResponse, RawResponse
from pytfm.api.transport import HttpxSyncTransport, SyncTransport
__all__ = ["APIClient", "APIResponse"]
__all__ = [
"ApiClient",
"ApiResponse",
"ApiStatusError",
"ApiTestError",
"ApiValidationError",
"HttpxSyncTransport",
"RawResponse",
"SyncTransport",
]

View File

@@ -1,261 +1,416 @@
"""API client for HTTP testing.
"""Synchronous API client for HTTP testing.
This module provides a convenient HTTP client wrapper around httpx
for testing REST APIs with proper error handling and response parsing.
This module provides :class:`ApiClient` — a synchronous HTTP client
with built-in authentication, request serialization, and Pydantic
response validation.
Example:
Basic usage without authentication:
>>> with ApiClient(base_url="http://api.example.com") as client:
... response = client.get("/health")
... response.assert_response(status_code=200)
With authentication:
>>> user = auth_provider.create_user("test", "pass", "test@test.com")
>>> with ApiClient(
... base_url="http://api.example.com",
... user=user,
... auth_provider=auth_provider,
... ) as client:
... response = client.get("/profile")
... profile = response.assert_response(UserProfile)
With Pydantic models:
>>> class CreateUserRequest(BaseModel):
... name: str
... email: str
>>> class UserResponse(BaseModel):
... id: int
... name: str
>>> with ApiClient(base_url="http://api.example.com") as client:
... req = CreateUserRequest(name="Alice", email="alice@test.com")
... response = client.post("/users", json=req)
... user = response.assert_response(UserResponse, status_code=201)
... print(user.id)
"""
from __future__ import annotations
from typing import Any
import httpx
from pydantic import BaseModel
class APIResponse:
"""Wrapper for API responses with convenient accessors.
Provides easy access to response data, status code, and headers
with automatic JSON parsing.
Attributes:
status_code: HTTP status code of the response.
headers: Response headers dictionary.
text: Raw response body as string.
json_data: Parsed JSON response data.
Example:
>>> response = await client.get("/api/posts")
>>> assert response.status_code == 200
>>> posts = response.json()
"""
def __init__(self, response: httpx.Response) -> None:
"""Initialize the response wrapper.
Args:
response: Raw httpx Response object.
"""
self._response = response
self.status_code = response.status_code
self.headers = dict(response.headers)
self.text = response.text
self.json_data: dict[str, Any] | list[Any] | None = None
if "application/json" in response.headers.get("content-type", ""):
try:
self.json_data = response.json()
except ValueError:
self.json_data = None
def json(self) -> dict[str, Any] | list[Any] | None:
"""Get parsed JSON response data.
Returns:
Parsed JSON data or None if not valid JSON.
"""
return self.json_data
def raise_for_status(self) -> None:
"""Raise an exception for 4xx/5xx status codes.
Raises:
httpx.HTTPStatusError: If status code indicates an error.
"""
self._response.raise_for_status()
@property
def is_success(self) -> bool:
"""Check if response indicates success (2xx status).
Returns:
bool: True if status code is 2xx.
"""
return 200 <= self.status_code < 300
from pytfm.api.response import ApiResponse
from pytfm.api.transport import HttpxSyncTransport, SyncTransport
from pytfm.auth import AuthProvider, TestUser
class APIClient:
"""HTTP client for API testing.
class ApiClient:
"""Synchronous HTTP client for API testing with auth and validation.
Wrapper around httpx.AsyncClient with convenient methods for
common HTTP operations and automatic response wrapping.
Wraps a :class:`~pytfm.api.transport.SyncTransport` to provide
convenient methods for sending HTTP requests with automatic
authentication, request body serialization, and response wrapping.
The client **must** be used as a context manager to manage the
transport lifecycle.
Attributes:
base_url: Base URL for all API requests.
client: Underlying httpx AsyncClient instance.
Example:
>>> async with APIClient("http://api.example.com") as client:
... response = await client.get("/posts")
... assert response.status_code == 200
Anonymous requests:
>>> with ApiClient(base_url="http://api.example.com") as client:
... response = client.get("/public")
... response.assert_response(status_code=200)
Authenticated requests:
>>> user = TestUser(id="1", username="test", email="t@test.com",
... password="pass")
>>> with ApiClient(
... base_url="http://api.example.com",
... user=user,
... auth_provider=auth_provider,
... ) as client:
... response = client.get("/private")
... response.assert_response(status_code=200)
Custom default headers:
>>> with ApiClient(
... base_url="http://api.example.com",
... headers={"X-Custom": "value"},
... ) as client:
... response = client.get("/items")
"""
def __init__(
self,
base_url: str,
base_url: str = "",
*,
user: TestUser | None = None,
auth_provider: AuthProvider | None = None,
transport: SyncTransport | None = None,
headers: dict[str, str] | None = None,
timeout: float = 30.0,
) -> None:
"""Initialize the API client.
Args:
base_url: Base URL for all API requests.
headers: Default headers to include in all requests.
timeout: Request timeout in seconds.
base_url: Base URL for all API requests. Defaults to empty
string, allowing absolute URLs in request methods.
user: Optional test user for automatic authentication.
auth_provider: Optional auth provider for user login.
transport: Optional custom sync transport. Defaults to
:class:`~pytfm.api.transport.HttpxSyncTransport`.
headers: Default headers included in every request.
timeout: Default request timeout in seconds.
Note:
If both ``user`` and ``auth_provider`` are provided, the
client automatically authenticates the user when entering
the context manager via :meth:`login`.
Example:
>>> client = ApiClient(base_url="http://api.example.com")
>>> with client:
... pass
"""
self.base_url = base_url.rstrip("/")
self._headers = headers or {}
self._user = user
self._auth_provider = auth_provider
self._transport = transport if transport is not None else HttpxSyncTransport()
self._default_headers = headers or {}
self._timeout = timeout
self._client: httpx.AsyncClient | None = None
self._token: str | None = None
self._entered = False
async def __aenter__(self) -> APIClient:
"""Async context manager entry.
def __enter__(self) -> ApiClient:
"""Enter the runtime context and initialize the transport.
If a user and auth provider were provided at initialization,
authentication is performed at this stage.
Returns:
APIClient: Self for use in context.
"""
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers=self._headers,
timeout=self._timeout,
)
return self
async def __aexit__(self, *args: Any) -> None:
"""Async context manager exit."""
if self._client:
await self._client.aclose()
self._client = None
def _get_client(self) -> httpx.AsyncClient:
"""Get the underlying client or raise if not initialized.
Returns:
httpx.AsyncClient: The initialized client.
Self for use in context manager.
Raises:
RuntimeError: If client is not initialized (not used as context manager).
RuntimeError: If the transport fails to initialize.
"""
if self._client is None:
raise RuntimeError("Client not initialized. Use 'async with' context manager.")
return self._client
self._transport.__enter__()
self._entered = True
if self._user is not None and self._auth_provider is not None:
self._authenticate()
return self
async def get(
def __exit__(self, *args: Any) -> None:
"""Exit the runtime context and close the transport."""
self._entered = False
self._transport.__exit__(*args)
def _authenticate(self) -> None:
"""Authenticate the user and update default headers.
Calls :meth:`AuthProvider.login` with the user's credentials
and merges the resulting auth headers into the default headers.
"""
if self._auth_provider is None or self._user is None:
return
self._token = self._auth_provider.login(self._user.username, self._user.password)
auth_headers = self._auth_provider.get_api_auth_headers(self._token)
self._default_headers.update(auth_headers)
def _ensure_initialized(self) -> None:
"""Raise an error if the client is not used as context manager."""
if not self._entered:
raise RuntimeError("ApiClient not initialized. Use 'with ApiClient(...) as client:'.")
@staticmethod
def _serialize_body(
data: dict[str, Any] | BaseModel | list[Any] | None,
) -> Any:
"""Serialize a request body to a JSON-compatible format.
Pydantic models are dumped via ``model_dump(mode='json')``.
Other types are returned as-is.
Args:
data: Request body data.
Returns:
JSON-serializable data.
Example:
>>> class User(BaseModel):
... name: str
>>> ApiClient._serialize_body(User(name="Alice"))
{'name': 'Alice'}
"""
if isinstance(data, BaseModel):
return data.model_dump(mode="json", by_alias=True)
return data
def _merge_headers(self, request_headers: dict[str, str] | None) -> dict[str, str]:
"""Merge default and request-specific headers.
Request-specific headers override defaults.
Args:
request_headers: Headers for a specific request.
Returns:
Merged headers dictionary.
"""
merged = self._default_headers.copy()
if request_headers:
merged.update(request_headers)
return merged
def _build_url(self, path: str) -> str:
"""Build a full URL from base_url and path.
If ``path`` is an absolute URL (starts with ``http://`` or
``https://``), it is returned unchanged.
Args:
path: URL path or absolute URL.
Returns:
Full request URL.
Example:
>>> client = ApiClient(base_url="http://api.example.com")
>>> client._build_url("/users")
'http://api.example.com/users'
>>> client._build_url("http://other.com/api")
'http://other.com/api'
"""
if path.startswith(("http://", "https://")):
return path
path = path.lstrip("/")
if not self.base_url:
return f"/{path}" if path else ""
return f"{self.base_url}/{path}"
def request(
self,
method: str,
path: str,
*,
json: dict[str, Any] | BaseModel | list[Any] | None = None,
data: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> APIResponse:
timeout: float | None = None,
) -> ApiResponse:
"""Send an HTTP request.
Args:
method: HTTP method (GET, POST, PUT, PATCH, DELETE).
path: URL path relative to ``base_url``, or absolute URL.
json: JSON-serializable body (dict, Pydantic model, or list).
data: Form data.
params: Query parameters.
headers: Additional headers for this request.
timeout: Request timeout in seconds.
Returns:
Wrapped :class:`~pytfm.api.response.ApiResponse`.
Raises:
RuntimeError: If client is not used as context manager.
Example:
>>> with ApiClient(base_url="http://api.example.com") as client:
... response = client.request(
... "GET", "/users", params={"page": "1"}
... )
"""
self._ensure_initialized()
raw = self._transport.send(
method=method,
url=self._build_url(path),
headers=self._merge_headers(headers),
json=self._serialize_body(json),
data=data,
params=params,
timeout=timeout if timeout is not None else self._timeout,
)
return ApiResponse(raw)
def get(
self,
path: str,
*,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
timeout: float | None = None,
) -> ApiResponse:
"""Send a GET request.
Args:
path: URL path (relative to base_url).
path: URL path.
params: Query parameters.
headers: Additional headers for this request.
headers: Additional headers.
timeout: Request timeout in seconds.
Returns:
APIResponse: Wrapped response object.
"""
response = await self._get_client().get(path, params=params, headers=headers)
return APIResponse(response)
Wrapped ApiResponse.
async def post(
Example:
>>> with ApiClient(base_url="http://api.example.com") as client:
... response = client.get("/users", params={"page": "1"})
... users = response.assert_response(UserList)
"""
return self.request("GET", path, params=params, headers=headers, timeout=timeout)
def post(
self,
path: str,
json: dict[str, Any] | BaseModel | None = None,
*,
json: dict[str, Any] | BaseModel | list[Any] | None = None,
data: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> APIResponse:
timeout: float | None = None,
) -> ApiResponse:
"""Send a POST request.
Args:
path: URL path (relative to base_url).
json: JSON body (dict or Pydantic model).
path: URL path.
json: JSON body (dict, Pydantic model, or list).
data: Form data.
headers: Additional headers for this request.
headers: Additional headers.
timeout: Request timeout in seconds.
Returns:
APIResponse: Wrapped response object.
"""
json_data = json.model_dump(mode="json") if isinstance(json, BaseModel) else json
response = await self._get_client().post(
path, json=json_data, data=data, headers=headers
)
return APIResponse(response)
Wrapped ApiResponse.
async def put(
Example:
>>> payload = CreateUserRequest(name="Alice")
>>> with ApiClient(base_url="http://api.example.com") as client:
... response = client.post("/users", json=payload)
... user = response.assert_response(UserResponse, status_code=201)
"""
return self.request("POST", path, json=json, data=data, headers=headers, timeout=timeout)
def put(
self,
path: str,
json: dict[str, Any] | BaseModel | None = None,
*,
json: dict[str, Any] | BaseModel | list[Any] | None = None,
data: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> APIResponse:
timeout: float | None = None,
) -> ApiResponse:
"""Send a PUT request.
Args:
path: URL path (relative to base_url).
json: JSON body (dict or Pydantic model).
path: URL path.
json: JSON body.
data: Form data.
headers: Additional headers for this request.
headers: Additional headers.
timeout: Request timeout in seconds.
Returns:
APIResponse: Wrapped response object.
Wrapped ApiResponse.
"""
json_data = json.model_dump(mode="json") if isinstance(json, BaseModel) else json
response = await self._get_client().put(
path, json=json_data, data=data, headers=headers
)
return APIResponse(response)
return self.request("PUT", path, json=json, data=data, headers=headers, timeout=timeout)
async def patch(
def patch(
self,
path: str,
json: dict[str, Any] | BaseModel | None = None,
*,
json: dict[str, Any] | BaseModel | list[Any] | None = None,
data: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> APIResponse:
timeout: float | None = None,
) -> ApiResponse:
"""Send a PATCH request.
Args:
path: URL path (relative to base_url).
json: JSON body (dict or Pydantic model).
path: URL path.
json: JSON body.
data: Form data.
headers: Additional headers for this request.
headers: Additional headers.
timeout: Request timeout in seconds.
Returns:
APIResponse: Wrapped response object.
Wrapped ApiResponse.
"""
json_data = json.model_dump(mode="json") if isinstance(json, BaseModel) else json
response = await self._get_client().patch(
path, json=json_data, data=data, headers=headers
return self.request(
"PATCH",
path,
json=json,
data=data,
headers=headers,
timeout=timeout,
)
return APIResponse(response)
async def delete(
def delete(
self,
path: str,
*,
headers: dict[str, str] | None = None,
) -> APIResponse:
timeout: float | None = None,
) -> ApiResponse:
"""Send a DELETE request.
Args:
path: URL path (relative to base_url).
headers: Additional headers for this request.
path: URL path.
headers: Additional headers.
timeout: Request timeout in seconds.
Returns:
APIResponse: Wrapped response object.
Wrapped ApiResponse.
Example:
>>> with ApiClient(base_url="http://api.example.com") as client:
... response = client.delete("/users/123")
... response.assert_response(status_code=204)
"""
response = await self._get_client().delete(path, headers=headers)
return APIResponse(response)
def set_auth_token(self, token: str) -> None:
"""Set authorization token for subsequent requests.
Args:
token: Bearer token for authentication.
"""
self._headers["Authorization"] = f"Bearer {token}"
def clear_auth_token(self) -> None:
"""Remove authorization token from headers."""
self._headers.pop("Authorization", None)
return self.request("DELETE", path, headers=headers, timeout=timeout)

122
src/pytfm/api/exceptions.py Normal file
View File

@@ -0,0 +1,122 @@
"""Custom exceptions for API testing.
This module provides a hierarchy of exceptions used by the API client
for detailed error reporting during test execution.
Example:
>>> try:
... response.assert_response(UserProfile, status_code=200)
... except ApiStatusError as exc:
... print(f"Unexpected status: {exc.actual}")
... except ApiValidationError as exc:
... print(f"Validation failed: {exc.validation_error}")
"""
from __future__ import annotations
from collections.abc import Iterable
from typing import Any
from pydantic import BaseModel, ValidationError
class ApiTestError(Exception):
"""Base exception for all API testing failures.
Attributes:
message: Human-readable error description.
response: The ApiResponse that caused the error, if available.
Example:
>>> raise ApiTestError("Connection failed", response=response)
"""
def __init__(
self,
message: str,
*,
response: Any | None = None,
) -> None:
"""Initialize the exception.
Args:
message: Error description.
response: Associated ApiResponse instance.
"""
super().__init__(message)
self.response = response
class ApiStatusError(ApiTestError):
"""Raised when the HTTP status code does not match the expected value.
Attributes:
expected: Expected status code or collection of acceptable codes.
actual: Actual status code received from the server.
Example:
>>> raise ApiStatusError(
... expected=201,
... actual=400,
... response=response,
... )
"""
def __init__(
self,
expected: int | Iterable[int],
actual: int,
*,
response: Any,
) -> None:
"""Initialize the status error.
Args:
expected: Expected status code or container of acceptable codes.
actual: Actual status code from the response.
response: The ApiResponse that triggered the error.
"""
if isinstance(expected, int):
message = f"Expected status code {expected}, got {actual}"
else:
message = f"Expected status code in {tuple(expected)}, got {actual}"
super().__init__(message, response=response)
self.expected = expected
self.actual = actual
class ApiValidationError(ApiTestError):
"""Raised when response body fails Pydantic model validation.
Attributes:
model: The Pydantic model class used for validation.
validation_error: The original Pydantic ValidationError, if available.
Example:
>>> try:
... response.assert_response(UserProfile)
... except ApiValidationError as exc:
... print(exc.validation_error.errors())
"""
def __init__(
self,
model: type[BaseModel],
validation_error: ValidationError | None,
*,
response: Any,
) -> None:
"""Initialize the validation error.
Args:
model: Pydantic model class that validation failed against.
validation_error: Original Pydantic ValidationError or None if
the response body was empty or not JSON.
response: The ApiResponse that triggered the error.
"""
message = f"Response body validation failed for {model.__name__}"
if validation_error is not None:
message += f": {validation_error}"
super().__init__(message, response=response)
self.model = model
self.validation_error = validation_error

183
src/pytfm/api/response.py Normal file
View File

@@ -0,0 +1,183 @@
"""API response models and validation.
This module provides normalized response representation and validation
utilities for API testing.
Example:
>>> raw = RawResponse(
... status_code=200,
... headers={"content-type": "application/json"},
... text='{"id": 1, "name": "Alice"}',
... json_data={"id": 1, "name": "Alice"},
... )
>>> response = ApiResponse(raw)
>>> user = response.assert_response(UserProfile, status_code=200)
"""
from __future__ import annotations
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Any, TypeVar, overload
from pydantic import BaseModel, ValidationError
from pytfm.api.exceptions import ApiStatusError, ApiValidationError
T = TypeVar("T", bound=BaseModel)
@dataclass
class RawResponse:
"""Normalized raw HTTP response independent of the transport backend.
Attributes:
status_code: HTTP status code.
headers: Response headers dictionary.
text: Response body as text.
json_data: Parsed JSON body, or None if not JSON.
raw_bytes: Raw response body bytes, or None.
Example:
>>> raw = RawResponse(
... status_code=200,
... headers={},
... text="OK",
... json_data=None,
... )
"""
status_code: int
headers: dict[str, str]
text: str
json_data: dict[str, Any] | list[Any] | None = None
raw_bytes: bytes | None = None
class ApiResponse:
"""Wrapper for API responses with convenient accessors and validation.
Provides easy access to response data and methods for asserting
status codes and validating response bodies against Pydantic models.
Attributes:
status_code: HTTP status code of the response.
headers: Response headers dictionary.
text: Raw response body as string.
Example:
>>> response = client.get("/users/1")
>>> user = response.assert_response(UserProfile, status_code=200)
>>> print(user.name)
"""
def __init__(self, raw: RawResponse) -> None:
"""Initialize the response wrapper.
Args:
raw: Normalized RawResponse from the transport layer.
"""
self._raw = raw
self.status_code = raw.status_code
self.headers = raw.headers
self.text = raw.text
def json(self) -> dict[str, Any] | list[Any] | None:
"""Get parsed JSON response data.
Returns:
Parsed JSON data or None if the response body is not valid JSON.
Example:
>>> data = response.json()
>>> if data:
... print(data["id"])
"""
return self._raw.json_data
@overload
def assert_response(
self,
model: None = None,
status_code: int | Iterable[int] = 200,
) -> None: ...
@overload
def assert_response(
self,
model: type[T],
status_code: int | Iterable[int] = 200,
) -> T: ...
def assert_response(
self,
model: type[T] | None = None,
status_code: int | Iterable[int] = 200,
) -> T | None:
"""Assert status code and optionally validate response body.
Checks that the HTTP status code matches the expected value and,
if a Pydantic model is provided, parses and validates the response
body against that model.
Args:
model: Pydantic model class to validate the response body.
If None, only the status code is checked.
status_code: Expected status code or container of acceptable codes.
Defaults to 200.
Returns:
Validated Pydantic model instance if ``model`` is provided,
otherwise None.
Raises:
ApiStatusError: If the status code does not match.
ApiValidationError: If the response body fails model validation.
Example:
Status code only:
>>> response.assert_response(status_code=204)
With Pydantic model:
>>> user = response.assert_response(UserProfile, status_code=200)
>>> print(user.id)
Acceptable range of status codes:
>>> response.assert_response(status_code={200, 201})
"""
if isinstance(status_code, int):
if self.status_code != status_code:
raise ApiStatusError(
expected=status_code,
actual=self.status_code,
response=self,
)
elif self.status_code not in status_code:
raise ApiStatusError(
expected=status_code,
actual=self.status_code,
response=self,
)
if model is None:
return None
json_data = self.json()
if json_data is None:
raise ApiValidationError(
model=model,
validation_error=None,
response=self,
)
try:
return model.model_validate(json_data)
except ValidationError as exc:
raise ApiValidationError(
model=model,
validation_error=exc,
response=self,
) from exc

177
src/pytfm/api/transport.py Normal file
View File

@@ -0,0 +1,177 @@
"""Synchronous HTTP transports for API testing.
This module defines the transport protocol and provides concrete
implementations for different HTTP backends.
Example:
>>> with HttpxSyncTransport() as transport:
... raw = transport.send("GET", "http://api.example.com/health")
... print(raw.status_code)
"""
from __future__ import annotations
import contextlib
from typing import Any, Protocol
import httpx
from pytfm.api.response import RawResponse
class SyncTransport(Protocol):
"""Protocol for synchronous HTTP transports.
All sync transports must implement context manager protocol
for proper resource lifecycle management.
Example:
>>> class MyTransport:
... def __enter__(self):
... return self
... def __exit__(self, *args):
... pass
... def send(self, method, url, **kwargs):
... return RawResponse(status_code=200, headers={}, text="")
"""
def __enter__(self) -> SyncTransport:
"""Enter the runtime context."""
...
def __exit__(self, *args: Any) -> None:
"""Exit the runtime context."""
...
def send(
self,
method: str,
url: str,
*,
headers: dict[str, str] | None = None,
json: Any | None = None,
data: Any | None = None,
params: dict[str, Any] | None = None,
timeout: float | None = None,
) -> RawResponse:
"""Send an HTTP request and return a normalized response.
Args:
method: HTTP method (GET, POST, PUT, PATCH, DELETE).
url: Full request URL.
headers: Request headers.
json: JSON-serializable request body.
data: Form data.
params: Query parameters.
timeout: Request timeout in seconds.
Returns:
Normalized RawResponse.
"""
...
class HttpxSyncTransport:
"""Synchronous HTTP transport using ``httpx.Client``.
Provides a normalized interface over ``httpx.Client`` for use
with :class:`~pytfm.api.api_client.ApiClient`.
Attributes:
_client: Underlying ``httpx.Client`` instance.
Example:
>>> with HttpxSyncTransport() as transport:
... raw = transport.send(
... "POST",
... "http://api.example.com/users",
... json={"name": "Alice"},
... )
... print(raw.status_code)
With an existing client:
>>> client = httpx.Client(timeout=60.0)
>>> transport = HttpxSyncTransport(client=client)
>>> with transport:
... raw = transport.send("GET", "http://api.example.com/health")
"""
def __init__(self, client: httpx.Client | None = None) -> None:
"""Initialize the transport.
Args:
client: Optional existing ``httpx.Client`` instance.
If not provided, a new client is created on context enter.
"""
self._client = client
self._owns_client = client is None
def __enter__(self) -> HttpxSyncTransport:
"""Enter the runtime context and initialize the client.
Returns:
Self for use in context manager.
"""
if self._client is None:
self._client = httpx.Client()
return self
def __exit__(self, *args: Any) -> None:
"""Exit the runtime context and close the client if owned."""
if self._owns_client and self._client is not None:
self._client.close()
self._client = None
def send(
self,
method: str,
url: str,
*,
headers: dict[str, str] | None = None,
json: Any | None = None,
data: Any | None = None,
params: dict[str, Any] | None = None,
timeout: float | None = None,
) -> RawResponse:
"""Send an HTTP request via ``httpx.Client``.
Args:
method: HTTP method.
url: Full request URL.
headers: Request headers.
json: JSON body.
data: Form data.
params: Query parameters.
timeout: Request timeout in seconds.
Returns:
Normalized RawResponse.
Raises:
RuntimeError: If transport is not used as context manager.
"""
if self._client is None:
raise RuntimeError(
"HttpxSyncTransport not initialized. Use 'with HttpxSyncTransport() as transport:'."
)
response = self._client.request(
method=method,
url=url,
headers=headers,
json=json,
data=data,
params=params,
timeout=timeout,
)
json_data = None
content_type = response.headers.get("content-type", "")
if "application/json" in content_type or "text/json" in content_type:
with contextlib.suppress(ValueError):
json_data = response.json()
return RawResponse(
status_code=response.status_code,
headers=dict(response.headers),
text=response.text,
json_data=json_data,
)

225
src/pytfm/auth/__init__.py Normal file
View File

@@ -0,0 +1,225 @@
"""Authentication abstractions for UI testing.
This module provides the AuthProvider interface and concrete implementations
for managing test user authentication in browser-based tests.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any
@dataclass
class TestUser:
"""Represents a test user for authentication scenarios.
Attributes:
id: Unique user identifier.
username: User login name.
email: User email address.
password: User password (plaintext for testing only).
roles: List of user roles.
meta: Additional provider-specific metadata.
"""
id: str
username: str
email: str
password: str
roles: list[str] = field(default_factory=list)
meta: dict[str, Any] = field(default_factory=dict)
class AuthProvider(ABC):
"""Abstract base class for authentication providers in tests.
Implementations handle user creation, login, and cookie generation
for browser-based authentication scenarios.
Example:
>>> class FakeKeycloakProvider(AuthProvider):
... def create_user(self, username, password, email, roles):
... return TestUser(id="1", username=username, email=email,
... password=password, roles=roles)
... def login(self, username, password):
... return "fake_token"
... def build_auth_cookie(self, token, domain):
... return {"name": "access_token", "value": token,
... "domain": domain, "path": "/", "httpOnly": True}
"""
@abstractmethod
def create_user(
self,
username: str,
password: str,
email: str,
roles: list[str] | None = None,
) -> TestUser:
"""Create a new test user.
Args:
username: Login name for the user.
password: Password for the user.
email: Email address.
roles: Optional list of roles.
Returns:
Created TestUser instance.
"""
@abstractmethod
def login(self, username: str, password: str) -> str:
"""Authenticate user and return token.
Args:
username: User login name.
password: User password.
Returns:
Authentication token string.
"""
@abstractmethod
def build_auth_cookie(self, token: str, domain: str) -> dict[str, Any]:
"""Build cookie dict for browser authentication.
Args:
token: Authentication token from login().
domain: Cookie domain (e.g. '127.0.0.1').
Returns:
Cookie dict compatible with Playwright's add_cookies().
"""
def get_api_auth_headers(self, token: str) -> dict[str, str]:
"""Return HTTP headers for API authentication.
Default implementation returns a Bearer token ``Authorization``
header. Override this method for custom authentication schemes
such as API-Key or X-Auth-Token.
Args:
token: Authentication token returned by :meth:`login`.
Returns:
Dictionary of HTTP headers to include in API requests.
Example:
>>> provider = InMemoryAuthProvider()
>>> headers = provider.get_api_auth_headers("my_token")
>>> headers
{'Authorization': 'Bearer my_token'}
Custom scheme:
>>> class ApiKeyProvider(AuthProvider):
... def get_api_auth_headers(self, token):
... return {"X-API-Key": token}
"""
return {"Authorization": f"Bearer {token}"}
class InMemoryAuthProvider(AuthProvider):
"""Simple in-memory authentication provider for testing.
Stores users and tokens in memory. Suitable for local test servers
where external auth services are not available.
Attributes:
_users: Dictionary of users by username.
_tokens: Dictionary of active tokens.
token_ttl: Token lifetime in seconds.
"""
def __init__(self, token_ttl: int = 3600) -> None:
"""Initialize the in-memory auth provider.
Args:
token_ttl: Token lifetime in seconds (default: 1 hour).
"""
self._users: dict[str, TestUser] = {}
self._tokens: dict[str, tuple[str, float]] = {}
self._token_ttl = token_ttl
self._counter = 0
def create_user(
self,
username: str,
password: str,
email: str,
roles: list[str] | None = None,
) -> TestUser:
"""Create a new in-memory test user.
Args:
username: Login name.
password: Password.
email: Email address.
roles: Optional list of roles.
Returns:
Created TestUser.
"""
self._counter += 1
user = TestUser(
id=str(self._counter),
username=username,
email=email,
password=password,
roles=roles or ["user"],
)
self._users[username] = user
return user
def login(self, username: str, password: str) -> str:
"""Authenticate user and return a simple token.
Args:
username: User login name.
password: User password.
Returns:
Simple token string.
Raises:
ValueError: If credentials are invalid.
"""
import secrets
import time
user = self._users.get(username)
if not user or user.password != password:
raise ValueError("Invalid credentials")
token = secrets.token_urlsafe(32)
self._tokens[token] = (user.id, time.time())
return token
def build_auth_cookie(self, token: str, domain: str) -> dict[str, Any]:
"""Build a standard auth cookie.
Args:
token: Authentication token.
domain: Cookie domain.
Returns:
Cookie dict for Playwright.
"""
return {
"name": "access_token",
"value": token,
"domain": domain,
"path": "/",
"httpOnly": True,
"secure": False,
"sameSite": "Lax",
}
def clear(self) -> None:
"""Clear all users and tokens."""
self._users.clear()
self._tokens.clear()
self._counter = 0

View File

@@ -1,13 +1,16 @@
"""Test data generators module.
This module provides utilities for generating test data
using libraries like mimesis or faker.
using the mimesis library.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from mimesis import Field, Locale
from mimesis.enums import Gender
if TYPE_CHECKING:
pass
@@ -16,7 +19,10 @@ class TestDataGenerator:
"""Generator for test data.
Provides methods to generate realistic test data for various
test scenarios.
test scenarios using mimesis.
Attributes:
_field: Mimesis Field instance for data generation.
Example:
>>> generator = TestDataGenerator()
@@ -24,22 +30,169 @@ test scenarios.
>>> name = generator.full_name()
"""
def __init__(self) -> None:
"""Initialize the test data generator."""
pass
def __init__(self, locale: Locale = Locale.EN) -> None:
"""Initialize the test data generator.
Args:
locale: Mimesis locale for data generation.
"""
self._field = Field(locale)
def email(self) -> str:
"""Generate a random email address.
Returns:
str: Random email address.
Random email address.
"""
return "test@example.com"
return self._field("email")
def full_name(self) -> str:
def full_name(self, gender: Gender | None = None) -> str:
"""Generate a random full name.
Args:
gender: Optional gender for the name.
Returns:
str: Random full name.
Random full name.
"""
return "Test User"
if gender:
return self._field("full_name", gender=gender)
return self._field("full_name")
def username(self) -> str:
"""Generate a random username.
Returns:
Random username.
"""
return self._field("username")
def password(self, length: int = 12) -> str:
"""Generate a random password.
Args:
length: Password length.
Returns:
Random password string.
"""
return self._field("password", length=length)
class PostDataGenerator:
"""Generator for blog post test data.
Provides methods to generate realistic blog post content
including titles, markdown content, and tags.
Attributes:
_field: Mimesis Field instance for data generation.
Example:
>>> generator = PostDataGenerator()
>>> post_data = generator.generate_post()
"""
_TECH_TOPICS: list[str] = [
"python",
"fastapi",
"docker",
"kubernetes",
"sqlalchemy",
"pytest",
"asyncio",
"typescript",
"react",
"postgres",
"redis",
"graphql",
"microservices",
"ci_cd",
"ddd",
]
_TECH_VERBS: list[str] = [
"Building",
"Deploying",
"Testing",
"Optimizing",
"Refactoring",
"Scaling",
"Monitoring",
"Securing",
"Automating",
"Integrating",
]
def __init__(self, locale: Locale = Locale.EN) -> None:
"""Initialize the post data generator.
Args:
locale: Mimesis locale for data generation.
"""
self._field = Field(locale)
def title(self) -> str:
"""Generate a random blog post title.
Returns:
Random post title related to technology.
"""
verb = self._field("choice", items=self._TECH_VERBS)
topic = self._field("choice", items=self._TECH_TOPICS)
suffix = self._field("choice", items=["", " in 2025", " Best Practices", " Guide"])
return f"{verb} {topic.replace('_', ' ').title()}{suffix}"
def content(self, paragraphs: int = 3) -> str:
"""Generate random markdown blog post content.
Args:
paragraphs: Number of paragraphs to generate.
Returns:
Markdown formatted post content.
"""
parts: list[str] = []
for _ in range(paragraphs):
parts.append(self._field("text.text", quantity=2))
return "\n\n".join(parts)
def tags(self, count: int = 3) -> list[str]:
"""Generate random post tags.
Args:
count: Number of tags to generate.
Returns:
List of unique tag strings.
"""
selected: list[str] = []
while len(selected) < min(count, len(self._TECH_TOPICS)):
tag = self._field("choice", items=self._TECH_TOPICS)
if tag not in selected:
selected.append(tag)
return selected
def generate_post(self) -> dict[str, str | list[str]]:
"""Generate a complete blog post data dictionary.
Returns:
Dictionary with title, content, and tags keys.
Example:
>>> generator = PostDataGenerator()
>>> post = generator.generate_post()
>>> post["title"]
'Building FastAPI Guide'
>>> post["content"]
'Lorem ipsum...'
>>> post["tags"]
['python', 'fastapi', 'docker']
"""
return {
"title": self.title(),
"content": self.content(),
"tags": self.tags(),
}

131
src/pytfm/pytest_plugin.py Normal file
View File

@@ -0,0 +1,131 @@
"""Pytest plugin for pytfm test framework.
Provides reusable fixtures for Playwright-based E2E testing:
- Browser lifecycle (via pytest-playwright)
- Test user data generation
- Authenticated browser contexts and pages
Projects must override ``pytfm_auth_provider`` fixture to supply
an :class:`~pytfm.auth.AuthProvider` instance.
Example project conftest::
import pytest
from pytfm.auth import AuthProvider
@pytest.fixture(scope="session")
def pytfm_auth_provider():
return MyKeycloakAuthProvider()
"""
from __future__ import annotations
import uuid
from collections.abc import Generator
from typing import TYPE_CHECKING, Any
import pytest
if TYPE_CHECKING:
from playwright.sync_api import Browser, BrowserContext, Page
from pytfm.auth import AuthProvider
# pytest-playwright provides browser, context, page fixtures.
# We extend it with authentication helpers.
pytest_plugins = ["pytest_playwright"]
@pytest.fixture(scope="session")
def browser_type_launch_args() -> dict[str, Any]:
"""Return default browser launch arguments ensuring headless mode."""
return {"headless": True}
@pytest.fixture(scope="session")
def pytfm_auth_provider() -> "AuthProvider":
"""Return an AuthProvider instance for test authentication.
Projects MUST override this fixture in their conftest.py.
Raises:
NotImplementedError: If not overridden by the project.
"""
raise NotImplementedError(
"Override the 'pytfm_auth_provider' fixture in your project's conftest.py "
"and return an AuthProvider instance (e.g. InMemoryAuthProvider)."
)
@pytest.fixture
def test_user_data() -> dict[str, str]:
"""Generate unique test user data.
Returns:
Dictionary with username, email, and password keys.
"""
unique_id = uuid.uuid4().hex[:8]
return {
"username": f"testuser_{unique_id}",
"email": f"test_{unique_id}@example.com",
"password": "TestPass123!",
}
@pytest.fixture
def authenticated_context(
browser: "Browser",
base_url: str,
pytfm_auth_provider: "AuthProvider",
test_user_data: dict[str, str],
) -> Generator["BrowserContext", None, None]:
"""Create an authenticated browser context with a logged-in user.
Creates a user via the auth provider, logs them in, and injects
the resulting auth cookie into a new browser context.
Args:
browser: Playwright Browser instance (from pytest-playwright).
base_url: Application base URL (must be provided by project).
pytfm_auth_provider: Auth provider (must be overridden by project).
test_user_data: Generated test user credentials.
Yields:
Authenticated BrowserContext.
"""
user = pytfm_auth_provider.create_user(
username=test_user_data["username"],
password=test_user_data["password"],
email=test_user_data["email"],
roles=["user"],
)
token = pytfm_auth_provider.login(user.username, user.password)
context = browser.new_context(
viewport={"width": 1280, "height": 720},
)
cookie_domain = base_url.replace("http://", "").replace("https://", "").split(":")[0]
cookie = pytfm_auth_provider.build_auth_cookie(token, cookie_domain)
context.add_cookies([cookie]) # type: ignore[list-item]
yield context
context.close()
@pytest.fixture
def authenticated_page(authenticated_context: "BrowserContext") -> Generator["Page", None, None]:
"""Create an authenticated page from an authenticated context.
Args:
authenticated_context: BrowserContext with active session.
Yields:
Authenticated Playwright Page.
"""
page = authenticated_context.new_page()
yield page
page.close()

View File

@@ -4,6 +4,13 @@ This module provides base classes and utilities for implementing
the Page Object Model pattern in web UI tests.
"""
from pytfm.web.pom import BasePage
from pytfm.web.locator import AsyncSmartLocator, LocatorConfig, SmartLocator
from pytfm.web.pom import AsyncBasePage, BasePage
__all__ = ["BasePage"]
__all__ = [
"AsyncBasePage",
"AsyncSmartLocator",
"BasePage",
"LocatorConfig",
"SmartLocator",
]

729
src/pytfm/web/locator.py Normal file
View File

@@ -0,0 +1,729 @@
"""Smart locator module for Playwright-based UI testing.
Provides both synchronous (:class:`SmartLocator`) and asynchronous
(:class:`AsyncSmartLocator`) variants for maximum flexibility.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal, Self
if TYPE_CHECKING:
from playwright.async_api import Locator as AsyncLocator
from playwright.async_api import Page as AsyncPage
from playwright.sync_api import Locator as SyncLocator
from playwright.sync_api import Page as SyncPage
@dataclass(frozen=True)
class LocatorConfig:
"""Configuration for locator search strategy."""
selector: str
strategy: Literal["css", "xpath", "text", "testid", "role"] = "testid"
timeout: int = 5000
visible_only: bool = True
def to_playwright_selector(self) -> str:
"""Convert to Playwright-compatible selector string."""
if self.strategy == "testid":
return f'[data-testid="{self.selector}"]'
if self.strategy == "css":
return self.selector
if self.strategy == "xpath":
return f"xpath={self.selector}"
if self.strategy == "text":
return f"text={self.selector}"
if self.strategy == "role":
parts = self.selector.split(":", 1)
role = parts[0]
name = parts[1] if len(parts) > 1 else None
if name:
return f"role={role}[name='{name}']"
return f"role={role}"
return self.selector
class SmartLocator:
"""Synchronous enhanced locator with waiting and assertion capabilities.
This is the **default** sync implementation. For async tests use
:class:`AsyncSmartLocator`.
"""
def __init__(self, config: LocatorConfig, page: "SyncPage" | None = None) -> None:
self._config = config
self._page = page
self._selector = config.to_playwright_selector()
@property
def selector(self) -> str:
return self._selector
def __str__(self) -> str:
return self._selector
def __repr__(self) -> str:
return f"SmartLocator({self._config.strategy}='{self._config.selector}')"
# ============ Factory Methods ============
@classmethod
def by_testid(
cls,
testid: str,
timeout: int = 5000,
visible_only: bool = True,
) -> Self:
config = LocatorConfig(
selector=testid, strategy="testid", timeout=timeout, visible_only=visible_only
)
return cls(config)
@classmethod
def by_css(
cls,
selector: str,
timeout: int = 5000,
visible_only: bool = True,
) -> Self:
config = LocatorConfig(
selector=selector, strategy="css", timeout=timeout, visible_only=visible_only
)
return cls(config)
@classmethod
def by_xpath(
cls,
xpath: str,
timeout: int = 5000,
visible_only: bool = True,
) -> Self:
config = LocatorConfig(
selector=xpath, strategy="xpath", timeout=timeout, visible_only=visible_only
)
return cls(config)
@classmethod
def by_text(
cls,
text: str,
timeout: int = 5000,
visible_only: bool = True,
) -> Self:
config = LocatorConfig(
selector=text, strategy="text", timeout=timeout, visible_only=visible_only
)
return cls(config)
@classmethod
def by_role(
cls,
role: str,
name: str | None = None,
timeout: int = 5000,
visible_only: bool = True,
) -> Self:
selector = f"{role}:{name}" if name else role
config = LocatorConfig(
selector=selector, strategy="role", timeout=timeout, visible_only=visible_only
)
return cls(config)
# ============ Page Association ============
def with_page(self, page: "SyncPage") -> Self:
return self.__class__(self._config, page)
def _get_locator(self, page: "SyncPage" | None = None) -> "SyncLocator":
target_page = page or self._page
if not target_page:
raise RuntimeError("Page not attached to locator")
return target_page.locator(self._selector)
def _get_page(self, page: "SyncPage" | None = None) -> "SyncPage":
target_page = page or self._page
if not target_page:
raise RuntimeError("Page not attached to locator")
return target_page
# ============ Wait Methods ============
def wait_for_visible(self, page: "SyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
loc.wait_for(state="visible", timeout=timeout or self._config.timeout)
def wait_for_hidden(self, page: "SyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
loc.wait_for(state="hidden", timeout=timeout or self._config.timeout)
def wait_for_enabled(self, page: "SyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
loc.wait_for(state="visible", timeout=timeout or self._config.timeout)
target_page = page or self._page
if target_page:
target_page.wait_for_function(
f"!document.querySelector('{self._selector}').disabled",
timeout=timeout or self._config.timeout,
)
def wait_for_disabled(self, page: "SyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
loc.wait_for(state="visible", timeout=timeout or self._config.timeout)
# ============ Action Methods ============
def click(self, page: "SyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
loc.click(timeout=timeout or self._config.timeout)
def click_and_wait(
self,
page: "SyncPage" | None = None,
target_testid: str | None = None,
navigation: bool = False,
timeout: int | None = None,
) -> None:
target_page = self._get_page(page)
timeout_val = timeout or self._config.timeout
if navigation:
try:
with target_page.expect_navigation(
timeout=timeout_val * 2,
wait_until="networkidle",
):
target_page.click(self._selector)
except TimeoutError as e:
raise TimeoutError(
f"Navigation timeout after clicking '{self._selector}'. "
f"Current URL: {target_page.url}"
) from e
else:
target_page.click(self._selector)
if target_testid:
target_loc = SmartLocator.by_testid(target_testid)
try:
target_loc.wait_for_visible(target_page, timeout=timeout_val)
except TimeoutError as e:
raise TimeoutError(
f"Element with testid '{target_testid}' not visible after "
f"clicking '{self._selector}'. Current URL: {target_page.url}"
) from e
def fill(
self, page: "SyncPage" | None = None, value: str = "", timeout: int | None = None
) -> None:
loc = self._get_locator(page)
loc.fill(value, timeout=timeout or self._config.timeout)
def fill_and_submit(
self,
page: "SyncPage" | None = None,
value: str = "",
submit_testid: str | None = None,
timeout: int | None = None,
) -> None:
target_page = self._get_page(page)
timeout_val = timeout or self._config.timeout
self.fill(target_page, value, timeout_val)
if submit_testid:
submit_btn = SmartLocator.by_testid(submit_testid)
submit_btn.click(target_page, timeout_val)
else:
target_page.press(self._selector, "Enter")
def clear(self, page: "SyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
loc.clear(timeout=timeout or self._config.timeout)
def select_option(
self, page: "SyncPage" | None = None, value: str = "", timeout: int | None = None
) -> None:
loc = self._get_locator(page)
loc.select_option(value, timeout=timeout or self._config.timeout)
def check(self, page: "SyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
loc.check(timeout=timeout or self._config.timeout)
def uncheck(self, page: "SyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
loc.uncheck(timeout=timeout or self._config.timeout)
def hover(self, page: "SyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
loc.hover(timeout=timeout or self._config.timeout)
def scroll_into_view(self, page: "SyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
loc.scroll_into_view_if_needed(timeout=timeout or self._config.timeout)
# ============ Query Methods ============
def get_text(self, page: "SyncPage" | None = None) -> str:
loc = self._get_locator(page)
text = loc.text_content()
return text or ""
def get_input_value(self, page: "SyncPage" | None = None) -> str:
loc = self._get_locator(page)
val = loc.input_value()
return val or ""
def get_attribute(self, name: str, page: "SyncPage" | None = None) -> str | None:
loc = self._get_locator(page)
return loc.get_attribute(name)
def count(self, page: "SyncPage" | None = None) -> int:
loc = self._get_locator(page)
return loc.count()
def is_visible(self, page: "SyncPage" | None = None) -> bool:
loc = self._get_locator(page)
return loc.is_visible()
def is_enabled(self, page: "SyncPage" | None = None) -> bool:
loc = self._get_locator(page)
return loc.is_enabled()
def is_checked(self, page: "SyncPage" | None = None) -> bool:
loc = self._get_locator(page)
return loc.is_checked()
# ============ Assertion Methods ============
def assert_visible(self, page: "SyncPage" | None = None, visible: bool = True) -> None:
is_vis = self.is_visible(page)
assert is_vis == visible, f"Expected visible={visible}, got {is_vis}"
def assert_text(
self,
expected: str,
page: "SyncPage" | None = None,
case_sensitive: bool = True,
) -> None:
actual = self.get_text(page)
if not case_sensitive:
actual = actual.lower()
expected = expected.lower()
assert actual == expected, f"Expected '{expected}', got '{actual}'"
def assert_contains_text(
self,
expected: str,
page: "SyncPage" | None = None,
case_sensitive: bool = True,
) -> None:
actual = self.get_text(page)
if not case_sensitive:
actual = actual.lower()
expected = expected.lower()
assert expected in actual, f"Expected '{expected}' in '{actual}'"
def assert_count(self, expected: int, page: "SyncPage" | None = None) -> None:
actual = self.count(page)
assert actual == expected, f"Expected {expected} elements, got {actual}"
def assert_attribute(
self,
name: str,
expected: str,
page: "SyncPage" | None = None,
) -> None:
actual = self.get_attribute(name, page)
assert actual == expected, f"Expected attr '{name}'='{expected}', got '{actual}'"
# ============ Navigation Methods ============
def nth(self, index: int) -> Self:
new_selector = f"{self._selector} >> nth={index}"
new_config = LocatorConfig(
selector=new_selector,
strategy="css",
timeout=self._config.timeout,
visible_only=self._config.visible_only,
)
return self.__class__(new_config, self._page)
def first(self) -> Self:
return self.nth(0)
def last(self) -> Self:
new_selector = f"{self._selector} >> nth=-1"
new_config = LocatorConfig(
selector=new_selector,
strategy="css",
timeout=self._config.timeout,
visible_only=self._config.visible_only,
)
return self.__class__(new_config, self._page)
def filter(self, filter_testid: str) -> Self:
new_selector = f"{self._selector}:has([data-testid='{filter_testid}'])"
new_config = LocatorConfig(
selector=new_selector,
strategy="css",
timeout=self._config.timeout,
visible_only=self._config.visible_only,
)
return self.__class__(new_config, self._page)
def child(self, child_testid: str) -> Self:
new_selector = f"{self._selector} [data-testid='{child_testid}']"
new_config = LocatorConfig(
selector=new_selector,
strategy="css",
timeout=self._config.timeout,
visible_only=self._config.visible_only,
)
return self.__class__(new_config, self._page)
def parent(self) -> Self:
new_selector = f"{self._selector} >> .."
new_config = LocatorConfig(
selector=new_selector,
strategy="css",
timeout=self._config.timeout,
visible_only=self._config.visible_only,
)
return self.__class__(new_config, self._page)
class AsyncSmartLocator:
"""Asynchronous variant of SmartLocator for async Playwright usage.
Use this when your test suite runs with ``async def`` test functions
and ``playwright.async_api``.
"""
def __init__(self, config: LocatorConfig, page: "AsyncPage" | None = None) -> None:
self._config = config
self._page = page
self._selector = config.to_playwright_selector()
@property
def selector(self) -> str:
return self._selector
def __str__(self) -> str:
return self._selector
def __repr__(self) -> str:
return f"AsyncSmartLocator({self._config.strategy}='{self._config.selector}')"
@classmethod
def by_testid(
cls,
testid: str,
timeout: int = 5000,
visible_only: bool = True,
) -> Self:
config = LocatorConfig(
selector=testid, strategy="testid", timeout=timeout, visible_only=visible_only
)
return cls(config)
@classmethod
def by_css(
cls,
selector: str,
timeout: int = 5000,
visible_only: bool = True,
) -> Self:
config = LocatorConfig(
selector=selector, strategy="css", timeout=timeout, visible_only=visible_only
)
return cls(config)
@classmethod
def by_xpath(
cls,
xpath: str,
timeout: int = 5000,
visible_only: bool = True,
) -> Self:
config = LocatorConfig(
selector=xpath, strategy="xpath", timeout=timeout, visible_only=visible_only
)
return cls(config)
@classmethod
def by_text(
cls,
text: str,
timeout: int = 5000,
visible_only: bool = True,
) -> Self:
config = LocatorConfig(
selector=text, strategy="text", timeout=timeout, visible_only=visible_only
)
return cls(config)
@classmethod
def by_role(
cls,
role: str,
name: str | None = None,
timeout: int = 5000,
visible_only: bool = True,
) -> Self:
selector = f"{role}:{name}" if name else role
config = LocatorConfig(
selector=selector, strategy="role", timeout=timeout, visible_only=visible_only
)
return cls(config)
def with_page(self, page: "AsyncPage") -> Self:
return self.__class__(self._config, page)
def _get_locator(self, page: "AsyncPage" | None = None) -> "AsyncLocator":
target_page = page or self._page
if not target_page:
raise RuntimeError("Page not attached to locator")
return target_page.locator(self._selector)
def _get_page(self, page: "AsyncPage" | None = None) -> "AsyncPage":
target_page = page or self._page
if not target_page:
raise RuntimeError("Page not attached to locator")
return target_page
async def wait_for_visible(self, page: "AsyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
await loc.wait_for(state="visible", timeout=timeout or self._config.timeout)
async def wait_for_hidden(self, page: "AsyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
await loc.wait_for(state="hidden", timeout=timeout or self._config.timeout)
async def wait_for_enabled(self, page: "AsyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
await loc.wait_for(state="visible", timeout=timeout or self._config.timeout)
target_page = page or self._page
if target_page:
await target_page.wait_for_function(
f"!document.querySelector('{self._selector}').disabled",
timeout=timeout or self._config.timeout,
)
async def wait_for_disabled(self, page: "AsyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
await loc.wait_for(state="visible", timeout=timeout or self._config.timeout)
async def click(self, page: "AsyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
await loc.click(timeout=timeout or self._config.timeout)
async def click_and_wait(
self,
page: "AsyncPage" | None = None,
target_testid: str | None = None,
navigation: bool = False,
timeout: int | None = None,
) -> None:
target_page = self._get_page(page)
timeout_val = timeout or self._config.timeout
if navigation:
try:
async with target_page.expect_navigation(
timeout=timeout_val * 2,
wait_until="networkidle",
):
await target_page.click(self._selector)
except TimeoutError as e:
raise TimeoutError(
f"Navigation timeout after clicking '{self._selector}'. "
f"Current URL: {target_page.url}"
) from e
else:
await target_page.click(self._selector)
if target_testid:
target_loc = AsyncSmartLocator.by_testid(target_testid)
try:
await target_loc.wait_for_visible(target_page, timeout=timeout_val)
except TimeoutError as e:
raise TimeoutError(
f"Element with testid '{target_testid}' not visible after "
f"clicking '{self._selector}'. Current URL: {target_page.url}"
) from e
async def fill(
self, page: "AsyncPage" | None = None, value: str = "", timeout: int | None = None
) -> None:
loc = self._get_locator(page)
await loc.fill(value, timeout=timeout or self._config.timeout)
async def fill_and_submit(
self,
page: "AsyncPage" | None = None,
value: str = "",
submit_testid: str | None = None,
timeout: int | None = None,
) -> None:
target_page = self._get_page(page)
timeout_val = timeout or self._config.timeout
await self.fill(target_page, value, timeout_val)
if submit_testid:
submit_btn = AsyncSmartLocator.by_testid(submit_testid)
await submit_btn.click(target_page, timeout_val)
else:
await target_page.press(self._selector, "Enter")
async def clear(self, page: "AsyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
await loc.clear(timeout=timeout or self._config.timeout)
async def select_option(
self, page: "AsyncPage" | None = None, value: str = "", timeout: int | None = None
) -> None:
loc = self._get_locator(page)
await loc.select_option(value, timeout=timeout or self._config.timeout)
async def check(self, page: "AsyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
await loc.check(timeout=timeout or self._config.timeout)
async def uncheck(self, page: "AsyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
await loc.uncheck(timeout=timeout or self._config.timeout)
async def hover(self, page: "AsyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
await loc.hover(timeout=timeout or self._config.timeout)
async def scroll_into_view(self, page: "AsyncPage" | None = None, timeout: int | None = None) -> None:
loc = self._get_locator(page)
await loc.scroll_into_view_if_needed(timeout=timeout or self._config.timeout)
async def get_text(self, page: "AsyncPage" | None = None) -> str:
loc = self._get_locator(page)
text = await loc.text_content()
return text or ""
async def get_input_value(self, page: "AsyncPage" | None = None) -> str:
loc = self._get_locator(page)
val = await loc.input_value()
return val or ""
async def get_attribute(self, name: str, page: "AsyncPage" | None = None) -> str | None:
loc = self._get_locator(page)
return await loc.get_attribute(name)
async def count(self, page: "AsyncPage" | None = None) -> int:
loc = self._get_locator(page)
return await loc.count()
async def is_visible(self, page: "AsyncPage" | None = None) -> bool:
loc = self._get_locator(page)
return await loc.is_visible()
async def is_enabled(self, page: "AsyncPage" | None = None) -> bool:
loc = self._get_locator(page)
return await loc.is_enabled()
async def is_checked(self, page: "AsyncPage" | None = None) -> bool:
loc = self._get_locator(page)
return await loc.is_checked()
async def assert_visible(self, page: "AsyncPage" | None = None, visible: bool = True) -> None:
is_vis = await self.is_visible(page)
assert is_vis == visible, f"Expected visible={visible}, got {is_vis}"
async def assert_text(
self,
expected: str,
page: "AsyncPage" | None = None,
case_sensitive: bool = True,
) -> None:
actual = await self.get_text(page)
if not case_sensitive:
actual = actual.lower()
expected = expected.lower()
assert actual == expected, f"Expected '{expected}', got '{actual}'"
async def assert_contains_text(
self,
expected: str,
page: "AsyncPage" | None = None,
case_sensitive: bool = True,
) -> None:
actual = await self.get_text(page)
if not case_sensitive:
actual = actual.lower()
expected = expected.lower()
assert expected in actual, f"Expected '{expected}' in '{actual}'"
async def assert_count(self, expected: int, page: "AsyncPage" | None = None) -> None:
actual = await self.count(page)
assert actual == expected, f"Expected {expected} elements, got {actual}"
async def assert_attribute(
self,
name: str,
expected: str,
page: "AsyncPage" | None = None,
) -> None:
actual = await self.get_attribute(name, page)
assert actual == expected, f"Expected attr '{name}'='{expected}', got '{actual}'"
def nth(self, index: int) -> Self:
new_selector = f"{self._selector} >> nth={index}"
new_config = LocatorConfig(
selector=new_selector,
strategy="css",
timeout=self._config.timeout,
visible_only=self._config.visible_only,
)
return self.__class__(new_config, self._page)
def first(self) -> Self:
return self.nth(0)
def last(self) -> Self:
new_selector = f"{self._selector} >> nth=-1"
new_config = LocatorConfig(
selector=new_selector,
strategy="css",
timeout=self._config.timeout,
visible_only=self._config.visible_only,
)
return self.__class__(new_config, self._page)
def filter(self, filter_testid: str) -> Self:
new_selector = f"{self._selector}:has([data-testid='{filter_testid}'])"
new_config = LocatorConfig(
selector=new_selector,
strategy="css",
timeout=self._config.timeout,
visible_only=self._config.visible_only,
)
return self.__class__(new_config, self._page)
def child(self, child_testid: str) -> Self:
new_selector = f"{self._selector} [data-testid='{child_testid}']"
new_config = LocatorConfig(
selector=new_selector,
strategy="css",
timeout=self._config.timeout,
visible_only=self._config.visible_only,
)
return self.__class__(new_config, self._page)
def parent(self) -> Self:
new_selector = f"{self._selector} >> .."
new_config = LocatorConfig(
selector=new_selector,
strategy="css",
timeout=self._config.timeout,
visible_only=self._config.visible_only,
)
return self.__class__(new_config, self._page)

View File

@@ -1,7 +1,7 @@
"""Page Object Model base class for UI testing.
"""Page Object Model base classes for UI testing.
This module provides the foundation for implementing the Page Object Model pattern
with Playwright for web UI testing.
Provides both synchronous (:class:`BasePage`) and asynchronous
(:class:`AsyncBasePage`) base page objects.
"""
from __future__ import annotations
@@ -9,112 +9,86 @@ from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from playwright.async_api import Page
from playwright.async_api import Page as AsyncPage
from playwright.sync_api import Page as SyncPage
class BasePage:
"""Base class for all page objects.
"""Synchronous base class for all page objects.
Provides common functionality for page interactions and element management.
All page objects should inherit from this class.
Attributes:
page: Playwright Page instance for browser interactions.
base_url: Base URL of the application under test.
path: URL path for this specific page.
Example:
>>> class LoginPage(BasePage):
... path = "/login"
...
... async def login(self, username: str, password: str) -> None:
... await self.page.fill('[data-testid="input-username"]', username)
... await self.page.fill('[data-testid="input-password"]', password)
... await self.page.click('[data-testid="btn-login"]')
This is the **default** implementation. For async tests use
:class:`AsyncBasePage`.
"""
path: str = ""
def __init__(self, page: Page, base_url: str) -> None:
"""Initialize the page object.
Args:
page: Playwright Page instance for browser interactions.
base_url: Base URL of the application under test.
"""
def __init__(self, page: "SyncPage", base_url: str) -> None:
self.page = page
self.base_url = base_url.rstrip("/")
@property
def url(self) -> str:
"""Return the full URL for this page.
Returns:
str: Full URL combining base_url and path.
"""
return f"{self.base_url}{self.path}"
async def open(self) -> BasePage:
"""Navigate to this page.
def open(self) -> BasePage:
self.page.goto(self.url)
return self
Returns:
BasePage: Self for method chaining.
"""
def wait_for_element(self, selector: str, timeout: int = 5000) -> None:
self.page.wait_for_selector(selector, state="visible", timeout=timeout)
def click(self, selector: str) -> None:
self.page.click(selector)
def fill(self, selector: str, value: str) -> None:
self.page.fill(selector, value)
def get_text(self, selector: str) -> str:
element = self.page.wait_for_selector(selector)
if element:
return element.text_content() or ""
return ""
def is_visible(self, selector: str) -> bool:
element = self.page.query_selector(selector)
if element:
return element.is_visible()
return False
class AsyncBasePage:
"""Asynchronous base class for page objects using ``playwright.async_api``."""
path: str = ""
def __init__(self, page: "AsyncPage", base_url: str) -> None:
self.page = page
self.base_url = base_url.rstrip("/")
@property
def url(self) -> str:
return f"{self.base_url}{self.path}"
async def open(self) -> AsyncBasePage:
await self.page.goto(self.url)
return self
async def wait_for_element(self, selector: str, timeout: int = 5000) -> None:
"""Wait for an element to be visible.
Args:
selector: CSS selector or data-testid for the element.
timeout: Maximum time to wait in milliseconds.
Raises:
TimeoutError: If element doesn't appear within timeout.
"""
await self.page.wait_for_selector(selector, state="visible", timeout=timeout)
async def click(self, selector: str) -> None:
"""Click on an element.
Args:
selector: CSS selector or data-testid for the element.
"""
await self.page.click(selector)
async def fill(self, selector: str, value: str) -> None:
"""Fill an input field.
Args:
selector: CSS selector or data-testid for the input.
value: Value to fill into the input.
"""
await self.page.fill(selector, value)
async def get_text(self, selector: str) -> str:
"""Get text content of an element.
Args:
selector: CSS selector or data-testid for the element.
Returns:
str: Text content of the element.
"""
element = await self.page.wait_for_selector(selector)
if element:
return await element.text_content() or ""
return ""
async def is_visible(self, selector: str) -> bool:
"""Check if an element is visible.
Args:
selector: CSS selector or data-testid for the element.
Returns:
bool: True if element is visible, False otherwise.
"""
element = await self.page.query_selector(selector)
if element:
return await element.is_visible()