Master Kafka Topic Management and Message Production on CentOS

Learn how to set up Kafka on CentOS, create and manage topics, adjust partitions, produce and consume messages using command‑line tools and Java code, and explore related utilities such as consumer groups, log inspection, and configuration tips for remote access.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Master Kafka Topic Management and Message Production on CentOS

Environment: CentOS 8.1 with Zookeeper 3.6.2, Kafka 2.7.0 (Scala 2.13) and JDK 8.

Create a topic

bin/kafka-topics.sh --create --zookeeper localhost:2182 --topic test --partitions 4 --replication-factor 2

This creates a topic named test with 4 partitions and a replication factor of 2 (one leader and one replica).

View topic information

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

The command displays details such as partition count, replication factor, leader, replicas, and ISR for each partition.

List all topics

bin/kafka-topics.sh --list --zookeeper localhost:2181

Increase partition count

bin/kafka-topics.sh --alter --zookeeper localhost:2182 --topic test --partitions 6

Check partition offsets

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1

Delete a topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

Produce messages (CLI)

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

Consume messages (CLI)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group g1

Options: --from-beginning starts consumption from the earliest offset; --group specifies the consumer group.

List consumer groups

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

Describe a consumer group

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g1 --members

Inspect log directories

bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list test

Dump a log file

bin/kafka-dump-log.sh --files /root/sf/datas/kafka/9094/test-0/00000000000000000000.log --print-data-log

Java producer example

public class KafkaProducerDemo {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.100.101.105:9092,10.100.101.105:9093,10.100.101.105:9094");
        props.put("group.id", "g1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "2000");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        int i = 0;
        for (int n = 0; n < 10; n++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("test", ++i + "", "msg " + i);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        exception.printStackTrace();
                    }
                }
            });
            TimeUnit.SECONDS.sleep(1);
        }
    }
}

Java consumer example

public class KafkaDemo {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.100.101.105:9092, 10.100.101.105:9093, 10.100.101.105:9094");
        props.put("group.id", "g1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "2000");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }
}

Remote access configuration

When Kafka runs on a different machine from your client, edit config/server.properties to enable remote connections.

Future work will cover Spring Boot integration.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

javaKafkaConsumerProducerCentOSTopic Management
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.