Backend Development 9 min read

Master Kafka Topic Management & Message Flow on CentOS with Java

This guide walks through setting up Kafka on CentOS, creating and configuring topics, inspecting topic and consumer group details, adjusting partitions, and implementing Java producer and consumer applications, complete with command-line examples, code snippets, and troubleshooting tips for remote server access.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Master Kafka Topic Management & Message Flow on CentOS with Java

Environment: CentOS Linux 8.1.1911 (Core) with Zookeeper 3.6.2, Kafka 2.7.0 (Scala 2.13) and JDK 8.

Create a topic

<code>bin/kafka-topics.sh --create --zookeeper localhost:2182 --topic test --partitions 4 --replication-factor 2</code>

Creates a topic named

test

with 4 partitions and a replication factor of 2 (one leader and one replica).

View topic information

<code>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test</code>

Displays the configuration of the

test

topic. Example output is shown below.

The first line shows the topic name, partition count (4) and replication factor (2). The second line shows details for each partition, including the leader broker, replica list and in‑sync replicas (ISR).

List all topics

<code>bin/kafka-topics.sh --list --zookeeper localhost:2181</code>

Alter the number of partitions

<code>bin/kafka-topics.sh --alter --zookeeper localhost:2182 --topic test --partitions 6</code>

Note: The number of partitions can only be increased, not decreased.

Delete a topic

<code>bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test</code>

Produce messages

<code>bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test</code>

Consume messages

<code>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group g1</code>

--from-beginning starts consumption from the earliest offset; --group specifies the consumer group name.

View consumer group information

<code>bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list</code>
<code>bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g1 --members</code>

Shows each partition’s offset, leader, replica and ISR status for the specified group.

View log directories

<code>bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list test</code>

Dump a log file

<code>bin/kafka-dump-log.sh --files /root/sf/datas/kafka/9094/test-0/00000000000000000000.log --print-data-log</code>

Java code test for producing and consuming

pom.xml dependency:

<code>&lt;dependency&gt;
  &lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt;
  &lt;artifactId&gt;spring-kafka&lt;/artifactId&gt;
&lt;/dependency&gt;</code>

Producer example:

<code>public class KafkaProducerDemo {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.100.101.105:9092,10.100.101.105:9093,10.100.101.105:9094");
        props.put("group.id", "g1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "2000");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        int i = 0;
        for (int n = 0; n < 10; n++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("test", ++i + "", "msg + " + i);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        exception.printStackTrace();
                    }
                }
            });
            TimeUnit.SECONDS.sleep(1);
        }
    }
}</code>

Consumer example:

<code>public class KafkaDemo {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.100.101.105:9092, 10.100.101.105:9093, 10.100.101.105:9094");
        props.put("group.id", "g1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "2000");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }
}</code>

Important note

If Kafka and the test program run on different machines, modify

server.properties

to enable remote access.

Next article will cover Spring Boot integration.

KafkaConsumercommand lineCentOSTopic ManagementJava Producer
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.