如果您有其他调度系统,希望在调度系统的任务完成后触发DataWorks上的任务运行,您可以使用DataWorks的HTTP触发器节点功能。本文为您介绍外部调度系统触发场景下,使用DataWorks的HTTP触发器节点的流程和注意事项。

前提条件

  • 已开通DataWorks企业版及以上版本。
  • 已创建好业务流程和需要通过HTTP触发器节点触发的计算任务节点。以使用MaxCompute的SQL进行任务计算为例,您可参见创建ODPS SQL节点,创建完成MaxCompute的SQL计算节点。

背景信息

外部调度系统触发任务运行有以下两种典型场景:
  • HTTP触发器节点无其他上游任务节点
    无上游任务节点此种场景下,您需要创建HTTP触发节点后,在其他调度系统配置好调度触发,并在DataWorks上配置好各节点的调度和上下游依赖关系,详情可参见创建HTTP触发节点其他调度系统的触发配置
  • HTTP触发器节点有上游任务节点
    上游有任务节点此种场景下:
    • 您需要创建HTTP触发节点后,在其他调度系统配置好调度触发,并在DataWorks上配置好各节点的调度和上下游依赖关系,详情可参见创建HTTP触发节点其他调度系统的触发配置
    • HTTP触发器节点默认上游节点为业务流程的根节点,当上游有其他任务节点时,您需要手动修改为对应的上游任务节点。
    • 当上游任务节点运行完成,且外部调度系统发出调度指令后,HTTP触发节点才会触发下游任务节点运行。
      如果外部调度系统提前发出调度指令,但是上游任务节点没有运行完成,HTTP触发节点不会触发下游任务节点。系统会保留外部调度系统的调度指令,待上游任务运行完成后,再通过HTTP触发节点触发下游任务节点运行。
      注意 外部调度系统的触发指令仅保留24小时。如果24小时内上游任务节点没有运行完成,则触发指令会丢失,外部调度系统本次发出的调度指令失效。

使用限制

  • HTTP触发器节点功能仅适用DataWorks企业版及以上版本,且当前HTTP触发器节点功能仅成都地域灰度上线。
  • HTTP触发器节点仅作为触发节点,不可以直接写计算运行任务,您需要将待运行的任务节点作为HTTP触发器节点的下游节点。
  • 业务流程创建完成正常运行后,如果您想重跑触发器节点,您同时需要在外部调度系统中重新下发触发指令。
  • 业务流程创建完成正常运行后,如果您想获取触发器节点的下游任务节点的历史时间段的运行结果,您可参见补数据实例进行补数据操作。补数据操作时无需外部调度系统下发调度指令,HTTP触发器节点会直接触发下游节点运行。

创建HTTP触发节点

  1. 进入数据开发页面。
    1. 登录DataWorks控制台
    2. 在左侧导航栏,单击工作空间列表
    3. 选择工作空间所在地域后,单击相应工作空间后的进入数据开发
  2. 数据开发页面,鼠标悬停至新建图标,单击通用 > HTTP触发器
    您也可以打开相应的业务流程,右键单击通用,选择新建 > HTTP触发器
  3. 新建节点对话框中,输入节点名称,并选择目标文件夹
    说明 节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
  4. 单击提交
  5. 单击节点编辑区域右侧的调度配置,配置节点的调度属性,详情请参见基础属性
    说明 HTTP触发器节点默认上游节点为业务流程的根节点,当上游有其他任务节点时,您需要手动修改为对应的上游任务节点。
  6. 保存并提交节点。
    注意 您需要设置节点的重跑属性依赖的上游节点,才可以提交节点。
    1. 单击工具栏中的保存图标,保存节点。
    2. 单击工具栏中的提交图标。
    3. 提交新版本对话框中,输入变更描述
    4. 单击确认
    如果您使用的是标准模式的工作空间,提交成功后,请单击右上方的发布。具体操作请参见发布任务
  7. 测试节点,详情请参见查看周期任务

其他调度系统的触发配置

