Master Three‑Layer Caching in FastAPI: HTTP, In‑Memory, and Redis
This guide explains the three‑layer caching strategy—HTTP cache headers, in‑memory caching, and Redis—detailing when to use each layer, how to implement them with FastAPI, common pitfalls, performance benchmarks, and best‑practice patterns for production‑grade applications.
Caching may seem simple until you realize there are three layers to consider. Teams often waste time implementing Redis for data that could be cached in memory, or forget HTTP cache headers, missing client‑side optimization opportunities.
This guide covers practical caching strategies: when to use each layer, how to implement them, and real‑world performance comparisons.
Three‑Layer Cache
HTTP cache headers (browser/CDN) — free, no server response needed.
In‑memory cache (application instance) — fast but not shared across instances.
Redis cache (distributed) — cross‑instance persistence and shared state.
Most production apps need all three layers; they complement each other rather than replace one another.
Layer 1: HTTP Cache Headers (the overlooked cache)
This is the cheapest optimization: let browsers and CDNs cache your responses so they never hit your server.
Basic Cache‑Control example
from fastapi import FastAPI, Response
app = FastAPI()
@app.get("/products/{product_id}")
async def get_product(product_id: int, response: Response):
"""Rarely changing product data"""
product = {"id": product_id, "name": "Widget", "price": 99.99}
response.headers["Cache-Control"] = "public, max-age=3600"
return productCommon Cache‑Control values
max-age=3600: cache for 3600 seconds (1 hour) public: any cache (browser, CDN, proxy) may store private: only browser cache, not CDN no-cache: must re‑validate with server before use no-store: never cache the response
Using ETag for smart revalidation
ETag lets the browser reuse cached data without downloading it again. If the data hasn't changed, the server returns 304 Not Modified with no body.
from fastapi import FastAPI, Response, Request
from fastapi.responses import Response as FastAPIResponse
import hashlib, json
app = FastAPI()
@app.get("/users/{user_id}")
async def get_user(user_id: int, request: Request):
"""User data supporting ETag"""
user_data = {"id": user_id, "name": "Alice", "email": "[email protected]"}
data_str = json.dumps(user_data, sort_keys=True)
etag = f'"{hashlib.md5(data_str.encode()).hexdigest()}"'
client_etag = request.headers.get("if-none-match")
if client_etag == etag:
return FastAPIResponse(status_code=304, headers={"ETag": etag})
return FastAPIResponse(content=json.dumps(user_data),
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "public, max-age=300"})How it works:
Server sends an ETag hash in the response.
Browser caches the response.
Subsequent request includes If-None-Match: <etag>.
Server compares ETags; if identical, returns 304.
Browser uses the cached copy.
Typical HTTP‑cache scenarios
Public data (product catalogs, blog posts)
Data that changes infrequently
Static or semi‑static content
Any GET endpoint that returns the same data for a period
Layer 2: In‑Memory Cache
Suitable for single‑instance apps or expensive computations that can be lost on restart.
Using functools.lru_cache
from functools import lru_cache
from fastapi import FastAPI
import time
app = FastAPI()
@lru_cache(maxsize=128)
def get_user_permissions(user_id: int) -> list[str]:
"""Automatically cached permission calculation"""
time.sleep(0.5) # simulate expensive operation
return ["read", "write", "delete"]
@app.get("/users/{user_id}/permissions")
def get_permissions(user_id: int):
"""Endpoint that uses the cached function"""
permissions = get_user_permissions(user_id)
return {"user_id": user_id, "permissions": permissions}
@app.post("/users/{user_id}/permissions/clear")
def clear_permissions_cache(user_id: int):
"""Clear cache for a specific user"""
get_user_permissions.cache_clear()
return {"message": "Cache cleared"}Note: lru_cache works only with regular functions, not async def. Keep cached functions synchronous.
TTL cache with cachetools
from fastapi import FastAPI
from cachetools import TTLCache
import time, json
app = FastAPI()
cache = TTLCache(maxsize=1000, ttl=300)
def expensive_computation(user_id: int) -> dict:
"""Simulate a costly operation"""
time.sleep(1)
return {"user_id": user_id, "result": "expensive data"}
@app.get("/data/{user_id}")
async def get_data(user_id: int):
"""Endpoint with in‑memory TTL cache"""
cache_key = f"user_data_{user_id}"
if cache_key in cache:
return {"cached": True, "data": cache[cache_key]}
result = expensive_computation(user_id)
cache[cache_key] = result
return {"cached": False, "data": result}Performance: cache hit ≈ 0.5 ms, full computation ≈ 1000 ms.
When to use in‑memory cache
Single‑instance deployments
Configuration data, permissions, lookup tables
Results of expensive calculations
Data that can be lost on restart
Total cache size under 100 MB
Limitations
Not shared across application instances
Lost on restart
Consumes application memory
Layer 3: Redis Cache
Use Redis when you need cross‑instance shared cache or persistence after restarts.
from fastapi import FastAPI
from contextlib import asynccontextmanager
import redis.asyncio as redis
import json, hashlib
redis_client = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global redis_client
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
try:
await redis_client.ping()
print("✓ Connected to Redis")
except redis.ConnectionError:
print("✗ Redis connection failed")
redis_client = None
yield
if redis_client:
await redis_client.aclose()
app = FastAPI(lifespan=lifespan)
@app.get("/posts/{post_id}")
async def get_post(post_id: int):
"""Post data stored in Redis"""
cache_key = f"post:{post_id}"
if redis_client:
try:
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached)
except redis.RedisError as e:
print(f"Redis error: {e}")
post = {"id": post_id, "title": "Post Title", "content": "Post content here..."}
if redis_client:
try:
await redis_client.setex(cache_key, 3600, json.dumps(post))
except redis.RedisError as e:
print(f"Cache write failed: {e}")
return postReusable cache helper
from typing import Callable, Any
import json
async def cached(key: str, ttl: int, fetch_func: Callable[[], Any]) -> Any:
"""Generic cache helper with fallback"""
if redis_client:
try:
cached = await redis_client.get(key)
if cached:
return json.loads(cached)
except redis.RedisError:
pass
data = await fetch_func() if callable(fetch_func) else fetch_func
if redis_client:
try:
await redis_client.setex(key, ttl, json.dumps(data))
except redis.RedisError:
pass
return data
@app.get("/users/{user_id}")
async def get_user(user_id: int):
"""Endpoint using the helper"""
async def fetch_user():
return {"id": user_id, "name": "Alice"}
user = await cached(key=f"user:{user_id}", ttl=1800, fetch_func=fetch_user)
return userWarning: redis_client.flushdb() clears the entire Redis database; use only in development.
When to use Redis
Multi‑instance deployments
Cross‑service shared state
Cache that survives restarts
Session data
Rate‑limit data
Cache larger than any single instance’s memory
Combined Production Strategy
Use all three layers together:
from fastapi import FastAPI, Response, Request
from fastapi.responses import Response as FastAPIResponse
from contextlib import asynccontextmanager
import redis.asyncio as redis, json, hashlib
from functools import lru_cache
redis_client = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global redis_client
redis_client = redis.Redis(host="localhost", decode_responses=True)
try:
await redis_client.ping()
except redis.ConnectionError:
redis_client = None
yield
if redis_client:
await redis_client.aclose()
app = FastAPI(lifespan=lifespan)
@lru_cache(maxsize=1)
def get_app_config() -> dict:
"""In‑memory cached app config"""
return {"feature_flags": {"new_ui": True}, "api_version": "1.0"}
async def fetch_product_from_db(product_id: int) -> dict:
"""Simulated DB fetch"""
return {"id": product_id, "name": f"Product {product_id}", "price": 99.99}
@app.get("/products/{product_id}")
async def get_product(product_id: int, request: Request):
"""Product endpoint with three‑layer cache"""
redis_key = f"product:{product_id}"
data = None
if redis_client:
try:
cached = await redis_client.get(redis_key)
if cached:
data = json.loads(cached)
except redis.RedisError:
pass
if not data:
data = await fetch_product_from_db(product_id)
if redis_client:
try:
await redis_client.setex(redis_key, 3600, json.dumps(data))
except redis.RedisError:
pass
data_str = json.dumps(data, sort_keys=True)
etag = f'"{hashlib.md5(data_str.encode()).hexdigest()}"'
if request.headers.get("if-none-match") == etag:
return FastAPIResponse(status_code=304, headers={"ETag": etag})
return FastAPIResponse(content=data_str,
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "public, max-age=1800"})
@app.get("/config")
async def get_config(response: Response):
"""Config cached only in memory"""
config = get_app_config()
response.headers["Cache-Control"] = "public, max-age=86400"
return configWhat not to cache
Real‑time data (stock prices, sensor feeds)
User‑specific sensitive data unless you have a careful expiry policy
Large responses that can be optimized elsewhere
Common Mistakes
Error 1: Redis keys without TTL
await redis_client.set("user_profile", user_data) # no TTL
await redis_client.setex("user_profile", 3600, user_data) # correctError 2: Forgetting to invalidate cache
@app.put("/users/{user_id}")
async def update_user(user_id: int, name: str):
await db.update_user(user_id, name)
# missing cache invalidation
return {"updated": True}
@app.put("/users/{user_id}")
async def update_user(user_id: int, name: str):
await db.update_user(user_id, name)
await redis_client.delete(f"user:{user_id}")
return {"updated": True}Error 3: Using in‑memory cache in a multi‑instance setup
cache = {}
await redis_client.get(key) # mixing approaches leads to inconsistencyError 4: No error handling for Redis
cached = await redis_client.get(key)
try:
cached = await redis_client.get(key)
except redis.RedisError:
cached = NoneError 5: Cache stampede
When a cache expires, many requests hit the database simultaneously.
import asyncio
async def get_with_lock(key: str, ttl: int, fetch_func):
"""Lock to prevent cache stampede"""
cached = await redis_client.get(key)
if cached:
return json.loads(cached)
lock_key = f"{key}:lock"
lock_acquired = await redis_client.set(lock_key, "1", ex=10, nx=True)
if lock_acquired:
try:
data = await fetch_func()
await redis_client.setex(key, ttl, json.dumps(data))
return data
finally:
await redis_client.delete(lock_key)
else:
await asyncio.sleep(0.1)
return await get_with_lock(key, ttl, fetch_func)Caching is powerful but adds complexity. Start with the simplest layer, solve your problem, and add more layers as needed.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Mala Tang
Read source code together, write articles together, and enjoy spicy hot pot together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
