How to Install Apache Airflow and Build a Simple Data Processing Pipeline
This tutorial guides you through installing Apache Airflow, initializing its database, starting the web server and scheduler, creating a Python DAG that reads, cleans, groups, and saves CSV data, configuring the DAG directory, and monitoring the pipeline via the Airflow web UI.
Goal: learn to install and configure Apache Airflow, build a simple data processing pipeline, and schedule and monitor tasks.
Step 1: Install Apache Airflow using pip.
pip install apache-airflowStep 2: Initialize Airflow database and configuration.
airflow db initStep 3: Start the Airflow web server (default port 8080).
airflow webserver --port 8080Step 4: Start the Airflow scheduler.
airflow schedulerStep 5: Create a DAG file ( simple_data_pipeline.py ) that defines a pipeline to read a CSV, clean data, group by department and calculate mean sales, and save results.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import os
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'simple_data_pipeline',
default_args=default_args,
description='一个简单的数据处理管道',
schedule_interval=timedelta(days=1),
)
def read_csv_file():
"""从 CSV 文件中读取数据"""
file_path = '/path/to/sales_data.csv'
df = pd.read_csv(file_path, encoding='utf-8-sig')
print(f"原始数据集: \n{df.head()}")
return df
def clean_data(**context):
"""清洗数据(处理缺失值和重复行)"""
df = context['ti'].xcom_pull(task_ids='read_csv_file')
missing_values = df.isnull().sum()
print(f"每列的缺失值数量: \n{missing_values}")
df_cleaned = df.dropna()
print(f"删除缺失值后的数据集: \n{df_cleaned.head()}")
duplicates = df_cleaned.duplicated()
print(f"重复行: \n{duplicates}")
df_no_duplicates = df_cleaned.drop_duplicates()
print(f"删除重复行后的数据集: \n{df_no_duplicates.head()}")
return df_no_duplicates
def group_and_calculate_mean(**context):
"""按部门分组并计算每组的销售额均值"""
df = context['ti'].xcom_pull(task_ids='clean_data')
grouped_by_department = df.groupby('部门')
mean_sales_by_department = grouped_by_department['总价'].mean()
print(f"按 '部门' 列分组后,每组的销售额均值: \n{mean_sales_by_department}")
return mean_sales_by_department
def save_results(**context):
"""将结果保存到新的 CSV 文件"""
mean_sales_by_department = context['ti'].xcom_pull(task_ids='group_and_calculate_mean')
result_path = '/path/to/mean_sales_by_department.csv'
mean_sales_by_department.to_csv(result_path, encoding='utf-8-sig')
print(f"结果已保存到 {result_path}")
read_csv_task = PythonOperator(
task_id='read_csv_file',
python_callable=read_csv_file,
dag=dag,
)
clean_data_task = PythonOperator(
task_id='clean_data',
python_callable=clean_data,
provide_context=True,
dag=dag,
)
group_and_calculate_mean_task = PythonOperator(
task_id='group_and_calculate_mean',
python_callable=group_and_calculate_mean,
provide_context=True,
dag=dag,
)
save_results_task = PythonOperator(
task_id='save_results',
python_callable=save_results,
provide_context=True,
dag=dag,
)
read_csv_task >> clean_data_task >> group_and_calculate_mean_task >> save_results_taskStep 6: Ensure the DAG directory is correctly set in Airflow's configuration so the file is detected.
Step 7: Open the Airflow web UI at http://localhost:8080 , locate the simple_data_pipeline DAG, and trigger it manually or let the scheduler run it automatically; monitor task status and logs from the UI.
Conclusion: After completing these steps you should be able to run a basic Airflow pipeline that reads, cleans, aggregates, and saves CSV data, and you can extend this knowledge to more complex workflows.
Test Development Learning Exchange
Test Development Learning Exchange
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.