Implementing Dynamic Flow Orchestration with Nacos, SpringBoot, and Kafka
This article explains how to build a lightweight, plug‑and‑play microservice flow orchestration system using Docker‑installed Nacos, SpringBoot services with Kafka integration, and dynamic configuration listeners to adjust data pipelines without downtime.
Microservice development often requires separate data‑processing modules, each packaged as an independent service to simplify scaling and flow orchestration.
After evaluating Spring Cloud Data Flow and finding it too heavyweight, the author proposes a simple orchestration approach based on the existing tech stack, allowing services to be inserted, removed, or re‑wired without stopping the system.
Preparation – Nacos installation
Use Docker to quickly set up Nacos:
docker pull nacos/nacos-server
docker run --env MODE=standalone --name nacos -d -p 8848:8848 nacos/nacos-serverAfter the container starts, access the console at http://<i>host</i>:8848/nacos (default username/password: nacos).
Creating three SpringBoot services
Add the following dependencies to each project's pom.xml:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.1</version>
</dependency>Configuration file
spring:
kafka:
bootstrap-servers: kafka-server:9092
producer:
acks: all
consumer:
group-id: node1-group
enable-auto-commit: false
nacos:
config:
server-addr: nacos-server:8848It is recommended to add the service name to the local /etc/hosts file so that the host name (e.g., node1-server) can be used instead of an IP address.
Business interpretation
The three services form a linear data pipeline: node1 consumes from a pre‑processing topic, node2 consumes the output of node1, and node3 consumes the output of node2. By changing the sink configuration of a service, the pipeline can be re‑wired (e.g., removing node2 simply requires pointing node1 's sink to node3 's input). This enables plug‑and‑play data flows and provides fault‑tolerance by redirecting traffic when a node fails.
Nacos configuration
For each service, create two configuration items in Nacos: one for the input topic and one for the sink (output) topic. Use the service name as groupId and a descriptive name as dataId. The screenshots in the original article illustrate the exact entries.
Reading configuration in code
@Configuration
@NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)
@NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)
public class NacosConfig {
@NacosValue(value = "${input:}", autoRefreshed = true)
private String input;
@NacosValue(value = "${sink:}", autoRefreshed = true)
private String sink;
public String getInput() { return input; }
public String getSink() { return sink; }
}Listening to configuration changes
@Component
public class ConsumerManager {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Autowired
private NacosConfig nacosConfig;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private String topic;
private ExecutorService executorService;
@NacosConfigListener(dataId = "node1-server", groupId = "input")
public void inputListener(String input) {
String inputTopic = nacosConfig.getInput();
if (topic != null) {
executorService.shutdownNow();
executorService = null;
}
topic = inputTopic;
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(topic + "-pool-%d").build();
executorService = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(2), threadFactory);
executorService.execute(() -> consumer(topic));
}
public void consumer(String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("enable.auto.commit", enableAutoCommit);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
String message = record.value();
String handleMessage = handle(message); // business logic omitted
kafkaTemplate.send(nacosConfig.getSink(), handleMessage);
}
}
consumer.commitAsync();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
try { consumer.commitSync(); } finally { consumer.close(); }
}
}
}Summary
The overall idea of flow orchestration is to make the direction of data streams adjustable at runtime; by leveraging lightweight frameworks and dynamic configuration, developers can build flexible microservice pipelines that are easy to understand, modify, and extend.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
