Build Your Own Python API Test Framework: A Complete Guide with Ready‑to‑Run Code
This article explains why a custom API test framework is needed, presents a modular architecture, and provides step‑by‑step implementations for multi‑environment configuration, colored logging, cookie management, unified API client, database assertions, Redis support, dynamic test data factories, and enterprise notification, all with full runnable code examples.
1. Why We Need a Self‑Developed Framework?
Many teams use Postman + Newman for automation, but these tools have obvious limitations.
2. Overall Architecture Design
We adopt modular, high‑cohesion, low‑coupling design principles, resulting in a clear structure that is easy to extend.
Directory layout:
py-api-test-framework/
├── api_client.py # encapsulate Requests, unified calls
├── conftest.py # pytest hooks, global config
├── requirements.txt # dependency management
├── config/
│ ├── env.ini # environment config (test/prod)
│ ├── users.ini # user credentials
│ ├── db.ini # MySQL config
│ ├── redis.ini # Redis config
│ └── notification.ini # webhook URLs
├── data/
│ ├── cookies.json # multi‑user cookies storage
│ └── test_users.json # generated test data
├── scripts/
│ ├── generate_cookies.py
│ └── send_notification.py
├── tests/
│ ├── test_login.py
│ └── test_order.py
├── utils/
│ ├── config_loader.py
│ ├── logger.py
│ ├── auth_manager.py
│ ├── db.py
│ ├── redis_client.py
│ ├── assertions.py
│ └── notification.py
├── factories/
│ ├── user_factory.py
│ └── order_factory.py
├── reports/
│ ├── allure-results/
│ └── allure-report/
└── README.md # usage documentation3. Core Feature Implementations (Full Runnable Code)
3.1 Multi‑Environment Configuration Management (.ini files)
[test]
base_url = https://api.test.example.com
timeout = 10
[prod]
base_url = https://api.prod.example.com
timeout = 5 [A用户]
username = testuser1
password = 123456
[B用户]
username = testuser2
password = 123456 [mysql]
host = 192.168.1.100
port = 3306
user = test_user
password = test_pass
database = test_db
[redis]
host = 192.168.1.101
port = 6379
password = redis_pass
db = 0 [test]
webhook = https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxxx
[prod]
webhook = https://oapi.dingtalk.com/robot/send?access_token=yyyyyyyy3.2 Config Loader
import configparser
import os
from pathlib import Path
class ConfigLoader:
def __init__(self, env="test"):
self.env = env
self.config = configparser.ConfigParser()
config_dir = Path(__file__).parent.parent / "config"
config_files = [
config_dir / "env.ini",
config_dir / "users.ini",
config_dir / "db.ini",
config_dir / "redis.ini",
config_dir / "notification.ini",
]
existing_files = [f for f in config_files if f.exists()]
if not existing_files:
raise FileNotFoundError(f"Configuration files not found in {config_dir}")
self.config.read(existing_files, encoding='utf-8')
def get(self, section, key, fallback=None):
try:
return self.config.get(section, key, fallback=fallback)
except (configparser.NoSectionError, configparser.NoOptionError):
return fallback
def getint(self, section, key, fallback=0):
try:
return self.config.getint(section, key)
except Exception:
return fallback
def getboolean(self, section, key, fallback=False):
try:
return self.config.getboolean(section, key)
except Exception:
return fallback
def sections(self):
return self.config.sections()
def options(self, section):
return self.config.options(section)
def items(self, section):
return self.config.items(section)3.3 Colored Logging
import logging
from colorlog import ColoredFormatter
def get_logger(name="test"):
logger = logging.getLogger(name)
if logger.handlers:
return logger # avoid duplicate handlers
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = ColoredFormatter(
"%(log_color)s[%(levelname)s] %(asctime)s %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
}
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return loggerInstall with pip install colorlog.
3.4 Multi‑User Cookies Management (Concurrent Support)
import json
import requests
from pathlib import Path
from utils.config_loader import ConfigLoader
from utils.logger import get_logger
logger = get_logger(__name__)
class AuthManager:
def __init__(self, env="test"):
self.config = ConfigLoader(env)
self.env = env
self.base_url = self.config.get("env", "base_url")
self.timeout = self.config.getint("env", "timeout", 10)
self.cookies_file = Path(__file__).parent.parent / "data" / "cookies.json"
self.session = requests.Session()
self._ensure_data_dir()
def _ensure_data_dir(self):
data_dir = self.cookies_file.parent
data_dir.mkdir(exist_ok=True)
if not self.cookies_file.exists():
self.cookies_file.write_text("{}")
def _load_cookies(self):
try:
content = self.cookies_file.read_text(encoding='utf-8')
return json.loads(content)
except Exception as e:
logger.warning(f"Failed to read cookies: {e}")
return {}
def _save_cookies(self, all_cookies):
try:
self.cookies_file.write_text(
json.dumps(all_cookies, ensure_ascii=False, indent=2),
encoding='utf-8'
)
logger.info(f"✅ Cookies saved to {self.cookies_file}")
except Exception as e:
logger.error(f"❌ Failed to save cookies: {e}")
def login(self, user_key):
"""Login and store cookies for a given user key (e.g., 'A用户')."""
if not self.config.has_section(user_key):
logger.error(f"❌ User {user_key} not configured in users.ini")
return None
user_info = dict(self.config.items(user_key))
username = user_info.get('username')
password = user_info.get('password')
login_url = f"{self.base_url}/login"
payload = {"username": username, "password": password}
try:
resp = self.session.post(login_url, json=payload, timeout=self.timeout)
resp.raise_for_status()
result = resp.json()
if result.get("success"):
cookies = dict(self.session.cookies)
all_cookies = self._load_cookies()
all_cookies[user_key] = cookies
self._save_cookies(all_cookies)
logger.info(f"✅ User {username} logged in successfully")
return cookies
else:
logger.error(f"❌ Login failed: {result.get('message')}")
return None
except Exception as e:
logger.error(f"❌ Request error: {e}")
return None
def get_cookies(self, user_key):
"""Retrieve stored cookies for a user."""
all_cookies = self._load_cookies()
return all_cookies.get(user_key)3.5 Generate Cookies Script
"""Batch generate user cookies"""
from utils.auth_manager import AuthManager
if __name__ == "__main__":
auth = AuthManager(env="test")
for user_key in ["A用户", "B用户"]:
auth.login(user_key)3.6 Unified API Client
import requests
from utils.logger import get_logger
from utils.auth_manager import AuthManager
logger = get_logger(__name__)
class APIClient:
def __init__(self, auth_manager: AuthManager):
self.auth = auth_manager
self.session = requests.Session()
self.base_url = auth_manager.base_url
self.timeout = auth_manager.timeout
def _request(self, method, endpoint, **kwargs):
url = self.base_url + endpoint
if 'timeout' not in kwargs:
kwargs['timeout'] = self.timeout
try:
logger.info(f"{method.upper()} {url}")
resp = self.session.request(method, url, **kwargs)
logger.info(f"Status: {resp.status_code}")
return resp
except Exception as e:
logger.error(f"❌ Request error: {e}")
raise
def get(self, endpoint, **kwargs):
return self._request('get', endpoint, **kwargs)
def post(self, endpoint, json=None, files=None, **kwargs):
return self._request('post', endpoint, json=json, files=files, **kwargs)
def put(self, endpoint, json=None, **kwargs):
return self._request('put', endpoint, json=json, **kwargs)
def delete(self, endpoint, **kwargs):
return self._request('delete', endpoint, **kwargs)
def set_user(self, user_key):
"""Switch the active user by loading stored cookies."""
cookies = self.auth.get_cookies(user_key)
if cookies:
self.session.cookies.update(cookies)
logger.info(f"✅ Switched to user: {user_key}")
else:
logger.warning(f"⚠️ No cookies for user {user_key}; please log in first")3.7 Database Assertions (MySQL + Redis)
import pymysql
import pymysql.cursors
from utils.config_loader import ConfigLoader
from utils.logger import get_logger
logger = get_logger(__name__)
class MySQLClient:
def __init__(self, env="test"):
self.config = ConfigLoader(env)
self.connection = None
self._connect()
def _connect(self):
try:
self.connection = pymysql.connect(
host=self.config.get("mysql", "host"),
port=self.config.getint("mysql", "port"),
user=self.config.get("mysql", "user"),
password=self.config.get("mysql", "password"),
database=self.config.get("mysql", "database"),
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
)
logger.info("✅ MySQL connection successful")
except Exception as e:
logger.error(f"❌ MySQL connection failed: {e}")
raise
def query(self, sql, params=None):
"""Execute a SELECT query and return results."""
try:
with self.connection.cursor() as cursor:
cursor.execute(sql, params)
result = cursor.fetchall()
logger.info(f"SQL query successful: {sql}")
return result
except Exception as e:
logger.error(f"❌ SQL query failed: {e}")
raise
def execute(self, sql, params=None):
"""Execute INSERT/UPDATE/DELETE statements."""
try:
with self.connection.cursor() as cursor:
cursor.execute(sql, params)
self.connection.commit()
logger.info(f"SQL execution successful: {sql}")
except Exception as e:
self.connection.rollback()
logger.error(f"❌ SQL execution failed: {e}")
raise
def close(self):
if self.connection:
self.connection.close()
class RedisClient:
def __init__(self, env="test"):
self.config = ConfigLoader(env)
try:
self.client = redis.StrictRedis(
host=self.config.get("redis", "host"),
port=self.config.getint("redis", "port"),
db=self.config.getint("redis", "db"),
password=self.config.get("redis", "password"),
decode_responses=True,
socket_connect_timeout=5,
)
self.client.ping()
logger.info("✅ Redis connection successful")
except Exception as e:
logger.error(f"❌ Redis connection failed: {e}")
raise
def get(self, key):
try:
value = self.client.get(key)
logger.info(f"Redis GET {key} = {value}")
return value
except Exception as e:
logger.error(f"❌ Redis GET failed: {e}")
return None
def set(self, key, value, ex=None):
try:
self.client.set(key, value, ex=ex)
logger.info(f"Redis SET {key} = {value}")
except Exception as e:
logger.error(f"❌ Redis SET failed: {e}")
def exists(self, key):
return self.client.exists(key)3.8 Dynamic Test Data Generation (factory_boy + Faker)
import factory
from factory import Faker
class UserFactory(factory.DictFactory):
username = Faker('user_name')
email = Faker('email')
full_name = Faker('name')
age = factory.Iterator([18, 25, 30, 35])
city = Faker('city')
phone = Faker('phone_number')
class OrderFactory(factory.DictFactory):
product_name = factory.Iterator(["iPhone", "MacBook", "iPad"])
price = factory.Iterator([999, 1999, 799])
quantity = factory.Iterator([1, 2, 3])
status = "pending"3.9 Generate Test Data Script
"""Generate test data and save to JSON"""
from factories.user_factory import UserFactory
from pathlib import Path
import json
if __name__ == "__main__":
users = [UserFactory() for _ in range(5)]
file_path = Path(__file__).parent.parent / "data" / "test_users.json"
file_path.write_text(
json.dumps(users, ensure_ascii=False, indent=2),
encoding='utf-8'
)
print(f"✅ Generated 5 test users: {file_path}")3.10 Enterprise WeChat / DingTalk Notification (Full Implementation)
import requests
import json
import os
from datetime import datetime
from utils.config_loader import ConfigLoader
from utils.logger import get_logger
logger = get_logger(__name__)
class Notification:
def __init__(self, env="test"):
self.env = env
self.config = ConfigLoader(env)
self.webhook = self.config.get("notification", "webhook")
if not self.webhook:
logger.warning("⚠️ No webhook configured")
def _is_wecom(self):
return "qyapi.weixin.qq.com" in self.webhook
def _is_dingtalk(self):
return "dingtalk.com" in self.webhook
def send_markdown(self, title, content):
if not self.webhook:
return
if self._is_wecom():
data = {"msgtype": "markdown", "markdown": {"content": content}}
elif self._is_dingtalk():
data = {"msgtype": "markdown", "markdown": {"title": title, "text": content}}
else:
logger.warning("❌ Unsupported webhook type")
return
try:
resp = requests.post(self.webhook, json=data, timeout=10)
result = resp.json()
if result.get("errcode") == 0 or result.get("status") == "success":
logger.info("✅ Message sent successfully")
else:
logger.error(f"❌ Message send failed: {result}")
except Exception as e:
logger.error(f"❌ Sending message exception: {e}")
def send_test_report(self, passed, failed, total, duration):
success_rate = (passed / total * 100) if total > 0 else 0
status = "✅ All Passed" if failed == 0 else "❌ Failures Present"
title = f"【Automated Test Report】{status}"
content = f"""
## {title}
- ✅ Passed: {passed}
- ❌ Failed: {failed}
- 📊 Total: {total}
- ⏱️ Duration: {duration:.2f}s
- 📈 Success Rate: {success_rate:.1f}%
> Environment: {self.env}
> Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
self.send_markdown(title, content)3.11 Pytest Hooks for Result Collection
import pytest
from datetime import datetime
from utils.notification import Notification
test_results = {"passed": 0, "failed": 0}
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
outcome = yield
report = outcome.get_result()
if report.when == "call":
if report.passed:
test_results["passed"] += 1
elif report.failed:
test_results["failed"] += 1
def pytest_sessionstart(session):
session.start_time = datetime.now()
def pytest_sessionfinish(session, exitstatus):
session.finish_time = datetime.now()
duration = (session.finish_time - session.start_time).total_seconds()
passed = test_results["passed"]
failed = test_results["failed"]
total = passed + failed
notifier = Notification(env="test")
notifier.send_test_report(passed, failed, total, duration)4. Conclusion
The article provides a comprehensive approach to building an API automation framework; readers are encouraged to try it out and adapt it to their own projects.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
