Master Kafka Topic Management & Message Flow on CentOS with Java
This guide walks through setting up Kafka on CentOS, creating and configuring topics, inspecting topic and consumer group details, adjusting partitions, and implementing Java producer and consumer applications, complete with command-line examples, code snippets, and troubleshooting tips for remote server access.
Environment: CentOS Linux 8.1.1911 (Core) with Zookeeper 3.6.2, Kafka 2.7.0 (Scala 2.13) and JDK 8.
Create a topic
bin/kafka-topics.sh --create --zookeeper localhost:2182 --topic test --partitions 4 --replication-factor 2Creates a topic named test with 4 partitions and a replication factor of 2 (one leader and one replica).
View topic information
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testDisplays the configuration of the test topic. Example output is shown below.
The first line shows the topic name, partition count (4) and replication factor (2). The second line shows details for each partition, including the leader broker, replica list and in‑sync replicas (ISR).
List all topics
bin/kafka-topics.sh --list --zookeeper localhost:2181Alter the number of partitions
bin/kafka-topics.sh --alter --zookeeper localhost:2182 --topic test --partitions 6Note: The number of partitions can only be increased, not decreased.
Delete a topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic testProduce messages
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic testConsume messages
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group g1--from-beginning starts consumption from the earliest offset; --group specifies the consumer group name.
View consumer group information
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g1 --membersShows each partition’s offset, leader, replica and ISR status for the specified group.
View log directories
bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list testDump a log file
bin/kafka-dump-log.sh --files /root/sf/datas/kafka/9094/test-0/00000000000000000000.log --print-data-logJava code test for producing and consuming
pom.xml dependency:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>Producer example:
public class KafkaProducerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "10.100.101.105:9092,10.100.101.105:9093,10.100.101.105:9094");
props.put("group.id", "g1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
int i = 0;
for (int n = 0; n < 10; n++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test", ++i + "", "msg + " + i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
}
}
});
TimeUnit.SECONDS.sleep(1);
}
}
}Consumer example:
public class KafkaDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "10.100.101.105:9092, 10.100.101.105:9093, 10.100.101.105:9094");
props.put("group.id", "g1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
TimeUnit.MILLISECONDS.sleep(500);
}
}
}
}Important note
If Kafka and the test program run on different machines, modify server.properties to enable remote access.
Next article will cover Spring Boot integration.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
