Big Data 23 min read

Why Use Message Queues and an Introduction to Kafka with Practical Examples

This article explains the motivations for adopting message queues, outlines core concepts and protocols, compares mainstream MQ products, and provides a detailed walkthrough of Kafka architecture, cluster setup, native Java APIs, and Spring Boot integration with extensive code examples.

Rare Earth Juejin Tech Community
Rare Earth Juejin Tech Community
Rare Earth Juejin Tech Community
Why Use Message Queues and an Introduction to Kafka with Practical Examples

Message queues decouple services, reduce coupling, improve response time, handle concurrency pressure, and increase system elasticity, making them essential for building scalable, asynchronous order‑processing systems.

The core concept of a message queue is middleware that enables communication between applications. Two dominant protocols are AMQP (Advanced Message Queuing Protocol) and JMS (Java Message Service), each with its own ecosystem.

Popular MQ products include RabbitMQ (Erlang, AMQP), ActiveMQ (Java, JMS), RocketMQ (Java, custom protocol), and Kafka (Scala/Java, high‑throughput distributed streaming). A comparison table highlights their performance, latency, and feature sets.

Kafka Overview – Kafka is an Apache open‑source distributed messaging system built on Zookeeper, offering high throughput, low latency, persistence, reliability, fault tolerance, and massive concurrency. It supports both pure messaging and stream processing.

Key Kafka characteristics:

High throughput (millions of messages per second)

Low latency (milliseconds)

Persistent storage for TB‑scale data

Replication for fault tolerance

Kafka Internal Structure

Components:

Producer – sends messages to topics

Consumer – receives messages from topics

Broker – a Kafka server instance; clusters contain multiple brokers

Topic – logical channel for a category of messages

Partition – splits a topic’s data across brokers for scalability

Replication – creates leader and follower replicas for each partition

When creating a topic, you specify the number of partitions and the replication factor; the leader handles writes and reads, while followers sync data from the leader.

Cluster Setup Example

Three broker instances are configured on ports 7000, 8000, and 9000 with distinct log directories. Example configuration snippets:

broker.id=1
listeners=PLAINTEXT://192.168.200.100:7000
advertised.listeners=PLAINTEXT://192.168.200.100:7000
log.dirs=/opt/k-cluster/log7000

Cluster start commands:

kafka-server-start.sh -daemon /opt/k-cluster/server7000.properties
kafka-server-start.sh -daemon /opt/k-cluster/server8000.properties
kafka-server-start.sh -daemon /opt/k-cluster/server9000.properties

Topic creation with replication:

kafka-topics.sh \
  --bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 \
  --create \
  --partitions 3 \
  --replication-factor 3 \
  --topic my-cluster-topic

Native Java API Example

Producer code:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class MyProducerDemo {
    public static final String TOPIC_NAME = "topic-java-client";
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.100:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>(TOPIC_NAME, "hello-kafka-from-java-client~" + i));
        }
        System.out.println("----MyProducerDemo发送完毕");
        kafkaProducer.close();
    }
}

Consumer code:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class MyConsumerDemo {
    public static final String TOPIC_NAME = "topic-java-client";
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.200.100:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "true");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            TimeUnit.SECONDS.sleep(1);
            System.out.println("....进行中");
        }
    }
}

Spring Boot Integration

Dependencies (pom.xml) include spring-kafka, hutool-all, and Lombok. Application YAML configures bootstrap servers and serializers, using JsonSerializer for object messages.

spring:
  kafka:
    bootstrap-servers: 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: consumer-group

Java configuration creates a topic programmatically:

@Configuration
public class KafkaConfig {
    @Bean
    public NewTopic springTestTopic() {
        return TopicBuilder.name("topic-spring-boot")
                .partitions(3)
                .replicas(3)
                .build();
    }
}

Message sending via KafkaTemplate:

@Resource
private KafkaTemplate<String, String> kafkaTemplate;

@Test
public void testSendMessage() {
    kafkaTemplate.send("topic-spring-boot", "hello spring boot message");
}

Message consumption with @KafkaListener:

@Component
public class KafkaMessageListener {
    @KafkaListener(topics = {"topic-spring-boot"})
    public void simpleConsumerPartition(ConsumerRecord<String, String> record) {
        System.out.println("进入simpleConsumer方法");
        System.out.printf("分区 = %d, 偏移量 = %d, key = %s, 内容 = %s, 时间戳 = %d%n",
                record.partition(), record.offset(), record.key(), record.value(), record.timestamp());
    }
}

The article also covers troubleshooting steps such as deleting the __consumer_offsets Zookeeper node when consumers fail to receive messages.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Distributed SystemsjavaBig DataKafkaSpring BootMessage Queue
Rare Earth Juejin Tech Community
Written by

Rare Earth Juejin Tech Community

Juejin, a tech community that helps developers grow.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.