Backend Development 13 min read

Master Kafka Streams in Spring Boot: Real‑Time Data Processing with Code Samples

This guide walks through setting up Kafka Streams with Spring Boot 2.3, covering environment configuration, core concepts, topology design, and multiple practical examples—including message sending, listening, transformations, aggregations, filtering, branching, and multi‑field grouping—complete with full code snippets and execution results.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Master Kafka Streams in Spring Boot: Real‑Time Data Processing with Code Samples

Environment

Spring Boot 2.3.12.RELEASE, kafka_2.13-2.7.0, Zookeeper 3.6.2.

Kafka Streams Overview

Kafka introduced the Streams API in version 0.10, enabling real‑time processing and analysis of data stored in Kafka.

Stream processing works with unbounded data, continuously ingesting input and producing incremental results.

Kafka Streams is a lightweight client library that supports event‑time vs processing‑time, windowing, state management and interactive queries.

Key features include:

Simple, lightweight Java client library.

No extra dependencies beyond Kafka; leverages Kafka partitioning for scalability and ordering.

Fault‑tolerant state stores for efficient windowed joins and aggregations.

Exactly‑once semantics.

Record‑level processing with millisecond latency.

High‑level DSL and low‑level Processor API.

Stream Processing Topology

A stream represents an infinite, ordered, replayable and fault‑tolerant sequence of immutable records (key‑value pairs).

A Stream Processing Application uses Kafka Streams to define processor topologies, where each processor node transforms incoming records and may emit one or more output records.

Special processors:

Source Processor reads from one or more Kafka topics and creates the input stream.

Sink Processor writes records received from upstream processors to a specified Kafka topic.

Dependencies

<code>&lt;dependency&gt;
  &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
  &lt;artifactId&gt;spring-boot-starter-web&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
  &lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt;
  &lt;artifactId&gt;spring-kafka&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
  &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
  &lt;artifactId&gt;kafka-streams&lt;/artifactId&gt;
&lt;/dependency&gt;
</code>

Configuration

<code>server:
  port: 9090
spring:
  application:
    name: kafka-demo
  kafka:
    streams:
      application-id: ${spring.application.name}
      properties:
        spring.json.trusted.packages: '*'
    bootstrap-servers:
      - localhost:9092
      - localhost:9093
      - localhost:9094
    producer:
      acks: 1
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        spring.json.trusted.packages: '*'
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      enable-auto-commit: false
      group-id: ConsumerTest
      auto-offset-reset: latest
      properties:
        session.timeout.ms: 12000
        heartbeat.interval.ms: 3000
        max.poll.records: 100
        spring.json.trusted.packages: '*'
    listener:
      ack-mode: manual-immediate
      type: batch
      concurrency: 8
    properties:
      max.poll.interval.ms: 300000
</code>

Message Sending

<code>@Service
public class MessageSend {
  @Resource
  private KafkaTemplate<String, Message> kafkaTemplate;
  public void sendMessage2(Message message) {
    kafkaTemplate.send(new ProducerRecord<String, Message>("test", message))
      .addCallback(result -> System.out.println("执行成功..." + Thread.currentThread().getName()),
                   ex -> { System.out.println("执行失败"); ex.printStackTrace(); });
  }
}
</code>

Message Listening

<code>@KafkaListener(topics = {"test"})
public void listener2(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) {
  for (ConsumerRecord<String, Message> record : records) {
    System.out.println(this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName()
      + ", key: " + record.key() + ", 接收到消息:" + record.value()
      + ", partition: " + record.partition() + ", offset: " + record.offset());
  }
  try { TimeUnit.SECONDS.sleep(0); } catch (InterruptedException e) { e.printStackTrace(); }
  ack.acknowledge();
}
@KafkaListener(topics = {"demo"})
public void listenerDemo(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) {
  for (ConsumerRecord<String, Message> record : records) {
    System.out.println("Demo Topic: " + this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName()
      + ", key: " + record.key() + ", 接收到消息:" + record.value()
      + ", partition: " + record.partition() + ", offset: " + record.offset());
  }
  ack.acknowledge();
}
</code>

