Say Goodbye to Flask: 7 FastAPI Extensions That Boost Productivity Tenfold
After struggling with Flask's heavyweight middleware and limited plugin ecosystem, the author switched to FastAPI and demonstrates how seven extensions—FastAPI Users, FastAPI-Mail, FastAPI‑SocketIO, FastAPI‑Limiter, FastAPI‑Cache, FastAPI‑CrudRouter, and FastAPI‑Plugins—dramatically simplify authentication, email, real‑time communication, rate limiting, caching, CRUD generation, and ecosystem integration, turning weeks of debugging into minutes of development.
I spent a sleepless night wrestling with Flask's clunky middleware and half‑baked plugins while building a simple task‑manager API, only to realize that fixing Flask took longer than writing the actual application.
1. FastAPI Users – painless authentication
FastAPI Users provides ready‑made JWT, OAuth2, and social‑login support, eliminating the need to hand‑craft authentication code.
from fastapi import FastAPI, Depends
from fastapi_users import FastAPIUsers, models
from fastapi_users.db import SQLAlchemyUserDatabase
from fastapi_users.authentication import JWTAuthentication
import sqlalchemy as sa
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
from sqlalchemy.orm import sessionmaker, Session
# Create DB model
Base: DeclarativeMeta = declarative_base()
class UserTable(Base, models.BaseUserTable):
name = sa.Column(sa.String, nullable=True)
# FastAPI app
app = FastAPI()
DATABASE_URL = "sqlite:///./test.db"
engine = sa.create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base.metadata.create_all(bind=engine)
class User(models.BaseUser):
name: str = None
class UserCreate(models.BaseUserCreate):
name: str = None
class UserUpdate(models.BaseUserUpdate):
name: str = None
class UserDB(User, models.BaseUserDB):
pass
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_user_db(session: Session = Depends(get_db)):
yield SQLAlchemyUserDatabase(UserDB, session, UserTable)
SECRET = "YOUR_SECRET_KEY"
jwt_authentication = JWTAuthentication(secret=SECRET, lifetime_seconds=3600, tokenUrl="auth/jwt/login")
fastapi_users = FastAPIUsers(
get_user_db,
[jwt_authentication],
User,
UserCreate,
UserUpdate,
UserDB,
)
app.include_router(fastapi_users.get_auth_router(jwt_authentication), prefix="/auth/jwt", tags=["auth"])
app.include_router(fastapi_users.get_register_router(), prefix="/auth", tags=["auth"])
@app.get("/protected-route")
async def protected_route(user: User = Depends(fastapi_users.current_user())):
return {"message": f"Hello {user.email}, you are authenticated!"}Now I can spin up OAuth2 authentication in minutes instead of nights.
2. FastAPI‑Mail – elegant email sending
FastAPI‑Mail abstracts SMTP configuration and supports async background tasks, making password‑reset and notification emails trivial.
from fastapi import FastAPI, BackgroundTasks
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig
from pydantic import EmailStr, BaseModel
from typing import List
app = FastAPI()
conf = ConnectionConfig(
MAIL_USERNAME="[email protected]",
MAIL_PASSWORD="your-app-password",
MAIL_FROM="[email protected]",
MAIL_PORT=587,
MAIL_SERVER="smtp.gmail.com",
MAIL_TLS=True,
MAIL_SSL=False,
USE_CREDENTIALS=True,
VALIDATE_CERTS=True,
)
fm = FastMail(conf)
class EmailSchema(BaseModel):
email: List[EmailStr]
subject: str = "FastAPI Mail"
body: str
async def send_email_async(email: EmailSchema):
message = MessageSchema(
subject=email.subject,
recipients=email.email,
body=email.body,
subtype="html",
)
await fm.send_message(message)
@app.post("/send-email")
async def send_email(background_tasks: BackgroundTasks, email_data: EmailSchema):
"""Asynchronously send email"""
background_tasks.add_task(send_email_async, email_data)
return {"message": "邮件已加入发送队列"}In production it reliably handles password resets and notifications.
3. FastAPI‑SocketIO – painless real‑time features
FastAPI‑SocketIO integrates WebSocket handling with FastAPI, allowing clean event handling for chat, private messages, and broadcast.
from fastapi import FastAPI
from fastapi_socketio import SocketManager
import asyncio
app = FastAPI()
socket_manager = SocketManager(app=app, mount_location="/ws/")
@socket_manager.on("connect")
async def handle_connect(sid, environ, auth):
print(f"客户端 {sid} 已连接")
await socket_manager.emit("welcome", {"msg": "欢迎加入聊天室"}, to=sid)
@socket_manager.on("message")
async def handle_message(sid, data):
print(f"来自 {sid} 的消息: {data}")
await socket_manager.emit("response", {"from": sid[:8], "msg": data["message"]})
@socket_manager.on("private_message")
async def handle_private_message(sid, data):
target_sid = data.get("target_sid")
message = data.get("message")
if target_sid:
await socket_manager.emit("private", {"from": sid[:8], "msg": message}, to=target_sid)
else:
await socket_manager.emit("error", {"msg": "目标用户未指定"}, to=sid)
@socket_manager.on("disconnect")
async def handle_disconnect(sid):
print(f"客户端 {sid} 已断开连接")
await socket_manager.emit("user_left", {"user": sid[:8]})
@app.get("/online-users")
async def get_online_users():
"""获取在线用户列表"""
return {"online_count": len(socket_manager.get_participants("/"))}
@app.post("/broadcast")
async def broadcast_message(message: str):
"""向所有客户端广播消息"""
await socket_manager.emit("broadcast", {"from": "server", "msg": message, "timestamp": asyncio.get_event_loop().time()})
return {"status": "广播发送成功"}Building a chat panel felt smoother than any Flask solution I tried.
4. FastAPI‑Limiter – professional rate limiting
FastAPI‑Limiter plugs a Redis‑backed limiter into FastAPI, supporting per‑minute limits, premium tiers, IP‑based limits, and custom exception handling.
from fastapi import FastAPI, Depends, Request, HTTPException
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
import aioredis, asyncio
app = FastAPI()
@app.on_event("startup")
async def startup():
redis = await aioredis.create_redis_pool("redis://localhost")
await FastAPILimiter.init(redis)
@app.get("/api/data", dependencies=[Depends(RateLimiter(times=5, seconds=60))])
async def get_data():
return {"message": "您在速率限制内!"}
@app.get("/api/premium-data", dependencies=[Depends(RateLimiter(times=10, seconds=60))])
async def get_premium_data(request: Request):
"""针对付费用户的高限额"""
user_type = request.headers.get("X-User-Type", "free")
if user_type == "premium":
pass # dynamic adjustment could be added here
return {"data": "高级数据内容"}
@app.get("/api/public")
@RateLimiter(times=2, seconds=30, key_func=lambda request: request.client.host)
async def public_api():
return {"message": "公共 API,限制较严格"}
@app.exception_handler(429)
async def rate_limit_exception_handler(request: Request, exc):
return JSONResponse(status_code=429, content={"error": "请求过多", "message": "请稍后再试", "retry_after": 60})
@app.get("/rate-limit-info")
async def rate_limit_info(request: Request):
client_ip = request.client.host
return {"client_ip": client_ip, "rate_limited": False, "remaining": 5}No more midnight alarms from millions of requests.
5. FastAPI‑Cache – effortless acceleration
FastAPI‑Cache provides Redis‑backed response caching with per‑endpoint TTL, conditional caching, and manual cache control.
from fastapi import FastAPI, Query
from fastapi_cache import FastAPICache, JsonCoder
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
import aioredis, asyncio
from datetime import timedelta
app = FastAPI()
@app.on_event("startup")
async def startup():
redis = aioredis.from_url("redis://localhost")
FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
@app.get("/products")
@cache(expire=60)
async def get_products():
"""获取产品列表 - 结果缓存60秒"""
await asyncio.sleep(2)
return {"products": ["手机", "电脑", "平板", "耳机"]}
@app.get("/product/{product_id}")
@cache(expire=30, key_builder=lambda *args, **kwargs: f"product:{kwargs['product_id']}")
async def get_product(product_id: int):
await asyncio.sleep(1)
return {"id": product_id, "name": f"产品{product_id}", "price": 99.99}
@app.get("/search")
@cache(expire=60, unless=lambda response: response.status_code != 200)
async def search_products(q: str = Query(None, min_length=1), page: int = Query(1, ge=1)):
if not q:
return {"error": "请输入搜索关键词"}
await asyncio.sleep(1.5)
return {"query": q, "page": page, "results": [f"{q}结果{i}" for i in range(10)], "total": 100}
@app.get("/manual-cache")
async def manual_cache_demo():
backend = FastAPICache.get_backend()
cache_key = "manual:data"
cached_data = await backend.get(cache_key)
if cached_data:
return {"source": "cache", "data": JsonCoder().decode(cached_data)}
new_data = {"id": 1, "value": "新生成的数据"}
await backend.set(cache_key, JsonCoder().encode(new_data), expire=120)
return {"source": "database", "data": new_data}For high‑load APIs this cache layer is a lifesaver.
6. FastAPI‑CrudRouter – CRUD in five lines
FastAPI‑CrudRouter auto‑generates full CRUD routes from SQLAlchemy models, cutting hours of boilerplate.
from fastapi import FastAPI, Depends
from fastapi_crudrouter import SQLAlchemyCRUDRouter
from pydantic import BaseModel
from typing import Optional, List
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from datetime import datetime
Base = declarative_base()
class ItemModel(Base):
__tablename__ = "items"
id = sa.Column(sa.Integer, primary_key=True, index=True)
name = sa.Column(sa.String, index=True)
description = sa.Column(sa.String, nullable=True)
price = sa.Column(sa.Float)
created_at = sa.Column(sa.DateTime, default=datetime.utcnow)
updated_at = sa.Column(sa.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class ItemBase(BaseModel):
name: str
description: Optional[str] = None
price: float
class ItemCreate(ItemBase):
pass
class ItemUpdate(ItemBase):
name: Optional[str] = None
price: Optional[float] = None
class ItemResponse(ItemBase):
id: int
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
DATABASE_URL = "sqlite:///./crud.db"
engine = sa.create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
app = FastAPI()
item_router = SQLAlchemyCRUDRouter(
schema=ItemResponse,
create_schema=ItemCreate,
update_schema=ItemUpdate,
db_model=ItemModel,
db=get_db,
prefix="items",
tags=["商品管理"],
)
app.include_router(item_router)
@app.get("/items/search/{keyword}")
def search_items(keyword: str, db: Session = Depends(get_db)):
"""自定义搜索端点"""
results = db.query(ItemModel).filter(ItemModel.name.contains(keyword)).all()
return results
@app.get("/items/expensive/{min_price}")
def get_expensive_items(min_price: float, db: Session = Depends(get_db)):
"""获取价格高于指定值的商品"""
items = db.query(ItemModel).filter(ItemModel.price >= min_price).order_by(ItemModel.price.desc()).all()
return items
@app.get("/items/stats")
def get_item_stats(db: Session = Depends(get_db)):
"""获取商品统计信息"""
total_items = db.query(ItemModel).count()
total_value = db.query(sa.func.sum(ItemModel.price)).scalar() or 0
avg_price = db.query(sa.func.avg(ItemModel.price)).scalar() or 0
return {
"total_items": total_items,
"total_value": total_value,
"average_price": round(avg_price, 2),
"most_expensive": db.query(ItemModel).order_by(ItemModel.price.desc()).first(),
}The entire CRUD API was ready before my coffee went cold.
7. FastAPI‑Plugins – the all‑in‑one toolbox
FastAPI‑Plugins bundles Redis, caching, scheduling, and logging utilities, providing a unified lifecycle management interface.
from fastapi import FastAPI, Depends
from fastapi_plugins import (
RedisSettings,
depends_redis,
redis_plugin,
RedisPlugin,
)
from fastapi_plugins.cache import cache_plugin, CacheSettings
import aioredis
from contextlib import asynccontextmanager
from datetime import datetime
class AppSettings(RedisSettings, CacheSettings):
api_name: str = "fastapi-plugins-demo"
redis_url: str = "redis://localhost:6379/0"
cache_ttl: int = 300 # seconds
settings = AppSettings()
@asynccontextmanager
async def lifespan(app: FastAPI):
await redis_plugin.init_app(app, config=settings)
await redis_plugin.init()
await cache_plugin.init_app(app, config=settings)
await cache_plugin.init()
yield
await redis_plugin.terminate()
await cache_plugin.terminate()
app = FastAPI(lifespan=lifespan)
@app.get("/redis-demo")
async def redis_demo(redis: aioredis.Redis = Depends(depends_redis)):
"""演示基本的 Redis 操作"""
await redis.set("my_key", "Hello from FastAPI!")
value = await redis.get("my_key")
await redis.setex("temp_key", 60, "临时数据")
await redis.incr("counter")
counter = await redis.get("counter")
await redis.lpush("my_list", "item1", "item2", "item3")
list_items = await redis.lrange("my_list", 0, -1)
await redis.hset("user:1000", mapping={"name": "Alice", "age": "30"})
user_data = await redis.hgetall("user:1000")
return {
"simple_value": value.decode() if value else None,
"counter": counter.decode() if counter else None,
"list_items": [item.decode() for item in list_items],
"user_data": {k.decode(): v.decode() for k, v in user_data.items()},
}
@app.get("/cached-data")
@cache_plugin.cached(ttl=60)
async def get_cached_data():
"""自动缓存响应的端点"""
import asyncio
await asyncio.sleep(2)
return {"data": "这是缓存的数据", "timestamp": datetime.utcnow().isoformat()}
@app.get("/publish/{channel}")
async def publish_message(channel: str, message: str, redis: aioredis.Redis = Depends(depends_redis)):
"""向 Redis 频道发布消息"""
subscribers = await redis.publish(channel, message)
return {"channel": channel, "message": message, "subscribers": subscribers}
@app.get("/redis-info")
async def redis_info(redis: aioredis.Redis = Depends(depends_redis)):
"""获取 Redis 服务器信息"""
info = await redis.info()
keys = await redis.keys("*")
return {
"redis_version": info.get("redis_version"),
"connected_clients": info.get("connected_clients"),
"used_memory_human": info.get("used_memory_human"),
"total_keys": len(keys),
"sample_keys": [key.decode() for key in keys[:10]] if keys else [],
}
# Optional scheduler extension (requires fastapi‑plugins‑scheduler)
try:
from fastapi_plugins.scheduler import scheduler_plugin, SchedulerSettings
class ExtendedSettings(AppSettings, SchedulerSettings):
pass
settings = ExtendedSettings()
@scheduler_plugin.task("interval", seconds=30)
async def scheduled_task():
"""每30秒运行一次的定时任务"""
print(f"定时任务执行于 {datetime.utcnow()}")
@app.get("/scheduler/jobs")
async def get_scheduled_jobs():
"""获取所有计划任务"""
scheduler = scheduler_plugin.get_scheduler()
jobs = scheduler.get_jobs()
return {"jobs": [str(job) for job in jobs]}
except ImportError:
print("注意:未安装调度器扩展,相关功能不可用")FastAPI‑Plugins turns FastAPI into a one‑stop solution, far beyond a simple Flask replacement.
Conclusion
Reflecting on the journey, Flask felt like driving an old sedan, while FastAPI with these extensions feels like piloting a Tesla—same concept, dramatically different experience. The takeaway: mastering FastAPI’s ecosystem is essential for modern Python API development.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Data STUDIO
Click to receive the "Python Study Handbook"; reply "benefit" in the chat to get it. Data STUDIO focuses on original data science articles, centered on Python, covering machine learning, data analysis, visualization, MySQL and other practical knowledge and project case studies.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