在外部调度系统中进行触发配置时,您可以通过以下三种方式:Java方式、Python方式或API调用方式。
  • Java方式
    1. 安装Java SDK,详情可参见快速开始
      其中,DataWorks的SDK请用下面的pom配置。
      <dependency>
       <groupId>com.aliyun</groupId>
       <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
       <version>3.1.1</version>
      </dependency>
    2. 代码示例
      import com.aliyuncs.DefaultAcsClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.exceptions.ClientException;
      import com.aliyuncs.exceptions.ServerException;
      import com.aliyuncs.profile.DefaultProfile;
      import com.google.gson.Gson;
      import java.util.*;
      import com.aliyuncs.dataworks_public.model.v20200518.*;
      
      public class RunTriggerNode {
      
      public static void main(String[] args) {
      
      // 设置RegionId、访问密钥和访问密码。
      // 其中:cn-hangzhou表示任务所在的地域,即RegionId。
      // <accessKeyId>表示访问密钥。
      // <accessSecret>表示访问密码
      DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<accessKeyId>", "<accessSecret>");
      
      IAcsClient client = new DefaultAcsClient(profile);
      
      RunTriggerNodeRequest request = new RunTriggerNodeRequest();
      
      // 设置NodeId,表示触发式节点的节点ID,节点ID可通过ListNodes API查询获取
      request.setNodeId(700003742092L);
      
      
      // 设置NodeId,表示触发式节点的节点ID,节点ID可通过ListNodes API查询获取
      request.setCycleTime(1605629820000L);
      
      
      // 设置BizDate,表示触发式节点实例所在的业务日期时间戳,需将业务日期换算为时间戳。
      // 业务日期为运行时间的前一天,且时间精确到日,时分秒均为00000000。以运行日期为2020年11月25日为例,业务时间为2020112400000000,需将这个时间换算为业务日期的时间戳
      request.setBizDate(1605542400000L);
      
      // 设置AppId,表示触发式节点所属的Dataworks工作空间ID,工作空间ID可通过ListProjects获取
      request.setAppId(123L);
      
      try {
      
      RunTriggerNodeResponse response = client.getAcsResponse(request);
      System.out.println(new Gson().toJson(response));
      } catch (ServerException e) {
      e.printStackTrace();
      } catch (ClientException e) {
      System.out.println("ErrCode:" + e.getErrCode());
      System.out.println("ErrMsg:" + e.getErrMsg());
      System.out.println("RequestId:" + e.getRequestId());
      }
      
      }
      
      }
  • Python方式
    1. 安装Python SDK,详情可参见快速开始
      其中,DataWorks的SDK请使用下面的命令安装。
      pip install aliyun-python-sdk-dataworks-public==2.1.2
    2. 代码示例
      #!/usr/bin/env python
      #coding=utf-8
      
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkdataworks_public.request.v20200518.RunTriggerNodeRequest import RunTriggerNodeRequest
      
      # 设置RegionId 访问密钥 访问密码
      # cn-hangzhou 表示任务所在的地域,即RegionId
      # <accessKeyId> 访问密钥
      # <accessSecret> 访问密码
      client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
      
      request = RunTriggerNodeRequest()
      request.set_accept_format('json')
      # 设置NodeId,表示触发式节点的节点ID,节点ID可通过ListNodes API查询获取ID
      request.set_NodeId(123)
      
      # 设置CycleTime,表示触发式节点的任务的运行时间戳,需将HTTP触发节点的调度配置中,节点指定的运行时间,换算为时间戳
      request.set_CycleTime(1606321620000)
      
      # 设置BizDate,表示触发式节点实例所在的业务日期时间戳,需将业务日期换算为时间戳。
      # 业务日期为运行时间的前一天,且时间精确到日,时分秒均为00000000。以运行日期为2020年11月25日为例,业务时间为2020112400000000,需将这个时间换算为业务日期的时间戳
      request.set_BizDate(1606233600000)
      
      # 设置AppId,表示触发式节点所属的Dataworks工作空间ID,工作空间ID可通过ListProjects获取
      request.set_AppId(11456)
      
      
      response = client.do_action_with_exception(request)
      # python2: print(response)
      print(str(response, encoding='utf-8'))
  • API调用方式

    API调用方式可参见RunTriggerNode