本文为您介绍DMSLindormSparkOperator的配置信息。
功能说明
在Lindorm Spark引擎上执行任务,支持SQL和JAR两种作业类型。
参数说明
说明
参数instance、sql、region、configs可以使用Jinja模板。
参数 | 类型 | 是否必填 | 说明 |
instance | string | 是 | DMS管理的Lindorm实例的连接(DBLink)名称。 |
job_type | string | 否 | 作业类型,取值:
|
sql | string | 否 | 需要执行的Spark SQL语句。当 |
region | string | 否 | Region ID,跨地域调用时需指定。 |
configs | dict | 否 | 配置对象。当
|
polling_interval | int | 否 | 刷新执行结果的间隔时间。单位为秒,默认值为10。设置为0或负数时,任务仅提交不等待结果。状态查询内置重试机制。 |
示例
说明
task_id和dag是Airflow的特定参数,详情请参见Airflow官方文档。
SQL类型
from airflow import DAG
from airflow.providers.alibabadms.cloud.operators.dms_lindorm_spark import DMSLindormSparkOperator
with DAG(
"dms_lindorm_spark_sql",
) as dag:
lindorm_sql = DMSLindormSparkOperator(
task_id="lindorm_spark_sql",
instance="my_lindorm_link",
job_type="sql",
sql="SELECT * FROM wide_table WHERE dt = '{{ ds }}' LIMIT 1000;",
polling_interval=15,
dag=dag,
)JAR类型
from airflow import DAG
from airflow.providers.alibabadms.cloud.operators.dms_lindorm_spark import DMSLindormSparkOperator
with DAG(
"dms_lindorm_spark_jar",
) as dag:
lindorm_jar = DMSLindormSparkOperator(
task_id="lindorm_spark_jar",
instance="my_lindorm_link",
job_type="jar",
configs={
"mainClass": "com.example.LindormETL",
"mainResource": "oss://my-bucket/jars/lindorm-etl.jar",
"args": ["--date", "{{ ds }}"],
"configs": {
"spark.executor.memory": "4g",
"spark.executor.instances": "4",
},
},
dag=dag,
)说明
所有DMS Airflow Operator均支持任务取消和自动重试等通用特性,详情请参见Airflow DMS Operator。
该文章对您有帮助吗?