Apache Airflow Overview and Advanced Usage Examples
This article introduces Apache Airflow, explains its core concepts such as DAGs, tasks, operators, executors, and the web UI, and provides multiple practical Python code examples for Bash commands, Python functions, SQL queries, task dependencies, sensors, dynamic DAGs, SubDAGs, XCom, email alerts, and error handling.
Apache Airflow is a popular open‑source workflow management system used to create, schedule, and monitor batch jobs and data pipelines. Users define task dependencies by writing DAGs (Directed Acyclic Graphs) and can extend functionality with a rich plugin ecosystem for jobs such as Spark, Hive, and FTP/SFTP.
Core components : A DAG is the central workflow unit composed of Tasks, which are the smallest executable units and can be any operator (e.g., BashOperator, PythonOperator, Sensor). Operators are abstract classes that encapsulate specific logic. Executors run tasks (SequentialExecutor, LocalExecutor, CeleryExecutor, etc.). The Scheduler triggers tasks based on DAG dependencies, and the Web UI provides visual monitoring and management.
Executing a Bash command :
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 3, 27),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('example_bash_dag', default_args=default_args, schedule_interval=timedelta(hours=1)) as dag:
t1 = BashOperator(
task_id='run_command',
bash_command='echo "Hello World"',
)
# Running this DAG repeatedly executes the echo command at the defined intervalCalling a Python function :
from airflow.decorators import task
from airflow.utils.dates import days_ago
@task
def print_hello():
print("Hello from PythonOperator!")
with DAG('example_python_dag', schedule_interval=timedelta(hours=1), start_date=days_ago(1)) as dag:
say_hello = print_hello()
# Executes the custom Python functionScheduled SQL query :
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.models import Variable
def run_sql():
sql_query = Variable.get('my_sql_query')
return sql_query
with DAG('example_mysql_dag', ...) as dag:
execute_query = MySqlOperator(
task_id='run_mysql_query',
mysql_conn_id='your_mysql_connection',
sql=run_sql(),
)Task dependencies (multiple upstream tasks) :
from airflow.operators.python_operator import PythonOperator
def print_message(message):
print(message)
with DAG(...) as dag:
t1 = PythonOperator(task_id='print_a', python_callable=print_message, op_kwargs={'message': 'Task A'})
t2 = PythonOperator(task_id='print_b', python_callable=print_message, op_kwargs={'message': 'Task B'})
t3 = PythonOperator(task_id='print_c', python_callable=print_message, op_kwargs={'message': 'Task C'})
# t3 runs only after t1 and t2 complete
t3.set_upstream([t1, t2])External event sensor (HttpSensor) :
from airflow.sensors.http_sensor import HttpSensor
with DAG(...) as dag:
wait_for_data = HttpSensor(
task_id='wait_for_http_response',
http_conn_id='http_default',
endpoint='/api/data',
poke_interval=60,
timeout=600,
mode="reschedule",
success_states=["200"],
)
process_data = PythonOperator(...)
# process_data depends on successful HttpSensor
process_data.set_upstream(wait_for_data)Dynamic DAG construction :
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DagRun
def create_dag_structure(dag):
for i in range(5):
task_id = f'task_{i}'
DummyOperator(task_id=task_id, dag=dag)
dag = DAG('dynamic_dag_example', ...)
create_dag_structure(dag)Cross‑DAG (SubDag) reference :
from airflow import models
from airflow.operators.subdag_operator import SubDagOperator
subdag = DAG(...)
subtask = BashOperator(..., dag=subdag)
main_dag = DAG(...)
main_task = SubDagOperator(subdag=subdag, task_id='subdag_task', dag=main_dag)Using XCom to pass data :
from airflow.operators.python_operator import PythonOperator
from airflow.models import XCom
def push_value(ti, **kwargs):
ti.xcom_push(key='value', value='some data')
def pull_value(**kwargs):
last_value = kwargs['ti'].xcom_pull(task_ids='push_task', key='value')
print(f"Pulled value: {last_value}")
with DAG(...) as dag:
push_task = PythonOperator(task_id='push_task', python_callable=push_value)
pull_task = PythonOperator(task_id='pull_task', python_callable=pull_value)
pull_task.set_upstream(push_task)Email notification :
from airflow.operators.email_operator import EmailOperator
notify_email = "[email protected]"
with DAG(...) as dag:
email_alert = EmailOperator(
task_id='send_email',
to=notify_email,
subject='Airflow Alert',
html_content='Email from Airflow',
)Error handling and retries :
from airflow.operators.python_operator import PythonOperator
from airflow.exceptions import AirflowSkipException
def potentially_failing_function():
import random
if random.randint(0, 10) > 5:
raise AirflowSkipException("Randomly skipped the task")
else:
print("Task executed successfully")
with DAG(...) as dag:
failing_task = PythonOperator(
task_id='potentially_failing_task',
python_callable=potentially_failing_function,
retries=3, # number of retries
retry_delay=timedelta(minutes=5),
)Note : When using these examples, ensure your Airflow environment is correctly configured and all required connectors (e.g., MySQL) are installed. The snippets omit some imports and full DAG context; adapt them to your specific project requirements.
Test Development Learning Exchange
Test Development Learning Exchange
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.