Eight Common Use Cases of Message Queues (MQ) with Code Examples
This article explains eight typical scenarios for using message queues—including asynchronous processing, service decoupling, traffic shaping, delayed tasks, log collection, distributed transactions, remote invocation, and broadcast notifications—providing clear explanations and Java code snippets for each case.
In daily development we often interact with message queues (MQ). This article outlines eight typical MQ usage scenarios with explanations and code examples.
1. Asynchronous Processing
MQ is commonly used to offload time‑consuming tasks such as sending SMS or email after a user registration, preventing the registration API from being blocked.
// User registration method
public void registerUser(String username, String email, String phoneNumber) {
// Save user (simplified)
userService.add(buildUser(username,email,phoneNumber));
// Build message
String registrationMessage = "User " + username + " has registered successfully.";
// Send to queue
rabbitTemplate.convertAndSend("registrationQueue", registrationMessage);
}The consumer reads the message and sends the notification:
@Service
public class NotificationService {
// Listen to the registration queue and send SMS/email
@RabbitListener(queues = "registrationQueue")
public void handleRegistrationNotification(String message) {
System.out.println("Sending registration notification: " + message);
// Send SMS
sendSms(message);
// Send email
sendEmail(message);
}
}2. Decoupling
In micro‑service architectures MQ helps decouple services. For example, a payment service publishes a message to reduce inventory instead of calling the inventory API directly.
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class PaymentService {
private DefaultMQProducer producer;
public PaymentService() throws Exception {
producer = new DefaultMQProducer("PaymentProducerGroup");
producer.setNamesrvAddr("localhost:9876"); // RocketMQ NameServer address
producer.start();
}
public void processPayment(String orderId, int quantity) throws Exception {
// Simulate payment call
boolean paymentSuccessful = callPayment(orderId, quantity);
if (paymentSuccessful) {
String messageBody = "OrderId: " + orderId + ", Quantity: " + quantity;
Message message = new Message("paymentTopic", "paymentTag", messageBody.getBytes());
producer.send(message);
}
}
} public class InventoryService {
private DefaultMQPushConsumer consumer;
public InventoryService() throws Exception {
consumer = new DefaultMQPushConsumer("InventoryConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("paymentTopic", "paymentTag");
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
String messageBody = new String(msg.getBody());
reduceStock(messageBody);
}
return null; // indicate success
});
consumer.start();
System.out.println("InventoryService started...");
}
}3. Traffic Shaping (Peak‑shaving)
During spikes such as ticket booking or flash‑sale events, MQ can absorb burst traffic and process requests at a steady rate, preventing system overload.
4. Delayed Tasks
MQ delayed queues can automatically cancel unpaid orders after a timeout. The order service sends a delayed message to RocketMQ, and a consumer processes the cancellation.
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void createOrder(Order order) {
// Save order (omitted)
long delay = order.getTimeout(); // milliseconds
// Send delayed message
rocketMQTemplate.syncSend("orderCancelTopic:delay" + delay,
MessageBuilder.withPayload(order).build(),
10000,
(int) (delay / 1000)); // RocketMQ delay level in seconds
}
} @Component
@RocketMQMessageListener(topic = "orderCancelTopic", consumerGroup = "order-cancel-consumer-group")
public class OrderCancelListener implements RocketMQListener
{
@Override
public void onMessage(Order order) {
// Cancel order logic
System.out.println("Cancelling order: " + order.getOrderId());
// ... actual cancellation code
}
}5. Log Collection
Applications can publish logs to a log‑processing system (e.g., Kafka → ELK) for centralized storage and analysis.
// Send log to Kafka topic "app-logs"
KafkaProducer
producer = new KafkaProducer<>(config);
String logMessage = "{\"level\": \"INFO\", \"message\": \"Application started\", \"timestamp\": \"2024-12-29T20:30:59\"}";
producer.send(new ProducerRecord<>("app-logs", "log-key", logMessage)); @Service
public class LogConsumer {
// Consume logs from Kafka
@KafkaListener(topics = "app-logs", groupId = "log-consumer-group")
public void consumeLog(String logMessage) {
System.out.println("Received log: " + logMessage);
}
}6. Distributed Transactions
MQ can act as a coordinator for distributed transactions by using half‑messages that are committed or rolled back after the local transaction succeeds or fails.
7. Remote Invocation
RocketMQ can be used as a transport layer for a custom remote‑call framework, providing features such as message query and high reliability required by financial scenarios.
8. Broadcast Notifications (Event‑driven Messaging)
MQ can broadcast events (e.g., order payment success) to multiple subscribers like inventory, points, and finance systems.
// Create order payment success event message
String orderEventData = "{\"orderId\": 12345, \"userId\": 67890, \"amount\": 100.0, \"event\": \"ORDER_PAYMENT_SUCCESS\"}";
Message msg = new Message("order_event_topic", "order_payment_success", orderEventData.getBytes());
producer.send(msg); // Inventory system consumer
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (Message msg : msgs) {
String eventData = new String(msg.getBody());
System.out.println("Inventory system received: " + eventData);
// updateInventory(eventData);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}); // Points system consumer
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (Message msg : msgs) {
String eventData = new String(msg.getBody());
System.out.println("Points system received: " + eventData);
// updateUserPoints(eventData);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}); // Finance system consumer
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (Message msg : msgs) {
String eventData = new String(msg.getBody());
System.out.println("Finance system received: " + eventData);
// recordPaymentTransaction(eventData);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});IT Services Circle
Delivering cutting-edge internet insights and practical learning resources. We're a passionate and principled IT media platform.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.