Dynamic Flow Orchestration with Nacos, Docker, and SpringBoot Microservices

This article demonstrates how to build a lightweight, plug‑and‑play flow‑orchestration system for microservices by installing Nacos with Docker, configuring SpringBoot services with Kafka and Nacos, and using dynamic Nacos listeners to adjust Kafka topics at runtime without redeployment.

IT Xianyu
IT Xianyu
IT Xianyu
Dynamic Flow Orchestration with Nacos, Docker, and SpringBoot Microservices

The author explains the need for a simple flow‑orchestration solution for microservices, preferring a lightweight approach over heavyweight frameworks like Spring Cloud Data Flow.

Preparation : Install Nacos using Docker.

docker pull nacos/nacos-server
docker run --env MODE=standalone --name nacos -d -p 8848:8848 nacos/nacos-server

After the container starts, access the UI at http://<i>host</i>:8848/nacos with username and password both set to nacos.

Set up three SpringBoot services that depend on Kafka and Nacos. Add the following dependencies to each service’s pom.xml:

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.1.0.RELEASE</version>
</parent>

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
  <groupId>com.alibaba.boot</groupId>
  <artifactId>nacos-config-spring-boot-starter</artifactId>
  <version>0.2.1</version>
</dependency>

Configure each service with Kafka and Nacos settings in application.yml:

spring:
  kafka:
    bootstrap-servers: kafka-server:9092
    producer:
      acks: all
    consumer:
      group-id: node1-group
      enable-auto-commit: false
nacos:
  config:
    server-addr: nacos-server:8848

In Nacos, create configuration items for each service: an input topic and a sink topic. Use the service name as groupId and the configuration name as dataId.

Read the configuration in code using Nacos annotations:

@Configuration
@NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)
@NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)
public class NacosConfig {
    @NacosValue(value = "${input:}", autoRefreshed = true)
    private String input;
    @NacosValue(value = "${sink:}", autoRefreshed = true)
    private String sink;
    public String getInput() { return input; }
    public String getSink() { return sink; }
}

Listen for configuration changes and rebuild Kafka consumers dynamically . The ConsumerManager component watches the input configuration, creates or shuts down a consumer thread when the topic changes, and forwards processed messages to the configured sink topic.

@Component
public class ConsumerManager {
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Autowired
    private NacosConfig nacosConfig;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    private String topic;
    private ExecutorService executorService;

    @NacosConfigListener(dataId = "node1-server", groupId = "input")
    public void inputListener(String input) {
        String inputTopic = nacosConfig.getInput();
        if (topic != null) {
            executorService.shutdownNow();
        }
        topic = inputTopic;
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat(topic + "-pool-%d").build();
        executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(2), threadFactory);
        executorService.execute(() -> consumer(topic));
    }

    public void consumer(String topic) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", servers);
        properties.put("enable.auto.commit", enableAutoCommit);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", groupId);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(topic));
        try {
            while (!Thread.currentThread().isInterrupted()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    String message = record.value();
                    String handleMessage = handle(message); // business logic omitted
                    kafkaTemplate.send(nacosConfig.getSink(), handleMessage);
                }
            }
            consumer.commitAsync();
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        } finally {
            try { consumer.commitSync(); } finally { consumer.close(); }
        }
    }
}

Conclusion : By leveraging Nacos for dynamic configuration and rebuilding Kafka consumers on the fly, services become plug‑and‑play; topics can be added, removed, or redirected without stopping the system, improving flexibility and fault tolerance in a microservice architecture.

JavaDockermicroservicesKafkaNacosSpringBootFlow Orchestration
IT Xianyu
Written by

IT Xianyu

We share common IT technologies (Java, Web, SQL, etc.) and practical applications of emerging software development techniques. New articles are posted daily. Follow IT Xianyu to stay ahead in tech. The IT Xianyu series is being regularly updated.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.