Cloud Native 10 min read

Build Dynamic Fan‑Out/Fan‑In DAG Workflows with Argo on ACK One

This guide explains how to use Argo Workflow on Alibaba Cloud ACK One to implement dynamic fan‑out/fan‑in DAGs, splitting large log files, running parallel map tasks, and aggregating results with a reduce step, including full YAML definitions and execution steps.

Alibaba Cloud Native
Alibaba Cloud Native
Alibaba Cloud Native
Build Dynamic Fan‑Out/Fan‑In DAG Workflows with Argo on ACK One

Fan‑out/fan‑in is a workflow pattern that splits a large task into many small parallel tasks and then aggregates their results. In a static DAG the sub‑tasks are fixed, while a dynamic DAG determines sub‑tasks at runtime, effectively acting like a MapReduce job.

Dynamic DAG use case

Large‑scale workloads such as autonomous‑driving simulation or massive data processing may require thousands of parallel sub‑tasks and tens of thousands of CPU cores. A cloud‑native workflow engine that supports dynamic DAGs can schedule these workloads efficiently.

Argo Workflow on ACK One

Alibaba Cloud ACK One offers a managed Argo Workflow service. Argo Workflow is an open‑source CNCF project that uses Kubernetes CRDs to define DAGs and runs each task in a Kubernetes Pod. The service provides on‑demand CPU scaling and automatic resource cleanup.

Step‑by‑Step Workflow Construction

Create an ACK One Argo Workflow cluster.

Mount an Alibaba Cloud OSS volume so the workflow can read and write files directly from OSS.

Define the workflow in a YAML file. The example below reads a large log file from OSS, splits it into several smaller files, runs a map task for each part to count keywords, and finally runs a reduce task to merge the counts.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dynamic-dag-map-reduce-
spec:
  entrypoint: main
  volumes:
    - name: workdir
      persistentVolumeClaim:
        claimName: pvc-oss
  arguments:
    parameters:
      - name: numParts
        value: "5"
  templates:
    - name: main
      dag:
        tasks:
          - name: split
            template: split
            arguments:
              parameters:
                - name: numParts
                  value: "{{workflow.parameters.numParts}}"
          - name: map
            template: map
            arguments:
              parameters:
                - name: partId
                  value: '{{item}}'
            depends: "split"
            withParam: '{{tasks.split.outputs.result}}'
          - name: reduce
            template: reduce
            arguments:
              parameters:
                - name: numParts
                  value: "{{workflow.parameters.numParts}}"
            depends: "map"
    - name: split
      inputs:
        parameters:
          - name: numParts
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["split.py"]
        env:
        - name: NUM_PARTS
          value: "{{inputs.parameters.numParts}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
    - name: map
      inputs:
        parameters:
          - name: partId
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["count.py"]
        env:
        - name: PART_ID
          value: "{{inputs.parameters.partId}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
    - name: reduce
      inputs:
        parameters:
          - name: numParts
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["merge.py"]
        env:
        - name: NUM_PARTS
          value: "{{inputs.parameters.numParts}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
        outputs:
          artifacts:
          - name: result
            path: /mnt/vol/result.json

The split task outputs a JSON array of part IDs, e.g. ["0", "1", "2", "3", "4"]. The map task uses withParam to iterate over this array, launching one map pod per part. The reduce task aggregates the intermediate results into a final JSON file ( result.json).

After submitting the workflow, you can monitor the DAG and execution status through the ACK One console, view intermediate files in OSS, and retrieve the final aggregated result.

Argo workflow DAG view
Argo workflow DAG view
OSS file layout
OSS file layout

References

Argo Workflow documentation: https://argo-workflows.readthedocs.io/en/latest/

ACK One user guide: https://help.aliyun.com/zh/ack/overview-12

Example repository (log‑count): https://github.com/AliyunContainerService/argo-workflow-examples/tree/main/log-count

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

KubernetesWorkflow OrchestrationArgo WorkflowDynamic DAGFan-out Fan-in
Alibaba Cloud Native
Written by

Alibaba Cloud Native

We publish cloud-native tech news, curate in-depth content, host regular events and live streams, and share Alibaba product and user case studies. Join us to explore and share the cloud-native insights you need.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.