Backend Development 12 min read

Ensuring Cache‑DB Consistency with Canal, RabbitMQ, and Redis in SpringBoot

This guide walks through building a SpringBoot architecture that updates MySQL first, deletes Redis cache asynchronously via Canal and RabbitMQ, resolves common Canal meta‑data mismatches, and configures manual RabbitMQ acknowledgments to guarantee reliable cache invalidation.

macrozheng
macrozheng
macrozheng
Ensuring Cache‑DB Consistency with Canal, RabbitMQ, and Redis in SpringBoot

Architecture

The solution follows three principles: update the database first then delete the cache, use asynchronous retries to ensure both steps succeed, and subscribe to change logs to clear Redis entries.

APP reads from Redis and writes updates to MySQL.

Canal captures MySQL binlog and pushes data to an MQ.

MQ consumes messages and deletes the corresponding Redis cache.

Environment Preparation

Install MySQL, Canal, RabbitMQ and Redis according to their official guides. RabbitMQ is already available on the server and will be used as the message queue.

Canal Configuration

Modify

conf/canal.properties

:

<code># Specify mode
canal.serverMode = rabbitMQ
# Instances (comma‑separated)
canal.destinations = example

# RabbitMQ settings
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
rabbitmq.exchange = xxx
rabbitmq.username = xxx
rabbitmq.password = xxx
</code>

Modify

conf/example/instance.properties

:

<code># Slave ID (custom, not MySQL server ID)
canal.instance.mysql.slaveId=10

# MySQL address
canal.instance.master.address=ip:port

# Credentials
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx

# Filter (all databases/tables)
canal.instance.filter.regex=.*\..*

# MQ configuration
canal.mq.topic=xxx
</code>

Database

Table creation:

<code>CREATE TABLE `product_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `price` decimal(10,4) DEFAULT NULL,
  `create_date` datetime DEFAULT NULL,
  `update_date` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
</code>

Initial data:

<code>INSERT INTO product_info (id, name, price, create_date, update_date) VALUES
(1, '从你的全世界路过', 14.0000, '2020-11-21 21:26:12', '2021-03-27 22:17:39'),
(2, '乔布斯传', 25.0000, '2020-11-21 21:26:42', '2021-03-27 22:17:42'),
(3, 'java开发', 87.0000, '2021-03-27 22:43:31', '2021-03-27 22:43:34');
</code>

Practical Implementation

RabbitMQ Configuration (SpringBoot)

<code>@Configuration
public class RabbitMQConfig {
    public static final String CANAL_QUEUE = "canal_queue"; // queue
    public static final String DIRECT_EXCHANGE = "canal"; // exchange, must match Canal config
    public static final String ROUTING_KEY = "routingkey"; // routing key, must match Canal config

    @Bean
    public Queue canalQueue() {
        return new Queue(CANAL_QUEUE, true);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(canalQueue()).to(directExchange()).with(ROUTING_KEY);
    }
}
</code>

Cache Product Information

<code>/**
 * Get product info: first try cache, if missing query DB and store in cache.
 */
@Override
public ProductInfo findProductInfo(Long productInfoId) {
    // 1. Try Redis
    Object object = redisTemplate.opsForValue().get(REDIS_PRODUCT_KEY + productInfoId);
    if (ObjectUtil.isNotEmpty(object)) {
        return (ProductInfo) object;
    }
    // 2. Query MySQL
    ProductInfo productInfo = this.baseMapper.selectById(productInfoId);
    if (productInfo != null) {
        // 3. Cache result
        redisTemplate.opsForValue().set(REDIS_PRODUCT_KEY + productInfoId, productInfo,
                REDIS_PRODUCT_KEY_EXPIRE, TimeUnit.SECONDS);
        return productInfo;
    }
    return null;
}
</code>

Update Data and Push to MQ

<code>@PostMapping("/update")
public AjaxResult update(@RequestBody ProductInfo productInfo) {
    productInfoService.updateById(productInfo);
    return AjaxResult.success();
}
</code>

After calling the update endpoint, no message appeared in RabbitMQ because Canal could not read the binlog.

Problem Diagnosis

The

meta.dat

file stored a binlog position that did not match the MySQL instance, and

conf/example/instance.properties

lacked the correct

canal.instance.master.address

setting.

Solution

Stop the Canal service.

Delete the

meta.dat

file.

Restart Canal.

After restarting, the update request generated a message that could be seen in RabbitMQ.

MQ Consumer

<code>@RabbitListener(queues = "canal_queue")
public void getMsg(Message message, Channel channel, String msg) throws IOException {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
        log.info("Consumer queue: " + message.getMessageProperties().getConsumerQueue());
        ProductInfoDetail productInfoDetail = JSON.parseObject(msg, ProductInfoDetail.class);
        if (productInfoDetail != null && productInfoDetail.getData() != null) {
            ProductInfo productInfo = productInfoDetail.getData().get(0);
            if (productInfo != null) {
                Long id = productInfo.getId();
                redisTemplate.delete(REDIS_PRODUCT_KEY + id);
                channel.basicAck(deliveryTag, true);
                return;
            }
        }
        channel.basicReject(deliveryTag, true);
    } catch (Exception e) {
        channel.basicReject(deliveryTag, false);
        e.printStackTrace();
    }
}
</code>

The log shows the database name, table name, and confirms that the Redis key has been removed.

Extended Discussion

RabbitMQ relies on consumer acknowledgments. Three acknowledgment modes exist:

manual : the consumer explicitly calls

basicAck

after successful processing.

auto : Spring automatically acks if no exception is thrown.

none : messages are considered processed immediately, which can lead to loss.

To avoid message loss, the example uses manual mode:

<code>spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
</code>

Corresponding code uses

channel.basicAck

for successful handling and

channel.basicReject

for failures.

With the above configuration, cache invalidation becomes reliable and consistent across MySQL, Redis, and the message queue.

RedisMySQLMessage QueueRabbitMQCache ConsistencyCanalSpringBoot
macrozheng
Written by

macrozheng

Dedicated to Java tech sharing and dissecting top open-source projects. Topics include Spring Boot, Spring Cloud, Docker, Kubernetes and more. Author’s GitHub project “mall” has 50K+ stars.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.