Implementing Dynamic Flow Orchestration with Nacos, SpringBoot, and Kafka

This article explains how to build a lightweight, plug‑and‑play microservice flow orchestration system using Docker‑installed Nacos, SpringBoot services with Kafka integration, and dynamic configuration listeners to adjust data pipelines without downtime.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
Implementing Dynamic Flow Orchestration with Nacos, SpringBoot, and Kafka

Microservice development often requires separate data‑processing modules, each packaged as an independent service to simplify scaling and flow orchestration.

After evaluating Spring Cloud Data Flow and finding it too heavyweight, the author proposes a simple orchestration approach based on the existing tech stack, allowing services to be inserted, removed, or re‑wired without stopping the system.

Preparation – Nacos installation

Use Docker to quickly set up Nacos:

docker pull nacos/nacos-server
docker run --env MODE=standalone --name nacos -d -p 8848:8848 nacos/nacos-server

After the container starts, access the console at http://<i>host</i>:8848/nacos (default username/password: nacos).

Creating three SpringBoot services

Add the following dependencies to each project's pom.xml:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.0.RELEASE</version>
</parent>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba.boot</groupId>
    <artifactId>nacos-config-spring-boot-starter</artifactId>
    <version>0.2.1</version>
</dependency>

Configuration file

spring:
  kafka:
    bootstrap-servers: kafka-server:9092
    producer:
      acks: all
    consumer:
      group-id: node1-group
      enable-auto-commit: false

nacos:
  config:
    server-addr: nacos-server:8848

It is recommended to add the service name to the local /etc/hosts file so that the host name (e.g., node1-server) can be used instead of an IP address.

Business interpretation

The three services form a linear data pipeline: node1 consumes from a pre‑processing topic, node2 consumes the output of node1, and node3 consumes the output of node2. By changing the sink configuration of a service, the pipeline can be re‑wired (e.g., removing node2 simply requires pointing node1 's sink to node3 's input). This enables plug‑and‑play data flows and provides fault‑tolerance by redirecting traffic when a node fails.

Nacos configuration

For each service, create two configuration items in Nacos: one for the input topic and one for the sink (output) topic. Use the service name as groupId and a descriptive name as dataId. The screenshots in the original article illustrate the exact entries.

Reading configuration in code

@Configuration
@NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)
@NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)
public class NacosConfig {
    @NacosValue(value = "${input:}", autoRefreshed = true)
    private String input;

    @NacosValue(value = "${sink:}", autoRefreshed = true)
    private String sink;

    public String getInput() { return input; }
    public String getSink() { return sink; }
}

Listening to configuration changes

@Component
public class ConsumerManager {
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Autowired
    private NacosConfig nacosConfig;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private String topic;
    private ExecutorService executorService;

    @NacosConfigListener(dataId = "node1-server", groupId = "input")
    public void inputListener(String input) {
        String inputTopic = nacosConfig.getInput();
        if (topic != null) {
            executorService.shutdownNow();
            executorService = null;
        }
        topic = inputTopic;
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat(topic + "-pool-%d").build();
        executorService = new ThreadPoolExecutor(1, 1, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(2), threadFactory);
        executorService.execute(() -> consumer(topic));
    }

    public void consumer(String topic) {
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        props.put("enable.auto.commit", enableAutoCommit);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", groupId);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        try {
            while (!Thread.currentThread().isInterrupted()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    String message = record.value();
                    String handleMessage = handle(message); // business logic omitted
                    kafkaTemplate.send(nacosConfig.getSink(), handleMessage);
                }
            }
            consumer.commitAsync();
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        } finally {
            try { consumer.commitSync(); } finally { consumer.close(); }
        }
    }
}

Summary

The overall idea of flow orchestration is to make the direction of data streams adjustable at runtime; by leveraging lightweight frameworks and dynamic configuration, developers can build flexible microservice pipelines that are easy to understand, modify, and extend.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

microservicesDynamic ConfigurationKafkaNacosSpringBootFlow Orchestration
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.