Building a Robust Asynchronous Processing SDK with Spring, Kafka and MySQL

This article introduces a non‑intrusive asynchronous processing SDK that leverages Spring transaction events, Kafka queues, and MySQL tables to ensure reliable execution, eventual consistency, and automatic retry without impacting the main business flow.

Top Architect
Top Architect
Top Architect
Building a Robust Asynchronous Processing SDK with Spring, Kafka and MySQL

Preface

Good system design must follow the open‑closed principle; as business iterates, core code changes increase error probability. Most new features extend existing functionality, so we often use asynchronous thread pools, which introduce uncertainty.

Purpose

The goal is to execute methods reliably without affecting the main flow and to guarantee data persistence, achieving eventual consistency.

Advantages

Non‑intrusive design with independent database, scheduled tasks, message queue, and manual UI (single sign‑on).

Uses Spring transaction event mechanism; even if async strategy parsing fails, business logic remains unaffected.

If a method runs inside a transaction, the event is processed after commit or rollback.

Fallback execution ensures processing continues unless the database, queue, or method itself fails.

Principle

After container initialization, all beans are scanned; methods annotated with @AsyncExec are cached. At runtime, an AOP aspect publishes an event, and a transaction event listener handles the asynchronous execution strategy.

@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)
fallbackExecution=true

– process the event even when no transaction is active. TransactionPhase.AFTER_COMPLETION – handle the event after transaction commit or rollback.

Components

Kafka message queue

XXL‑Job scheduled tasks

MySQL database

Spring AOP

Design Patterns

Strategy

Template Method

Dynamic Proxy

Database Scripts

CREATE TABLE `async_scene` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `application_name` varchar(100) NOT NULL DEFAULT '' COMMENT '应用名称',
  `method_sign` varchar(50) NOT NULL DEFAULT '' COMMENT '方法签名',
  `scene_name` varchar(200) NOT NULL DEFAULT '' COMMENT '业务场景描述',
  `async_type` varchar(50) NOT NULL DEFAULT '' COMMENT '异步策略类型',
  `queue_name` varchar(200) NOT NULL DEFAULT '' COMMENT '队列名称',
  `theme_value` varchar(100) NOT NULL DEFAULT '' COMMENT '消费主题',
  `exec_count` int NOT NULL DEFAULT '0' COMMENT '失败重试次数',
  `exec_deleted` int NOT NULL DEFAULT '0' COMMENT '执行后是否删除',
  `async_version` varchar(50) NOT NULL DEFAULT '' COMMENT '组件版本号',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  `cdc_crt_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '记录新增时间',
  `cdc_upd_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录修改时间',
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE KEY `uk_application_sign` (`application_name`,`method_sign`) USING BTREE,
  KEY `idx_cdc_upd_time` (`cdc_upd_time`)
) ENGINE=InnoDB COMMENT='异步场景表';

CREATE TABLE `async_req` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `application_name` varchar(100) NOT NULL DEFAULT '' COMMENT '应用名称',
  `sign` varchar(50) NOT NULL DEFAULT '' COMMENT '方法签名',
  `class_name` varchar(200) NOT NULL DEFAULT '' COMMENT '全路径类名称',
  `method_name` varchar(100) NOT NULL DEFAULT '' COMMENT '方法名称',
  `async_type` varchar(50) NOT NULL DEFAULT '' COMMENT '异步策略类型',
  `exec_status` tinyint NOT NULL DEFAULT '0' COMMENT '执行状态 0:初始化 1:执行失败 2:执行成功',
  `exec_count` int NOT NULL DEFAULT '0' COMMENT '执行次数',
  `param_json` longtext COMMENT '请求参数',
  `remark` varchar(200) NOT NULL DEFAULT '' COMMENT '业务描述',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE,
  KEY `idx_applocation_name` (`application_name`) USING BTREE,
  KEY `idx_exec_status` (`exec_status`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='异步处理请求';

CREATE TABLE `async_log` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `async_id` bigint NOT NULL DEFAULT '0' COMMENT '异步请求ID',
  `error_data` longtext COMMENT '执行错误信息',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`) USING BTREE,
  KEY `idx_async_id` (`async_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='异步处理日志';

Async Strategy

Async Strategy Diagram
Async Strategy Diagram

Security Level

Security Level
Security Level

Execution Status

Execution Status
Execution Status

Flowchart

Flowchart
Flowchart

Apollo Configuration

# Switch (default off)
async.enabled=true

# Application name
spring.application.name=xxx

# Data source (Druid)
async.datasource.driver-class-name=com.mysql.jdbc.Driver
async.datasource.url=jdbc:mysql://127.0.0.1:3306/fc_async?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true
async.datasource.username=user
async.datasource.password=xxxx
async.datasource.filters=config
async.datasource.connectionProperties=config.decrypt=true;config.decrypt.key=yyy

# Static resources
spring.resources.add-mappings=true
spring.resources.static-locations=classpath:/static/

# Thread pool settings (defaults)
async.executor.thread.corePoolSize=10
async.executor.thread.maxPoolSize=50
async.executor.thread.queueCapacity=10000
async.executor.thread.keepAliveSeconds=600

# Delete record after success (default true)
async.exec.deleted=true

# Topic prefix (default application name)
async.topic=${spring.application.name}_async_queue

# Retry settings
async.exec.count=5
async.retry.limit=100
async.comp.limit=100

# Login interception (default false)
async.login=false

Usage

Enable async: scm.async.enabled=true Add

@AsyncExec(type=AsyncExecEnum.SAVE_ASYNC, remark="Data Dictionary")

to Spring‑proxied methods.

Access the manual handling UI at http://localhost:8004/async/index.html.

Notes

Set spring.application.name correctly; it is used as the default queue name.

Ensure business logic is idempotent.

All applications share a single queue.

Scheduled tasks:

Async retry task runs every 2 minutes (configurable retry count).

Async compensation task runs hourly for records older than one hour.

Effect Demonstration

Effect Demo
Effect Demo

GitHub Repository

https://github.com/xiongyanokok/fc-async
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Design PatternsSDKtransactionspringAsynchronousKafkamysql
Top Architect
Written by

Top Architect

Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.