Dynamic Flow Orchestration with Nacos, Kafka, and SpringBoot
This article demonstrates how to build a lightweight, plug‑and‑play microservice flow orchestration solution using Docker‑deployed Nacos for configuration, Kafka for streaming, and SpringBoot services, covering environment setup, service configuration, dynamic topic handling, and runtime configuration change listening.
In recent microservice projects we often need independent services that can be dynamically re‑wired; heavy frameworks like Spring Cloud Data Flow can be overkill, so we implement a simple flow orchestration based on our existing stack.
Preparation : Install Nacos quickly with Docker.
docker pull nacos/nacos-server docker run --env MODE=standalone --name nacos -d -p 8848:8848 nacos/nacos-serverAccess the console at http://<i>host</i>:8848/nacos using the default credentials (user: nacos, password: nacos).
Service Setup : Create three SpringBoot services, each importing Nacos and Kafka dependencies.
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.1</version>
</dependency>The application.yml contains Kafka and Nacos settings:
spring:
kafka:
bootstrap-servers: kafka-server:9092
producer:
acks: all
consumer:
group-id: node1-group
enable-auto-commit: false
nacos:
config:
server-addr: nacos-server:8848Nacos Configuration : For each service we define two topics – input and sink. In Nacos we create a configuration item with groupId as the service name and dataId as the topic name.
Example configuration screenshot is omitted here.
Reading Configuration – a Java class annotated with @NacosPropertySource loads the topics:
@Configuration
@NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)
@NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)
public class NacosConfig {
@NacosValue(value = "${input:}", autoRefreshed = true)
private String input;
@NacosValue(value = "${sink:}", autoRefreshed = true)
private String sink;
public String getInput() { return input; }
public String getSink() { return sink; }
}Listening for Configuration Changes : A component watches the input dataId and recreates the Kafka consumer when the topic changes.
@Component
public class ConsumerManager {
@Value("${spring.kafka.bootstrap-servers}") private String servers;
@Value("${spring.kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.group-id}") private String groupId;
@Autowired private NacosConfig nacosConfig;
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
private String topic;
private ExecutorService executorService;
@NacosConfigListener(dataId = "node1-server", groupId = "input")
public void inputListener(String input) {
String inputTopic = nacosConfig.getInput();
if (topic != null) { executorService.shutdownNow(); }
topic = inputTopic;
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(topic + "-pool-%d").build();
executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(2), tf);
executorService.execute(() -> consumer(topic));
}
public void consumer(String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("enable.auto.commit", enableAutoCommit);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
String message = record.value();
String handleMessage = handle(message); // business logic omitted
kafkaTemplate.send(nacosConfig.getSink(), handleMessage);
}
}
consumer.commitAsync();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
try { consumer.commitSync(); } finally { consumer.close(); }
}
}
private String handle(String msg) { return msg; } // placeholder
}Summary : By treating each microservice as a plug‑in with configurable input and output topics, we achieve dynamic flow control without heavyweight orchestration tools. Adjusting or removing a node only requires updating Nacos entries, and the services automatically reconfigure their consumers, enabling flexible, resilient data pipelines.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
