Building an Open-Source ML Pipeline – Part 1: Data Ingestion & Storage

This article walks through building the first stage of an open‑source MLOps pipeline—data ingestion and storage—by outlining requirements, selecting tools such as Argo Workflows, Minio and Great Expectations, showing how to set up a minikube cluster, and providing Python scripts and an Argo CronWorkflow to extract, transform, and load OpenAQ air‑quality data into Minio.

Code DAO
Code DAO
Code DAO
Building an Open-Source ML Pipeline – Part 1: Data Ingestion & Storage

1. Introduction

This series attempts to assemble a basic ML pipeline that follows modern MLOps practices. The pipeline is expected to automatically retrieve data for model training and inference, validate data before inference, perform hyper‑parameter search, store models and track metrics, continuously deliver trained models, monitor models, and automate model retraining.

2. Tool Selection

Argo Workflows is used as the backbone because it can define directed‑acyclic graphs (DAGs) for data pipelines. Minio provides object storage, Great Expectations validates data, MLflow handles model storage and metric tracking, Feast serves as a feature store, Seldon Core manages model monitoring and continuous delivery, and Argo Events orchestrates automated retraining and other event‑driven dependencies. The whole stack is deployed on a minikube cluster.

3. Environment Setup

Assuming minikube, kubectl, the Argo CLI, and helm are installed, start the cluster with four CPUs and about 8 GiB of memory: minikube start --cpus 4 Deploy Minio using the Bitnami Helm chart in the mlops namespace:

kubectl create ns mlops
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install minio bitnami/minio --namespace mlops

Install Argo Workflows in the argo namespace:

kubectl create ns argo
kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/manifests/quick-start-postgres.yaml

Retrieve the login token for later use:

SECRET=$(kubectl -n argo get sa argo -o=jsonpath='{.secrets[0].name}')
ARGO_TOKEN="Bearer $(kubectl -n argo get secret $SECRET -o=jsonpath='{.data.token}' | base64 --decode)"
echo $ARGO_TOKEN

4. Extract‑Transform‑Load (ETL) Pipeline

The ETL consists of three steps: fetch JSON data from the OpenAQ API, convert it to a Pandas DataFrame and drop missing values, then write the DataFrame to a Parquet file and upload it to a Minio bucket.

extract.py fetches historical air‑quality measurements for Vienna from OpenAQ and saves the raw JSON to /results.json:

import requests
from datetime import datetime, timedelta
import json

BASE_URL = "https://u50g7n0cbj.execute-api.us-east-1.amazonaws.com"

def healthy_connection():
    r = requests.get(f"{BASE_URL}/ping")
    return r.status_code == 200

def get_historical_data(start, end, data_category="pm25", all=False, limit=10000):
    if all:
        params = {"country":"AT", "city":"Wien", "date_from": start, "date_to": end, "limit": limit}
    else:
        params = {"country":"AT", "city":"Wien", "parameter": data_category, "date_from": start, "date_to": end, "limit": limit}
    r = requests.get(f"{BASE_URL}/v2/measurements", params=params)
    return r.json()["results"]

def main():
    current_date = datetime.now()
    previous_date = current_date - timedelta(days=1)
    if healthy_connection():
        r = get_historical_data(previous_date.date(), current_date.date(), all=True, limit=10000)
    else:
        raise Exception("Could not connect to OpenAQ API.")
    with open("/results.json", "w") as f:
        json.dump(r, f)

if __name__ == '__main__':
    main()

transform.py loads the JSON, normalises it into a DataFrame, drops rows with NaN, computes daily medians, and writes a Parquet file:

import pandas as pd
import json

def main():
    with open('/results.json', 'r') as f:
        response_data = json.load(f)
    df = pd.json_normalize(response_data)
    df = df.dropna()
    daily_median = df.groupby(by=["date.utc", "parameter"]).median()
    daily_median.to_parquet("/data.parquet")

if __name__ == '__main__':
    main()

load.py uploads the Parquet file to Minio using credentials from config.yaml:

import yaml
from minio import Minio
from datetime import datetime

with open("config.yaml", "r") as stream:
    config = yaml.safe_load(stream)

def put_file_minio(filename):
    client = Minio(
        "minio.mlops.svc.cluster.local:9000",
        access_key=config["MINIO_USER"],
        secret_key=config["MINIO_PASSWORD"],
        secure=False,
    )
    if not client.bucket_exists("openaq"):
        client.make_bucket("openaq")
    else:
        print("Bucket 'openaq' already exists")
    client.fput_object("openaq", f"{filename}", "/data.parquet", content_type="application/parquet")
    print("File successfully uploaded!")

if __name__ == '__main__':
    filename = f"data-{datetime.now().date()}.parquet"
    try:
        put_file_minio(filename)
    except Exception as e:
        print(e)

5. Scheduling with Argo CronWorkflow

The workflow is scheduled to run daily at 01:00 AM. The CronWorkflow definition links the three container templates (extract, transform, load) as DAG tasks with appropriate artifact passing.

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  generateName: etl-cron-wf-
spec:
  schedule: "0 1 * * *"
  concurrencyPolicy: "Allow"
  startingDeadlineSeconds: 0
  workflowSpec:
    entrypoint: extract-transform-load
    templates:
    - name: extract-transform-load
      dag:
        tasks:
        - name: extract
          template: extract-template
        - name: transform
          dependencies: [extract]
          template: transform-template
          arguments:
            artifacts:
            - name: api-result
              from: "{{tasks.extract.outputs.artifacts.api-result}}"
        - name: load
          dependencies: [transform]
          template: load-template
          arguments:
            artifacts:
            - name: transformed-data
              from: "{{tasks.transform.outputs.artifacts.transformed-data}}"
    - name: extract-template
      container:
        image: lambertsbennett/extract
        command: [python, extract.py]
      outputs:
        artifacts:
        - name: api-result
          path: /results.json
    - name: transform-template
      inputs:
        artifacts:
        - name: api-result
          path: /results.json
      container:
        image: lambertsbennett/transform
        command: [python, transform.py]
      outputs:
        artifacts:
        - name: transformed-data
          path: /data.parquet
    - name: load-template
      inputs:
        artifacts:
        - name: transformed-data
          path: /data.parquet
      container:
        image: lambertsbennett/load
        command: [python, load.py]

Submit the workflow with the Argo CLI: argo -n argo cron create ETL.yaml After submission, the Argo UI shows the CronJob execution, and a Parquet file appears in the Minio bucket, confirming that the first step of the ML pipeline is functional.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

PythonKubernetesMLOpsMinioArgo WorkflowsOpenAQ
Code DAO
Written by

Code DAO

We deliver AI algorithm tutorials and the latest news, curated by a team of researchers from Peking University, Shanghai Jiao Tong University, Central South University, and leading AI companies such as Huawei, Kuaishou, and SenseTime. Join us in the AI alchemy—making life better!

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.