Build Your Own Python API Test Framework: A Complete Guide with Ready‑to‑Run Code

This article explains why a custom API test framework is needed, presents a modular architecture, and provides step‑by‑step implementations for multi‑environment configuration, colored logging, cookie management, unified API client, database assertions, Redis support, dynamic test data factories, and enterprise notification, all with full runnable code examples.

Test Development Learning Exchange
Test Development Learning Exchange
Test Development Learning Exchange
Build Your Own Python API Test Framework: A Complete Guide with Ready‑to‑Run Code

1. Why We Need a Self‑Developed Framework?

Many teams use Postman + Newman for automation, but these tools have obvious limitations.

2. Overall Architecture Design

We adopt modular, high‑cohesion, low‑coupling design principles, resulting in a clear structure that is easy to extend.

Directory layout:

py-api-test-framework/
├── api_client.py          # encapsulate Requests, unified calls
├── conftest.py           # pytest hooks, global config
├── requirements.txt      # dependency management
├── config/
│   ├── env.ini           # environment config (test/prod)
│   ├── users.ini         # user credentials
│   ├── db.ini            # MySQL config
│   ├── redis.ini         # Redis config
│   └── notification.ini  # webhook URLs
├── data/
│   ├── cookies.json      # multi‑user cookies storage
│   └── test_users.json   # generated test data
├── scripts/
│   ├── generate_cookies.py
│   └── send_notification.py
├── tests/
│   ├── test_login.py
│   └── test_order.py
├── utils/
│   ├── config_loader.py
│   ├── logger.py
│   ├── auth_manager.py
│   ├── db.py
│   ├── redis_client.py
│   ├── assertions.py
│   └── notification.py
├── factories/
│   ├── user_factory.py
│   └── order_factory.py
├── reports/
│   ├── allure-results/
│   └── allure-report/
└── README.md            # usage documentation

3. Core Feature Implementations (Full Runnable Code)

3.1 Multi‑Environment Configuration Management (.ini files)

[test]
base_url = https://api.test.example.com
timeout = 10

[prod]
base_url = https://api.prod.example.com
timeout = 5
[A用户]
username = testuser1
password = 123456

[B用户]
username = testuser2
password = 123456
[mysql]
host = 192.168.1.100
port = 3306
user = test_user
password = test_pass
database = test_db

[redis]
host = 192.168.1.101
port = 6379
password = redis_pass
db = 0
[test]
webhook = https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxxx

[prod]
webhook = https://oapi.dingtalk.com/robot/send?access_token=yyyyyyyy

3.2 Config Loader

import configparser
import os
from pathlib import Path

class ConfigLoader:
    def __init__(self, env="test"):
        self.env = env
        self.config = configparser.ConfigParser()
        config_dir = Path(__file__).parent.parent / "config"
        config_files = [
            config_dir / "env.ini",
            config_dir / "users.ini",
            config_dir / "db.ini",
            config_dir / "redis.ini",
            config_dir / "notification.ini",
        ]
        existing_files = [f for f in config_files if f.exists()]
        if not existing_files:
            raise FileNotFoundError(f"Configuration files not found in {config_dir}")
        self.config.read(existing_files, encoding='utf-8')

    def get(self, section, key, fallback=None):
        try:
            return self.config.get(section, key, fallback=fallback)
        except (configparser.NoSectionError, configparser.NoOptionError):
            return fallback

    def getint(self, section, key, fallback=0):
        try:
            return self.config.getint(section, key)
        except Exception:
            return fallback

    def getboolean(self, section, key, fallback=False):
        try:
            return self.config.getboolean(section, key)
        except Exception:
            return fallback

    def sections(self):
        return self.config.sections()

    def options(self, section):
        return self.config.options(section)

    def items(self, section):
        return self.config.items(section)

3.3 Colored Logging

import logging
from colorlog import ColoredFormatter

