Simplifying Argo Workflows with the Hera Python SDK on ACK Serverless Clusters
This article introduces Argo Workflows, explains the challenges of YAML‑based definitions, and demonstrates how the Hera Python SDK can streamline workflow creation, testing, and submission on Alibaba Cloud ACK Serverless clusters with practical DAG‑diamond and Map‑Reduce examples.
Argo Workflows is an open‑source workflow engine built for Kubernetes that enables users to define, schedule, and orchestrate complex task pipelines such as cron jobs, machine‑learning training, ETL, and CI/CD.
While powerful, Argo traditionally relies on YAML for workflow specifications, which can be cumbersome due to strict indentation and verbose syntax, especially for large or intricate pipelines.
The Hera Python SDK addresses these pain points by providing a Pythonic interface for constructing and submitting Argo workflows, making the process more concise, testable, and integrated with the broader Python ecosystem.
Key advantages of Hera:
Concise, readable code that speeds up development.
Support for complex workflows without YAML syntax errors.
Seamless integration with Python libraries—each function becomes a template.
Enhanced testability using standard Python testing frameworks.
To start, an ACK One Serverless Argo cluster must be created and the access token obtained (see the referenced Alibaba Cloud guides). After the cluster is ready, install Hera with a single command:
pip install hera-workflowsBelow are two illustrative examples.
a. Simple DAG Diamond
# Import packages
from hera.workflows import DAG, Workflow, script
from hera.shared import global_config
import urllib3
urllib3.disable_warnings()
# Configure host and token
global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
global_config.token = "abcdefgxxxxxx" # replace with your token
global_config.verify_ssl = ""
@script()
def echo(message: str):
print(message)
with Workflow(generate_name="dag-diamond-", entrypoint="diamond") as w:
with DAG(name="diamond"):
A = echo(name="A", arguments={"message": "A"})
B = echo(name="B", arguments={"message": "B"})
C = echo(name="C", arguments={"message": "C"})
D = echo(name="D", arguments={"message": "D"})
A >> [B, C] >> D # B and C depend on A; D depends on B and C
w.create()Run the workflow with:
python simpleDAG.pyThe console will show successful task execution.
b. Map‑Reduce Workflow
from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, script
from hera.shared import global_config
import urllib3
urllib3.disable_warnings()
global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
global_config.token = "abcdefgxxxxxx"
global_config.verify_ssl = ""
@script()
def echo(message: str):
print(message)
@script()
def split(num_parts: int) -> None:
import json, os, sys
os.mkdir("/mnt/out")
part_ids = list(map(str, range(num_parts)))
for i, part_id in enumerate(part_ids, start=1):
with open(f"/mnt/out/{part_id}.json", "w") as f:
json.dump({"foo": i}, f)
json.dump(part_ids, sys.stdout)
@script(
image="python:alpine3.6",
inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json")],
outputs=OSSArtifact(name="part", path="/mnt/out/part.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json")
)
def map_() -> None:
import json, os
os.mkdir("/mnt/out")
with open("/mnt/in/part.json") as f:
part = json.load(f)
with open("/mnt/out/part.json", "w") as f:
json.dump({"bar": part["foo"] * 2}, f)
@script(
image="python:alpine3.6",
inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"),
outputs=OSSArtifact(name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json")
)
def reduce() -> None:
import json, os
os.mkdir("/mnt/out")
total = 0
for f in map(lambda x: open(f"/mnt/in/{x}"), os.listdir("/mnt/in")):
result = json.load(f)
total += result["bar"]
with open("/mnt/out/total.json", "w") as f:
json.dump({"total": total}, f)
with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="default", arguments=Parameter(name="num_parts", value="4")) as w:
with DAG(name="main"):
s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}"))
m = map_(with_param=s.result, arguments=[Parameter(name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json")])
s >> m >> reduce()
w.create()Submit the workflow with:
python map-reduce.pyBoth examples can be monitored via the Argo UI, where successful task completion is displayed.
The article concludes with a comparison table highlighting the simplicity, testability, and Python ecosystem integration of Hera versus raw YAML definitions, and encourages readers to join the ACK One Serverless Argo community for further best‑practice sharing.
Alibaba Cloud Infrastructure
For uninterrupted computing services
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.