How to Implement Delayed Queues in RabbitMQ: TTL, DLX, and Plugin Methods
This article explains what delayed queues are, details two RabbitMQ implementations using TTL with dead‑letter exchanges and the rabbitmq‑delayed‑message‑exchange plugin, provides full Java configuration and producer/consumer code examples, and outlines common use‑cases such as order timeout and refund processing.
What is a Delayed Queue
A delayed queue is a special queue where messages are not consumed immediately; they become available to consumers only after a specified delay, making it suitable for tasks that must be executed at a particular time or after a certain interval.
RabbitMQ Implementation Methods
Using TTL (Time‑To‑Live) and Dead‑Letter Queue (DLQ)
Principle: Set a TTL for a queue or message. When the TTL expires, the message becomes a dead letter. By binding the queue to a dead‑letter exchange, the dead letter is routed to another queue (the delayed queue) where consumers later process it.
1. Define RabbitMQ Configuration, Declare Normal Queue with TTL and DLX
public class RabbitMQConfig {
// Delayed queue
public static final String DELAY_QUEUE = "delay.queue";
// Delayed exchange
public static final String DELAY_EXCHANGE = "delay.exchange";
// Delayed routing key
public static final String DELAY_ROUTING_KEY = "delay.routing.key";
// Dead‑letter queue
public static final String DLX_QUEUE = "dlx.queue";
// Dead‑letter exchange
public static final String DLX_EXCHANGE = "dlx.exchange";
// Dead‑letter routing key
public static final String DLX_ROUTING_KEY = "dlx.routing.key";
public static Connection createConnection() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xxxxx");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
return factory.newConnection();
}
public static void init() throws Exception {
try (Connection connection = createConnection();
Channel channel = connection.createChannel()) {
// Declare dead‑letter exchange
channel.exchangeDeclare(DLX_EXCHANGE, "direct", true);
// Declare dead‑letter queue
channel.queueDeclare(DLX_QUEUE, true, false, false, null);
channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY);
// Declare delayed exchange
channel.exchangeDeclare(DELAY_EXCHANGE, "direct", true);
// Declare delayed queue with DLX parameters
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DLX_EXCHANGE);
args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
channel.queueDeclare(DELAY_QUEUE, true, false, false, args);
// Bind delayed queue to delayed exchange
channel.queueBind(DELAY_QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY);
}
}
}2. Producer and Consumer
// Producer
@Slf4j
public class MessageProducer {
public void sendMessage(String message, int delayTime) throws Exception {
try (Connection connection = RabbitMQConfig.createConnection();
Channel channel = connection.createChannel()) {
log.info("Sending delayed message: {}, delay: {}ms", message, delayTime);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration(String.valueOf(delayTime))
.build();
channel.basicPublish(RabbitMQConfig.DELAY_EXCHANGE,
RabbitMQConfig.DELAY_ROUTING_KEY,
properties,
message.getBytes());
}
}
}
// Consumer
@Slf4j
public class MessageConsumer {
public void consumeDelayQueue() throws Exception {
Connection connection = RabbitMQConfig.createConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
log.info("Received delayed message: {}", message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(RabbitMQConfig.DLX_QUEUE, false, deliverCallback, consumerTag -> {});
}
}Testing
public class DelayQueueTest {
public static void main(String[] args) throws Exception {
RabbitMQConfig.init();
MessageProducer producer = new MessageProducer();
MessageConsumer consumer = new MessageConsumer();
new Thread(() -> {
try { consumer.consumeDelayQueue(); }
catch (Exception e) { e.printStackTrace(); }
}).start();
producer.sendMessage("Delay 5 seconds", 5000);
producer.sendMessage("Delay 10 seconds", 10000);
producer.sendMessage("Delay 15 seconds", 15000);
log.info("All messages sent, waiting for delayed processing...");
Thread.sleep(20000);
}
}Using the rabbitmq‑delay‑message‑exchange Plugin
Principle
The plugin adds a custom exchange type (e.g., x-delay-message) to RabbitMQ. Producers can specify a delay when publishing; the exchange holds the message for the given time before routing it to the bound queue.
Implementation Steps
Plugin address: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
Install the appropriate version of rabbitmq-delay-message-exchange on the RabbitMQ server and enable it.
RabbitMQ Configuration for Plugin
public class RabbitMQConfig {
public static final String DELAY_QUEUE = "plugin.delay.queue";
public static final String DELAY_EXCHANGE = "plugin.delay.exchange";
public static final String DELAY_ROUTING_KEY = "plugin.delay.routing.key";
public static Connection createConnection() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xxxxxxx");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
return factory.newConnection();
}
public static void init() throws Exception {
try (Connection connection = createConnection();
Channel channel = connection.createChannel()) {
Map<String, Object> args = new java.util.HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
channel.queueDeclare(DELAY_QUEUE, true, false, false, null);
channel.queueBind(DELAY_QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY);
}
}
}Producer and Consumer (Plugin Version)
// Producer
@Slf4j
public class MessageProducer {
public void sendMessage(String message, int delayTime) throws Exception {
try (Connection connection = RabbitMQConfig.createConnection();
Channel channel = connection.createChannel()) {
log.info("Sending delayed message: {}, delay: {}ms", message, delayTime);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.headers(new java.util.HashMap<String, Object>() {{ put("x-delay", delayTime); }})
.build();
channel.basicPublish(RabbitMQConfig.DELAY_EXCHANGE,
RabbitMQConfig.DELAY_ROUTING_KEY,
properties,
message.getBytes());
}
}
}
// Consumer
@Slf4j
public class MessageConsumer {
public void consumeDelayQueue() throws Exception {
Connection connection = RabbitMQConfig.createConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
log.info("Received delayed message: {}", message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(RabbitMQConfig.DELAY_QUEUE, false, deliverCallback, consumerTag -> {});
}
}Testing (Plugin Version)
public class DelayQueuePluginTest {
public static void main(String[] args) throws Exception {
RabbitMQConfig.init();
MessageProducer producer = new MessageProducer();
MessageConsumer consumer = new MessageConsumer();
new Thread(() -> {
try { consumer.consumeDelayQueue(); }
catch (Exception e) { log.error("Consumer error", e); }
}).start();
producer.sendMessage("Delay 5 seconds", 5000);
producer.sendMessage("Delay 10 seconds", 10000);
producer.sendMessage("Delay 15 seconds", 15000);
log.info("All messages sent, waiting for delayed processing...");
Thread.sleep(20000);
}
}Use Cases
Order timeout handling : Place orders into a delayed queue; if payment is not completed within a set time (e.g., 30 minutes), the order is automatically cancelled and inventory released.
Refund processing : After a refund request, put it into a delayed queue; if no status change occurs within a period (e.g., 72 hours), the system automatically issues the refund.
Repayment reminders : Schedule reminder messages in a delayed queue so users receive notifications shortly before the repayment deadline.
Transfer confirmation timeout : For cross‑bank transfers, if confirmation is not received within a defined window (e.g., 24 hours), the delayed message triggers a rollback and notifies the user.
Package pickup overdue alerts : If a package remains uncollected after a certain number of days, a delayed message sends a reminder to the recipient.
Delivery delay warnings : When logistics predict a later arrival, a delayed message alerts couriers or customers if the package status has not updated by the expected time.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Xuanwu Backend Tech Stack
Primarily covers fundamental Java concepts, mainstream frameworks, deep dives into underlying principles, and JVM internals.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
