How to Build Dynamic Microservice Flow Orchestration with Nacos, SpringBoot, and Kafka
This article explains how to create a plug‑and‑play microservice flow orchestration using Docker‑based Nacos, three SpringBoot services, and Kafka, covering installation, configuration, dynamic topic management, and runtime consumer updates for flexible data pipelines.
Preface
The author has been developing microservices and needs a lightweight flow‑orchestration solution that allows services to be plugged in and reconfigured without downtime.
Preparation
Install Nacos via Docker:
docker pull nacos/nacos-server docker run --env MODE=standalone --name nacos -d -p 8848:8848 nacos/nacos-serverAccess the UI at http://<i>host</i>:8848/nacos (username/password: nacos).
Set Up Three SpringBoot Services with Nacos and Kafka
Add the following Maven dependencies:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.1</version>
</dependency>Configuration file (application.yml):
spring:
kafka:
bootstrap-servers: kafka-server:9092
producer:
acks: all
consumer:
group-id: node1-group
enable-auto-commit: false
nacos:
config:
server-addr: nacos-server:8848Business Interpretation
Each service has an input topic and an output (sink) topic. By changing the Nacos configuration, the data flow can be re‑wired—for example, removing node2 simply requires updating node1 's sink to point to node3. This enables plug‑and‑play microservice pipelines and dynamic failover.
Nacos Configuration
1. Create Configuration
In Nacos, add two entries for each service: input and sink. Use the service name as groupId and a descriptive name as dataId.
2. Read Configuration
@Configuration
@NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)
@NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)
public class NacosConfig {
@NacosValue(value = "${input:}", autoRefreshed = true)
private String input;
@NacosValue(value = "${sink:}", autoRefreshed = true)
private String sink;
public String getInput() { return input; }
public String getSink() { return sink; }
}3. Listen for Configuration Changes
When the input topic changes, recreate the Kafka consumer to listen to the new topic.
@Component
public class ConsumerManager {
@Value("${spring.kafka.bootstrap-servers}") private String servers;
@Value("${spring.kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.group-id}") private String groupId;
@Autowired private NacosConfig nacosConfig;
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
private String topic;
private ExecutorService executorService;
@NacosConfigListener(dataId = "node1-server", groupId = "input")
public void inputListener(String input) {
String inputTopic = nacosConfig.getInput();
if (topic != null) { executorService.shutdownNow(); }
topic = inputTopic;
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(topic + "-pool-%d").build();
executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(2), tf);
executorService.execute(() -> consumer(topic));
}
public void consumer(String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("enable.auto.commit", enableAutoCommit);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
String message = record.value();
String handleMessage = handle(message); // business logic omitted
kafkaTemplate.send(nacosConfig.getSink(), handleMessage);
}
}
consumer.commitAsync();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
try { consumer.commitSync(); } finally { consumer.close(); }
}
}
}Summary
The flow‑orchestration approach lets you adjust data‑flow direction dynamically, using Nacos for configuration, SpringBoot for services, and Kafka for messaging. This pattern demonstrates a lightweight, plug‑and‑play microservice architecture that can be re‑wired at runtime without heavy frameworks.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Java Backend Technology
Focus on Java-related technologies: SSM, Spring ecosystem, microservices, MySQL, MyCat, clustering, distributed systems, middleware, Linux, networking, multithreading. Occasionally cover DevOps tools like Jenkins, Nexus, Docker, and ELK. Also share technical insights from time to time, committed to Java full-stack development!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
