Master Spring Boot & RabbitMQ: From Setup to Advanced Patterns
This guide walks you through installing RabbitMQ, configuring Spring Boot, building basic producers and consumers, exploring work, fanout, direct and topic patterns, and applying advanced features like confirmations, manual ACKs, TTL, dead‑letter, delayed and priority queues, plus clustering, monitoring and troubleshooting tips.
Environment Preparation
Run RabbitMQ in a Docker container (management UI enabled):
docker run -d --hostname my-rabbit --name some-rabbit \
-p 5672:5672 -p 15672:15672 \
rabbitmq:3-managementManagement UI: http://localhost:15672 (default credentials guest/guest).
Create a Spring Boot Project
Add the AMQP starter dependency:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>Configure the connection in application.yml:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /Basic Send/Receive Example
Configuration
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "demo.queue";
public static final String EXCHANGE_NAME = "demo.exchange";
public static final String ROUTING_KEY = "demo.routingkey";
@Bean
Queue queue() { return new Queue(QUEUE_NAME, true); }
@Bean
DirectExchange exchange() { return new DirectExchange(EXCHANGE_NAME); }
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
}Producer
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
RabbitMQConfig.ROUTING_KEY,
message);
System.out.println("发送消息: " + message);
}
}Consumer
@Component
public class MessageConsumer {
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void receiveMessage(String message) {
System.out.println("接收消息: " + message);
}
}Test Controller
@RestController
@RequestMapping("/rabbit")
public class RabbitMQController {
@Autowired
private MessageProducer producer;
@GetMapping("/send")
public String sendMessage(@RequestParam String message) {
producer.sendMessage(message);
return "消息发送成功: " + message;
}
}Common Messaging Patterns
Work Queue – multiple consumers compete for messages; suitable for asynchronous task processing (e.g., image transcoding).
Fanout (Publish/Subscribe) – broadcast to all bound queues; used for log distribution or system notifications.
Direct (Routing) – exact routing‑key match; useful for multi‑level log handling.
Topic – wildcard routing keys; fits e‑commerce order/payment/logistics classification.
Work Queue Configuration
@Configuration
public class WorkQueueConfig {
public static final String WORK_QUEUE = "work.queue";
@Bean
Queue workQueue() { return new Queue(WORK_QUEUE, true); }
}
@Component
public class WorkMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendWorkMessage(String msg) {
rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, msg);
}
}
@Component
public class Worker1 {
@RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)
public void worker1(String msg) { System.out.println("Worker1 接收消息: " + msg); }
}
@Component
public class Worker2 {
@RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)
public void worker2(String msg) { System.out.println("Worker2 接收消息: " + msg); }
}Fanout Configuration
@Configuration
public class FanoutConfig {
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
@Bean
FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); }
@Bean
Queue fanoutQueue1() { return new Queue(FANOUT_QUEUE1, false); }
@Bean
Queue fanoutQueue2() { return new Queue(FANOUT_QUEUE2, false); }
@Bean
Binding binding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
Binding binding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}Direct Configuration
@Configuration
public class DirectConfig {
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
public static final String ROUTING_KEY1 = "routing.key1";
public static final String ROUTING_KEY2 = "routing.key2";
@Bean
DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); }
@Bean
Queue directQueue1() { return new Queue(DIRECT_QUEUE1, false); }
@Bean
Queue directQueue2() { return new Queue(DIRECT_QUEUE2, false); }
@Bean
Binding binding1(Queue directQueue1, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue1).to(directExchange).with(ROUTING_KEY1);
}
@Bean
Binding binding2(Queue directQueue2, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue2).to(directExchange).with(ROUTING_KEY2);
}
}Topic Configuration
@Configuration
public class TopicConfig {
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String ROUTING_KEY1 = "topic.key1.*";
public static final String ROUTING_KEY2 = "topic.key2.#";
@Bean
TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); }
@Bean
Queue topicQueue1() { return new Queue(TOPIC_QUEUE1, false); }
@Bean
Queue topicQueue2() { return new Queue(TOPIC_QUEUE2, false); }
@Bean
Binding binding1(Queue topicQueue1, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue1).to(topicExchange).with(ROUTING_KEY1);
}
@Bean
Binding binding2(Queue topicQueue2, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with(ROUTING_KEY2);
}
}Advanced Features
Confirm & Return
Configure RabbitTemplate.setConfirmCallback and setReturnsCallback to guarantee delivery to the exchange and to a queue.
Manual ACK
@RabbitListener(queues = "demo.queue")
public void receive(String msg, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
// business logic
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, true);
}
}JSON Message Conversion
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}Reliability Guarantees
Durable queues and exchanges: new Queue("durable.queue", true) and new DirectExchange("durable.exchange", true, false).
Message TTL: set x-message-ttl argument (e.g., 60000 ms) when declaring a queue.
Dead‑Letter Queue (DLQ): configure x-dead-letter-exchange and x-dead-letter-routing-key arguments.
Delayed Queue (plugin): declare a CustomExchange of type x-delayed-message with x-delayed-type=direct. Send delayed messages via msg.getMessageProperties().setDelay(5000).
Priority Queue: set x-max-priority argument (e.g., 10) when creating the queue.
Practical Scenarios
Order timeout cancellation using a delayed queue (30‑minute delay).
Flash‑sale spike‑shaving with a work queue to asynchronously decrement stock.
Log collection via fanout exchange broadcasting to Elasticsearch, Kafka, or file storage.
Cluster & High Availability
Enable mirrored queues on all nodes:
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'Load‑balance broker nodes (e.g., Nginx) and configure multiple addresses in Spring Boot:
spring:
rabbitmq:
addresses: host1:5672,host2:5672,host3:5672Monitoring & Troubleshooting
RabbitMQ Management UI ( http://localhost:15672) for queues, exchanges, consumers.
Spring Boot Actuator: expose health, metrics, rabbit endpoints.
Prometheus + Grafana with rabbitmq_exporter to monitor backlog, consumption rate, and connections.
Common Issues & Solutions
Duplicate consumption – enforce idempotency (e.g., store processed message IDs in Redis).
Message backlog – increase consumer count or use lazy queues ( x-queue-mode=lazy).
Message ordering – use a single consumer per queue or enforce order in business logic.
Connection loss – tune requested-heartbeat and connection-timeout settings.
Conclusion
Select the appropriate messaging pattern (Work, Fanout, Direct, Topic, delayed, priority) based on business requirements. Ensure reliability through persistence, ACKs, DLQ, and delayed queues. Handle high concurrency with asynchronous processing, clustering, and connection pooling. Observe the system via the management UI, Actuator endpoints, and Prometheus metrics.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Ray's Galactic Tech
Practice together, never alone. We cover programming languages, development tools, learning methods, and pitfall notes. We simplify complex topics, guiding you from beginner to advanced. Weekly practical content—let's grow together!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
