FastAPI in Production: Auth, Rate Limiting, and Zero‑Downtime with One Codebase
This article walks through a complete production‑ready FastAPI setup, covering secure OIDC/JWKS authentication, Redis‑backed token‑bucket rate limiting, zero‑downtime rolling deployments on Docker/Kubernetes, and observability best practices such as request‑ID middleware and structured JSON logging.
Core Analogy: Running a Restaurant
Authentication is like a door guard that only lets customers with a membership card in, rate limiting is the kitchen’s limited capacity that serves one table at a time, and zero‑downtime deployment is changing the menu while diners keep eating.
1. Authentication – Giving Every Request an "ID Card"
JWT is the mainstream choice, but hard‑coding a secret key is a mistake because key rotation would lock out all users. The correct approach is to use OpenID Connect (OIDC) and fetch public keys dynamically from the JWKS endpoint.
# auth.py
JWKS_CACHE = {"keys": [], "exp": 0}
ISSUER = "https://accounts.example.com"
AUD = "api"
ALGOS = ["RS256"]
JWKS_URL = f"{ISSUER}/.well-known/jwks.json"
async def load_jwks():
"""Load public keys from the auth server with caching"""
global JWKS_CACHE
if time.time() < JWKS_CACHE["exp"]:
return JWKS_CACHE["keys"]
async with AsyncClient(timeout=3) as c:
resp = await c.get(JWKS_URL)
resp.raise_for_status()
data = resp.json()
JWKS_CACHE = {"keys": data["keys"], "exp": time.time() + 600}
return JWKS_CACHE["keys"]
async def current_user(request: Request):
"""Dependency: extract and verify the current user"""
auth = request.headers.get("authorization", "")
if not auth.startswith("Bearer "):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Missing or invalid token format")
token = auth.split(" ", 1)[1]
keys = await load_jwks()
try:
claims = jwt.decode(token, keys, algorithms=ALGOS, audience=AUD, issuer=ISSUER)
return {"sub": claims["sub"], "scopes": claims.get("scope", "").split()}
except Exception as e:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, f"Invalid token: {type(e).__name__}")Use the dependency in routes:
@app.get("/me")
async def me(user=Depends(current_user)):
"""Get current user info"""
return {"user": user["sub"]}Production‑Grade Authentication Checklist
Token lifetime: Access ≤ 15 min, Refresh 7‑30 days
Key management: fetch from JWKS, never hard‑code
Revocation: store JWT IDs (jti) in Redis with TTL equal to token lifetime
Cookie vs Header: use HttpOnly cookie + CSRF token, avoid plain cookies
Logging: record only error type, never the full token
2. Rate Limiting – Installing a "Pressure Valve"
Without rate limiting, a single malicious client can bring the service down, just like a car without brakes.
Why Redis + Token Bucket?
Allows short bursts (burst capacity)
Controls long‑term average rate
Redis guarantees atomic updates in a distributed environment
Analogy: Subway Turnstile
Each passenger consumes one token when swiping
The turnstile refills a fixed number of tokens per second (e.g., 50 tokens/s)
During rush hour tokens are consumed quickly but the bucket still holds tokens, allowing bursts
When tokens run out the turnstile blocks entry and returns “please try later”
# rate_limit.py
import aioredis
from fastapi import Request, HTTPException, status
LUA_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local refill_rate = tonumber(ARGV[3])
local cost = tonumber(ARGV[4])
local data = redis.call("HMGET", key, "tokens", "last_update")
local tokens = tonumber(data[1]) or capacity
local last_update = tonumber(data[2]) or now
local delta = (now - last_update) * refill_rate
tokens = math.min(capacity, tokens + delta)
local allowed = 0
if tokens >= cost then
tokens = tokens - cost
allowed = 1
end
redis.call("HMSET", key, "tokens", tokens, "last_update", now)
redis.call("EXPIRE", key, 3600)
return {allowed, tokens}
"""
class RateLimiter:
def __init__(self, redis: aioredis.Redis, capacity=100, refill_rate=50):
self.redis = redis
self.capacity = capacity
self.refill_rate = refill_rate
self.script = self.redis.register_script(LUA_SCRIPT)
async def check(self, key: str, cost=1) -> bool:
redis_time = await self.redis.time()
now = float(redis_time[0]) + redis_time[1] / 1_000_000
allowed, _ = await self.script(keys=[f"rl:{key}"], args=[now, self.capacity, self.refill_rate, cost])
return bool(int(allowed))
async def get_limiter():
redis = await aioredis.from_url("redis://localhost:6379", decode_responses=True)
return RateLimiter(redis, capacity=100, refill_rate=50)
async def rate_limit(request: Request):
limiter = await get_limiter()
user_id = request.headers.get("x-user-id")
key = f"user:{user_id}" if user_id else f"ip:{request.client.host}"
if not await limiter.check(key):
raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Rate limit exceeded. Please slow down.",
headers={"Retry-After": "30"})Apply the dependency to routes:
@app.get("/search", dependencies=[Depends(rate_limit)])
async def search(q: str):
"""Search endpoint limited to 50 req/s with bursts up to 100"""
return {"results": []}Rate‑Limit Strategy Matrix
Anonymous IP: 10‑30 req/s – prevents crawlers
Authenticated user ID: 50‑200 req/s – adjust per business
API key: 100‑1000 req/s – for partners
Login endpoint (IP): 5 req/min – blocks brute‑force attacks
Payment endpoint (user ID): 2 req/s – low‑frequency sensitive ops
3. Zero‑Downtime Deployment – Keeping the Service Alive
Restart‑based releases work in development but can drop dozens of in‑flight requests in production.
Graceful Shutdown
When a termination signal is received, the server stops accepting new requests, finishes processing the current ones, and then exits.
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncpg, aioredis, asyncio
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifecycle: open/close connection pools"""
app.state.db_pool = await asyncpg.create_pool("postgresql://user:pass@localhost/db", min_size=5, max_size=20)
app.state.redis = await aioredis.from_url("redis://localhost:6379", decode_responses=True)
yield
await app.state.db_pool.close()
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)
@app.get("/health/live")
async def liveness():
"""Liveness probe – is the process running?"""
return {"status": "alive"}
@app.get("/health/ready")
async def readiness():
"""Readiness probe – are dependencies ready?"""
try:
async with app.state.db_pool.acquire() as conn:
await conn.execute("SELECT 1")
await app.state.redis.ping()
return {"status": "ready"}
except Exception as e:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Not ready: {e}")Docker + Kubernetes Configuration
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", \
"--workers", "4", "--graceful-timeout", "30"] # k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastapi-app
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1 # allow one extra pod
maxUnavailable: 0
template:
spec:
containers:
- name: app
image: fastapi:latest
ports:
- containerPort: 8000
livenessProbe:
httpGet:
path: /health/live
port: 8000
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5Database Migration – Expand/Contract Pattern
Directly adding a NOT NULL column locks the table and can cause request timeouts.
# Wrong approach
ALTER TABLE users ADD COLUMN role VARCHAR(50) NOT NULL;
-- All INSERTs fail because the new column cannot be nullCorrect Expand/Contract steps:
# 1. Add the column as nullable
op.add_column('users', sa.Column('role', sa.String(50), nullable=True))
# 2. Deploy new code that writes to both old and new columns
# 3. Backfill data in the background
# 4. Switch reads to the new column
# 5. Alter column to NOT NULL
op.alter_column('users', 'role', nullable=False)4. Production‑Grade Middleware
Security Headers
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
app.add_middleware(HTTPSRedirectMiddleware)
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["example.com"])
@app.middleware("http")
async def add_security_headers(request, call_next):
response = await call_next(request)
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
return responsePrecise CORS Configuration
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.example.com", "https://admin.example.com"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=["Authorization", "Content-Type", "X-Request-ID"],
expose_headers=["X-Request-ID"],
max_age=3600,
)5. Observability – The Lifeline at 2 AM
Request‑ID Propagation
from starlette.middleware.base import BaseHTTPMiddleware
import uuid
class RequestIDMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
request_id = request.headers.get("x-request-id", uuid.uuid4().hex)
request.state.request_id = request_id
response = await call_next(request)
response.headers["x-request-id"] = request_id
return response
app.add_middleware(RequestIDMiddleware)Structured JSON Logging
import logging, json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"request_id": getattr(record, "request_id", "unknown"),
"route": getattr(record, "route", "unknown"),
"duration_ms": getattr(record, "duration_ms", 0),
}
if record.exc_info:
entry["exception"] = self.formatException(record.exc_info)
return json.dumps(entry)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.basicConfig(level=logging.INFO, handlers=[handler])
logger = logging.getLogger(__name__)Real‑World Case Study
A startup runs FastAPI services for three regions of mobile clients with the following production configuration:
Authentication: OIDC + JWKS, access token valid for 12 minutes
Rate limiting: Authenticated users 60 req/s, anonymous IP 10 req/s, token‑bucket capacity double the rate
Deployment: Rolling update, 20 % of pods replaced at a time, 20 s wait for new pods to become ready
Results after applying the guide:
Monthly availability = 99.95 %
P95 latency stabilized at 120‑180 ms
Deployment‑time error rate dropped to zero (previously 5‑10 timeouts per release)
A later schema change caused a 500‑storm in staging, but the Expand/Contract migration prevented any impact on production.
Final Checklist (Copy‑able to README)
JWT validated via JWKS with automatic key rotation
Access token ≤ 15 min, Refresh token ≤ 30 days
Redis stores jti blacklist on logout
Distributed rate limiting using Redis token bucket
Return Retry-After header on 429 responses
Liveness and readiness probes check DB + Redis
Uvicorn started with --graceful-timeout ≥ p99 request latency
Database migrations follow Expand/Contract pattern
Full‑trace request ID propagated via middleware
JSON‑structured logs include duration, status, route
Security headers + strict CORS configuration
Lifespan manages connection‑pool lifecycle
Key Takeaways
Authentication is more than decoding a JWT – use JWKS, short‑lived tokens, and revocation.
Rate limiting is the last defense line – Redis + token bucket is production‑grade.
Zero‑downtime is achievable with graceful shutdown, readiness probes, and phased migrations.
FastAPI makes development fast, but a production‑ready service requires predictable performance, robust security, and observability so that releases no longer depend on prayers and midnight emergency calls.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Data STUDIO
Click to receive the "Python Study Handbook"; reply "benefit" in the chat to get it. Data STUDIO focuses on original data science articles, centered on Python, covering machine learning, data analysis, visualization, MySQL and other practical knowledge and project case studies.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
