Unlock Enterprise‑Grade Data Pipelines with DMS Airflow: Features, Integration & Code Samples
This article introduces DMS Airflow, an enterprise‑level data workflow orchestration platform built on Apache Airflow, covering its advanced DAG capabilities, deep DMS integration, scheduling, task dependency management, dynamic task generation, resource scaling, security features, and practical code examples for SQL, Spark, DTS, and Notebook tasks.
1. Advanced Orchestration Capabilities of Airflow
Airflow’s core concept is a Directed Acyclic Graph (DAG) that defines task dependencies and execution order. DAGs are written in Python, supporting version control, dynamic generation, and Jinja2 templating.
Python code definition : DAGs are defined as Python code, enabling code review and versioning.
Dynamic generation : DAGs can be generated based on configuration or data.
Templating : Jinja2 templates allow parameterized configurations.
1.2 Task Dependency Management
Airflow provides flexible dependency operators such as >>, <<, set_upstream(), set_downstream(), cross_downstream(), and chain() to define execution order.
1.3 Scheduling and Time Triggers
Airflow supports cron expressions, preset intervals (@daily, @hourly, @weekly), time deltas, and manual triggers. Template variables like {{ ds }}, {{ ds_nodash }}, {{ ts }}, {{ yesterday_ds }}, and {{ next_ds }} provide execution context.
1.4 Task State Management
Task states include None, Scheduled, Queued, Running, Success, Failed, Skipped, Retry, and Up for retry, with configurable retry policies and exponential backoff.
1.5 Data‑Aware Scheduling (Dataset)
Airflow 2.4+ introduces the Dataset concept, allowing DAGs to be triggered when a dataset is updated. Producers emit datasets, consumers depend on them.
1.6 Dynamic Task Generation
Tasks can be generated at runtime from configurations, database queries, or file lists, enabling flexible pipelines.
1.7 Task Groups and Sub‑DAGs
TaskGroup and SubDAG help organize complex task structures. Example shows an ETL group with extract, transform, and load tasks.
1.8 XCom Data Transfer
XCom enables cross‑task data passing. Example demonstrates pulling data from an upstream task and using it in a downstream task.
2. DMS Integration Features
2.1 Unified Authentication & Authorization
DMS Airflow uses DmsAuthManager for single sign‑on with DMS UC Center, mapping DMS roles to Airflow roles (Public, Viewer, User, Operator, Admin).
2.2 Integrated Services
DMS Enterprise API (SQL execution, task management)
AnalyticDB API (Spark job submission, resource management)
DTS API (data synchronization)
Notebook API (notebook resource management)
UC Center (user authentication and permission management)
2.3 Enterprise‑Level Notification
Three notification channels are provided:
DMS Notification : integrates with DMS notification center.
SLS Notification : centralized log management and query.
CloudMonitor Notification : real‑time metrics and custom alert rules.
2.4 Intelligent Resource Management
Automatic scaling adjusts worker numbers based on queue length, with configurable min/max workers, polling interval, and Kalman filter smoothing. Supports Kubernetes replica adjustments.
2.5 DAG Dynamic Refresh
The dags_refresh_plugin allows API‑triggered DAG reloads without restarting Airflow, supporting batch refresh and POP‑signature authentication.
2.6 Log Optimization
Log stack filtering ( no_stack_filter) removes exception stacks, reducing log size and improving readability.
2.7 Security Features
POP signature authentication for API calls.
Automatic DMS token refresh.
Role‑based fine‑grained permission control.
Encrypted communication for all API interactions.
3. DMS Airflow Usage Examples
3.1 SQL Task Example
from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator
from datetime import datetime
dag = DAG(
'dms_sql_example',
default_args={'start_date': datetime(2024, 1, 1)},
schedule_interval='@daily'
)
sql_task = DMSSqlOperator(
task_id='execute_sql',
instance='production_db',
database='analytics',
sql='''
SELECT COUNT(*) as total_records
FROM user_behavior_log
WHERE date = '{{ ds }}'
''',
polling_interval=10,
callback=lambda result: print(f"SQL execution completed: {result}"),
dag=dag
)3.2 Spark Compute Task Example
from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_analyticdb_spark import (
DMSAnalyticDBSparkSqlOperator,
DMSAnalyticDBSparkOperator
)
from datetime import datetime
dag = DAG(
'spark_analysis_example',
default_args={'start_date': datetime(2024, 1, 1)},
schedule_interval='@daily'
)
# Spark SQL (Warehouse mode)
spark_sql_task = DMSAnalyticDBSparkSqlOperator(
task_id='spark_sql_analysis',
cluster_id='adb-cluster-001',
resource_group='interactive-spark',
sql='''
SELECT user_id,
COUNT(*) as action_count,
SUM(amount) as total_amount
FROM user_events
WHERE date = '{{ ds }}'
GROUP BY user_id
''',
schema='analytics',
conf={'spark.sql.shuffle.partitions': 200},
execute_time_limit_in_seconds=3600,
dag=dag
)
# Spark Job (traditional mode)
spark_job_task = DMSAnalyticDBSparkOperator(
task_id='spark_batch_job',
cluster_id='adb-cluster-001',
resource_group='batch-job',
sql='your_spark_sql_here',
app_type='SQL',
app_name='daily_etl_job',
dag=dag
)3.3 Data Synchronization (DTS) Example
from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator
from datetime import datetime
dag = DAG(
'dts_sync_example',
default_args={'start_date': datetime(2024, 1, 1)},
schedule_interval='@daily'
)
dts_task = DTSLakeInjectionOperator(
task_id='sync_to_data_lake',
source_instance='source_rds',
source_database='production_db',
target_instance='target_oss',
bucket_name='data-lake-bucket',
reserve={'table_filter': ['user_*', 'order_*'], 'sync_mode': 'full'},
db_list={'include': ['analytics', 'reporting']},
polling_interval=10,
dag=dag
)3.4 Notebook Task Example
from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_notebook import DMSNotebookOperator
from datetime import datetime
dag = DAG(
'notebook_example',
default_args={'start_date': datetime(2024, 1, 1)},
schedule_interval='@daily'
)
notebook_task = DMSNotebookOperator(
task_id='run_ml_training',
file_path='notebooks/model_training.ipynb',
profile_name='ml-profile',
cluster_name='ml-cluster',
cluster_type='spark',
spec='large',
runtime_name='python3.9',
run_params={'training_date': '{{ ds }}', 'model_version': 'v2.0'},
timeout=7200,
polling_interval=10,
dag=dag
)3.5 Notification Example
from airflow import DAG
from airflow.providers.alibaba_dms.cloud.notifications.sls_notification import SLSNotifier
from airflow.providers.alibaba_dms.cloud.notifications.cloudmonitor_notification import CloudMonitorNotifier
from datetime import datetime
def notify_on_failure(context):
sls_notifier = SLSNotifier(
sls_conn_id='sls_default',
project='airflow-logs',
logstore='task-alerts',
success=False,
message=f"Task {context['task_instance'].task_id} failed"
)
sls_notifier.notify(context)
cms_notifier = CloudMonitorNotifier(
cms_conn_id='cms_default',
region='cn-hangzhou',
metric_name='TaskFailure',
event_name='TaskFailedEvent',
success=False,
message=f"Task {context['task_instance'].task_id} failed"
)
cms_notifier.notify(context)
dag = DAG(
'example_with_notifications',
default_args={
'start_date': datetime(2024, 1, 1),
'on_failure_callback': notify_on_failure
},
schedule_interval='@daily'
)3.6 Complete ETL Workflow Example
from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator
from airflow.providers.alibaba_dms.cloud.operators.dms_analyticdb_spark import DMSAnalyticDBSparkSqlOperator
from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator
from airflow.providers.alibaba_dms.cloud.notifications.sls_notification import SLSNotifier
from datetime import datetime, timedelta
def failure_notify(context):
SLSNotifier(
project='airflow-alerts',
logstore='task-failures',
success=False,
message=f"DAG {context['dag'].dag_id} failed"
).notify(context)
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': failure_notify
}
dag = DAG(
'complete_etl_pipeline',
default_args=default_args,
description='完整ETL数据管道',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'production']
)
# 1. Data sync (RDS → Data Lake)
sync_task = DTSLakeInjectionOperator(
task_id='sync_source_data',
source_instance='production_rds',
source_database='production_db',
target_instance='data_lake_oss',
bucket_name='raw-data-bucket',
reserve={'table_filter': ['user_*', 'order_*'], 'sync_mode': 'incremental'},
polling_interval=10,
dag=dag
)
# 2. Validate data with SQL
validate_task = DMSSqlOperator(
task_id='validate_data',
instance='analytics_db',
database='staging',
sql='''
SELECT COUNT(*) as total_records,
COUNT(DISTINCT user_id) as unique_users
FROM raw_user_data
WHERE date = '{{ ds }}'
''',
polling_interval=10,
dag=dag
)
# 3. Spark transformation
spark_transform_task = DMSAnalyticDBSparkSqlOperator(
task_id='spark_data_transform',
cluster_id='adb-cluster-001',
resource_group='batch-processing',
sql='''
INSERT INTO analytics.user_daily_summary
SELECT user_id, date,
COUNT(*) as event_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount
FROM staging.raw_user_data
WHERE date = '{{ ds }}'
GROUP BY user_id, date
''',
schema='analytics',
conf={'spark.sql.shuffle.partitions': 200},
execute_time_limit_in_seconds=3600,
dag=dag
)
# 4. Generate report
report_task = DMSSqlOperator(
task_id='generate_report',
instance='analytics_db',
database='analytics',
sql='''
INSERT INTO daily_reports
SELECT date,
COUNT(DISTINCT user_id) as daily_active_users,
SUM(total_amount) as daily_revenue
FROM user_daily_summary
WHERE date = '{{ ds }}'
GROUP BY date
''',
polling_interval=10,
dag=dag
)
# Define dependencies
sync_task >> validate_task >> spark_transform_task >> report_task4. Summary
DMS Airflow provides a seamless, enterprise‑grade data workflow platform with deep DMS integration, rich task types (SQL, Spark, DTS, Notebook), intelligent resource management, comprehensive monitoring, and robust security, making it ideal for ETL, analytics, machine‑learning pipelines, data synchronization, and scheduled jobs.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Alibaba Cloud Developer
Alibaba's official tech channel, featuring all of its technology innovations.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
