Big Data 6 min read

How to Install Apache Airflow and Build a Simple Data Processing Pipeline

This tutorial guides you through installing Apache Airflow, initializing its database, starting the web server and scheduler, creating a Python DAG that reads, cleans, groups, and saves CSV data, configuring the DAG directory, and monitoring the pipeline via the Airflow web UI.

Test Development Learning Exchange
Test Development Learning Exchange
Test Development Learning Exchange
How to Install Apache Airflow and Build a Simple Data Processing Pipeline

Goal: learn to install and configure Apache Airflow, build a simple data processing pipeline, and schedule and monitor tasks.

Step 1: Install Apache Airflow using pip.

pip install apache-airflow

Step 2: Initialize Airflow database and configuration.

airflow db init

Step 3: Start the Airflow web server (default port 8080).

airflow webserver --port 8080

Step 4: Start the Airflow scheduler.

airflow scheduler

Step 5: Create a DAG file ( simple_data_pipeline.py ) that defines a pipeline to read a CSV, clean data, group by department and calculate mean sales, and save results.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import os

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'simple_data_pipeline',
    default_args=default_args,
    description='一个简单的数据处理管道',
    schedule_interval=timedelta(days=1),
)

def read_csv_file():
    """从 CSV 文件中读取数据"""
    file_path = '/path/to/sales_data.csv'
    df = pd.read_csv(file_path, encoding='utf-8-sig')
    print(f"原始数据集: \n{df.head()}")
    return df

def clean_data(**context):
    """清洗数据(处理缺失值和重复行)"""
    df = context['ti'].xcom_pull(task_ids='read_csv_file')
    missing_values = df.isnull().sum()
    print(f"每列的缺失值数量: \n{missing_values}")
    df_cleaned = df.dropna()
    print(f"删除缺失值后的数据集: \n{df_cleaned.head()}")
    duplicates = df_cleaned.duplicated()
    print(f"重复行: \n{duplicates}")
    df_no_duplicates = df_cleaned.drop_duplicates()
    print(f"删除重复行后的数据集: \n{df_no_duplicates.head()}")
    return df_no_duplicates

def group_and_calculate_mean(**context):
    """按部门分组并计算每组的销售额均值"""
    df = context['ti'].xcom_pull(task_ids='clean_data')
    grouped_by_department = df.groupby('部门')
    mean_sales_by_department = grouped_by_department['总价'].mean()
    print(f"按 '部门' 列分组后,每组的销售额均值: \n{mean_sales_by_department}")
    return mean_sales_by_department

def save_results(**context):
    """将结果保存到新的 CSV 文件"""
    mean_sales_by_department = context['ti'].xcom_pull(task_ids='group_and_calculate_mean')
    result_path = '/path/to/mean_sales_by_department.csv'
    mean_sales_by_department.to_csv(result_path, encoding='utf-8-sig')
    print(f"结果已保存到 {result_path}")

read_csv_task = PythonOperator(
    task_id='read_csv_file',
    python_callable=read_csv_file,
    dag=dag,
)

clean_data_task = PythonOperator(
    task_id='clean_data',
    python_callable=clean_data,
    provide_context=True,
    dag=dag,
)

group_and_calculate_mean_task = PythonOperator(
    task_id='group_and_calculate_mean',
    python_callable=group_and_calculate_mean,
    provide_context=True,
    dag=dag,
)

save_results_task = PythonOperator(
    task_id='save_results',
    python_callable=save_results,
    provide_context=True,
    dag=dag,
)

read_csv_task >> clean_data_task >> group_and_calculate_mean_task >> save_results_task

Step 6: Ensure the DAG directory is correctly set in Airflow's configuration so the file is detected.

Step 7: Open the Airflow web UI at http://localhost:8080 , locate the simple_data_pipeline DAG, and trigger it manually or let the scheduler run it automatically; monitor task status and logs from the UI.

Conclusion: After completing these steps you should be able to run a basic Airflow pipeline that reads, cleans, aggregates, and saves CSV data, and you can extend this knowledge to more complex workflows.

Data PipelinePythonDAGworkflowetlApache Airflow
Test Development Learning Exchange
Written by

Test Development Learning Exchange

Test Development Learning Exchange

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.