Implementing Delayed Message Queues in RabbitMQ with Spring Boot
This article explains why traditional delayed‑message solutions like Redis expiration, database polling, or JVM DelayQueue are inefficient, and demonstrates how to use RabbitMQ's delayed‑queue plugin with Spring Boot to configure exchanges, send delayed messages, and consume them reliably.
Many common applications require delayed message delivery, such as Taobao's automatic receipt after seven days and 12306's ticket order cancellation after 30 minutes.
Traditional approaches like using Redis expiration, database polling, or JVM DelayQueue suffer from low performance, high memory pressure, or lack of persistence.
Since RabbitMQ 3.6.x, the official delayed‑queue plugin can be installed in the plugins directory to provide native delayed messaging.
First, define an exchange and a queue in a Spring configuration class:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
public static final String LAZY_QUEUE = "MQ.LazyQueue";
public static final String LAZY_KEY = "lazy.#";
@Bean
public TopicExchange lazyExchange() {
Map<String, Object> pros = new HashMap<>();
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Queue lazyQueue() {
return new Queue(LAZY_QUEUE, true);
}
@Bean
public Binding lazyBinding() {
return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
}
}Setting exchange.setDelayed(true) enables the delayed‑queue feature; alternatively, the x‑delayed-message header can be used.
When sending a message, specify the delay via a MessagePostProcessor:
import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendLazy(Object message) {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData("12345678909" + new Date());
rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setDelay(6000);
return message;
}
}, correlationData);
}
}The setDelay method ultimately sets the x‑delay header.
Consumer side simply listens to the queue and acknowledges the message:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class MQReceiver {
@RabbitListener(queues = "MQ.LazyQueue")
@RabbitHandler
public void onLazyMessage(Message msg, Channel channel) throws IOException {
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, true);
System.out.println("lazy receive " + new String(msg.getBody()));
}
}Running a test shows that the message is received after the configured six‑second delay.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Java High-Performance Architecture
Sharing Java development articles and resources, including SSM architecture and the Spring ecosystem (Spring Boot, Spring Cloud, MyBatis, Dubbo, Docker), Zookeeper, Redis, architecture design, microservices, message queues, Git, etc.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
