How to Let a Spring Cloud Stream Service Consume Its Own Messages (Avoid Bean Duplication Errors)
This guide explains why using identical @Input and @Output channel names in Spring Cloud Stream causes bean definition conflicts, and shows how to rename channels and configure them to share the same destination topic so a microservice can safely produce and consume its own messages.
In the previous article we solved duplicate consumption when deploying multiple instances; this continuation addresses the common question of how a microservice can also consume the messages it produces.
Common Mistake
A typical error (shown with Spring Boot 2.0.5 and Spring Cloud Finchley SR1) occurs when the same name is used for both @Output and @Input channels, leading to a bean definition conflict:
org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already existsChannel Definition
Define an interface with separate input and output channel names:
public interface TestTopic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}HTTP Producer
Create a REST controller that sends a message through the output channel:
@Slf4j
@RestController
public class TestController {
@Autowired
private TestTopic testTopic;
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}
}Message Listener
Implement a listener that receives messages from the input channel:
@Slf4j
@Component
public class TestListener {
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received: " + payload);
throw new RuntimeException("BOOM!");
}
}Application Entry
Enable binding and start the Spring Boot application:
@EnableBinding(TestTopic.class)
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
}Root Cause
The error arises because Spring Cloud Stream creates a bean for each channel name; using the same name for input and output creates two beans with identical identifiers.
Correct Approach
Step 1: Use Different Channel Names
public interface TestTopic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}Step 2: Map Both Channels to the Same Topic in configuration
spring.cloud.stream.bindings.example-topic-input.destination=aaa-topic
spring.cloud.stream.bindings.example-topic-output.destination=aaa-topicAfter these changes the application starts without errors, and accessing http://localhost:8080/sendMessage?message=hello-didi shows the message being sent and received:
INFO 32039 --- [ctor-http-nio-2] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
INFO 32039 --- [ctor-http-nio-2] o.s.a.r.c.CachingConnectionFactory : Created new connection ...
INFO 32039 --- [IafxrhkFBFI1A-1] com.didispace.stream.TestListener : Received: hello-didiThus the microservice successfully consumes its own messages.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
