Why Use Message Queues and an Introduction to Kafka with Practical Examples
This article explains the motivations for adopting message queues, outlines core concepts and protocols, compares mainstream MQ products, and provides a detailed walkthrough of Kafka architecture, cluster setup, native Java APIs, and Spring Boot integration with extensive code examples.
Message queues decouple services, reduce coupling, improve response time, handle concurrency pressure, and increase system elasticity, making them essential for building scalable, asynchronous order‑processing systems.
The core concept of a message queue is middleware that enables communication between applications. Two dominant protocols are AMQP (Advanced Message Queuing Protocol) and JMS (Java Message Service), each with its own ecosystem.
Popular MQ products include RabbitMQ (Erlang, AMQP), ActiveMQ (Java, JMS), RocketMQ (Java, custom protocol), and Kafka (Scala/Java, high‑throughput distributed streaming). A comparison table highlights their performance, latency, and feature sets.
Kafka Overview – Kafka is an Apache open‑source distributed messaging system built on Zookeeper, offering high throughput, low latency, persistence, reliability, fault tolerance, and massive concurrency. It supports both pure messaging and stream processing.
Key Kafka characteristics:
High throughput (millions of messages per second)
Low latency (milliseconds)
Persistent storage for TB‑scale data
Replication for fault tolerance
Kafka Internal Structure
Components:
Producer – sends messages to topics
Consumer – receives messages from topics
Broker – a Kafka server instance; clusters contain multiple brokers
Topic – logical channel for a category of messages
Partition – splits a topic’s data across brokers for scalability
Replication – creates leader and follower replicas for each partition
When creating a topic, you specify the number of partitions and the replication factor; the leader handles writes and reads, while followers sync data from the leader.
Cluster Setup Example
Three broker instances are configured on ports 7000, 8000, and 9000 with distinct log directories. Example configuration snippets:
broker.id=1
listeners=PLAINTEXT://192.168.200.100:7000
advertised.listeners=PLAINTEXT://192.168.200.100:7000
log.dirs=/opt/k-cluster/log7000Cluster start commands:
kafka-server-start.sh -daemon /opt/k-cluster/server7000.properties
kafka-server-start.sh -daemon /opt/k-cluster/server8000.properties
kafka-server-start.sh -daemon /opt/k-cluster/server9000.propertiesTopic creation with replication:
kafka-topics.sh \
--bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 \
--create \
--partitions 3 \
--replication-factor 3 \
--topic my-cluster-topicNative Java API Example
Producer code:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyProducerDemo {
public static final String TOPIC_NAME = "topic-java-client";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.100:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>(TOPIC_NAME, "hello-kafka-from-java-client~" + i));
}
System.out.println("----MyProducerDemo发送完毕");
kafkaProducer.close();
}
}Consumer code:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class MyConsumerDemo {
public static final String TOPIC_NAME = "topic-java-client";
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.200.100:9092");
properties.setProperty("group.id", "test");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
TimeUnit.SECONDS.sleep(1);
System.out.println("....进行中");
}
}
}Spring Boot Integration
Dependencies (pom.xml) include spring-kafka, hutool-all, and Lombok. Application YAML configures bootstrap servers and serializers, using JsonSerializer for object messages.
spring:
kafka:
bootstrap-servers: 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: consumer-groupJava configuration creates a topic programmatically:
@Configuration
public class KafkaConfig {
@Bean
public NewTopic springTestTopic() {
return TopicBuilder.name("topic-spring-boot")
.partitions(3)
.replicas(3)
.build();
}
}Message sending via KafkaTemplate:
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Test
public void testSendMessage() {
kafkaTemplate.send("topic-spring-boot", "hello spring boot message");
}Message consumption with @KafkaListener:
@Component
public class KafkaMessageListener {
@KafkaListener(topics = {"topic-spring-boot"})
public void simpleConsumerPartition(ConsumerRecord<String, String> record) {
System.out.println("进入simpleConsumer方法");
System.out.printf("分区 = %d, 偏移量 = %d, key = %s, 内容 = %s, 时间戳 = %d%n",
record.partition(), record.offset(), record.key(), record.value(), record.timestamp());
}
}The article also covers troubleshooting steps such as deleting the __consumer_offsets Zookeeper node when consumers fail to receive messages.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Rare Earth Juejin Tech Community
Juejin, a tech community that helps developers grow.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
