Cloud Native 13 min read

Mastering Spring Cloud Stream: Build Event‑Driven Microservices with Ease

This article explains how Spring Cloud Stream leverages Spring Messaging and Spring Integration to create highly scalable, event‑driven microservices, covering core concepts, message channels, binders, code examples, and the underlying processing flow for both producers and consumers.

Programmer DD
Programmer DD
Programmer DD
Mastering Spring Cloud Stream: Build Event‑Driven Microservices with Ease

Spring Cloud Stream

Spring Cloud Stream is used within the Spring Cloud ecosystem to build highly scalable, event‑driven microservices and simplifies messaging development in Spring Cloud applications.

Spring Messaging

Spring Messaging is a Spring Framework module that provides a unified programming model for messages, defining a Message interface with a payload and headers, and a MessageChannel for sending messages.

package org.springframework.messaging;
public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}

Message channels are represented by MessageChannel with a send method.

@FunctionalInterface
public interface MessageChannel {
    long INDEFINITE_TIMEOUT = -1;
    default boolean send(Message<?> message) {
        return send(message, INDEFINITE_TIMEOUT);
    }
    boolean send(Message<?> message, long timeout);
}

Subscribable channels ( SubscribableChannel) allow registration of MessageHandler implementations to consume messages.

public interface SubscribableChannel extends MessageChannel {
    boolean subscribe(MessageHandler handler);
    boolean unsubscribe(MessageHandler handler);
}
MessageHandler

processes messages:

@FunctionalInterface
public interface MessageHandler {
    void handleMessage(Message<?> message) throws MessagingException;
}

Spring Integration

Spring Integration extends Spring Messaging to support Enterprise Integration Patterns, introducing concepts such as MessageRoute, MessageDispatcher, Filter, Transformer, Aggregator, and Splitter, as well as concrete channel implementations like DirectChannel, ExecutorChannel, and PublishSubscribeChannel.

Simple Spring Integration Example

SubscribableChannel messageChannel = new DirectChannel();
messageChannel.subscribe(msg -> {
    System.out.println("receive: " + msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

The example demonstrates constructing a subscribable channel, registering a MessageHandler, and sending a message that is consumed by the handler.

When using DirectChannel, the internal UnicastingDispatcher delivers each message to a single handler based on a round‑robin load‑balancing strategy. Using PublishSubscribeChannel replaces the dispatcher with a BroadcastingDispatcher, delivering each message to all registered handlers.

Spring Cloud Stream Architecture

SCS builds on Spring Integration and introduces concepts such as Binder, Binding, @EnableBinding, and @StreamListener. It integrates with Spring Boot Actuator ( /bindings, /channels) and externalized configuration ( BindingProperties, BinderProperties).

The Binder abstracts the underlying message middleware (e.g., RabbitMQ, Kafka, RocketMQ) and provides bindConsumer and bindProducer methods.

RocketMQ Binder Example

@SpringBootApplication
@EnableBinding({ Source.class, Sink.class })
public class SendAndReceiveApplication {
    public static void main(String[] args) {
        SpringApplication.run(SendAndReceiveApplication.class, args);
    }
    @Bean
    public CustomRunner customRunner() {
        return new CustomRunner();
    }
    public static class CustomRunner implements CommandLineRunner {
        @Autowired
        private Source source;
        @Override
        public void run(String... args) throws Exception {
            int count = 5;
            for (int i = 1; i <= count; i++) {
                source.output().send(MessageBuilder.withPayload("msg-" + i).build());
            }
        }
    }
}
@Service
public class StreamListenerReceiveService {
    @StreamListener(Sink.INPUT)
    public void receiveByStreamListener1(String receiveMsg) {
        System.out.println("receiveByStreamListener: " + receiveMsg);
    }
}

Configuration binds the logical names output and input to physical destinations (e.g., test-topic) and specifies content type and group settings.

spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group

spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group1

The @EnableBinding annotation creates proxy factories that expose MessageChannel beans. Sending uses the DirectChannel which is processed by AbstractMessageChannelBinder implementations that translate Spring messages to the target middleware format.

Receiving uses @StreamListener, which subscribes to the input channel via a generated StreamListenerMessageHandler. The binder converts inbound middleware messages back to Spring Message objects before invoking the listener method.

Advanced @StreamListener Usage

@StreamListener(value = Sink.INPUT, condition = "headers['index']=='1'")
public void receiveByHeader(Message msg) {
    System.out.println("receive by headers['index']=='1': " + msg);
}

@StreamListener(value = Sink.INPUT, condition = "headers['index']=='9999'")
public void receivePerson(@Payload Person person) {
    System.out.println("receive Person: " + person);
}

@StreamListener(Sink.INPUT)
public void receiveAllMsg(String msg) {
    System.out.println("receive allMsg by StreamListener. content: " + msg);
}

@StreamListener(Sink.INPUT)
public void receiveHeaderAndMsg(@Header("index") String index, Message msg) {
    System.out.println("receive by HeaderAndMsg by StreamListener. content: " + msg);
}

These examples show how message headers and payloads can be filtered and bound to method parameters, similar to Spring MVC controller method handling.

Overall, Spring Cloud Stream provides a unified API for event‑driven communication, abstracting away the specifics of underlying message brokers while leveraging the powerful abstractions of Spring Messaging and Spring Integration.

microservicesevent-drivenSpring IntegrationSpring Cloud Streamspring-messaging
Programmer DD
Written by

Programmer DD

A tinkering programmer and author of "Spring Cloud Microservices in Action"

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.