Big Data 20 min read

Unlock Enterprise‑Grade Data Pipelines with DMS Airflow: Features, Integration & Code Samples

This article introduces DMS Airflow, an enterprise‑level data workflow orchestration platform built on Apache Airflow, covering its advanced DAG capabilities, deep DMS integration, scheduling, task dependency management, dynamic task generation, resource scaling, security features, and practical code examples for SQL, Spark, DTS, and Notebook tasks.

Alibaba Cloud Developer
Alibaba Cloud Developer
Alibaba Cloud Developer
Unlock Enterprise‑Grade Data Pipelines with DMS Airflow: Features, Integration & Code Samples

1. Advanced Orchestration Capabilities of Airflow

Airflow’s core concept is a Directed Acyclic Graph (DAG) that defines task dependencies and execution order. DAGs are written in Python, supporting version control, dynamic generation, and Jinja2 templating.

Python code definition : DAGs are defined as Python code, enabling code review and versioning.

Dynamic generation : DAGs can be generated based on configuration or data.

Templating : Jinja2 templates allow parameterized configurations.

1.2 Task Dependency Management

Airflow provides flexible dependency operators such as >>, <<, set_upstream(), set_downstream(), cross_downstream(), and chain() to define execution order.

1.3 Scheduling and Time Triggers

Airflow supports cron expressions, preset intervals (@daily, @hourly, @weekly), time deltas, and manual triggers. Template variables like {{ ds }}, {{ ds_nodash }}, {{ ts }}, {{ yesterday_ds }}, and {{ next_ds }} provide execution context.

1.4 Task State Management

Task states include None, Scheduled, Queued, Running, Success, Failed, Skipped, Retry, and Up for retry, with configurable retry policies and exponential backoff.

1.5 Data‑Aware Scheduling (Dataset)

Airflow 2.4+ introduces the Dataset concept, allowing DAGs to be triggered when a dataset is updated. Producers emit datasets, consumers depend on them.

1.6 Dynamic Task Generation

Tasks can be generated at runtime from configurations, database queries, or file lists, enabling flexible pipelines.

1.7 Task Groups and Sub‑DAGs

TaskGroup and SubDAG help organize complex task structures. Example shows an ETL group with extract, transform, and load tasks.

1.8 XCom Data Transfer

XCom enables cross‑task data passing. Example demonstrates pulling data from an upstream task and using it in a downstream task.

2. DMS Integration Features

2.1 Unified Authentication & Authorization

DMS Airflow uses DmsAuthManager for single sign‑on with DMS UC Center, mapping DMS roles to Airflow roles (Public, Viewer, User, Operator, Admin).

2.2 Integrated Services

DMS Enterprise API (SQL execution, task management)

AnalyticDB API (Spark job submission, resource management)

DTS API (data synchronization)

Notebook API (notebook resource management)

UC Center (user authentication and permission management)

2.3 Enterprise‑Level Notification

Three notification channels are provided:

DMS Notification : integrates with DMS notification center.

SLS Notification : centralized log management and query.

CloudMonitor Notification : real‑time metrics and custom alert rules.

2.4 Intelligent Resource Management

Automatic scaling adjusts worker numbers based on queue length, with configurable min/max workers, polling interval, and Kalman filter smoothing. Supports Kubernetes replica adjustments.

2.5 DAG Dynamic Refresh

The dags_refresh_plugin allows API‑triggered DAG reloads without restarting Airflow, supporting batch refresh and POP‑signature authentication.

2.6 Log Optimization

Log stack filtering ( no_stack_filter) removes exception stacks, reducing log size and improving readability.

2.7 Security Features

POP signature authentication for API calls.

Automatic DMS token refresh.

Role‑based fine‑grained permission control.

Encrypted communication for all API interactions.

3. DMS Airflow Usage Examples

3.1 SQL Task Example

from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator
from datetime import datetime

dag = DAG(
    'dms_sql_example',
    default_args={'start_date': datetime(2024, 1, 1)},
    schedule_interval='@daily'
)

sql_task = DMSSqlOperator(
    task_id='execute_sql',
    instance='production_db',
    database='analytics',
    sql='''
        SELECT COUNT(*) as total_records
        FROM user_behavior_log
        WHERE date = '{{ ds }}'
    ''',
    polling_interval=10,
    callback=lambda result: print(f"SQL execution completed: {result}"),
    dag=dag
)

3.2 Spark Compute Task Example

from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_analyticdb_spark import (
    DMSAnalyticDBSparkSqlOperator,
    DMSAnalyticDBSparkOperator
)
from datetime import datetime

dag = DAG(
    'spark_analysis_example',
    default_args={'start_date': datetime(2024, 1, 1)},
    schedule_interval='@daily'
)

# Spark SQL (Warehouse mode)
spark_sql_task = DMSAnalyticDBSparkSqlOperator(
    task_id='spark_sql_analysis',
    cluster_id='adb-cluster-001',
    resource_group='interactive-spark',
    sql='''
        SELECT user_id,
               COUNT(*) as action_count,
               SUM(amount) as total_amount
        FROM user_events
        WHERE date = '{{ ds }}'
        GROUP BY user_id
    ''',
    schema='analytics',
    conf={'spark.sql.shuffle.partitions': 200},
    execute_time_limit_in_seconds=3600,
    dag=dag
)

# Spark Job (traditional mode)
spark_job_task = DMSAnalyticDBSparkOperator(
    task_id='spark_batch_job',
    cluster_id='adb-cluster-001',
    resource_group='batch-job',
    sql='your_spark_sql_here',
    app_type='SQL',
    app_name='daily_etl_job',
    dag=dag
)

3.3 Data Synchronization (DTS) Example

