HTTP触发器节点

更新时间:2025-03-03 10:17:48

如果您有其他调度系统,希望在调度系统的任务完成后触发DataWorks上的任务运行,您可以使用DataWorksHTTP触发器节点功能。本文为您介绍外部调度系统触发场景下,使用DataWorksHTTP触发器节点的流程和注意事项。

产品介绍

HTTP触发器节点是一种特殊的虚拟节点,允许你用DataWorks OpenAPI RunTriggerNode 触发本节点及下游节点的调度。

HTTP触发器节点常用于外部系统与DataWorks调度系统的互通。

HTTP触发器可以在调度系统的任务完成后,触发DataWork上的任务,主要有以下两种典型场景:

  • HTTP触发器节点无其他上游任务节点。

    image

    您需要创建HTTP触发节点后,在其他调度系统配置好调度触发,并在DataWorks上配置好各节点的调度和上下游依赖关系。

  • HTTP触发器节点有上游任务节点。

    image

    • 您需要创建HTTP触发节点后,在其他调度系统配置好调度触发,并在DataWorks上配置好各节点的调度和上下游依赖关系。

    • HTTP触发器节点默认上游节点为业务流程的根节点,当上游有其他任务节点时,您需要手动修改为对应的上游任务节点。

    • 当上游任务节点运行完成,且外部调度系统发出调度指令后,HTTP触发节点才会触发下游任务节点运行。

      如果外部调度系统提前发出调度指令,但是上游任务节点没有运行完成,HTTP触发节点不会触发下游任务节点。系统会保留外部调度系统的调度指令,待上游任务运行完成后,再通过HTTP触发节点触发下游任务节点运行。

      说明

      外部调度系统的触发指令仅保留24小时。如果24小时内上游任务节点没有运行完成,则触发指令会丢失,外部调度系统本次发出的调度指令失效。

前提条件

  • RAM账号添加至对应空间(可选)。

    进行任务开发的RAM账号已被添加至对应工作空间中,并具有开发空间管理员(权限较大,谨慎添加)角色权限。添加成员并授权,详情请参见为工作空间添加空间成员

  • 对应空间已绑定Serverless资源组。详情请参见:使用Serverless资源组

  • 进行HTTP触发器节点开发前,需创建对应的HTTP触发器节点,详情请参见:创建周期任务

使用限制

  • HTTP触发器节点功能仅适用DataWorks企业版及以上版本,关于DataWorks版本介绍,详情请参见DataWorks各版本详解

  • 由于HTTP触发器节点仅支持T+1次生成实例,且补数据操作生成的实例不可被触发,因此HTTP触发器节点需要在任务发布到生产环境后第二天才可被外部调度系统触发。

  • HTTP触发器节点仅作为触发节点,不可以直接写计算运行任务,您需要将待运行的任务节点作为HTTP触发器节点的下游节点。

  • 业务流程创建完成并正常运行后,若您想重跑触发器节点,则需重新运行该节点并同时在外部调度系统中下发触发指令。重新运行HTTP触发器节点不会触发已处于运行成功状态的下游节点运行。

  • 业务流程创建完成正常运行后,如果您想获取触发器节点的下游任务节点的历史时间段的运行结果,您可参见执行补数据并查看补数据实例(新版)进行补数据操作。补数据操作时无需外部调度系统下发调度指令,HTTP触发器节点会直接触发下游节点运行,即HTTP触发器节点无法通过补数据的方式,实现外部系统触发DataWorks HTTP触发器的补数据任务执行。

HTTP触发器节点开发

HTTP触发节点触发说明

触发HTTP触发器节点需要满足以下条件:

  • HTTP触发器节点已经生成周期实例(在运维中心周期实例面板可以找到该实例)。该实例在未被RunTriggerNodeAPI成功触发前,将处于等待触发状态,其下游节点将被阻塞直至成功调用RunTriggerNode API触发HTTP触发器节点执行。

  • HTTP触发器节点所依赖的所有父节点都已经执行成功(实例为成功状态)。

  • HTTP触发器节点生成的周期实例定时时间已到。

  • HTTP触发器节点使用的调度资源组,在触发时间点资源充足。

  • HTTP触发器节点处于非冻结状态。

  • 仅在等待触发状态下的HTTP触发器节点才可被触发(已成功触发过的节点再次触发将不会执行)。

HTTP触发节点调度配置

完成HTTP触发节点开发后,需对HTTP触发节点进行调度配置,详情请参见调度配置

说明

HTTP触发器节点默认上游节点为业务流程的根节点,当上游有其他任务节点时,您需要手动修改为对应的上游任务节点。

HTTP触发节点提交并发布

  1. 完成调度配置后,即可对已完成的HTTP触发器节点提交发布至生产环境,详情请参见:节点/工作流发布

  2. 发布完成的任务,将按照您配置的调度进行周期运行,可在运维中心 > 任务运维 > 周期任务运维 > 周期任务中查看已发布的周期任务,并对任务进行运维操作,详情请参见:运维中心入门

其他调度系统的触发配置

在外部调度系统中进行触发配置时,您可以通过以下三种方式:Java方式、Python方式或API调用方式。

