Master Kafka Topic Management and Message Production on CentOS
Learn how to set up Kafka on CentOS, create and manage topics, adjust partitions, produce and consume messages using command‑line tools and Java code, and explore related utilities such as consumer groups, log inspection, and configuration tips for remote access.
Environment: CentOS 8.1 with Zookeeper 3.6.2, Kafka 2.7.0 (Scala 2.13) and JDK 8.
Create a topic
bin/kafka-topics.sh --create --zookeeper localhost:2182 --topic test --partitions 4 --replication-factor 2This creates a topic named test with 4 partitions and a replication factor of 2 (one leader and one replica).
View topic information
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testThe command displays details such as partition count, replication factor, leader, replicas, and ISR for each partition.
List all topics
bin/kafka-topics.sh --list --zookeeper localhost:2181Increase partition count
bin/kafka-topics.sh --alter --zookeeper localhost:2182 --topic test --partitions 6Check partition offsets
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1Delete a topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic testProduce messages (CLI)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic testConsume messages (CLI)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group g1Options: --from-beginning starts consumption from the earliest offset; --group specifies the consumer group.
List consumer groups
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --listDescribe a consumer group
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g1 --membersInspect log directories
bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list testDump a log file
bin/kafka-dump-log.sh --files /root/sf/datas/kafka/9094/test-0/00000000000000000000.log --print-data-logJava producer example
public class KafkaProducerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "10.100.101.105:9092,10.100.101.105:9093,10.100.101.105:9094");
props.put("group.id", "g1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
int i = 0;
for (int n = 0; n < 10; n++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test", ++i + "", "msg " + i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
}
}
});
TimeUnit.SECONDS.sleep(1);
}
}
}Java consumer example
public class KafkaDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "10.100.101.105:9092, 10.100.101.105:9093, 10.100.101.105:9094");
props.put("group.id", "g1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
TimeUnit.MILLISECONDS.sleep(500);
}
}
}
}Remote access configuration
When Kafka runs on a different machine from your client, edit config/server.properties to enable remote connections.
Future work will cover Spring Boot integration.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
