Implementing Delayed Message Delivery with RabbitMQ and Spring Boot
This article explains how to implement delayed message delivery in RabbitMQ using the official delayed‑queue plugin and Spring Boot, compares traditional approaches such as Redis expiration and database polling, and provides complete configuration and code examples for producers and consumers.
The article introduces delayed‑message scenarios such as automatic order confirmation in e‑commerce (e.g., Taobao) and ticket‑booking cancellations (e.g., 12306), and points out the drawbacks of using Redis expiration, database polling, or JVM DelayQueue for these cases.
It then describes RabbitMQ's delayed‑queue plugin (available from version 3.6.x) and shows how to enable it by creating a delayed exchange and queue.
Configuration code (Spring AMQP) defines a delayed exchange, queue, and binding, and sets exchange.setDelayed(true) to activate delayed messaging.
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
public static final String LAZY_QUEUE = "MQ.LazyQueue";
public static final String LAZY_KEY = "lazy.#";
@Bean
public TopicExchange lazyExchange() {
//Map
pros = new HashMap<>();
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Queue lazyQueue() {
return new Queue(LAZY_QUEUE, true);
}
@Bean
public Binding lazyBinding() {
return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
}
}The producer sets the message delay via a MessagePostProcessor , makes the message persistent, and specifies message.getMessageProperties().setDelay(6000) (6 seconds) before sending.
import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
// confirmCallback and returnCallback omitted for brevity
public void sendLazy(Object message) {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
// id + timestamp global unique
CorrelationData correlationData = new CorrelationData("12345678909" + new Date());
rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// set persistent
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// message.getMessageProperties().setHeader("x-delay", "6000");
message.getMessageProperties().setDelay(6000);
return message;
}
}, correlationData);
}
}The consumer listens on the lazy queue, acknowledges the message, and prints the received payload.
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class MQReceiver {
@RabbitListener(queues = "MQ.LazyQueue")
@RabbitHandler
public void onLazyMessage(Message msg, Channel channel) throws IOException {
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, true);
System.out.println("lazy receive " + new String(msg.getBody()));
}
}The underlying setDelay method in Spring’s MessageProperties simply sets the x-delay header, which the plugin uses to schedule delivery.
public void setDelay(Integer delay) {
if (delay == null || delay < 0) {
this.headers.remove(X_DELAY);
} else {
this.headers.put(X_DELAY, delay);
}
}A JUnit test sends a lazy message and confirms that the consumer receives it after the configured 6‑second delay, as shown by the console output and the accompanying screenshot.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.