Java方式触发
Python方式触发
  1. 安装Java SDK,详情可参见开始使用

    其中,DataWorksSDK请用下面的pom配置:

    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
      <version>3.4.2</version>
    </dependency>
  2. 代码示例

    您可进入RunTriggerNode的调试页面,在SDK示例页签查看完整的Java示例。

    // This file is auto-generated, don't edit it. Thanks.
    package com.aliyun.sample;
    
    import com.aliyun.tea.*;
    
    public class Sample {
    
        /**
         * <b>description</b> :
         * <p>使用AK&amp;SK初始化账号Client</p>
         * @return Client
         * 
         * @throws Exception
         */
        public static com.aliyun.dataworks_public20200518.Client createClient() throws Exception {
            // 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
            // 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378657.html。
            com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
                    // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
                    .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
                    // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
                    .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // Endpoint 请参考 https://api.aliyun.com/product/dataworks-public
            config.endpoint = "dataworks.cn-shanghai.aliyuncs.com";
            return new com.aliyun.dataworks_public20200518.Client(config);
        }
    
        public static void main(String[] args_) throws Exception {
            java.util.List<String> args = java.util.Arrays.asList(args_);
            com.aliyun.dataworks_public20200518.Client client = Sample.createClient();
            com.aliyun.dataworks_public20200518.models.RunTriggerNodeRequest runTriggerNodeRequest = new com.aliyun.dataworks_public20200518.models.RunTriggerNodeRequest();
            com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
            try {
                // 复制代码运行请自行打印 API 的返回值
                client.runTriggerNodeWithOptions(runTriggerNodeRequest, runtime);
            } catch (TeaException error) {
                // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
                // 错误 message
                System.out.println(error.getMessage());
                // 诊断地址
                System.out.println(error.getData().get("Recommend"));
                com.aliyun.teautil.Common.assertAsString(error.message);
            } catch (Exception _error) {
                TeaException error = new TeaException(_error.getMessage(), _error);
                // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
                // 错误 message
                System.out.println(error.getMessage());
                // 诊断地址
                System.out.println(error.getData().get("Recommend"));
                com.aliyun.teautil.Common.assertAsString(error.message);
            }        
        }
    }
    
  1. 安装Python SDK,详情可参见 Python SDK

    其中,DataWorksSDK请使用下面的命令安装:

    pip install aliyun-python-sdk-dataworks-public==2.1.2
  2. 代码示例

    您可进入RunTriggerNode的调试页面,在SDK示例页签查看完整的Python示例。

    # -*- coding: utf-8 -*-
    # This file is auto-generated, don't edit it. Thanks.
    import os
    import sys
    
    from typing import List
    
    from alibabacloud_dataworks_public20200518.client import Client as dataworks_public20200518Client
    from alibabacloud_tea_openapi import models as open_api_models
    from alibabacloud_dataworks_public20200518 import models as dataworks_public_20200518_models
    from alibabacloud_tea_util import models as util_models
    from alibabacloud_tea_util.client import Client as UtilClient
    
    
    class Sample:
        def __init__(self):
            pass
    
        @staticmethod
        def create_client() -> dataworks_public20200518Client:
            """
            使用AK&SK初始化账号Client
            @return: Client
            @throws Exception
            """
            # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
            # 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html。
            config = open_api_models.Config(
                # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
                access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
                # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
                access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
            )
            # Endpoint 请参考 https://api.aliyun.com/product/dataworks-public
            config.endpoint = f'dataworks.cn-shanghai.aliyuncs.com'
            return dataworks_public20200518Client(config)
    
        @staticmethod
        def main(
            args: List[str],
        ) -> None:
            client = Sample.create_client()
            run_trigger_node_request = dataworks_public_20200518_models.RunTriggerNodeRequest()
            runtime = util_models.RuntimeOptions()
            try:
                # 复制代码运行请自行打印 API 的返回值
                client.run_trigger_node_with_options(run_trigger_node_request, runtime)
            except Exception as error:
                # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
                # 错误 message
                print(error.message)
                # 诊断地址
                print(error.data.get("Recommend"))
                UtilClient.assert_as_string(error.message)
    
        @staticmethod
        async def main_async(
            args: List[str],
        ) -> None:
            client = Sample.create_client()
            run_trigger_node_request = dataworks_public_20200518_models.RunTriggerNodeRequest()
            runtime = util_models.RuntimeOptions()
            try:
                # 复制代码运行请自行打印 API 的返回值
                await client.run_trigger_node_with_options_async(run_trigger_node_request, runtime)
            except Exception as error:
                # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
                # 错误 message
                print(error.message)
                # 诊断地址
                print(error.data.get("Recommend"))
                UtilClient.assert_as_string(error.message)
    
    
    if __name__ == '__main__':
        Sample.main(sys.argv[1:])
    

API调用方式可参见RunTriggerNode

  • 本页导读 (1)
  • 产品介绍
  • 前提条件
  • 使用限制
  • HTTP触发器节点开发
  • HTTP触发节点触发说明
  • HTTP触发节点调度配置
  • HTTP触发节点提交并发布
  • 其他调度系统的触发配置
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等