HTTP触发器节点
如果您有其他调度系统,希望在调度系统的任务完成后触发DataWorks上的任务运行,您可以使用DataWorks的HTTP触发器节点功能。本文为您介绍外部调度系统触发场景下,使用DataWorks的HTTP触发器节点的流程和注意事项。
前提条件
已开通DataWorks企业版及以上版本。
已创建好业务流程和需要通过HTTP触发器节点触发的计算任务节点。以使用MaxCompute的SQL进行任务计算为例,您可参见开发ODPS SQL任务,创建完成MaxCompute的SQL计算节点。
背景信息
外部调度系统触发任务运行有以下两种典型场景:
HTTP触发器节点无其他上游任务节点
此种场景下,您需要创建HTTP触发节点后,在其他调度系统配置好调度触发,并在DataWorks上配置好各节点的调度和上下游依赖关系,详情可参见创建HTTP触发器节点和其他调度系统的触发配置。
HTTP触发器节点有上游任务节点
此种场景下:
您需要创建HTTP触发节点后,在其他调度系统配置好调度触发,并在DataWorks上配置好各节点的调度和上下游依赖关系,详情可参见创建HTTP触发器节点和其他调度系统的触发配置。
HTTP触发器节点默认上游节点为业务流程的根节点,当上游有其他任务节点时,您需要手动修改为对应的上游任务节点。
当上游任务节点运行完成,且外部调度系统发出调度指令后,HTTP触发节点才会触发下游任务节点运行。
如果外部调度系统提前发出调度指令,但是上游任务节点没有运行完成,HTTP触发节点不会触发下游任务节点。系统会保留外部调度系统的调度指令,待上游任务运行完成后,再通过HTTP触发节点触发下游任务节点运行。
重要外部调度系统的触发指令仅保留24小时。如果24小时内上游任务节点没有运行完成,则触发指令会丢失,外部调度系统本次发出的调度指令失效。
使用限制
HTTP触发器节点功能仅适用DataWorks企业版及以上版本,关于DataWorks版本介绍,详情请参见DataWorks各版本详解。
由于HTTP触发器节点仅支持T+1次生成实例,且补数据操作生成的实例不可被触发,因此HTTP触发器节点需要在任务发布生产环境后第二天才可被外部调度系统触发。
HTTP触发器节点仅作为触发节点,不可以直接写计算运行任务,您需要将待运行的任务节点作为HTTP触发器节点的下游节点。
业务流程创建完成正常运行后,如果您想重跑触发器节点,您同时需要在外部调度系统中重新下发触发指令。
业务流程创建完成正常运行后,如果您想获取触发器节点的下游任务节点的历史时间段的运行结果,您可参见执行补数据并查看补数据实例进行补数据操作。补数据操作时无需外部调度系统下发调度指令,HTTP触发器节点会直接触发下游节点运行,即HTTP触发器节点无法通过补数据的方式,实现外部系统触发DataWorks HTTP触发器的补数据任务执行。
触发说明
触发HTTP触发器节点需要满足以下条件:
HTTP触发器节点已经生成周期实例(在运维中心周期实例面板可以搜到该实例)。该实例在未被RunTriggerNode API成功触发前,将处于等待触发状态,其下游节点将被阻塞直至成功调用RunTriggerNode API触发HTTP触发器节点执行。
HTTP触发器节点所依赖的所有父节点都已经执行成功(实例为成功状态)。
HTTP触发器节点生成的周期实例定时时间已到。
HTTP触发器节点使用的调度资源组,在触发时间点资源充足。
HTTP触发器节点处于非冻结状态。
仅等待触发状态下的HTTP触发器节点才可被触发(已触发成功过的再次触发将不会执行)。
创建HTTP触发器节点
进入数据开发页面。
登录DataWorks控制台,单击左侧导航栏的 ,在下拉框中选择对应工作空间后单击进入数据开发。
在数据开发页面,鼠标悬停至
图标,单击 。
您也可以打开相应的业务流程,右键单击通用,选择
。在新建节点对话框中,输入节点名称,并选择路径。
单击确认。
单击节点编辑区域右侧的调度配置,配置节点的调度属性,详情请参见配置基础属性。
说明HTTP触发器节点默认上游节点为业务流程的根节点,当上游有其他任务节点时,您需要手动修改为对应的上游任务节点。
- 保存并提交节点。重要 您需要设置节点的重跑属性和依赖的上游节点,才可以提交节点。
- 单击工具栏中的
图标,保存节点。
- 单击工具栏中的
图标。
- 在提交新版本对话框中,输入变更描述。
- 单击确认。
如果您使用的是标准模式的工作空间,提交成功后,请单击右上方的发布。具体操作请参见发布任务。 - 单击工具栏中的
- 任务运维,详情请参见周期任务基本运维操作。
其他调度系统的触发配置
在外部调度系统中进行触发配置时,您可以通过以下三种方式:Java方式、Python方式或API调用方式。
Java方式
安装Java SDK,详情可参见开始使用。
其中,DataWorks的SDK请用下面的pom配置。
<dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-dataworks-public</artifactId> <version>3.4.2</version> </dependency>
Python方式
安装Python SDK,详情可参见安装。
其中,DataWorks的SDK请使用下面的命令安装。
pip install aliyun-python-sdk-dataworks-public==2.1.2
代码示例
#!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkcore.auth.credentials import AccessKeyCredential from aliyunsdkcore.auth.credentials import StsTokenCredential from aliyunsdkdataworks_public.request.v20200518.RunTriggerNodeRequest import RunTriggerNodeRequest # 设置RegionId 访问密钥 访问密码 # cn-hangzhou 表示任务所在的地域,即RegionId # Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://help.aliyun.com/document_detail/378657.html credentials = AccessKeyCredential(os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'], os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']) # use STS Token # credentials = StsTokenCredential(os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'], os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'], os.environ['ALIBABA_CLOUD_SECURITY_TOKEN']) client = AcsClient(region_id='cn-shanghai', credential=credentials) request = RunTriggerNodeRequest() request.set_accept_format('json') # 设置NodeId,表示触发式节点的节点ID,节点ID可通过ListNodes API查询获取ID request.set_NodeId(123) # 设置CycleTime,表示触发式节点的任务的运行时间戳,需将HTTP触发节点的调度配置中,节点指定的运行时间,换算为时间戳 # 如果HTTP触发节点所在的地域与调度系统所在的地域不在同个时区,存在时差,这里需配置为触发节点所在时区的时间。 # 例如,HTTP触发节点在北京地域且Cyctime为北京时间18:00,而调度系统在美西地域,此时调度系统配置时,需配置为北京时间18:00的时间戳。 request.set_CycleTime(1606321620000) # 设置BizDate,表示触发式节点实例所在的业务日期时间戳,需将业务日期换算为时间戳。 # 业务日期为运行时间的前一天,且时间精确到日,时分秒均为00000000。以运行日期为2020年11月25日为例,业务时间为2020112400000000,需将这个时间换算为业务日期的时间戳 # 如果HTTP触发节点所在的地域与调度系统所在的地域不在同个时区,存在时差,这里需配置为触发节点所在时区的时间。 request.set_BizDate(1606233600000) # 设置AppId,表示触发式节点所属的Dataworks工作空间ID,工作空间ID可通过ListProjects获取 request.set_AppId(11456) response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
API调用方式
API调用方式可参见RunTriggerNode。