Cloud Native 5 min read

Simplify Message Routing with @StreamListener Conditions in Spring Cloud Stream

Using Spring Cloud Stream’s @StreamListener condition attribute, this guide shows how to route messages by header values, providing a full example that includes a producer sending version‑tagged messages, a consumer with two conditional listeners, required configuration, and sample log output demonstrating the simplified processing flow.

Programmer DD
Programmer DD
Programmer DD
Simplify Message Routing with @StreamListener Conditions in Spring Cloud Stream

When processing messages on the same channel, developers often use header or payload checks to branch logic, which can become messy. Spring Cloud Stream’s @StreamListener annotation provides a condition attribute that lets you filter messages based on SpEL expressions, simplifying the code.

Below is a complete example that demonstrates both message production and consumption. The producer sends two messages to /sendMessage, each with a different version header ("1.0" and "2.0"). The consumer defines two listener methods, each annotated with @StreamListener and a distinct condition that matches the corresponding header value.

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @RestController
    static class TestController {
        @Autowired
        private TestTopic testTopic;

        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message)
                .setHeader("version", "1.0").build());
            testTopic.output().send(MessageBuilder.withPayload(message)
                .setHeader("version", "2.0").build());
            return "ok";
        }
    }

    @Slf4j
    @Component
    static class TestListener {
        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")
        public void receiveV1(String payload, @Header("version") String version) {
            log.info("Received v1 : " + payload + ", " + version);
        }

        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")
        public void receiveV2(String payload, @Header("version") String version) {
            log.info("Received v2 : " + payload + ", " + version);
        }
    }

    interface TestTopic {
        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output();

        @Input(INPUT)
        SubscribableChannel input();
    }

Configure the physical destinations for the input and output bindings, for example:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-content-route
spring.cloud.stream.bindings.example-topic-output.destination=test-topic

After starting the application and invoking http://localhost:8080/sendMessage?message=hello, the logs show two entries, each processed by the appropriate listener based on the version header.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaConditionMessage RoutingSpring Cloud Stream@StreamListener
Programmer DD
Written by

Programmer DD

A tinkering programmer and author of "Spring Cloud Microservices in Action"

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.