Implementing Custom Error Handling in Spring Cloud Stream
This tutorial demonstrates how to create a custom error‑handling flow for Spring Cloud Stream by building a simple producer‑consumer example, configuring input/output bindings, using @StreamListener to simulate a failure, and routing the exception to a @ServiceActivator fallback method, while also discussing version‑specific limitations.
Application Scenario
The previous article covered automatic retry for Spring Cloud Stream, which works for transient issues but not for logical errors that always fail. This article introduces a custom error‑handling approach to deal with such unrecoverable failures.
Hands‑on Example
Prepare a failing consumer example using the same project as before. The following code defines a Spring Boot application with a REST controller that sends a message and a listener that deliberately throws an exception.
@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@RestController
static class TestController {
@Autowired
private TestTopic testTopic;
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}
}
@Component
@Slf4j
static class TestListener {
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload);
throw new RuntimeException("Message consumer failed!");
}
}
interface TestTopic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}
}Run the application and call http://localhost:8080/sendMessage?message=hello. The listener logs the payload and then throws an exception, which triggers the error‑handling flow.
Configuration
Before starting the app, configure the physical targets (exchange or topic) and the consumer group:
spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-output.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1Custom Error Handling
Use @ServiceActivator to bind a dedicated error‑handling method to the error channel generated for the failing consumer.
@Component
@Slf4j
static class TestListener {
// same listener as above
}
@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")
public void error(Message<?> message) {
log.info("Message consumer failed, call fallback!");
// additional fallback logic can be added here
}The inputChannel name follows the pattern {destination}.{group}.errors. When the listener throws an exception, the message is routed to this method, where you can implement any fallback processing such as persisting the payload, sending alerts, or invoking compensating actions.
Note: In Spring Cloud Stream 2.0.x this approach has a known issue where some error messages may not be processed correctly. The problem is tracked in Issue #1357 and is expected to be fixed in version 2.1.0.
This technique is suitable when you have a clear, business‑specific fallback strategy. Otherwise, you may still need to rely on logging and manual post‑processing.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
