How to Achieve Exactly‑Once Processing with RabbitMQ: 6 Practical Strategies
This article presents a comprehensive, end‑to‑end solution for preventing duplicate consumption in RabbitMQ by combining architecture design, routing strategies, idempotent controls, state machines, and ACK mechanisms, and details six concrete implementation options with code examples.
Core Premise: Global Uniqueness
All deduplication approaches require that each message have a globally unique messageId generated by the producer. RabbitMQ only guarantees at‑least‑once delivery; exactly‑once must be enforced by the surrounding system.
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.messageId(UUID.randomUUID().toString())
.timestamp(new Date())
.deliveryMode(2) // persistent
.build();
channel.basicPublish(exchange, routingKey, props, message.getBytes());Do not rely on RabbitMQ‑generated IDs because they are unstable across clusters, uncontrollable on retries, and unsuitable as idempotency keys.
Solution 1 – Use a Work Queue Instead of Pub/Sub
When each message only needs a single consumer, replace the fan‑out pattern with a simple work queue.
// Producer
channel.queueDeclare("task_queue", true, false, false, null);
channel.basicPublish("", "task_queue", null, message.getBytes());
// Multiple consumers share the same queue
channel.basicConsume("task_queue", false, deliverCallback, cancelCallback);Advantages : No duplicate delivery, highest performance, simplest logic.
Disadvantages : Cannot broadcast, limited scalability, no business partitioning.
Solution 2 – Consistent‑Hash Exchange (Business‑Level Unique Consumption)
Route messages with the same business key (e.g., orderId) to the same consumer.
channel.exchangeDeclare("consistent-exchange", "x-consistent-hash", true);
channel.queueBind("queue-1", "consistent-exchange", "1");
channel.queueBind("queue-2", "consistent-exchange", "1");
String routingKey = orderId; // business key
channel.basicPublish("consistent-exchange", routingKey, null, body);Natural routing isolation – same business key ends up in the same queue.
Horizontal scaling – add more queues as needed.
Reduces duplicate probability by limiting concurrent processing of the same key.
Solution 3 – Redis Idempotent Deduplication (Primary Strategy)
Upgrade the simple SETNX approach to an atomic Lua state‑machine that tracks three states: non‑existent , processing , and processed .
-- KEYS[1] = msgKey
-- ARGV[1] = expireSeconds
if redis.call("EXISTS", KEYS[1]) == 0 then
redis.call("SET", KEYS[1], "processing", "EX", ARGV[1])
return 1
end
local status = redis.call("GET", KEYS[1])
if status == "processed" then
return 2
end
return 0Java integration:
int result = redisLua.execute(key, 300);
if (result == 1) {
// first processing
} else if (result == 2) {
channel.basicAck(tag, false); // already processed
return;
} else {
channel.basicNack(tag, false, true); // still processing
return;
}Atomicity
High concurrency safety
Lock‑free
Excellent performance
Solution 4 – Database Unique Index (Final Consistency)
Create a table with a primary key on message_id to guarantee uniqueness at the storage layer.
CREATE TABLE processed_messages (
message_id VARCHAR(64) PRIMARY KEY,
status TINYINT NOT NULL COMMENT '0-processing,1-done',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);Consumption flow:
INSERT message_id, status=0
→ process business logic
→ UPDATE status=1
→ ACKJava implementation uses a @Transactional method to insert, process, and update the status.
Crash‑recoverable consumption
Auditable and traceable
Provides a final safety net for exactly‑once guarantees
Solution 5 – Message Grouping (Logical Ordering)
Attach a groupId header to the message and combine it with consistent‑hash routing so that all messages of the same group are processed by the same consumer.
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.messageId(messageId)
.headers(Map.of("groupId", groupId))
.build();Solution 6 – RabbitMQ Plugins
Message Deduplication : MQ‑level automatic deduplication.
Sharding Plugin : Queue sharding for ultra‑high concurrency scenarios.
Suitable for very large clusters and unified MQ governance.
Must Enable Manual ACK
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manualRule: ACK only after business success and successful Redis and DB markings.
Ultimate Consumption Template
@RabbitListener(queues = "${rabbit.queue}")
public void handle(Message message, Channel channel) throws Exception {
String msgId = message.getMessageProperties().getMessageId();
long tag = message.getMessageProperties().getDeliveryTag();
// 1. Redis atomic idempotency check
int r = redisLua.execute("msg:" + msgId, 300);
if (r == 2) { channel.basicAck(tag, false); return; }
if (r == 0) { channel.basicNack(tag, false, true); return; }
try {
// 2. DB idempotent fallback
insertProcessing(msgId);
// 3. Business logic
processBusiness(message);
// 4. Update DB
markDone(msgId);
// 5. Mark Redis as processed
redisTemplate.opsForValue().set("msg:" + msgId, "processed", 1, TimeUnit.DAYS);
// 6. ACK
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, true);
}
}Solution Selection Summary
Routing Layer : Work queue or consistent‑hash to reduce natural duplicates.
Cache Layer : Redis idempotency for high‑performance concurrency control.
Storage Layer : DB unique index as the ultimate fallback.
Protocol Layer : Manual ACK to avoid message loss.
Architecture Layer : Plugins for massive‑scale governance.
Key Takeaway
RabbitMQ only delivers messages; idempotency is a business responsibility, and achieving exactly‑once is an engineering art.
Ray's Galactic Tech
Practice together, never alone. We cover programming languages, development tools, learning methods, and pitfall notes. We simplify complex topics, guiding you from beginner to advanced. Weekly practical content—let's grow together!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