Kafka Streams Processing Examples

Message transformation and forwarding

<code>@Bean
public KStream<Object, Object> kStream(StreamsBuilder streamsBuilder) {
  KStream<Object, Object> stream = streamsBuilder.stream("test");
  stream.map((key, value) -> {
    System.out.println("原始消息内容:" + new String((byte[]) value, Charset.forName("UTF-8")));
    return new KeyValue<>(key, "{\"title\": \"123123\", \"message\": \"重新定义内容\"}".getBytes(Charset.forName("UTF-8")));
  }).to("demo");
  return stream;
}
</code>

Stream object processing

<code>@Bean
public KStream<String, Message> kStream4(StreamsBuilder streamsBuilder) {
  JsonSerde<Message> jsonSerde = new JsonSerde<>();
  JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer();
  descri.addTrustedPackages("*");
  KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
  stream.map((key, value) -> { value.setTitle("XXXXXXX"); return new KeyValue<>(key, value); })
        .to("demo", Produced.with(Serdes.String(), jsonSerde));
  return stream;
}
</code>

Grouped processing

<code>@Bean
public KStream<String, Message> kStream5(StreamsBuilder streamsBuilder) {
  JsonSerde<Message> jsonSerde = new JsonSerde<>();
  JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer();
  descri.addTrustedPackages("*");
  KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
  stream.selectKey((key, value) -> value.getOrgCode())
        .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
        .count()
        .toStream().print(Printed.toSysOut());
  return stream;
}
</code>

Aggregation

<code>@Bean
public KStream<String, Message> kStream6(StreamsBuilder streamsBuilder) {
  JsonSerde<Message> jsonSerde = new JsonSerde<>();
  JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer();
  descri.addTrustedPackages("*");
  KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
  stream.selectKey((key, value) -> value.getOrgCode())
        .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
        .aggregate(() -> 0L,
          (key, value, agg) -> { System.out.println("key = " + key + ", value = " + value + ", agg = " + agg); return agg + 1; },
          Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long()))
        .toStream().print(Printed.toSysOut());
  return stream;
}
</code>

Filter

<code>@Bean
public KStream<String, Message> kStream7(StreamsBuilder streamsBuilder) {
  JsonSerde<Message> jsonSerde = new JsonSerde<>();
  JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer();
  descri.addTrustedPackages("*");
  KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
  stream.selectKey((key, value) -> value.getOrgCode())
        .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
        .aggregate(() -> 0L,
          (key, value, agg) -> { System.out.println("key = " + key + ", value = " + value + ", agg = " + agg); return agg + 1; },
          Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long()))
        .toStream()
        .filter((key, value) -> !"2".equals(key))
        .print(Printed.toSysOut());
  return stream;
}
</code>

Branching (multiple streams)

<code>@Bean
public KStream<String, Message> kStream8(StreamsBuilder streamsBuilder) {
  JsonSerde<Message> jsonSerde = new JsonSerde<>();
  JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer();
  descri.addTrustedPackages("*");
  KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
  KStream<String, Message>[] arrStream = stream.branch(
      (key, value) -> "男".equals(value.getSex()),
      (key, value) -> "女".equals(value.getSex()));
  Stream.of(arrStream).forEach(as -> as.foreach((key, message) ->
      System.out.println(Thread.currentThread().getName() + ", key = " + key + ", message = " + message)));
  return stream;
}
</code>

Multi‑field grouping (single selectKey limitation)

<code>@Bean
public KStream<String, Message> kStreamM2(StreamsBuilder streamsBuilder) {
  JsonSerde<Message> jsonSerde = new JsonSerde<>();
  JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer();
  descri.addTrustedPackages("*");
  KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
  stream.selectKey((key, value) -> value.getTime() + " | " + value.getOrgCode())
        .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
        .count()
        .toStream().print(Printed.toSysOut());
  return stream;
}
</code>

The tutorial concludes with a complete demonstration of Kafka Streams capabilities in a Spring Boot project.

Javastream processingKafkaSpring BootKafka Streams
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.