Dynamic Flow Orchestration with Nacos, Docker, and SpringBoot Microservices
This article demonstrates how to build a lightweight, plug‑and‑play flow‑orchestration system for microservices by installing Nacos with Docker, configuring SpringBoot services with Kafka and Nacos, and using dynamic Nacos listeners to adjust Kafka topics at runtime without redeployment.
The author explains the need for a simple flow‑orchestration solution for microservices, preferring a lightweight approach over heavyweight frameworks like Spring Cloud Data Flow.
Preparation : Install Nacos using Docker.
docker pull nacos/nacos-server docker run --env MODE=standalone --name nacos -d -p 8848:8848 nacos/nacos-serverAfter the container starts, access the UI at http://<i>host</i>:8848/nacos with username and password both set to nacos.
Set up three SpringBoot services that depend on Kafka and Nacos. Add the following dependencies to each service’s pom.xml:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.1</version>
</dependency>Configure each service with Kafka and Nacos settings in application.yml:
spring:
kafka:
bootstrap-servers: kafka-server:9092
producer:
acks: all
consumer:
group-id: node1-group
enable-auto-commit: false
nacos:
config:
server-addr: nacos-server:8848In Nacos, create configuration items for each service: an input topic and a sink topic. Use the service name as groupId and the configuration name as dataId.
Read the configuration in code using Nacos annotations:
@Configuration
@NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)
@NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)
public class NacosConfig {
@NacosValue(value = "${input:}", autoRefreshed = true)
private String input;
@NacosValue(value = "${sink:}", autoRefreshed = true)
private String sink;
public String getInput() { return input; }
public String getSink() { return sink; }
}Listen for configuration changes and rebuild Kafka consumers dynamically . The ConsumerManager component watches the input configuration, creates or shuts down a consumer thread when the topic changes, and forwards processed messages to the configured sink topic.
@Component
public class ConsumerManager {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Autowired
private NacosConfig nacosConfig;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private String topic;
private ExecutorService executorService;
@NacosConfigListener(dataId = "node1-server", groupId = "input")
public void inputListener(String input) {
String inputTopic = nacosConfig.getInput();
if (topic != null) {
executorService.shutdownNow();
}
topic = inputTopic;
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(topic + "-pool-%d").build();
executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(2), threadFactory);
executorService.execute(() -> consumer(topic));
}
public void consumer(String topic) {
Properties properties = new Properties();
properties.put("bootstrap.servers", servers);
properties.put("enable.auto.commit", enableAutoCommit);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topic));
try {
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
String message = record.value();
String handleMessage = handle(message); // business logic omitted
kafkaTemplate.send(nacosConfig.getSink(), handleMessage);
}
}
consumer.commitAsync();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
try { consumer.commitSync(); } finally { consumer.close(); }
}
}
}Conclusion : By leveraging Nacos for dynamic configuration and rebuilding Kafka consumers on the fly, services become plug‑and‑play; topics can be added, removed, or redirected without stopping the system, improving flexibility and fault tolerance in a microservice architecture.
IT Xianyu
We share common IT technologies (Java, Web, SQL, etc.) and practical applications of emerging software development techniques. New articles are posted daily. Follow IT Xianyu to stay ahead in tech. The IT Xianyu series is being regularly updated.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
