Mastering Spring Cloud Stream: Build Event‑Driven Microservices with Ease
This article explains how Spring Cloud Stream leverages Spring Messaging and Spring Integration to create highly scalable, event‑driven microservices, covering core concepts, message channels, binders, code examples, and the underlying processing flow for both producers and consumers.
Spring Cloud Stream
Spring Cloud Stream is used within the Spring Cloud ecosystem to build highly scalable, event‑driven microservices and simplifies messaging development in Spring Cloud applications.
Spring Messaging
Spring Messaging is a Spring Framework module that provides a unified programming model for messages, defining a Message interface with a payload and headers, and a MessageChannel for sending messages.
package org.springframework.messaging;
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}Message channels are represented by MessageChannel with a send method.
@FunctionalInterface
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;
default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
boolean send(Message<?> message, long timeout);
}Subscribable channels ( SubscribableChannel) allow registration of MessageHandler implementations to consume messages.
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
} MessageHandlerprocesses messages:
@FunctionalInterface
public interface MessageHandler {
void handleMessage(Message<?> message) throws MessagingException;
}Spring Integration
Spring Integration extends Spring Messaging to support Enterprise Integration Patterns, introducing concepts such as MessageRoute, MessageDispatcher, Filter, Transformer, Aggregator, and Splitter, as well as concrete channel implementations like DirectChannel, ExecutorChannel, and PublishSubscribeChannel.
Simple Spring Integration Example
SubscribableChannel messageChannel = new DirectChannel();
messageChannel.subscribe(msg -> {
System.out.println("receive: " + msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());The example demonstrates constructing a subscribable channel, registering a MessageHandler, and sending a message that is consumed by the handler.
When using DirectChannel, the internal UnicastingDispatcher delivers each message to a single handler based on a round‑robin load‑balancing strategy. Using PublishSubscribeChannel replaces the dispatcher with a BroadcastingDispatcher, delivering each message to all registered handlers.
Spring Cloud Stream Architecture
SCS builds on Spring Integration and introduces concepts such as Binder, Binding, @EnableBinding, and @StreamListener. It integrates with Spring Boot Actuator ( /bindings, /channels) and externalized configuration ( BindingProperties, BinderProperties).
The Binder abstracts the underlying message middleware (e.g., RabbitMQ, Kafka, RocketMQ) and provides bindConsumer and bindProducer methods.
RocketMQ Binder Example
@SpringBootApplication
@EnableBinding({ Source.class, Sink.class })
public class SendAndReceiveApplication {
public static void main(String[] args) {
SpringApplication.run(SendAndReceiveApplication.class, args);
}
@Bean
public CustomRunner customRunner() {
return new CustomRunner();
}
public static class CustomRunner implements CommandLineRunner {
@Autowired
private Source source;
@Override
public void run(String... args) throws Exception {
int count = 5;
for (int i = 1; i <= count; i++) {
source.output().send(MessageBuilder.withPayload("msg-" + i).build());
}
}
}
} @Service
public class StreamListenerReceiveService {
@StreamListener(Sink.INPUT)
public void receiveByStreamListener1(String receiveMsg) {
System.out.println("receiveByStreamListener: " + receiveMsg);
}
}Configuration binds the logical names output and input to physical destinations (e.g., test-topic) and specifies content type and group settings.
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group1The @EnableBinding annotation creates proxy factories that expose MessageChannel beans. Sending uses the DirectChannel which is processed by AbstractMessageChannelBinder implementations that translate Spring messages to the target middleware format.
Receiving uses @StreamListener, which subscribes to the input channel via a generated StreamListenerMessageHandler. The binder converts inbound middleware messages back to Spring Message objects before invoking the listener method.
Advanced @StreamListener Usage
@StreamListener(value = Sink.INPUT, condition = "headers['index']=='1'")
public void receiveByHeader(Message msg) {
System.out.println("receive by headers['index']=='1': " + msg);
}
@StreamListener(value = Sink.INPUT, condition = "headers['index']=='9999'")
public void receivePerson(@Payload Person person) {
System.out.println("receive Person: " + person);
}
@StreamListener(Sink.INPUT)
public void receiveAllMsg(String msg) {
System.out.println("receive allMsg by StreamListener. content: " + msg);
}
@StreamListener(Sink.INPUT)
public void receiveHeaderAndMsg(@Header("index") String index, Message msg) {
System.out.println("receive by HeaderAndMsg by StreamListener. content: " + msg);
}These examples show how message headers and payloads can be filtered and bound to method parameters, similar to Spring MVC controller method handling.
Overall, Spring Cloud Stream provides a unified API for event‑driven communication, abstracting away the specifics of underlying message brokers while leveraging the powerful abstractions of Spring Messaging and Spring Integration.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
