Building an Open-Source ML Pipeline – Part 1: Data Ingestion & Storage
This article walks through building the first stage of an open‑source MLOps pipeline—data ingestion and storage—by outlining requirements, selecting tools such as Argo Workflows, Minio and Great Expectations, showing how to set up a minikube cluster, and providing Python scripts and an Argo CronWorkflow to extract, transform, and load OpenAQ air‑quality data into Minio.
1. Introduction
This series attempts to assemble a basic ML pipeline that follows modern MLOps practices. The pipeline is expected to automatically retrieve data for model training and inference, validate data before inference, perform hyper‑parameter search, store models and track metrics, continuously deliver trained models, monitor models, and automate model retraining.
2. Tool Selection
Argo Workflows is used as the backbone because it can define directed‑acyclic graphs (DAGs) for data pipelines. Minio provides object storage, Great Expectations validates data, MLflow handles model storage and metric tracking, Feast serves as a feature store, Seldon Core manages model monitoring and continuous delivery, and Argo Events orchestrates automated retraining and other event‑driven dependencies. The whole stack is deployed on a minikube cluster.
3. Environment Setup
Assuming minikube, kubectl, the Argo CLI, and helm are installed, start the cluster with four CPUs and about 8 GiB of memory: minikube start --cpus 4 Deploy Minio using the Bitnami Helm chart in the mlops namespace:
kubectl create ns mlops
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install minio bitnami/minio --namespace mlopsInstall Argo Workflows in the argo namespace:
kubectl create ns argo
kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/manifests/quick-start-postgres.yamlRetrieve the login token for later use:
SECRET=$(kubectl -n argo get sa argo -o=jsonpath='{.secrets[0].name}')
ARGO_TOKEN="Bearer $(kubectl -n argo get secret $SECRET -o=jsonpath='{.data.token}' | base64 --decode)"
echo $ARGO_TOKEN4. Extract‑Transform‑Load (ETL) Pipeline
The ETL consists of three steps: fetch JSON data from the OpenAQ API, convert it to a Pandas DataFrame and drop missing values, then write the DataFrame to a Parquet file and upload it to a Minio bucket.
extract.py fetches historical air‑quality measurements for Vienna from OpenAQ and saves the raw JSON to /results.json:
import requests
from datetime import datetime, timedelta
import json
BASE_URL = "https://u50g7n0cbj.execute-api.us-east-1.amazonaws.com"
def healthy_connection():
r = requests.get(f"{BASE_URL}/ping")
return r.status_code == 200
def get_historical_data(start, end, data_category="pm25", all=False, limit=10000):
if all:
params = {"country":"AT", "city":"Wien", "date_from": start, "date_to": end, "limit": limit}
else:
params = {"country":"AT", "city":"Wien", "parameter": data_category, "date_from": start, "date_to": end, "limit": limit}
r = requests.get(f"{BASE_URL}/v2/measurements", params=params)
return r.json()["results"]
def main():
current_date = datetime.now()
previous_date = current_date - timedelta(days=1)
if healthy_connection():
r = get_historical_data(previous_date.date(), current_date.date(), all=True, limit=10000)
else:
raise Exception("Could not connect to OpenAQ API.")
with open("/results.json", "w") as f:
json.dump(r, f)
if __name__ == '__main__':
main()transform.py loads the JSON, normalises it into a DataFrame, drops rows with NaN, computes daily medians, and writes a Parquet file:
import pandas as pd
import json
def main():
with open('/results.json', 'r') as f:
response_data = json.load(f)
df = pd.json_normalize(response_data)
df = df.dropna()
daily_median = df.groupby(by=["date.utc", "parameter"]).median()
daily_median.to_parquet("/data.parquet")
if __name__ == '__main__':
main()load.py uploads the Parquet file to Minio using credentials from config.yaml:
import yaml
from minio import Minio
from datetime import datetime
with open("config.yaml", "r") as stream:
config = yaml.safe_load(stream)
def put_file_minio(filename):
client = Minio(
"minio.mlops.svc.cluster.local:9000",
access_key=config["MINIO_USER"],
secret_key=config["MINIO_PASSWORD"],
secure=False,
)
if not client.bucket_exists("openaq"):
client.make_bucket("openaq")
else:
print("Bucket 'openaq' already exists")
client.fput_object("openaq", f"{filename}", "/data.parquet", content_type="application/parquet")
print("File successfully uploaded!")
if __name__ == '__main__':
filename = f"data-{datetime.now().date()}.parquet"
try:
put_file_minio(filename)
except Exception as e:
print(e)5. Scheduling with Argo CronWorkflow
The workflow is scheduled to run daily at 01:00 AM. The CronWorkflow definition links the three container templates (extract, transform, load) as DAG tasks with appropriate artifact passing.
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
generateName: etl-cron-wf-
spec:
schedule: "0 1 * * *"
concurrencyPolicy: "Allow"
startingDeadlineSeconds: 0
workflowSpec:
entrypoint: extract-transform-load
templates:
- name: extract-transform-load
dag:
tasks:
- name: extract
template: extract-template
- name: transform
dependencies: [extract]
template: transform-template
arguments:
artifacts:
- name: api-result
from: "{{tasks.extract.outputs.artifacts.api-result}}"
- name: load
dependencies: [transform]
template: load-template
arguments:
artifacts:
- name: transformed-data
from: "{{tasks.transform.outputs.artifacts.transformed-data}}"
- name: extract-template
container:
image: lambertsbennett/extract
command: [python, extract.py]
outputs:
artifacts:
- name: api-result
path: /results.json
- name: transform-template
inputs:
artifacts:
- name: api-result
path: /results.json
container:
image: lambertsbennett/transform
command: [python, transform.py]
outputs:
artifacts:
- name: transformed-data
path: /data.parquet
- name: load-template
inputs:
artifacts:
- name: transformed-data
path: /data.parquet
container:
image: lambertsbennett/load
command: [python, load.py]Submit the workflow with the Argo CLI: argo -n argo cron create ETL.yaml After submission, the Argo UI shows the CronJob execution, and a Parquet file appears in the Minio bucket, confirming that the first step of the ML pipeline is functional.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code DAO
We deliver AI algorithm tutorials and the latest news, curated by a team of researchers from Peking University, Shanghai Jiao Tong University, Central South University, and leading AI companies such as Huawei, Kuaishou, and SenseTime. Join us in the AI alchemy—making life better!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
