Backend Development 11 min read

Boost FastAPI Performance: Integrate Redis for Caching, Rate Limiting, and Queues

Learn how to set up Redis with Docker, integrate it into a FastAPI app using a custom caching decorator, and apply Redis for caching API responses, rate limiting, task queues, and distributed locks, complete with practical code examples and deployment tips.

Code Mala Tang
Code Mala Tang
Code Mala Tang
Boost FastAPI Performance: Integrate Redis for Caching, Rate Limiting, and Queues

Redis is known for its ultra‑fast in‑memory data store and pairs well with high‑performance web applications.

Why Use Redis in FastAPI?

FastAPI is a modern Python framework for building APIs efficiently. Combining FastAPI with Redis brings several benefits:

Cache : Reduce database load by caching frequently accessed data.

Session Management : Efficiently store user sessions for authentication.

Rate Limiting : Prevent API abuse.

Pub/Sub : Enable real‑time communication features.

In this article we will implement Redis caching, but first we set up Redis with Docker. If Docker is not installed, visit the Docker website and install it for your OS.

Set Up Redis with Docker

Run the following command to pull the latest Redis image (if not already downloaded) and start Redis in a container:

<code>docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest</code>

Check that the Redis container is running:

<code>docker ps</code>

You should see output similar to:

<code>CONTAINER ID   IMAGE                     COMMAND               CREATED          STATUS          PORTS                     NAMES
abcd1234efgh   redis/redis-stack:latest   "redis-server --load…"   10 seconds ago   Up 10 sec       0.0.0.0:6379->6379/tcp    redis-stack</code>

To interact with Redis via its CLI, run:

<code>docker exec -it redis-stack redis-cli</code>

After execution you will be inside the Redis CLI:

<code>C:\Programming\fastapi\project\fastapi with redis>docker exec -it redis-stack redis-cli
127.0.0.1:6379></code>

Integrate Redis with FastAPI

First we create a simple FastAPI application and integrate Redis.

<code>## decorator.py
from functools import wraps
from hashlib import sha256
import json
import redis

redis_client = redis.StrictRedis(host='localhost', port=6379, decode_responses=True)
app_cache_key = "app_cache"

def cache_decorator(expire=3600):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Generate a unique cache key based on function name and arguments
            key = sha256(json.dumps((func.__name__, args, kwargs), sort_keys=True).encode()).hexdigest()
            cached_data = redis_client.get(f"{key}_{app_cache_key}")
            if cached_data:
                return json.loads(cached_data)
            # Await the async function result
            result = await func(*args, **kwargs)
            # Serialize and cache the result
            redis_client.set(f"{key}_{app_cache_key}", json.dumps(result), ex=expire)
            return result
        return wrapper
    return decorator
</code>

This decorator generates a unique cache key from the function name and arguments, checks Redis for cached data, returns it if present, otherwise computes and caches the result.

Each cache entry can have an expiration time, ensuring automatic cleanup, and can be deleted manually.

The suffix added to the hash key helps locate the exact cache entry when deleting, because reproducing the exact arguments may be difficult.

<code>## main.py
from fastapi import FastAPI, HTTPException
import asyncio
from decorator import cache_decorator, redis_client

app = FastAPI()

@app.post("/get_details")
@cache_decorator(expire=3600)
async def get_details(body: dict):
    await asyncio.sleep(2)
    return {"data": body}

@app.delete("/delete_keys_with_suffix/{suffix}")
async def delete_keys_with_suffix(suffix: str):
    # Use SCAN to find keys ending with the given suffix
    keys_to_delete = []
    cursor = 0
    while True:
        cursor, keys = redis_client.scan(cursor=cursor, match=f"*{suffix}")
        keys_to_delete.extend(keys)
        if cursor == 0:
            break
    if not keys_to_delete:
        raise HTTPException(status_code=404, detail=f"No keys ending with '{suffix}' found")
    # Delete these keys
    deleted_count = redis_client.delete(*keys_to_delete)
    return {"message": f"Deleted {deleted_count} keys ending with '{suffix}'"}
</code>

/get_details demonstrates how caching can improve performance by simulating a time‑consuming operation and storing the result in Redis.

/delete_keys_with_suffix/{suffix} provides a tool to delete all Redis keys that end with a specific suffix.

Application Scenarios

Using Redis’s persistence we can apply it to many scenarios such as caching, rate limiting, task queues, distributed locks, etc.

Cache API Data to Reduce Database Load

<code>@app.get("/user/{user_id}")
def get_user(user_id: int):
    cache_key = f"user:{user_id}"
    cached = redis_client.get(cache_key)
    if cached:
        return {"data": cached, "cached": True}
    # Simulate fetching from database
    user_data = {"id": user_id, "name": f"User{user_id}"}
    redis_client.set(cache_key, str(user_data), ex=300)  # Cache for 5 minutes
    return {"data": user_data, "cached": False}
</code>

Explanation: first check the cache, return if hit, otherwise fetch from DB and cache for 300 seconds.

API Rate Limiting: Control User Access Frequency

<code>@app.middleware("http")
async def rate_limiter(request: Request, call_next):
    ip = request.client.host
    key = f"rate:{ip}"
    count = redis_client.get(key)
    if count and int(count) >= 10:
        raise HTTPException(status_code=429, detail="Too many requests")
    redis_client.incr(key, amount=1)
    redis_client.expire(key, 10)
    return await call_next(request)
</code>

Explanation: each IP gets its own Redis key, incremented on each request, expires after 10 seconds, and exceeds threshold returns 429.

Use Redis for an Asynchronous Task Queue

<code>@app.post("/register")
async def register_user(email: str):
    redis_client.lpush("mail_queue", email)
    return {"msg": "Registration successful, notification email will be sent shortly"}
</code>

Worker example:

<code>def mail_worker():
    while True:
        email = redis_client.rpop("mail_queue")  # Pop from the right (FIFO)
        if email:
            print(f"Sending registration email to {email}...")
            time.sleep(1)
</code>

Distributed Lock to Prevent Duplicate Task Execution

<code>@app.get("/lock")
def do_task():
    lock_key = "lock:task"
    lock_id = str(uuid.uuid4())
    if redis_client.set(lock_key, lock_id, nx=True, ex=5):
        try:
            # Execute task
            return {"status": "Task executed successfully"}
        finally:
            if redis_client.get(lock_key) == lock_id:
                redis_client.delete(lock_key)
    else:
        return {"status": "Task in progress, please try later"}
</code>

Explanation: set(..., nx=True) creates a lock only if it does not exist; ex=5 auto‑expires to avoid deadlocks; uuid ensures the lock is released only by its owner.

Conclusion

Integrating Redis with FastAPI opens possibilities for building high‑performance and scalable applications. Whether for caching, session management, rate limiting, or real‑time updates, Redis is a powerful tool that complements FastAPI’s capabilities.

PythonRedisCachingDistributed Lockrate limitingfastapiTask Queue
Code Mala Tang
Written by

Code Mala Tang

Read source code together, write articles together, and enjoy spicy hot pot together.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.