Implementing Delayed Message Queues with RabbitMQ Plugin in Spring Boot
This article demonstrates how to use RabbitMQ's delayed‑queue plugin together with Spring Boot to create a lazy exchange, configure a durable queue, send messages with a specified delay, and consume them, providing complete Java code examples and explaining why this approach outperforms traditional Redis, database polling, or JVM DelayQueue solutions.
The article explains how to implement delayed message delivery in RabbitMQ using the official delayed‑queue plugin and Spring Boot.
It first describes common business scenarios such as automatic order confirmation and delayed payment settlement, and why traditional solutions like Redis expiration, database polling, or JVM DelayQueue are inefficient.
For RabbitMQ 3.6+ the plugin can be installed in the plugins directory, after which an exchange with exchange.setDelayed(true) is declared. The configuration class defines constants for the exchange, queue and routing key, creates a TopicExchange , a durable queue, and binds them:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
public static final String LAZY_QUEUE = "MQ.LazyQueue";
public static final String LAZY_KEY = "lazy.#";
@Bean
public TopicExchange lazyExchange() {
//Map
pros = new HashMap<>();
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Queue lazyQueue() {
return new Queue(LAZY_QUEUE, true);
}
@Bean
public Binding lazyBinding() {
return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
}
}When sending a message, the MessagePostProcessor sets the message as persistent and specifies the delay via message.getMessageProperties().setDelay(6000) (or the x‑delay header). The sender also sets correlation data and uses rabbitTemplate.convertAndSend to publish to the lazy exchange:
import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendLazy(Object message) {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData("12345678909" + new Date());
rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setDelay(6000);
return message;
}
}, correlationData);
}
}The setDelay method internally adds the x‑delay header; if the delay is null or negative, the header is removed:
public void setDelay(Integer delay) {
if (delay == null || delay < 0) {
this.headers.remove(X_DELAY);
} else {
this.headers.put(X_DELAY, delay);
}
}On the consumer side, a @RabbitListener on the lazy queue receives the message, acknowledges it, and processes the payload:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class MQReceiver {
@RabbitListener(queues = "MQ.LazyQueue")
@RabbitHandler
public void onLazyMessage(Message msg, Channel channel) throws IOException {
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, true);
System.out.println("lazy receive " + new String(msg.getBody()));
}
}A JUnit test demonstrates sending a message and receiving it after a 6‑second delay:
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {
@Autowired
private MQSender mqSender;
@Test
public void sendLazy() throws Exception {
String msg = "hello spring boot";
mqSender.sendLazy(msg + ":");
}
}The test confirms that the message is printed after the configured 6‑second delay, verifying the lazy queue works as intended.
The article concludes with an invitation for readers to discuss the topic, share opinions, and follow additional promotional links.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.