from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator
from datetime import datetime

dag = DAG(
    'dts_sync_example',
    default_args={'start_date': datetime(2024, 1, 1)},
    schedule_interval='@daily'
)

dts_task = DTSLakeInjectionOperator(
    task_id='sync_to_data_lake',
    source_instance='source_rds',
    source_database='production_db',
    target_instance='target_oss',
    bucket_name='data-lake-bucket',
    reserve={'table_filter': ['user_*', 'order_*'], 'sync_mode': 'full'},
    db_list={'include': ['analytics', 'reporting']},
    polling_interval=10,
    dag=dag
)

3.4 Notebook Task Example

from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_notebook import DMSNotebookOperator
from datetime import datetime

dag = DAG(
    'notebook_example',
    default_args={'start_date': datetime(2024, 1, 1)},
    schedule_interval='@daily'
)

notebook_task = DMSNotebookOperator(
    task_id='run_ml_training',
    file_path='notebooks/model_training.ipynb',
    profile_name='ml-profile',
    cluster_name='ml-cluster',
    cluster_type='spark',
    spec='large',
    runtime_name='python3.9',
    run_params={'training_date': '{{ ds }}', 'model_version': 'v2.0'},
    timeout=7200,
    polling_interval=10,
    dag=dag
)

3.5 Notification Example

from airflow import DAG
from airflow.providers.alibaba_dms.cloud.notifications.sls_notification import SLSNotifier
from airflow.providers.alibaba_dms.cloud.notifications.cloudmonitor_notification import CloudMonitorNotifier
from datetime import datetime

def notify_on_failure(context):
    sls_notifier = SLSNotifier(
        sls_conn_id='sls_default',
        project='airflow-logs',
        logstore='task-alerts',
        success=False,
        message=f"Task {context['task_instance'].task_id} failed"
    )
    sls_notifier.notify(context)

    cms_notifier = CloudMonitorNotifier(
        cms_conn_id='cms_default',
        region='cn-hangzhou',
        metric_name='TaskFailure',
        event_name='TaskFailedEvent',
        success=False,
        message=f"Task {context['task_instance'].task_id} failed"
    )
    cms_notifier.notify(context)

dag = DAG(
    'example_with_notifications',
    default_args={
        'start_date': datetime(2024, 1, 1),
        'on_failure_callback': notify_on_failure
    },
    schedule_interval='@daily'
)

3.6 Complete ETL Workflow Example

from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator
from airflow.providers.alibaba_dms.cloud.operators.dms_analyticdb_spark import DMSAnalyticDBSparkSqlOperator
from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator
from airflow.providers.alibaba_dms.cloud.notifications.sls_notification import SLSNotifier
from datetime import datetime, timedelta

def failure_notify(context):
    SLSNotifier(
        project='airflow-alerts',
        logstore='task-failures',
        success=False,
        message=f"DAG {context['dag'].dag_id} failed"
    ).notify(context)

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'on_failure_callback': failure_notify
}

dag = DAG(
    'complete_etl_pipeline',
    default_args=default_args,
    description='完整ETL数据管道',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'production']
)

# 1. Data sync (RDS → Data Lake)
sync_task = DTSLakeInjectionOperator(
    task_id='sync_source_data',
    source_instance='production_rds',
    source_database='production_db',
    target_instance='data_lake_oss',
    bucket_name='raw-data-bucket',
    reserve={'table_filter': ['user_*', 'order_*'], 'sync_mode': 'incremental'},
    polling_interval=10,
    dag=dag
)

# 2. Validate data with SQL
validate_task = DMSSqlOperator(
    task_id='validate_data',
    instance='analytics_db',
    database='staging',
    sql='''
        SELECT COUNT(*) as total_records,
               COUNT(DISTINCT user_id) as unique_users
        FROM raw_user_data
        WHERE date = '{{ ds }}'
    ''',
    polling_interval=10,
    dag=dag
)

# 3. Spark transformation
spark_transform_task = DMSAnalyticDBSparkSqlOperator(
    task_id='spark_data_transform',
    cluster_id='adb-cluster-001',
    resource_group='batch-processing',
    sql='''
        INSERT INTO analytics.user_daily_summary
        SELECT user_id, date,
               COUNT(*) as event_count,
               SUM(amount) as total_amount,
               AVG(amount) as avg_amount
        FROM staging.raw_user_data
        WHERE date = '{{ ds }}'
        GROUP BY user_id, date
    ''',
    schema='analytics',
    conf={'spark.sql.shuffle.partitions': 200},
    execute_time_limit_in_seconds=3600,
    dag=dag
)

# 4. Generate report
report_task = DMSSqlOperator(
    task_id='generate_report',
    instance='analytics_db',
    database='analytics',
    sql='''
        INSERT INTO daily_reports
        SELECT date,
               COUNT(DISTINCT user_id) as daily_active_users,
               SUM(total_amount) as daily_revenue
        FROM user_daily_summary
        WHERE date = '{{ ds }}'
        GROUP BY date
    ''',
    polling_interval=10,
    dag=dag
)

# Define dependencies
sync_task >> validate_task >> spark_transform_task >> report_task

4. Summary

DMS Airflow provides a seamless, enterprise‑grade data workflow platform with deep DMS integration, rich task types (SQL, Spark, DTS, Notebook), intelligent resource management, comprehensive monitoring, and robust security, making it ideal for ETL, analytics, machine‑learning pipelines, data synchronization, and scheduled jobs.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big Datadata pipelineETLWorkflow OrchestrationAirflowDMS
Alibaba Cloud Developer
Written by

Alibaba Cloud Developer

Alibaba's official tech channel, featuring all of its technology innovations.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.