Simplify Argo Workflows with Hera: Python SDK Guide for Kubernetes
This tutorial explains how to use the Hera Python SDK to create, submit, and manage Argo Workflows on an ACK One Serverless Argo cluster, covering installation, DAG diamond and MapReduce examples, and practical commands for token generation and workflow execution.
Argo Workflows is an open‑source workflow engine built for Kubernetes, enabling users to define complex task sequences with dependencies for use cases such as scheduled jobs, machine learning, scientific computing, ETL, and CI/CD.
Because Argo Workflows relies on YAML for workflow definitions, newcomers often struggle with indentation rules and verbose syntax. Hera is a Python SDK that abstracts away YAML, letting data scientists and engineers author workflows directly in Python.
Why Use Hera?
Simplicity: Concise, readable Python code speeds up development.
Complex workflows: Avoid YAML syntax errors when building intricate pipelines.
Python ecosystem integration: Each function becomes a template that can interoperate with existing Python libraries.
Testability: Workflows can be exercised with standard Python testing frameworks.
Preparing the ACK One Serverless Argo Cluster
First, create an Argo workflow cluster, enable the Argo server, optionally enable public access, and obtain a cluster token using:
kubectl create token default -n defaultInstalling Hera
Installation requires a single command:
pip install hera-workflowsExample 1: Simple DAG Diamond
This example shows a "diamond" DAG where two parallel tasks (A and B) feed into a final task (D). The Python code uses Hera decorators to define scripts and builds the workflow:
# Import packages
from hera.workflows import DAG, Workflow, script
from hera.shared import global_config
import urllib3
urllib3.disable_warnings()
# Configure host and token
global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
global_config.token = "abcdefgxxxxxx"
global_config.verify_ssl = ""
@script()
def echo(message: str):
print(message)
with Workflow(generate_name="dag-diamond-", entrypoint="diamond") as w:
with DAG(name="diamond"):
A = echo(name="A", arguments={"message": "A"})
B = echo(name="B", arguments={"message": "B"})
C = echo(name="C", arguments={"message": "C"})
D = echo(name="D", arguments={"message": "D"})
A >> [B, C] >> D
w.create()Run the workflow with:
python simpleDAG.pyExample 2: Map‑Reduce Workflow
The following script demonstrates a MapReduce‑style pipeline using Hera. It defines three script‑decorated functions: split (creates input parts), map_ (processes each part), and reduce (aggregates results). The workflow connects them with a DAG.
from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, script
from hera.shared import global_config
import urllib3
urllib3.disable_warnings()
global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
global_config.token = "abcdefgxxxxxx"
global_config.verify_ssl = ""
@script(
image="python:alpine3.6",
inputs=Parameter(name="num_parts"),
outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),
)
def split(num_parts: int):
import json, os, sys
os.mkdir("/mnt/out")
part_ids = list(map(str, range(num_parts)))
for i, part_id in enumerate(part_ids, start=1):
with open(f"/mnt/out/{part_id}.json", "w") as f:
json.dump({"foo": i}, f)
json.dump(part_ids, sys.stdout)
@script(
image="python:alpine3.6",
inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json")],
outputs=OSSArtifact(name="part", path="/mnt/out/part.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json"),
)
def map_():
import json, os
os.mkdir("/mnt/out")
with open("/mnt/in/part.json") as f:
part = json.load(f)
with open("/mnt/out/part.json", "w") as f:
json.dump({"bar": part["foo"] * 2}, f)
@script(
image="python:alpine3.6",
inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"),
outputs=OSSArtifact(name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json"),
)
def reduce():
import json, os
os.mkdir("/mnt/out")
total = 0
for f in map(lambda x: open(f"/mnt/in/{x}"), os.listdir("/mnt/in")):
result = json.load(f)
total += result["bar"]
with open("/mnt/out/total.json", "w") as f:
json.dump({"total": total}, f)
with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="default", arguments=Parameter(name="num_parts", value="4")) as w:
with DAG(name="main"):
s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}"))
m = map_(with_param=s.result,
arguments=[Parameter(name="part_id", value="{{item}}"),
OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json")])
s >> m >> reduce()
w.create()Execute the workflow with:
python map-reduce.pyViewing Results
After submission, the Argo console displays the status of each task, confirming successful execution for both examples.
Conclusion
Hera bridges Python with Argo Workflows, removing the need to write cumbersome YAML files and enabling data engineers to build, test, and iterate machine‑learning pipelines efficiently. By leveraging familiar Python syntax, teams can accelerate workflow development, improve testability, and streamline deployment on Kubernetes‑based cloud platforms.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Alibaba Cloud Native
We publish cloud-native tech news, curate in-depth content, host regular events and live streams, and share Alibaba product and user case studies. Join us to explore and share the cloud-native insights you need.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
