- Refactor Redis backend connection handling and pool management - Update algorithm implementations with improved type annotations - Enhance config loader validation with stricter Pydantic schemas - Improve decorator and middleware error handling - Expand example scripts with better docstrings and usage patterns - Add new 00_basic_usage.py example for quick start - Reorganize examples directory structure - Fix type annotation inconsistencies across core modules - Update dependencies in pyproject.toml
368 lines
10 KiB
ReStructuredText
368 lines
10 KiB
ReStructuredText
Testing
|
|
=======
|
|
|
|
Testing rate-limited endpoints requires some care. You don't want your tests to
|
|
be flaky because of timing issues, and you need to verify that limits actually work.
|
|
|
|
Basic Testing Setup
|
|
-------------------
|
|
|
|
Use pytest with pytest-asyncio for async tests:
|
|
|
|
.. code-block:: python
|
|
|
|
# conftest.py
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from fastapi_traffic import MemoryBackend, RateLimiter
|
|
from fastapi_traffic.core.limiter import set_limiter
|
|
|
|
@pytest.fixture
|
|
def app():
|
|
"""Create a fresh app for each test."""
|
|
from myapp import create_app
|
|
return create_app()
|
|
|
|
@pytest.fixture
|
|
def client(app):
|
|
"""Test client with fresh rate limiter."""
|
|
backend = MemoryBackend()
|
|
limiter = RateLimiter(backend)
|
|
set_limiter(limiter)
|
|
|
|
with TestClient(app) as client:
|
|
yield client
|
|
|
|
Testing Rate Limit Enforcement
|
|
------------------------------
|
|
|
|
Verify that the limit is actually enforced:
|
|
|
|
.. code-block:: python
|
|
|
|
def test_rate_limit_enforced(client):
|
|
"""Test that requests are blocked after limit is reached."""
|
|
# Make requests up to the limit
|
|
for i in range(10):
|
|
response = client.get("/api/data")
|
|
assert response.status_code == 200, f"Request {i+1} should succeed"
|
|
|
|
# Next request should be rate limited
|
|
response = client.get("/api/data")
|
|
assert response.status_code == 429
|
|
assert "retry_after" in response.json()
|
|
|
|
Testing Rate Limit Headers
|
|
--------------------------
|
|
|
|
Check that headers are included correctly:
|
|
|
|
.. code-block:: python
|
|
|
|
def test_rate_limit_headers(client):
|
|
"""Test that rate limit headers are present."""
|
|
response = client.get("/api/data")
|
|
|
|
assert "X-RateLimit-Limit" in response.headers
|
|
assert "X-RateLimit-Remaining" in response.headers
|
|
assert "X-RateLimit-Reset" in response.headers
|
|
|
|
# Verify values make sense
|
|
limit = int(response.headers["X-RateLimit-Limit"])
|
|
remaining = int(response.headers["X-RateLimit-Remaining"])
|
|
|
|
assert limit == 100 # Your configured limit
|
|
assert remaining == 99 # One request made
|
|
|
|
Testing Different Clients
|
|
-------------------------
|
|
|
|
Verify that different clients have separate limits:
|
|
|
|
.. code-block:: python
|
|
|
|
def test_separate_limits_per_client(client):
|
|
"""Test that different IPs have separate limits."""
|
|
# Client A makes requests
|
|
for _ in range(10):
|
|
response = client.get(
|
|
"/api/data",
|
|
headers={"X-Forwarded-For": "1.1.1.1"}
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# Client A is now limited
|
|
response = client.get(
|
|
"/api/data",
|
|
headers={"X-Forwarded-For": "1.1.1.1"}
|
|
)
|
|
assert response.status_code == 429
|
|
|
|
# Client B should still have full quota
|
|
response = client.get(
|
|
"/api/data",
|
|
headers={"X-Forwarded-For": "2.2.2.2"}
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
Testing Window Reset
|
|
--------------------
|
|
|
|
Test that limits reset after the window expires:
|
|
|
|
.. code-block:: python
|
|
|
|
import time
|
|
from unittest.mock import patch
|
|
|
|
def test_limit_resets_after_window(client):
|
|
"""Test that limits reset after window expires."""
|
|
# Exhaust the limit
|
|
for _ in range(10):
|
|
client.get("/api/data")
|
|
|
|
# Should be limited
|
|
response = client.get("/api/data")
|
|
assert response.status_code == 429
|
|
|
|
# Fast-forward time (mock time.time)
|
|
with patch('time.time') as mock_time:
|
|
# Move 61 seconds into the future
|
|
mock_time.return_value = time.time() + 61
|
|
|
|
# Should be allowed again
|
|
response = client.get("/api/data")
|
|
assert response.status_code == 200
|
|
|
|
Testing Exemptions
|
|
------------------
|
|
|
|
Verify that exemptions work:
|
|
|
|
.. code-block:: python
|
|
|
|
def test_exempt_paths(client):
|
|
"""Test that exempt paths bypass rate limiting."""
|
|
# Exhaust limit on a regular endpoint
|
|
for _ in range(100):
|
|
client.get("/api/data")
|
|
|
|
# Regular endpoint should be limited
|
|
response = client.get("/api/data")
|
|
assert response.status_code == 429
|
|
|
|
# Health check should still work
|
|
response = client.get("/health")
|
|
assert response.status_code == 200
|
|
|
|
def test_exempt_ips(client):
|
|
"""Test that exempt IPs bypass rate limiting."""
|
|
# Make many requests from exempt IP
|
|
for _ in range(1000):
|
|
response = client.get(
|
|
"/api/data",
|
|
headers={"X-Forwarded-For": "127.0.0.1"}
|
|
)
|
|
assert response.status_code == 200 # Never limited
|
|
|
|
Testing with Async Client
|
|
-------------------------
|
|
|
|
For async endpoints, use httpx:
|
|
|
|
.. code-block:: python
|
|
|
|
import pytest
|
|
import httpx
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_async_rate_limiting():
|
|
"""Test rate limiting with async client."""
|
|
async with httpx.AsyncClient(app=app, base_url="http://test") as client:
|
|
# Make concurrent requests
|
|
responses = await asyncio.gather(*[
|
|
client.get("/api/data")
|
|
for _ in range(15)
|
|
])
|
|
|
|
successes = sum(1 for r in responses if r.status_code == 200)
|
|
limited = sum(1 for r in responses if r.status_code == 429)
|
|
|
|
assert successes == 10 # Limit
|
|
assert limited == 5 # Over limit
|
|
|
|
Testing Backend Failures
|
|
------------------------
|
|
|
|
Test behavior when the backend fails:
|
|
|
|
.. code-block:: python
|
|
|
|
from unittest.mock import AsyncMock, patch
|
|
from fastapi_traffic import BackendError
|
|
|
|
def test_skip_on_error(client):
|
|
"""Test that requests are allowed when backend fails and skip_on_error=True."""
|
|
with patch.object(
|
|
MemoryBackend, 'get',
|
|
side_effect=BackendError("Connection failed")
|
|
):
|
|
# With skip_on_error=True, should still work
|
|
response = client.get("/api/data")
|
|
assert response.status_code == 200
|
|
|
|
def test_fail_on_error(client):
|
|
"""Test that requests fail when backend fails and skip_on_error=False."""
|
|
with patch.object(
|
|
MemoryBackend, 'get',
|
|
side_effect=BackendError("Connection failed")
|
|
):
|
|
# With skip_on_error=False (default), should fail
|
|
response = client.get("/api/strict-data")
|
|
assert response.status_code == 500
|
|
|
|
Mocking the Rate Limiter
|
|
------------------------
|
|
|
|
For unit tests, you might want to mock the rate limiter entirely:
|
|
|
|
.. code-block:: python
|
|
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
from fastapi_traffic.core.limiter import set_limiter
|
|
from fastapi_traffic.core.models import RateLimitInfo, RateLimitResult
|
|
|
|
def test_with_mocked_limiter(client):
|
|
"""Test endpoint logic without actual rate limiting."""
|
|
mock_limiter = MagicMock()
|
|
mock_limiter.hit = AsyncMock(return_value=RateLimitResult(
|
|
allowed=True,
|
|
info=RateLimitInfo(
|
|
limit=100,
|
|
remaining=99,
|
|
reset_at=time.time() + 60,
|
|
window_size=60,
|
|
),
|
|
key="test",
|
|
))
|
|
|
|
set_limiter(mock_limiter)
|
|
|
|
response = client.get("/api/data")
|
|
assert response.status_code == 200
|
|
mock_limiter.hit.assert_called_once()
|
|
|
|
Integration Testing with Redis
|
|
------------------------------
|
|
|
|
For integration tests with Redis:
|
|
|
|
.. code-block:: python
|
|
|
|
import pytest
|
|
from fastapi_traffic.backends.redis import RedisBackend
|
|
|
|
@pytest.fixture
|
|
async def redis_backend():
|
|
"""Create a Redis backend for testing."""
|
|
backend = await RedisBackend.from_url(
|
|
"redis://localhost:6379/15", # Use a test database
|
|
key_prefix="test:",
|
|
)
|
|
yield backend
|
|
await backend.clear() # Clean up after test
|
|
await backend.close()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redis_rate_limiting(redis_backend):
|
|
"""Test rate limiting with real Redis."""
|
|
limiter = RateLimiter(redis_backend)
|
|
await limiter.initialize()
|
|
|
|
config = RateLimitConfig(limit=5, window_size=60)
|
|
request = create_mock_request("1.1.1.1")
|
|
|
|
# Make requests up to limit
|
|
for _ in range(5):
|
|
result = await limiter.check(request, config)
|
|
assert result.allowed
|
|
|
|
# Next should be blocked
|
|
result = await limiter.check(request, config)
|
|
assert not result.allowed
|
|
|
|
await limiter.close()
|
|
|
|
Fixtures for Common Scenarios
|
|
-----------------------------
|
|
|
|
.. code-block:: python
|
|
|
|
# conftest.py
|
|
import pytest
|
|
from fastapi_traffic import MemoryBackend, RateLimiter, RateLimitConfig
|
|
from fastapi_traffic.core.limiter import set_limiter
|
|
|
|
@pytest.fixture
|
|
def fresh_limiter():
|
|
"""Fresh rate limiter for each test."""
|
|
backend = MemoryBackend()
|
|
limiter = RateLimiter(backend)
|
|
set_limiter(limiter)
|
|
return limiter
|
|
|
|
@pytest.fixture
|
|
def rate_limit_config():
|
|
"""Standard rate limit config for tests."""
|
|
return RateLimitConfig(
|
|
limit=10,
|
|
window_size=60,
|
|
)
|
|
|
|
@pytest.fixture
|
|
def mock_request():
|
|
"""Create a mock request."""
|
|
def _create(ip="127.0.0.1", path="/test"):
|
|
request = MagicMock()
|
|
request.client.host = ip
|
|
request.url.path = path
|
|
request.method = "GET"
|
|
request.headers = {}
|
|
return request
|
|
return _create
|
|
|
|
Avoiding Flaky Tests
|
|
--------------------
|
|
|
|
Rate limiting tests can be flaky due to timing. Tips:
|
|
|
|
1. **Use short windows for tests:**
|
|
|
|
.. code-block:: python
|
|
|
|
@rate_limit(10, 1) # 10 per second, not 10 per minute
|
|
|
|
2. **Mock time instead of sleeping:**
|
|
|
|
.. code-block:: python
|
|
|
|
with patch('time.time', return_value=future_time):
|
|
# Test window reset
|
|
|
|
3. **Reset state between tests:**
|
|
|
|
.. code-block:: python
|
|
|
|
@pytest.fixture(autouse=True)
|
|
async def reset_limiter():
|
|
yield
|
|
limiter = get_limiter()
|
|
await limiter.backend.clear()
|
|
|
|
4. **Use unique keys per test:**
|
|
|
|
.. code-block:: python
|
|
|
|
def test_something(mock_request):
|
|
request = mock_request(ip=f"test-{uuid.uuid4()}")
|