How to Build a Robust Asynchronous Processing SDK with Spring and Kafka
This article explains the design, advantages, and implementation details of a generic asynchronous processing SDK that leverages Spring transactional events, Kafka messaging, XXL‑Job scheduling, and a non‑intrusive architecture to ensure data consistency, fault tolerance, and easy integration across services.
A good system design must follow the open‑closed principle; as business evolves, core code changes increase error risk. Most new features extend existing functionality, requiring both performance and quality, so asynchronous thread pools are often used, which adds uncertainty. To address this, a universal asynchronous processing SDK is designed for easy implementation of various async tasks.
Purpose
Asynchronous processing ensures methods execute effectively without blocking the main flow and, more importantly, guarantees data is not lost through fallback mechanisms, achieving eventual consistency.
Advantages
Non‑intrusive design with independent database, scheduler, message queue, and unified login‑authenticated manual execution UI.
Uses Spring transaction event mechanism; even if async strategy parsing fails, business logic remains unaffected.
If a method runs within a transaction, the event is processed after transaction commit or rollback.
Even after transaction commit, if async parsing fails, a fallback plan executes (unless DB, MQ, or method has bugs).
Principle
After container initialization, all beans are scanned and methods annotated with @AsyncExec are cached.
When a method runs, an AOP aspect publishes an event.
The transaction event listener handles the asynchronous execution strategy.
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION) fallbackExecution=trueprocesses events even when no transaction is active. TransactionPhase.AFTER_COMPLETION processes events after transaction commit or rollback.
Components
Kafka message queue
XXL‑Job scheduler
MySQL database
Spring AOP aspect
Vue UI
Design Patterns
Strategy
Template Method
Dynamic Proxy
Flowchart
Database Scripts
CREATE TABLE `async_scene` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`application_name` varchar(100) NOT NULL DEFAULT '' COMMENT '应用名称',
`method_sign` varchar(50) NOT NULL DEFAULT '' COMMENT '方法签名',
`scene_name` varchar(200) NOT NULL DEFAULT '' COMMENT '业务场景描述',
`async_type` varchar(50) NOT NULL DEFAULT '' COMMENT '异步策略类型',
`queue_name` varchar(200) NOT NULL DEFAULT '' COMMENT '队列名称',
`theme_value` varchar(100) NOT NULL DEFAULT '' COMMENT '消费主题',
`exec_count` int NOT NULL DEFAULT '0' COMMENT '失败重试次数',
`exec_deleted` int NOT NULL DEFAULT '0' COMMENT '执行后是否删除',
`async_version` varchar(50) NOT NULL DEFAULT '' COMMENT '组件版本号',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
`cdc_crt_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '记录新增时间',
`cdc_upd_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录修改时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `uk_application_sign` (`application_name`,`method_sign`) USING BTREE,
KEY `idx_cdc_upd_time` (`cdc_upd_time`)
) ENGINE=InnoDB COMMENT='异步场景表';
CREATE TABLE `async_req` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`application_name` varchar(100) NOT NULL DEFAULT '' COMMENT '应用名称',
`sign` varchar(50) NOT NULL DEFAULT '' COMMENT '方法签名',
`class_name` varchar(200) NOT NULL DEFAULT '' COMMENT '全路径类名称',
`method_name` varchar(100) NOT NULL DEFAULT '' COMMENT '方法名称',
`async_type` varchar(50) NOT NULL DEFAULT '' COMMENT '异步策略类型',
`exec_status` tinyint NOT NULL DEFAULT '0' COMMENT '执行状态 0:初始化 1:执行失败 2:执行成功',
`exec_count` int NOT NULL DEFAULT '0' COMMENT '执行次数',
`param_json` longtext COMMENT '请求参数',
`remark` varchar(200) NOT NULL DEFAULT '' COMMENT '业务描述',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_applocation_name` (`application_name`) USING BTREE,
KEY `idx_exec_status` (`exec_status`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='异步处理请求';
CREATE TABLE `async_log` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`async_id` bigint NOT NULL DEFAULT '0' COMMENT '异步请求ID',
`error_data` longtext COMMENT '执行错误信息',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_async_id` (`async_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='异步处理日志';Async Strategy
Security Level
Execution Status
Flowchart
Apollo Configuration
# Switch: default off
async.enabled=true
# Application name
spring.application.name=xxx
# Data source (Druid)
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/fc_async?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true
spring.datasource.username=user
spring.datasource.password=xxxx
spring.datasource.filters=config
spring.datasource.connectionProperties=config.decrypt=true;config.decrypt.key=yyy
# Static resources
spring.resources.add-mappings=true
spring.resources.static-locations=classpath:/static/
# Core thread count
async.executor.thread.corePoolSize=10
# Max thread count
async.executor.thread.maxPoolSize=50
# Queue capacity
async.executor.thread.queueCapacity=10000
# Keep‑alive seconds
async.executor.thread.keepAliveSeconds=600
# Delete record after success (default true)
async.exec.deleted=true
# Custom queue name prefix (default application name)
async.topic=${spring.application.name}
# Retry count (default 5)
async.exec.count=5
# Retry query limit
async.retry.limit=100
# Compensation query limit
async.comp.limit=100
# Login interception (default false)
async.login=falseUsage
1. Async switch scm.async.enabled=true 2. Add annotation to methods (must be Spring‑proxied)
@AsyncExec(type = AsyncExecEnum.SAVE_ASYNC, remark = "数据字典")3. Manual handling URL
http://localhost:8004/async/index.html
Notes
1. Application name spring.application.name 2. Queue name
${async.topic:${spring.application.name}_async_queue3. Ensure business logic is idempotent
4. One queue per application
Self‑producing, self‑consuming.
5. Scheduled tasks
Async retry task (every 2 minutes, configurable retry count)
Async compensation task (every hour, processes records older than one hour)
Effect Demonstration
Code Repository
https://github.com/xiongyanokok/fc-async
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Java Interview Crash Guide
Dedicated to sharing Java interview Q&A; follow and reply "java" to receive a free premium Java interview guide.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
