Seven RabbitMQ Messaging Patterns and Their Use Cases with Java Code Examples
This article introduces seven RabbitMQ messaging patterns—Simple, Work Queues, Publish/Subscribe, Routing, Topics, RPC, and Publisher Confirms—explains their typical application scenarios, and provides complete Java code examples for each pattern along with a summary of the four exchange types.
Seven RabbitMQ Messaging Patterns and Their Application Scenarios
Simple Mode (Hello World)
A single producer sends a message to a single consumer; RabbitMQ acts as a message broker forwarding A's message to B. Use case: enqueue email messages for a mail service to consume and send.
Work Queues Mode
Distributes tasks among multiple competing consumers; one producer, many consumers. Suitable for resource‑intensive tasks that need parallel processing, such as handling many orders simultaneously.
Publish/Subscribe Mode
A single producer broadcasts messages to multiple consumers. Use case: after updating product inventory, notify multiple caches and databases via fanout exchange.
Routing Mode
Messages are delivered to queues bound with a specific routing key. Use case: only consumers interested in a particular product (e.g., iPhone 12 promotion) receive messages with that routing key.
Topics Mode
Uses pattern matching on routing keys with # (multiple words) and * (single word). Use case: a promotion consumer can receive messages for iphone , iphone12 , iphone13 , etc.
Remote Procedure Call (RPC) Mode
Enables a client to invoke a function on a remote server and wait for the result, useful for operations like order payment where a response is required.
Publisher Confirms
Provides reliable publish acknowledgments; after enabling confirms on a channel, RabbitMQ asynchronously confirms that messages have been processed on the server side, essential for high‑reliability scenarios such as wallet deductions.
Code Demonstrations
Below are Java examples using the RabbitMQ client library for each pattern.
Simple Mode
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Sender {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "simplest mode message";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent '" + message + "'");
channel.close();
connection.close();
}
} import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver {
private static final String QUEUE_NAME = "simplest_queue";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}Work Queues Mode
/* Sender for work queue */
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Sender {
private static final String QUEUE_NAME = "queue_work";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
for (int i = 0; i < 100; i++) {
String message = "work mode message" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent '" + message + "'");
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
} /* Receiver for work queue */
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver1 {
private static final String QUEUE_NAME = "queue_work";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}Publish/Subscribe Mode
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Receive1 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("[*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}Routing Mode
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver1 {
private static final String QUEUE_NAME = "queue_routing";
private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}Topics Mode
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver1 {
private static final String QUEUE_NAME = "queue_topic";
private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}Four Types of Exchanges
Direct exchange: Routes messages based on an exact routing key.
Fanout exchange: Broadcasts messages to all bound queues without routing logic.
Topic exchange: Uses pattern matching with * (single word) and # (multiple words) in routing keys.
Headers exchange: Matches messages using header attributes instead of routing keys, allowing more flexible rules.
Conclusion
Each RabbitMQ pattern has distinct strengths; developers should choose the one that best fits their specific use‑case as illustrated in the examples above.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.