style: clean up unused parameters and imports in examples
This commit is contained in:
@@ -13,8 +13,8 @@ from fastapi.responses import JSONResponse
|
||||
from fastapi_traffic import (
|
||||
Algorithm,
|
||||
MemoryBackend,
|
||||
RateLimitExceeded,
|
||||
RateLimiter,
|
||||
RateLimitExceeded,
|
||||
rate_limit,
|
||||
)
|
||||
from fastapi_traffic.core.decorator import RateLimitDependency
|
||||
@@ -25,7 +25,7 @@ limiter = RateLimiter(backend)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
async def lifespan(_: FastAPI):
|
||||
await limiter.initialize()
|
||||
set_limiter(limiter)
|
||||
yield
|
||||
@@ -36,7 +36,7 @@ app = FastAPI(title="Advanced Patterns", lifespan=lifespan)
|
||||
|
||||
|
||||
@app.exception_handler(RateLimitExceeded)
|
||||
async def rate_limit_handler(request: Request, exc: RateLimitExceeded) -> JSONResponse:
|
||||
async def rate_limit_handler(_: Request, exc: RateLimitExceeded) -> JSONResponse:
|
||||
return JSONResponse(
|
||||
status_code=429,
|
||||
content={"error": "rate_limit_exceeded", "retry_after": exc.retry_after},
|
||||
@@ -49,30 +49,31 @@ async def rate_limit_handler(request: Request, exc: RateLimitExceeded) -> JSONRe
|
||||
# Different operations consume different amounts of quota
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@app.get("/api/list")
|
||||
@rate_limit(limit=100, window_size=60, cost=1)
|
||||
async def list_items(request: Request) -> dict[str, Any]:
|
||||
async def list_items(_: Request) -> dict[str, Any]:
|
||||
"""Cheap operation - costs 1 token."""
|
||||
return {"items": ["a", "b", "c"], "cost": 1}
|
||||
|
||||
|
||||
@app.get("/api/details/{item_id}")
|
||||
@rate_limit(limit=100, window_size=60, cost=5)
|
||||
async def get_details(request: Request, item_id: str) -> dict[str, Any]:
|
||||
async def get_details(_: Request, item_id: str) -> dict[str, Any]:
|
||||
"""Medium operation - costs 5 tokens."""
|
||||
return {"item_id": item_id, "details": "...", "cost": 5}
|
||||
|
||||
|
||||
@app.post("/api/generate")
|
||||
@rate_limit(limit=100, window_size=60, cost=20)
|
||||
async def generate_content(request: Request) -> dict[str, Any]:
|
||||
async def generate_content(_: Request) -> dict[str, Any]:
|
||||
"""Expensive operation - costs 20 tokens."""
|
||||
return {"generated": "AI-generated content...", "cost": 20}
|
||||
|
||||
|
||||
@app.post("/api/bulk-export")
|
||||
@rate_limit(limit=100, window_size=60, cost=50)
|
||||
async def bulk_export(request: Request) -> dict[str, Any]:
|
||||
async def bulk_export(_: Request) -> dict[str, Any]:
|
||||
"""Very expensive operation - costs 50 tokens."""
|
||||
return {"export_url": "https://...", "cost": 50}
|
||||
|
||||
@@ -82,6 +83,7 @@ async def bulk_export(request: Request) -> dict[str, Any]:
|
||||
# Gradually reduce limits instead of hard blocking
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def get_request_priority(request: Request) -> int:
|
||||
"""Determine request priority (higher = more important)."""
|
||||
# Premium users get higher priority
|
||||
@@ -122,6 +124,7 @@ async def priority_endpoint(request: Request) -> dict[str, Any]:
|
||||
# Prevent abuse of specific resources
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def resource_key_extractor(request: Request) -> str:
|
||||
"""Rate limit by resource ID + user."""
|
||||
resource_id = request.path_params.get("resource_id", "unknown")
|
||||
@@ -135,7 +138,7 @@ def resource_key_extractor(request: Request) -> str:
|
||||
window_size=60,
|
||||
key_extractor=resource_key_extractor,
|
||||
)
|
||||
async def get_resource(request: Request, resource_id: str) -> dict[str, str]:
|
||||
async def get_resource(_: Request, resource_id: str) -> dict[str, str]:
|
||||
"""Each user can access each resource 10 times per minute."""
|
||||
return {"resource_id": resource_id, "data": "..."}
|
||||
|
||||
@@ -145,6 +148,7 @@ async def get_resource(request: Request, resource_id: str) -> dict[str, str]:
|
||||
# Prevent brute force attacks
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def login_key_extractor(request: Request) -> str:
|
||||
"""Rate limit by IP + username to prevent brute force."""
|
||||
ip = request.client.host if request.client else "unknown"
|
||||
@@ -161,7 +165,7 @@ def login_key_extractor(request: Request) -> str:
|
||||
key_extractor=login_key_extractor,
|
||||
error_message="Too many login attempts. Please try again in 5 minutes.",
|
||||
)
|
||||
async def login(request: Request) -> dict[str, str]:
|
||||
async def login(_: Request) -> dict[str, str]:
|
||||
"""Login endpoint with brute force protection."""
|
||||
return {"message": "Login successful", "token": "..."}
|
||||
|
||||
@@ -179,7 +183,7 @@ def password_reset_key(request: Request) -> str:
|
||||
key_extractor=password_reset_key,
|
||||
error_message="Too many password reset requests. Please try again later.",
|
||||
)
|
||||
async def password_reset(request: Request) -> dict[str, str]:
|
||||
async def password_reset(_: Request) -> dict[str, str]:
|
||||
"""Password reset with strict rate limiting."""
|
||||
return {"message": "Password reset email sent"}
|
||||
|
||||
@@ -197,23 +201,24 @@ webhook_rate_limit = RateLimitDependency(
|
||||
|
||||
|
||||
async def check_webhook_limit(
|
||||
request: Request,
|
||||
_: Request,
|
||||
webhook_url: str,
|
||||
) -> None:
|
||||
"""Check rate limit before sending webhook."""
|
||||
# Create key based on destination domain
|
||||
from urllib.parse import urlparse
|
||||
|
||||
domain = urlparse(webhook_url).netloc
|
||||
_key = f"webhook:{domain}" # Would be used with limiter in production
|
||||
|
||||
# Manually check limit (simplified example)
|
||||
# In production, you'd use the limiter directly
|
||||
_ = _key # Suppress unused variable warning
|
||||
__ = _key # Suppress unused variable warning
|
||||
|
||||
|
||||
@app.post("/api/send-webhook")
|
||||
async def send_webhook(
|
||||
request: Request,
|
||||
_: Request,
|
||||
webhook_url: str = "https://example.com/webhook",
|
||||
rate_info: Any = Depends(webhook_rate_limit),
|
||||
) -> dict[str, Any]:
|
||||
@@ -231,6 +236,7 @@ async def send_webhook(
|
||||
# Detect and limit similar requests (e.g., spam prevention)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def request_fingerprint(request: Request) -> str:
|
||||
"""Create fingerprint based on request characteristics."""
|
||||
ip = request.client.host if request.client else "unknown"
|
||||
@@ -251,7 +257,7 @@ def request_fingerprint(request: Request) -> str:
|
||||
key_extractor=request_fingerprint,
|
||||
error_message="Too many submissions from this device.",
|
||||
)
|
||||
async def submit_form(request: Request) -> dict[str, str]:
|
||||
async def submit_form(_: Request) -> dict[str, str]:
|
||||
"""Form submission with fingerprint-based rate limiting."""
|
||||
return {"message": "Form submitted successfully"}
|
||||
|
||||
@@ -261,13 +267,14 @@ async def submit_form(request: Request) -> dict[str, str]:
|
||||
# Different limits during peak vs off-peak hours
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def is_peak_hours() -> bool:
|
||||
"""Check if current time is during peak hours (9 AM - 6 PM UTC)."""
|
||||
current_hour = time.gmtime().tm_hour
|
||||
return 9 <= current_hour < 18
|
||||
|
||||
|
||||
def peak_aware_exempt(request: Request) -> bool:
|
||||
def peak_aware_exempt(_: Request) -> bool:
|
||||
"""Exempt requests during off-peak hours."""
|
||||
return not is_peak_hours()
|
||||
|
||||
@@ -278,7 +285,7 @@ def peak_aware_exempt(request: Request) -> bool:
|
||||
window_size=60,
|
||||
exempt_when=peak_aware_exempt, # No limit during off-peak
|
||||
)
|
||||
async def peak_aware_endpoint(request: Request) -> dict[str, Any]:
|
||||
async def peak_aware_endpoint(_: Request) -> dict[str, Any]:
|
||||
"""Stricter limits during peak hours."""
|
||||
return {
|
||||
"message": "Success",
|
||||
@@ -297,7 +304,7 @@ per_hour = RateLimitDependency(limit=1000, window_size=3600, key_prefix="hour")
|
||||
|
||||
|
||||
async def cascading_limits(
|
||||
request: Request,
|
||||
_: Request,
|
||||
sec_info: Any = Depends(per_second),
|
||||
min_info: Any = Depends(per_minute),
|
||||
hour_info: Any = Depends(per_hour),
|
||||
@@ -312,7 +319,7 @@ async def cascading_limits(
|
||||
|
||||
@app.get("/api/cascading")
|
||||
async def cascading_endpoint(
|
||||
request: Request,
|
||||
_: Request,
|
||||
limits: dict[str, Any] = Depends(cascading_limits),
|
||||
) -> dict[str, Any]:
|
||||
"""Endpoint with per-second, per-minute, and per-hour limits."""
|
||||
|
||||
Reference in New Issue
Block a user