DMSLindormSparkOperator

更新时间:
复制为 MD 格式

本文为您介绍DMSLindormSparkOperator的配置信息。

功能说明

Lindorm Spark引擎上执行任务,支持SQLJAR两种作业类型。

参数说明

说明

参数instancesqlregionconfigs可以使用Jinja模板

参数

类型

是否必填

说明

instance

string

DMS管理的Lindorm实例的连接(DBLink)名称。

job_type

string

作业类型,取值:

  • sql(默认值):执行Spark SQL语句。

  • jar:提交JAR包任务。

sql

string

需要执行的Spark SQL语句。当job_typesql时必填。

region

string

Region ID,跨地域调用时需指定。

configs

dict

配置对象。当job_typejar时,需要指定以下字段:

  • mainClass:主类名称(必填)。

  • mainResource:JAR文件路径(必填),如oss://path/to/app.jar

  • args:参数列表。

  • appName:应用名称。

  • username/password:Lindorm用户凭证。

  • configs:Spark配置项,如{"spark.executor.memory": "4g"}

polling_interval

int

刷新执行结果的间隔时间。单位为秒,默认值为10。设置为0或负数时,任务仅提交不等待结果。状态查询内置重试机制。

示例

说明

task_iddagAirflow的特定参数,详情请参见Airflow官方文档

SQL类型

from airflow import DAG
from airflow.providers.alibabadms.cloud.operators.dms_lindorm_spark import DMSLindormSparkOperator

with DAG(
    "dms_lindorm_spark_sql",
) as dag:

    lindorm_sql = DMSLindormSparkOperator(
        task_id="lindorm_spark_sql",
        instance="my_lindorm_link",
        job_type="sql",
        sql="SELECT * FROM wide_table WHERE dt = '{{ ds }}' LIMIT 1000;",
        polling_interval=15,
        dag=dag,
    )

JAR类型

from airflow import DAG
from airflow.providers.alibabadms.cloud.operators.dms_lindorm_spark import DMSLindormSparkOperator

with DAG(
    "dms_lindorm_spark_jar",
) as dag:

    lindorm_jar = DMSLindormSparkOperator(
        task_id="lindorm_spark_jar",
        instance="my_lindorm_link",
        job_type="jar",
        configs={
            "mainClass": "com.example.LindormETL",
            "mainResource": "oss://my-bucket/jars/lindorm-etl.jar",
            "args": ["--date", "{{ ds }}"],
            "configs": {
                "spark.executor.memory": "4g",
                "spark.executor.instances": "4",
            },
        },
        dag=dag,
    )
说明

所有DMS Airflow Operator均支持任务取消和自动重试等通用特性,详情请参见Airflow DMS Operator