Design and Implementation of a Real‑Time Cache Update System Based on Kafka and Distributed Cache
This article presents a comprehensive design and implementation of a real‑time cache update system that leverages Kafka‑driven database change streams, a centralized cache scheduling center, executor registration, broadcast and fail‑over scheduling, and a lightweight SDK to achieve millisecond‑level cache consistency for C‑end services.
1. Introduction In C‑end project practice, interface performance is paramount; to improve TP99 and other metrics, extensive use of distributed and local caches is made, but cache update remains a challenge. Traditional approaches—periodic refresh, expiration policies, and database change listeners—suffer from latency, wasted resources, or complex coordination.
2. System Architecture
2.1 Overall Design The solution adopts a real‑time data flow service (Home DTS) to push database binlog changes to Kafka. The cache scheduling center subscribes to these Kafka messages, maintains cache key lineage, and supports both single‑point and broadcast scheduling. It also provides a non‑intrusive SDK for developers. The implementation builds on the open‑source XXL‑JOB project with deep customisation.
2.2 Module Design
Executor registration and update handling unit When a business application starts, it registers its executor, cache keys, key dependencies, and scheduling mode with the cache center. The cache center creates or updates the executor record, stores target‑source key relationships, and registers method handlers.
// Executor registration example
public class CacheSpringExecutor extends CacheExecutor implements SmartInitializingSingleton {
private List
cacheBloodParams = null;
@Override
public void afterSingletonsInstantiated() {
// Register cache update handler in Spring container
initCacheUpdateHandlerRepository(applicationContext);
try {
if (!CollectionUtils.isEmpty(cacheBloodParams)) {
// Register cache key and lineage to remote cache center
super.start(cacheBloodParams);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void initCacheUpdateHandlerRepository(ApplicationContext applicationContext) {
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
cacheBloodParams = new ArrayList<>();
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
// Find methods annotated with @CacheUpdate
Map
annotatedMethods = MethodIntrospector.selectMethods(
bean.getClass(),
(MethodIntrospector.MetadataLookup
) method ->
AnnotatedElementUtils.findMergedAnnotation(method, CacheUpdate.class)
);
if (annotatedMethods == null || annotatedMethods.isEmpty()) {
continue;
}
annotatedMethods.forEach((k, v) -> {
if (!check(v)) {
throw new RuntimeException("cache-center method-cacheHandler param invalid, for[" + bean.getClass() + "#" + "] .");
}
// Initialise key lineage parameter object
CacheBloodParam param = initCacheBloodParam(v);
cacheBloodParams.add(param);
k.setAccessible(true);
// Register method handler
registHandler(v.targetKey(), new MethodCacheHandler(bean, k, null, null));
});
}
}
}2.2.2 Update Message Consumption and Scheduling The cache center listens to Kafka change messages (e.g., MySQL binlog), extracts primary key, operation type, and packages them into a change object placed in a dispatch queue. A scheduling thread consumes the queue, resolves target keys via lineage, determines executor IPs, and dispatches updates according to routing strategies.
// Dispatch implementation example
public class CacheTrigger {
/**
* @param targetKeyId target key id
* @param changeInfo data change information
*/
public static void trigger(Long targetKeyId, CacheChangeInfo changeInfo) {
CacheTargetInfo info = CacheAdminConfig.getAdminConfig().getCacheTargetInfoDao().loadById(targetKeyId);
if (info == null) { return; }
CacheGroup group = CacheAdminConfig.getAdminConfig().getXxlJobGroupDao().loadByAppName(info.getAppName());
if (group == null) {
logger.error("Executor not found, dispatch aborted, changeInfo:{}", GsonTool.toJson(changeInfo));
return;
}
String addressList = group.getAddressList();
if (addressList != null && addressList.trim().length() > 0) {
group.setAddressType(1);
group.setAddressList(addressList.trim());
}
ExecutorRouteStrategyEnum routeStrategy = ExecutorRouteStrategyEnum.match(info.getRouteStrategy());
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST.equals(routeStrategy)) {
// Broadcast to all executors
group.getRegistryList().forEach(address -> processTrigger(info, changeInfo, address));
} else {
TriggerParam triggerParam = new TriggerParam();
triggerParam.setId(info.getId().intValue());
ReturnT
routeRes = routeStrategy.getRouter().route(triggerParam, group.getRegistryList());
if (routeRes.getCode() == ReturnT.SUCCESS_CODE) {
String address = routeRes.getContent();
processTrigger(info, changeInfo, address);
} else {
logger.error("Routing address resolution error, msg:{}", routeRes.getMsg());
}
}
}
}2.2.3 Cache Update SDK Usage Developers add the SDK dependency, configure the cache center address, and annotate business methods with @CacheUpdate . The SDK extracts change parameters at runtime and triggers the cache update flow.
// Demo usage
public class Demo {
@CacheUpdate(targetKey = "ne:test:cache", targetKeyDesc = "测试",
sourceKey = {"aa.test_1", "aa.test_2"},
roteType = RouteStrategyEnum.SHARDING_BROADCAST)
public void cacheUpdate() {
CacheChangeParam changeParam = XxlCacheHelper.getChangeParam();
List
bizIds = changeParam.getBizIds(); // business IDs to update
Integer operateType = changeParam.getOperateType(); // 1-add 2-update 3-delete
logger.info(GsonTool.toJson(changeParam));
logger.info("测试");
}
}3. Application Effect The system was applied in the “Home Car Selection” VR configurator. Business data entered in the backend is cached locally for H5 components, reducing interface latency. When database records change, the cache updates in approximately 900 ms, demonstrating millisecond‑level consistency.
4. Conclusion By decoupling cache update into an independent service that reacts to real‑time data changes, the solution replaces inefficient periodic refreshes with proactive updates, improving cache timeliness from days/hours to seconds and boosting overall system efficiency and developer productivity.
HomeTech
HomeTech tech sharing
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.