Build a Robust Asynchronous Processing SDK with Spring, Kafka and MySQL
This article introduces a generic asynchronous processing SDK for Java back‑ends, explaining its design principles, advantages, component architecture, database schema, configuration via Apollo, usage steps, and practical demonstrations, while providing complete code snippets and a GitHub repository for reference.
Introduction
Good system design must follow the open‑closed principle. As business evolves, core code changes frequently, increasing the risk of errors. Most new features extend existing functionality, so we need to maintain performance and quality. The author therefore created a generic asynchronous processing SDK that simplifies the implementation of various async tasks.
Purpose
The SDK ensures that methods are executed reliably without blocking the main flow, and guarantees data is not lost through fallback mechanisms, achieving eventual consistency.
Advantages
Non‑intrusive design with independent database, scheduled tasks, message queue, and manual execution UI (single sign‑on).
Leverages Spring transaction event mechanism; even if async strategy parsing fails, business logic is unaffected.
If a method runs within a transaction, the event is processed after the transaction commits or rolls back.
When a transaction commits but async parsing fails, a fallback still executes unless the database, message queue, or method itself is faulty.
Principle
After Spring finishes bean initialization, it scans all methods for the @AsyncExec annotation and caches them. When such a method runs, an AOP aspect publishes an event, and a transactional event listener processes the async execution strategy.
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)The fallbackExecution=true flag ensures the event is handled even when no transaction is active, while TransactionPhase.AFTER_COMPLETION processes the event after both commit and rollback.
Components
Kafka message queue
XXL‑Job scheduled tasks
MySQL database
Spring AOP
Vue front‑end UI
Design Patterns
Strategy
Template Method
Dynamic Proxy
Flowchart (illustrative)
Database Scripts
CREATE TABLE `async_scene` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`application_name` varchar(100) NOT NULL DEFAULT '' COMMENT '应用名称',
`method_sign` varchar(50) NOT NULL DEFAULT '' COMMENT '方法签名',
`scene_name` varchar(200) NOT NULL DEFAULT '' COMMENT '业务场景描述',
`async_type` varchar(50) NOT NULL DEFAULT '' COMMENT '异步策略类型',
`queue_name` varchar(200) NOT NULL DEFAULT '' COMMENT '队列名称',
`theme_value` varchar(100) NOT NULL DEFAULT '' COMMENT '消费主题',
`exec_count` int NOT NULL DEFAULT '0' COMMENT '失败重试次数',
`exec_deleted` int NOT NULL DEFAULT '0' COMMENT '执行后是否删除',
`async_version` varchar(50) NOT NULL DEFAULT '' COMMENT '组件版本号',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
`cdc_crt_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '记录新增时间',
`cdc_upd_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录修改时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `uk_application_sign` (`application_name`,`method_sign`) USING BTREE,
KEY `idx_cdc_upd_time` (`cdc_upd_time`)
) ENGINE=InnoDB COMMENT='异步场景表';
CREATE TABLE `async_req` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`application_name` varchar(100) NOT NULL DEFAULT '' COMMENT '应用名称',
`sign` varchar(50) NOT NULL DEFAULT '' COMMENT '方法签名',
`class_name` varchar(200) NOT NULL DEFAULT '' COMMENT '全路径类名称',
`method_name` varchar(100) NOT NULL DEFAULT '' COMMENT '方法名称',
`async_type` varchar(50) NOT NULL DEFAULT '' COMMENT '异步策略类型',
`exec_status` tinyint NOT NULL DEFAULT '0' COMMENT '执行状态 0:初始化 1:执行失败 2:执行成功',
`exec_count` int NOT NULL DEFAULT '0' COMMENT '执行次数',
`param_json` longtext COMMENT '请求参数',
`remark` varchar(200) NOT NULL DEFAULT '' COMMENT '业务描述',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_applocation_name` (`application_name`) USING BTREE,
KEY `idx_exec_status` (`exec_status`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='异步处理请求';
CREATE TABLE `async_log` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`async_id` bigint NOT NULL DEFAULT '0' COMMENT '异步请求ID',
`error_data` longtext COMMENT '执行错误信息',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_async_id` (`async_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='异步处理日志';Asynchronous Strategy
Security Level
Execution Status
Flowchart
Apollo Configuration
# Switch (default off)
async.enabled=true
# Application name
spring.application.name=xxx
# Data source (Druid)
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/fc_async?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true
spring.datasource.username=user
spring.datasource.password=xxxx
spring.datasource.filters=config
spring.datasource.connectionProperties=config.decrypt=true;config.decrypt.key=yyy
# Static resources
spring.resources.add-mappings=true
spring.resources.static-locations=classpath:/static/
# Thread pool defaults
async.executor.thread.corePoolSize=10
async.executor.thread.maxPoolSize=50
async.executor.thread.queueCapacity=10000
async.executor.thread.keepAliveSeconds=600
# Delete record after success (default true)
async.exec.deleted=true
# Custom queue name prefix (default application name)
async.topic=${spring.application.name}_async_queue
# Retry settings
async.exec.count=5
async.retry.limit=100
async.comp.limit=100
# Login interception (default false)
async.login=falseUsage
Enable async processing by setting scm.async.enabled=true in the configuration.
Annotate methods that need asynchronous execution with
@AsyncExec(type = AsyncExecEnum.SAVE_ASYNC, remark = "Data Dictionary"). The method must be a Spring‑proxied bean.
Provide a manual handling UI at http://localhost:8004/async/index.html for monitoring and retry.
Important Notes
Set the correct spring.application.name to identify the application.
Define the queue name, e.g., ${async.topic:${spring.application.name}}_async_queue.
Ensure business logic is idempotent to avoid duplicate processing.
One application should share a single queue (self‑produce, self‑consume).
Configure scheduled tasks: async retry (every 2 minutes, configurable retry count) and async compensation (hourly, processing records older than one hour).
Effect Demonstration
Code Repository
https://github.com/xiongyanokok/fc-async
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
