Simplify Message Routing with @StreamListener Conditions in Spring Cloud Stream
Using Spring Cloud Stream’s @StreamListener condition attribute, this guide shows how to route messages by header values, providing a full example that includes a producer sending version‑tagged messages, a consumer with two conditional listeners, required configuration, and sample log output demonstrating the simplified processing flow.
When processing messages on the same channel, developers often use header or payload checks to branch logic, which can become messy. Spring Cloud Stream’s @StreamListener annotation provides a condition attribute that lets you filter messages based on SpEL expressions, simplifying the code.
Below is a complete example that demonstrates both message production and consumption. The producer sends two messages to /sendMessage, each with a different version header ("1.0" and "2.0"). The consumer defines two listener methods, each annotated with @StreamListener and a distinct condition that matches the corresponding header value.
@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@RestController
static class TestController {
@Autowired
private TestTopic testTopic;
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message)
.setHeader("version", "1.0").build());
testTopic.output().send(MessageBuilder.withPayload(message)
.setHeader("version", "2.0").build());
return "ok";
}
}
@Slf4j
@Component
static class TestListener {
@StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")
public void receiveV1(String payload, @Header("version") String version) {
log.info("Received v1 : " + payload + ", " + version);
}
@StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")
public void receiveV2(String payload, @Header("version") String version) {
log.info("Received v2 : " + payload + ", " + version);
}
}
interface TestTopic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}Configure the physical destinations for the input and output bindings, for example:
spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-content-route
spring.cloud.stream.bindings.example-topic-output.destination=test-topicAfter starting the application and invoking http://localhost:8080/sendMessage?message=hello, the logs show two entries, each processed by the appropriate listener based on the version header.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
