Cloud Native 11 min read

Simplifying Argo Workflows with the Hera Python SDK on ACK Serverless Clusters

This article introduces Argo Workflows, explains the challenges of YAML‑based definitions, and demonstrates how the Hera Python SDK can streamline workflow creation, testing, and submission on Alibaba Cloud ACK Serverless clusters with practical DAG‑diamond and Map‑Reduce examples.

Alibaba Cloud Infrastructure
Alibaba Cloud Infrastructure
Alibaba Cloud Infrastructure
Simplifying Argo Workflows with the Hera Python SDK on ACK Serverless Clusters

Argo Workflows is an open‑source workflow engine built for Kubernetes that enables users to define, schedule, and orchestrate complex task pipelines such as cron jobs, machine‑learning training, ETL, and CI/CD.

While powerful, Argo traditionally relies on YAML for workflow specifications, which can be cumbersome due to strict indentation and verbose syntax, especially for large or intricate pipelines.

The Hera Python SDK addresses these pain points by providing a Pythonic interface for constructing and submitting Argo workflows, making the process more concise, testable, and integrated with the broader Python ecosystem.

Key advantages of Hera:

Concise, readable code that speeds up development.

Support for complex workflows without YAML syntax errors.

Seamless integration with Python libraries—each function becomes a template.

Enhanced testability using standard Python testing frameworks.

To start, an ACK One Serverless Argo cluster must be created and the access token obtained (see the referenced Alibaba Cloud guides). After the cluster is ready, install Hera with a single command:

pip install hera-workflows

Below are two illustrative examples.

a. Simple DAG Diamond

# Import packages
from hera.workflows import DAG, Workflow, script
from hera.shared import global_config
import urllib3

urllib3.disable_warnings()

# Configure host and token
global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
global_config.token = "abcdefgxxxxxx"  # replace with your token
global_config.verify_ssl = ""

@script()
def echo(message: str):
    print(message)

with Workflow(generate_name="dag-diamond-", entrypoint="diamond") as w:
    with DAG(name="diamond"):
        A = echo(name="A", arguments={"message": "A"})
        B = echo(name="B", arguments={"message": "B"})
        C = echo(name="C", arguments={"message": "C"})
        D = echo(name="D", arguments={"message": "D"})
        A >> [B, C] >> D  # B and C depend on A; D depends on B and C
    w.create()

Run the workflow with:

python simpleDAG.py

The console will show successful task execution.

b. Map‑Reduce Workflow

from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, script
from hera.shared import global_config
import urllib3

urllib3.disable_warnings()

global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
global_config.token = "abcdefgxxxxxx"
global_config.verify_ssl = ""

@script()
def echo(message: str):
    print(message)

@script()
def split(num_parts: int) -> None:
    import json, os, sys
    os.mkdir("/mnt/out")
    part_ids = list(map(str, range(num_parts)))
    for i, part_id in enumerate(part_ids, start=1):
        with open(f"/mnt/out/{part_id}.json", "w") as f:
            json.dump({"foo": i}, f)
    json.dump(part_ids, sys.stdout)

@script(
    image="python:alpine3.6",
    inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json")],
    outputs=OSSArtifact(name="part", path="/mnt/out/part.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json")
)
def map_() -> None:
    import json, os
    os.mkdir("/mnt/out")
    with open("/mnt/in/part.json") as f:
        part = json.load(f)
    with open("/mnt/out/part.json", "w") as f:
        json.dump({"bar": part["foo"] * 2}, f)

@script(
    image="python:alpine3.6",
    inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"),
    outputs=OSSArtifact(name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json")
)
def reduce() -> None:
    import json, os
    os.mkdir("/mnt/out")
    total = 0
    for f in map(lambda x: open(f"/mnt/in/{x}"), os.listdir("/mnt/in")):
        result = json.load(f)
        total += result["bar"]
    with open("/mnt/out/total.json", "w") as f:
        json.dump({"total": total}, f)

with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="default", arguments=Parameter(name="num_parts", value="4")) as w:
    with DAG(name="main"):
        s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}"))
        m = map_(with_param=s.result, arguments=[Parameter(name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json")])
        s >> m >> reduce()
    w.create()

Submit the workflow with:

python map-reduce.py

Both examples can be monitored via the Argo UI, where successful task completion is displayed.

The article concludes with a comparison table highlighting the simplicity, testability, and Python ecosystem integration of Hera versus raw YAML definitions, and encourages readers to join the ACK One Serverless Argo community for further best‑practice sharing.

Cloud NativePythonkubernetesWorkflow AutomationArgo WorkflowsHera SDK
Alibaba Cloud Infrastructure
Written by

Alibaba Cloud Infrastructure

For uninterrupted computing services

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.