Storage Backends ================ FastAPI Traffic needs somewhere to store rate limit state — how many requests each client has made, when their window resets, and so on. That's what backends are for. You have three options, each suited to different deployment scenarios. Choosing a Backend ------------------ Here's the quick guide: .. list-table:: :header-rows: 1 :widths: 20 30 50 * - Backend - Use When - Limitations * - **Memory** - Development, single-process apps - Lost on restart, doesn't share across processes * - **SQLite** - Single-node production - Doesn't share across machines * - **Redis** - Distributed systems, multiple nodes - Requires Redis infrastructure Memory Backend -------------- The default backend. It stores everything in memory using a dictionary with LRU eviction and automatic TTL cleanup. .. code-block:: python from fastapi_traffic import MemoryBackend, RateLimiter from fastapi_traffic.core.limiter import set_limiter # This is what happens by default, but you can configure it: backend = MemoryBackend( max_size=10000, # Maximum number of keys to store cleanup_interval=60, # How often to clean expired entries (seconds) ) limiter = RateLimiter(backend) set_limiter(limiter) **When to use it:** - Local development - Single-process applications - Testing and CI/CD pipelines - When you don't need persistence **Limitations:** - State is lost when the process restarts - Doesn't work with multiple workers (each worker has its own memory) - Not suitable for ``gunicorn`` with multiple workers or Kubernetes pods **Memory management:** The backend automatically evicts old entries when it hits ``max_size``. It uses LRU (Least Recently Used) eviction, so inactive clients get cleaned up first. SQLite Backend -------------- For single-node production deployments where you need persistence. Rate limits survive restarts and work across multiple processes on the same machine. .. code-block:: python from fastapi_traffic import SQLiteBackend, RateLimiter from fastapi_traffic.core.limiter import set_limiter backend = SQLiteBackend( "rate_limits.db", # Database file path cleanup_interval=300, # Clean expired entries every 5 minutes ) limiter = RateLimiter(backend) set_limiter(limiter) @app.on_event("startup") async def startup(): await limiter.initialize() @app.on_event("shutdown") async def shutdown(): await limiter.close() **When to use it:** - Single-server deployments - When you need rate limits to survive restarts - Multiple workers on the same machine (gunicorn, uvicorn with workers) - When Redis is overkill for your use case **Performance notes:** - Uses WAL (Write-Ahead Logging) mode for better concurrent performance - Connection pooling is handled automatically - Writes are batched where possible **File location:** Put the database file somewhere persistent. For Docker deployments, mount a volume: .. code-block:: yaml # docker-compose.yml services: api: volumes: - ./data:/app/data environment: - RATE_LIMIT_DB=/app/data/rate_limits.db Redis Backend ------------- The go-to choice for distributed systems. All your application instances share the same rate limit state. .. code-block:: python from fastapi_traffic import RateLimiter from fastapi_traffic.backends.redis import RedisBackend from fastapi_traffic.core.limiter import set_limiter @app.on_event("startup") async def startup(): backend = await RedisBackend.from_url( "redis://localhost:6379/0", key_prefix="myapp:ratelimit", # Optional prefix for all keys ) limiter = RateLimiter(backend) set_limiter(limiter) await limiter.initialize() @app.on_event("shutdown") async def shutdown(): await limiter.close() **When to use it:** - Multiple application instances (Kubernetes, load-balanced servers) - When you need rate limits shared across your entire infrastructure - High-availability requirements **Connection options:** .. code-block:: python # Simple connection backend = await RedisBackend.from_url("redis://localhost:6379/0") # With authentication backend = await RedisBackend.from_url("redis://:password@localhost:6379/0") # Redis Sentinel for HA backend = await RedisBackend.from_url( "redis://sentinel1:26379/0", sentinel_master="mymaster", ) # Redis Cluster backend = await RedisBackend.from_url("redis://node1:6379,node2:6379,node3:6379/0") **Atomic operations:** The Redis backend uses Lua scripts to ensure atomic operations. This means rate limit checks are accurate even under high concurrency — no race conditions. **Key expiration:** Keys automatically expire based on the rate limit window. You don't need to worry about Redis filling up with stale data. Switching Backends ------------------ You can switch backends without changing your rate limiting code. Just configure a different backend at startup: .. code-block:: python import os from fastapi_traffic import RateLimiter, MemoryBackend, SQLiteBackend from fastapi_traffic.core.limiter import set_limiter def get_backend(): """Choose backend based on environment.""" env = os.getenv("ENVIRONMENT", "development") if env == "production": redis_url = os.getenv("REDIS_URL") if redis_url: from fastapi_traffic.backends.redis import RedisBackend return RedisBackend.from_url(redis_url) return SQLiteBackend("/app/data/rate_limits.db") return MemoryBackend() @app.on_event("startup") async def startup(): backend = await get_backend() limiter = RateLimiter(backend) set_limiter(limiter) await limiter.initialize() Custom Backends --------------- Need something different? Maybe you want to use PostgreSQL, DynamoDB, or some other storage system. You can implement your own backend: .. code-block:: python from fastapi_traffic.backends.base import Backend from typing import Any class MyCustomBackend(Backend): async def get(self, key: str) -> dict[str, Any] | None: """Retrieve state for a key.""" # Your implementation here pass async def set(self, key: str, value: dict[str, Any], *, ttl: float) -> None: """Store state with TTL.""" pass async def delete(self, key: str) -> None: """Delete a key.""" pass async def exists(self, key: str) -> bool: """Check if key exists.""" pass async def increment(self, key: str, amount: int = 1) -> int: """Atomically increment a counter.""" pass async def clear(self) -> None: """Clear all data.""" pass async def close(self) -> None: """Clean up resources.""" pass The key methods are ``get``, ``set``, and ``delete``. The state is stored as a dictionary, and the backend is responsible for serialization. Backend Comparison ------------------ .. list-table:: :header-rows: 1 * - Feature - Memory - SQLite - Redis * - Persistence - ❌ - ✅ - ✅ * - Multi-process - ❌ - ✅ - ✅ * - Multi-node - ❌ - ❌ - ✅ * - Setup complexity - None - Low - Medium * - Latency - ~0.01ms - ~0.1ms - ~1ms * - Dependencies - None - None - redis package Best Practices -------------- 1. **Start with Memory, upgrade when needed.** Don't over-engineer. Memory is fine for development and many production scenarios. 2. **Use Redis for distributed systems.** If you have multiple application instances, Redis is the only option that works correctly. 3. **Handle backend errors gracefully.** Set ``skip_on_error=True`` if you'd rather allow requests through than fail when the backend is down: .. code-block:: python @rate_limit(100, 60, skip_on_error=True) async def endpoint(request: Request): return {"status": "ok"} 4. **Monitor your backend.** Keep an eye on memory usage (Memory backend), disk space (SQLite), or Redis memory and connections.