Designing a Robust Asynchronous Processing SDK with Spring, Kafka and MySQL

This article explains the design, principles, components, database schema, configuration, and usage of a generic asynchronous processing SDK that leverages Spring AOP, transactional events, Kafka, and a Vue UI to achieve reliable async execution and eventual consistency in Java backend systems.

Top Architect
Top Architect
Top Architect
Designing a Robust Asynchronous Processing SDK with Spring, Kafka and MySQL

Introduction

Good system design should follow the open‑closed principle; as business evolves, core code changes, increasing the risk of errors. To keep performance and quality, asynchronous thread pools are often used, but they introduce uncertainty. This article presents a generic asynchronous processing SDK that simplifies async handling while preserving reliability.

Goal

Ensure that methods execute effectively without blocking the main flow and achieve eventual consistency through fallback mechanisms.

Advantages

Non‑intrusive design with independent database, scheduler, message queue, and UI (single sign‑on).

Uses Spring transaction event mechanism so that async failures do not affect business logic.

If a method runs inside a transaction, the async event is processed after commit or rollback.

Even when the transaction commits and async parsing fails, a fallback strategy persists data unless the database, message queue, or method itself fails.

Principle

After Spring finishes bean initialization, it scans for methods annotated with @AsyncExec and caches them. At runtime, an AOP aspect publishes an event for the annotated method. A @TransactionalEventListener with fallbackExecution=true and phase=TransactionPhase.AFTER_COMPLETION processes the async strategy.

@TransactionalEventListener(fallbackExecution=true, phase=TransactionPhase.AFTER_COMPLETION)

Components

Kafka message queue

XXL‑Job scheduler

MySQL database

Spring AOP

Vue UI

Design Patterns

Strategy pattern for async handling

Template method

Dynamic proxy

Database Scripts

The SDK defines two main tables. async_scene stores async request metadata, while async_req and async_log record request details and execution logs.

CREATE TABLE `async_scene` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'Primary ID',
  `application_name` varchar(100) NOT NULL DEFAULT '' COMMENT 'Application name',
  `method_sign` varchar(50) NOT NULL DEFAULT '' COMMENT 'Method signature',
  `scene_name` varchar(200) NOT NULL DEFAULT '' COMMENT 'Business scene description',
  `async_type` varchar(50) NOT NULL DEFAULT '' COMMENT 'Async strategy type',
  `queue_name` varchar(200) NOT NULL DEFAULT '' COMMENT 'Queue name',
  `theme_value` varchar(100) NOT NULL DEFAULT '' COMMENT 'Consumer topic',
  `exec_count` int NOT NULL DEFAULT '0' COMMENT 'Retry count',
  `exec_deleted` int NOT NULL DEFAULT '0' COMMENT 'Delete after execution',
  `async_version` varchar(50) NOT NULL DEFAULT '' COMMENT 'Component version',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Creation time',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Update time',
  `cdc_crt_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Record insert time',
  `cdc_upd_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Record update time',
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE KEY `uk_application_sign` (`application_name`,`method_sign`) USING BTREE,
  KEY `idx_cdc_upd_time` (`cdc_upd_time`)
) ENGINE=InnoDB COMMENT='Async scene table';

CREATE TABLE `async_req` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'Primary ID',
  `application_name` varchar(100) NOT NULL DEFAULT '' COMMENT 'Application name',
  `sign` varchar(50) NOT NULL DEFAULT '' COMMENT 'Method signature',
  `class_name` varchar(200) NOT NULL DEFAULT '' COMMENT 'Full class name',
  `method_name` varchar(100) NOT NULL DEFAULT '' COMMENT 'Method name',
  `async_type` varchar(50) NOT NULL DEFAULT '' COMMENT 'Async strategy type',
  `exec_status` tinyint NOT NULL DEFAULT '0' COMMENT '0: init, 1: fail, 2: success',
  `exec_count` int NOT NULL DEFAULT '0' COMMENT 'Execution count',
  `param_json` longtext COMMENT 'Request parameters',
  `remark` varchar(200) NOT NULL DEFAULT '' COMMENT 'Business description',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Creation time',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Update time',
  PRIMARY KEY (`id`) USING BTREE,
  KEY `idx_applocation_name` (`application_name`) USING BTREE,
  KEY `idx_exec_status` (`exec_status`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Async request table';

CREATE TABLE `async_log` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'Primary ID',
  `async_id` bigint NOT NULL DEFAULT '0' COMMENT 'Async request ID',
  `error_data` longtext COMMENT 'Error information',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Creation time',
  PRIMARY KEY (`id`) USING BTREE,
  KEY `idx_async_id` (`async_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Async log table';

Apollo Configuration

# Switch (default off)
async.enabled=true

# Application name
spring.application.name=xxx

# Data source (Druid)
async.datasource.driver-class-name=com.mysql.jdbc.Driver
async.datasource.url=jdbc:mysql://127.0.0.1:3306/fc_async?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true
async.datasource.username=user
async.datasource.password=xxxx
async.datasource.filters=config
async.datasource.connectionProperties=config.decrypt=true;config.decrypt.key=yyy

# Static resources
spring.resources.add-mappings=true
spring.resources.static-locations=classpath:/static/

# Thread pool (defaults can be overridden)
async.executor.thread.corePoolSize=10
async.executor.thread.maxPoolSize=50
async.executor.thread.queueCapacity=10000
async.executor.thread.keepAliveSeconds=600

# Delete record after success (default true)
async.exec.deleted=true

# Topic name (default uses application name)
async.topic=${spring.application.name}_async_queue

# Retry settings
async.exec.count=5
async.retry.limit=100
async.comp.limit=100
async.login=false

Usage

Enable async processing by setting async.enabled=true in the configuration.

Annotate Spring‑managed methods with

@AsyncExec(type=AsyncExecEnum.SAVE_ASYNC, remark="Data dictionary")

. The method must be proxied by Spring.

Provide a manual handling UI, e.g., http://localhost:8004/async/index.html, for operators to view and retry tasks.

Precautions

Set spring.application.name correctly; it is used to build the queue name.

Define the queue name, typically ${async.topic:${spring.application.name}}_async_queue.

Ensure business logic is idempotent to avoid duplicate processing.

One queue per application is recommended.

Two scheduled jobs are provided: an async retry task (every 2 minutes, configurable retry count) and a compensation task (hourly, processing records older than one hour).

Demo

Images illustrate the async strategy, security level, execution status, and overall flow.

Async strategy diagram
Async strategy diagram
Security level
Security level
Execution status
Execution status
Flowchart
Flowchart
Effect demonstration
Effect demonstration
design-patternsSDKSpringAsynchronousMySQL
Top Architect
Written by

Top Architect

Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.