Build a Dynamic Microservice Flow Orchestrator with Nacos, Kafka, and SpringBoot
This article explains how to create a lightweight, plug‑in‑style microservice flow orchestration system using Docker‑installed Nacos for configuration, three SpringBoot services with Kafka integration, and dynamic topic management to enable hot‑swapable data pipelines without heavy frameworks.
Preface
Recently I have been developing microservices and need a simple flow‑orchestration solution that allows each service to be plug‑in‑able and its input/output topics to be changed without downtime.
Preparation
Nacos Installation
Install Nacos via Docker:
docker pull nacos/nacos-server docker run --env MODE=standalone --name nacos -d -p 8848:8848 nacos/nacos-serverAccess the console at http://<i>host</i>:8848/nacos (username: nacos, password: nacos).
Three SpringBoot Services with Nacos and Kafka
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.1</version>
</dependency>Configuration file (application.yml):
spring:
kafka:
bootstrap-servers: kafka-server:9092
producer:
acks: all
consumer:
group-id: node1-group
enable-auto-commit: false
nacos:
config:
server-addr: nacos-server:8848Business Interpretation
We need three services (node1, node2, node3) that can be dynamically linked. Each service has an input topic and a sink topic. By changing the Nacos configuration, the pipeline can be re‑wired without restarting services.
node1 listens to the upstream service’s output topic.
node2 listens to node1’s output; node3 listens to node2’s output.
Removing node2 only requires changing node1’s sink to node3’s input, achieving plug‑and‑play.
If a node fails, the flow can be redirected to a temporary storage service to avoid Kafka backlog.
Nacos Configuration
1. Create configuration entries in Nacos for each service’s input and sink topics. Use the service name as groupId and a descriptive name as dataId.
2. Read configuration in code:
@Configuration
@NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)
@NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)
public class NacosConfig {
@NacosValue(value = "${input:}", autoRefreshed = true)
private String input;
@NacosValue(value = "${sink:}", autoRefreshed = true)
private String sink;
public String getInput() { return input; }
public String getSink() { return sink; }
}3. Listen for configuration changes and rebuild the consumer when the input topic changes:
@Component
public class ConsumerManager {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Autowired
private NacosConfig nacosConfig;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private String topic;
private ExecutorService executorService;
@NacosConfigListener(dataId = "node1-server", groupId = "input")
public void inputListener(String input) {
String inputTopic = nacosConfig.getInput();
if (topic != null) {
executorService.shutdownNow();
}
topic = inputTopic;
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(topic + "-pool-%d").build();
executorService = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(2), threadFactory);
executorService.execute(() -> consumer(topic));
}
public void consumer(String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("enable.auto.commit", enableAutoCommit);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));
for (ConsumerRecord<String, String> record : records) {
String message = record.value();
String handleMessage = handle(message); // business logic omitted
kafkaTemplate.send(nacosConfig.getSink(), handleMessage);
}
}
consumer.commitAsync();
} catch (Exception e) {
// log error
} finally {
try { consumer.commitSync(); } finally { consumer.close(); }
}
}
}Conclusion
The flow‑orchestration idea is to make data‑flow direction adjustable at runtime. By leveraging lightweight APIs from Nacos and Kafka, we can implement a dynamic pipeline that demonstrates the core principles of flow‑coding without heavyweight frameworks.
This approach improves service plug‑in capability and flexibility in microservice projects, even when organizational constraints limit the adoption of newer frameworks.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Java High-Performance Architecture
Sharing Java development articles and resources, including SSM architecture and the Spring ecosystem (Spring Boot, Spring Cloud, MyBatis, Dubbo, Docker), Zookeeper, Redis, architecture design, microservices, message queues, Git, etc.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
