Ensuring Cache‑DB Consistency with Canal, RabbitMQ, and Redis in SpringBoot
This guide walks through building a SpringBoot architecture that updates MySQL first, deletes Redis cache asynchronously via Canal and RabbitMQ, resolves common Canal meta‑data mismatches, and configures manual RabbitMQ acknowledgments to guarantee reliable cache invalidation.
Architecture
The solution follows three principles: update the database first then delete the cache, use asynchronous retries to ensure both steps succeed, and subscribe to change logs to clear Redis entries.
APP reads from Redis and writes updates to MySQL.
Canal captures MySQL binlog and pushes data to an MQ.
MQ consumes messages and deletes the corresponding Redis cache.
Environment Preparation
Install MySQL, Canal, RabbitMQ and Redis according to their official guides. RabbitMQ is already available on the server and will be used as the message queue.
Canal Configuration
Modify
conf/canal.properties:
<code># Specify mode
canal.serverMode = rabbitMQ
# Instances (comma‑separated)
canal.destinations = example
# RabbitMQ settings
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
rabbitmq.exchange = xxx
rabbitmq.username = xxx
rabbitmq.password = xxx
</code>Modify
conf/example/instance.properties:
<code># Slave ID (custom, not MySQL server ID)
canal.instance.mysql.slaveId=10
# MySQL address
canal.instance.master.address=ip:port
# Credentials
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
# Filter (all databases/tables)
canal.instance.filter.regex=.*\..*
# MQ configuration
canal.mq.topic=xxx
</code>Database
Table creation:
<code>CREATE TABLE `product_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`price` decimal(10,4) DEFAULT NULL,
`create_date` datetime DEFAULT NULL,
`update_date` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
</code>Initial data:
<code>INSERT INTO product_info (id, name, price, create_date, update_date) VALUES
(1, '从你的全世界路过', 14.0000, '2020-11-21 21:26:12', '2021-03-27 22:17:39'),
(2, '乔布斯传', 25.0000, '2020-11-21 21:26:42', '2021-03-27 22:17:42'),
(3, 'java开发', 87.0000, '2021-03-27 22:43:31', '2021-03-27 22:43:34');
</code>Practical Implementation
RabbitMQ Configuration (SpringBoot)
<code>@Configuration
public class RabbitMQConfig {
public static final String CANAL_QUEUE = "canal_queue"; // queue
public static final String DIRECT_EXCHANGE = "canal"; // exchange, must match Canal config
public static final String ROUTING_KEY = "routingkey"; // routing key, must match Canal config
@Bean
public Queue canalQueue() {
return new Queue(CANAL_QUEUE, true);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(canalQueue()).to(directExchange()).with(ROUTING_KEY);
}
}
</code>Cache Product Information
<code>/**
* Get product info: first try cache, if missing query DB and store in cache.
*/
@Override
public ProductInfo findProductInfo(Long productInfoId) {
// 1. Try Redis
Object object = redisTemplate.opsForValue().get(REDIS_PRODUCT_KEY + productInfoId);
if (ObjectUtil.isNotEmpty(object)) {
return (ProductInfo) object;
}
// 2. Query MySQL
ProductInfo productInfo = this.baseMapper.selectById(productInfoId);
if (productInfo != null) {
// 3. Cache result
redisTemplate.opsForValue().set(REDIS_PRODUCT_KEY + productInfoId, productInfo,
REDIS_PRODUCT_KEY_EXPIRE, TimeUnit.SECONDS);
return productInfo;
}
return null;
}
</code>Update Data and Push to MQ
<code>@PostMapping("/update")
public AjaxResult update(@RequestBody ProductInfo productInfo) {
productInfoService.updateById(productInfo);
return AjaxResult.success();
}
</code>After calling the update endpoint, no message appeared in RabbitMQ because Canal could not read the binlog.
Problem Diagnosis
The
meta.datfile stored a binlog position that did not match the MySQL instance, and
conf/example/instance.propertieslacked the correct
canal.instance.master.addresssetting.
Solution
Stop the Canal service.
Delete the
meta.datfile.
Restart Canal.
After restarting, the update request generated a message that could be seen in RabbitMQ.
MQ Consumer
<code>@RabbitListener(queues = "canal_queue")
public void getMsg(Message message, Channel channel, String msg) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("Consumer queue: " + message.getMessageProperties().getConsumerQueue());
ProductInfoDetail productInfoDetail = JSON.parseObject(msg, ProductInfoDetail.class);
if (productInfoDetail != null && productInfoDetail.getData() != null) {
ProductInfo productInfo = productInfoDetail.getData().get(0);
if (productInfo != null) {
Long id = productInfo.getId();
redisTemplate.delete(REDIS_PRODUCT_KEY + id);
channel.basicAck(deliveryTag, true);
return;
}
}
channel.basicReject(deliveryTag, true);
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
</code>The log shows the database name, table name, and confirms that the Redis key has been removed.
Extended Discussion
RabbitMQ relies on consumer acknowledgments. Three acknowledgment modes exist:
manual : the consumer explicitly calls
basicAckafter successful processing.
auto : Spring automatically acks if no exception is thrown.
none : messages are considered processed immediately, which can lead to loss.
To avoid message loss, the example uses manual mode:
<code>spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
</code>Corresponding code uses
channel.basicAckfor successful handling and
channel.basicRejectfor failures.
With the above configuration, cache invalidation becomes reliable and consistent across MySQL, Redis, and the message queue.
macrozheng
Dedicated to Java tech sharing and dissecting top open-source projects. Topics include Spring Boot, Spring Cloud, Docker, Kubernetes and more. Author’s GitHub project “mall” has 50K+ stars.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.