def get_logger(name="test"):
    logger = logging.getLogger(name)
    if logger.handlers:
        return logger  # avoid duplicate handlers
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    formatter = ColoredFormatter(
        "%(log_color)s[%(levelname)s] %(asctime)s %(name)s: %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
        log_colors={
            'DEBUG': 'cyan',
            'INFO': 'green',
            'WARNING': 'yellow',
            'ERROR': 'red',
            'CRITICAL': 'red,bg_white',
        }
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger

Install with pip install colorlog.

3.4 Multi‑User Cookies Management (Concurrent Support)

import json
import requests
from pathlib import Path
from utils.config_loader import ConfigLoader
from utils.logger import get_logger

logger = get_logger(__name__)

class AuthManager:
    def __init__(self, env="test"):
        self.config = ConfigLoader(env)
        self.env = env
        self.base_url = self.config.get("env", "base_url")
        self.timeout = self.config.getint("env", "timeout", 10)
        self.cookies_file = Path(__file__).parent.parent / "data" / "cookies.json"
        self.session = requests.Session()
        self._ensure_data_dir()

    def _ensure_data_dir(self):
        data_dir = self.cookies_file.parent
        data_dir.mkdir(exist_ok=True)
        if not self.cookies_file.exists():
            self.cookies_file.write_text("{}")

    def _load_cookies(self):
        try:
            content = self.cookies_file.read_text(encoding='utf-8')
            return json.loads(content)
        except Exception as e:
            logger.warning(f"Failed to read cookies: {e}")
            return {}

    def _save_cookies(self, all_cookies):
        try:
            self.cookies_file.write_text(
                json.dumps(all_cookies, ensure_ascii=False, indent=2),
                encoding='utf-8'
            )
            logger.info(f"✅ Cookies saved to {self.cookies_file}")
        except Exception as e:
            logger.error(f"❌ Failed to save cookies: {e}")

    def login(self, user_key):
        """Login and store cookies for a given user key (e.g., 'A用户')."""
        if not self.config.has_section(user_key):
            logger.error(f"❌ User {user_key} not configured in users.ini")
            return None
        user_info = dict(self.config.items(user_key))
        username = user_info.get('username')
        password = user_info.get('password')
        login_url = f"{self.base_url}/login"
        payload = {"username": username, "password": password}
        try:
            resp = self.session.post(login_url, json=payload, timeout=self.timeout)
            resp.raise_for_status()
            result = resp.json()
            if result.get("success"):
                cookies = dict(self.session.cookies)
                all_cookies = self._load_cookies()
                all_cookies[user_key] = cookies
                self._save_cookies(all_cookies)
                logger.info(f"✅ User {username} logged in successfully")
                return cookies
            else:
                logger.error(f"❌ Login failed: {result.get('message')}")
                return None
        except Exception as e:
            logger.error(f"❌ Request error: {e}")
            return None

    def get_cookies(self, user_key):
        """Retrieve stored cookies for a user."""
        all_cookies = self._load_cookies()
        return all_cookies.get(user_key)

3.5 Generate Cookies Script

"""Batch generate user cookies"""
from utils.auth_manager import AuthManager

if __name__ == "__main__":
    auth = AuthManager(env="test")
    for user_key in ["A用户", "B用户"]:
        auth.login(user_key)

3.6 Unified API Client

import requests
from utils.logger import get_logger
from utils.auth_manager import AuthManager

logger = get_logger(__name__)

class APIClient:
    def __init__(self, auth_manager: AuthManager):
        self.auth = auth_manager
        self.session = requests.Session()
        self.base_url = auth_manager.base_url
        self.timeout = auth_manager.timeout

    def _request(self, method, endpoint, **kwargs):
        url = self.base_url + endpoint
        if 'timeout' not in kwargs:
            kwargs['timeout'] = self.timeout
        try:
            logger.info(f"{method.upper()} {url}")
            resp = self.session.request(method, url, **kwargs)
            logger.info(f"Status: {resp.status_code}")
            return resp
        except Exception as e:
            logger.error(f"❌ Request error: {e}")
            raise

    def get(self, endpoint, **kwargs):
        return self._request('get', endpoint, **kwargs)

    def post(self, endpoint, json=None, files=None, **kwargs):
        return self._request('post', endpoint, json=json, files=files, **kwargs)

    def put(self, endpoint, json=None, **kwargs):
        return self._request('put', endpoint, json=json, **kwargs)

    def delete(self, endpoint, **kwargs):
        return self._request('delete', endpoint, **kwargs)

    def set_user(self, user_key):
        """Switch the active user by loading stored cookies."""
        cookies = self.auth.get_cookies(user_key)
        if cookies:
            self.session.cookies.update(cookies)
            logger.info(f"✅ Switched to user: {user_key}")
        else:
            logger.warning(f"⚠️ No cookies for user {user_key}; please log in first")

3.7 Database Assertions (MySQL + Redis)

import pymysql
import pymysql.cursors
from utils.config_loader import ConfigLoader
from utils.logger import get_logger

logger = get_logger(__name__)

class MySQLClient:
    def __init__(self, env="test"):
        self.config = ConfigLoader(env)
        self.connection = None
        self._connect()

    def _connect(self):
        try:
            self.connection = pymysql.connect(
                host=self.config.get("mysql", "host"),
                port=self.config.getint("mysql", "port"),
                user=self.config.get("mysql", "user"),
                password=self.config.get("mysql", "password"),
                database=self.config.get("mysql", "database"),
                charset="utf8mb4",
                cursorclass=pymysql.cursors.DictCursor,
            )
            logger.info("✅ MySQL connection successful")
        except Exception as e:
            logger.error(f"❌ MySQL connection failed: {e}")
            raise

    def query(self, sql, params=None):
        """Execute a SELECT query and return results."""
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(sql, params)
                result = cursor.fetchall()
                logger.info(f"SQL query successful: {sql}")
                return result
        except Exception as e:
            logger.error(f"❌ SQL query failed: {e}")
            raise

    def execute(self, sql, params=None):
        """Execute INSERT/UPDATE/DELETE statements."""
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(sql, params)
            self.connection.commit()
            logger.info(f"SQL execution successful: {sql}")
        except Exception as e:
            self.connection.rollback()
            logger.error(f"❌ SQL execution failed: {e}")
            raise

    def close(self):
        if self.connection:
            self.connection.close()

class RedisClient:
    def __init__(self, env="test"):
        self.config = ConfigLoader(env)
        try:
            self.client = redis.StrictRedis(
                host=self.config.get("redis", "host"),
                port=self.config.getint("redis", "port"),
                db=self.config.getint("redis", "db"),
                password=self.config.get("redis", "password"),
                decode_responses=True,
                socket_connect_timeout=5,
            )
            self.client.ping()
            logger.info("✅ Redis connection successful")
        except Exception as e:
            logger.error(f"❌ Redis connection failed: {e}")
            raise

    def get(self, key):
        try:
            value = self.client.get(key)
            logger.info(f"Redis GET {key} = {value}")
            return value
        except Exception as e:
            logger.error(f"❌ Redis GET failed: {e}")
            return None

    def set(self, key, value, ex=None):
        try:
            self.client.set(key, value, ex=ex)
            logger.info(f"Redis SET {key} = {value}")
        except Exception as e:
            logger.error(f"❌ Redis SET failed: {e}")

    def exists(self, key):
        return self.client.exists(key)

3.8 Dynamic Test Data Generation (factory_boy + Faker)

import factory
from factory import Faker

class UserFactory(factory.DictFactory):
    username = Faker('user_name')
    email = Faker('email')
    full_name = Faker('name')
    age = factory.Iterator([18, 25, 30, 35])
    city = Faker('city')
    phone = Faker('phone_number')

class OrderFactory(factory.DictFactory):
    product_name = factory.Iterator(["iPhone", "MacBook", "iPad"])
    price = factory.Iterator([999, 1999, 799])
    quantity = factory.Iterator([1, 2, 3])
    status = "pending"

3.9 Generate Test Data Script

"""Generate test data and save to JSON"""
from factories.user_factory import UserFactory
from pathlib import Path
import json

if __name__ == "__main__":
    users = [UserFactory() for _ in range(5)]
    file_path = Path(__file__).parent.parent / "data" / "test_users.json"
    file_path.write_text(
        json.dumps(users, ensure_ascii=False, indent=2),
        encoding='utf-8'
    )
    print(f"✅ Generated 5 test users: {file_path}")

3.10 Enterprise WeChat / DingTalk Notification (Full Implementation)

import requests
import json
import os
from datetime import datetime
from utils.config_loader import ConfigLoader
from utils.logger import get_logger

logger = get_logger(__name__)

class Notification:
    def __init__(self, env="test"):
        self.env = env
        self.config = ConfigLoader(env)
        self.webhook = self.config.get("notification", "webhook")
        if not self.webhook:
            logger.warning("⚠️ No webhook configured")

    def _is_wecom(self):
        return "qyapi.weixin.qq.com" in self.webhook

    def _is_dingtalk(self):
        return "dingtalk.com" in self.webhook

    def send_markdown(self, title, content):
        if not self.webhook:
            return
        if self._is_wecom():
            data = {"msgtype": "markdown", "markdown": {"content": content}}
        elif self._is_dingtalk():
            data = {"msgtype": "markdown", "markdown": {"title": title, "text": content}}
        else:
            logger.warning("❌ Unsupported webhook type")
            return
        try:
            resp = requests.post(self.webhook, json=data, timeout=10)
            result = resp.json()
            if result.get("errcode") == 0 or result.get("status") == "success":
                logger.info("✅ Message sent successfully")
            else:
                logger.error(f"❌ Message send failed: {result}")
        except Exception as e:
            logger.error(f"❌ Sending message exception: {e}")

    def send_test_report(self, passed, failed, total, duration):
        success_rate = (passed / total * 100) if total > 0 else 0
        status = "✅ All Passed" if failed == 0 else "❌ Failures Present"
        title = f"【Automated Test Report】{status}"
        content = f"""
## {title}
- ✅ Passed: {passed}
- ❌ Failed: {failed}
- 📊 Total: {total}
- ⏱️ Duration: {duration:.2f}s
- 📈 Success Rate: {success_rate:.1f}%
> Environment: {self.env}
> Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
        self.send_markdown(title, content)

3.11 Pytest Hooks for Result Collection

import pytest
from datetime import datetime
from utils.notification import Notification

test_results = {"passed": 0, "failed": 0}

@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
    outcome = yield
    report = outcome.get_result()
    if report.when == "call":
        if report.passed:
            test_results["passed"] += 1
        elif report.failed:
            test_results["failed"] += 1

def pytest_sessionstart(session):
    session.start_time = datetime.now()

def pytest_sessionfinish(session, exitstatus):
    session.finish_time = datetime.now()
    duration = (session.finish_time - session.start_time).total_seconds()
    passed = test_results["passed"]
    failed = test_results["failed"]
    total = passed + failed
    notifier = Notification(env="test")
    notifier.send_test_report(passed, failed, total, duration)

4. Conclusion

The article provides a comprehensive approach to building an API automation framework; readers are encouraged to try it out and adapt it to their own projects.

backendPythonAutomationFrameworkAPI testing
Test Development Learning Exchange
Written by

Test Development Learning Exchange

Test Development Learning Exchange

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.