HTTP触发器节点实现跨租户触发节点执行

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

DataWorks提供HTTP触发器节点功能,能够在跨租户的场景中实现任务的触发与执行。本文将逐步详细介绍如何在DataWorks平台内利用HTTP触发器节点来实现跨租户的任务触发。

背景信息

在现代企业中,通常有在多租户或同租户跨地域的环境中进行数据处理和分析业务操作的业务需求。DataWorks提供HTTP触发器节点功能,适用于以下业务场景:

  • 需要在跨租户环境中进行任务依赖和协同的业务场景。

  • 同租户跨地域的不同调度系统之间有业务关联的业务场景(例如,B系统的数据依赖于A系统完成某个节点加工后才能使用)。

下图以跨租户场景为例,为您说明任务触发机制的工作原理:

image

图示说明:将租户B环境中配置有HTTP触发器节点的工作流发布到运维中心,当该工作流到达运行时间时,HTTP节点上游的节点会根据配置的时间依次执行。执行到HTTP触发器节点时,HTTP触发器节点及后续任务将处于等待触发状态。一旦HTTP触发器节点接收到租户A环境发送的触发命令,会进行校验。校验通过后,HTTP触发器节点及后续节点任务将依次执行。

前提条件

  • 拥有两个租户账户(即两个主账户)环境。

  • 已在两个环境分别创建工作空间

  • 已在两个环境的工作空间分别绑定Serverless资源组

  • 已为触发端所在工作空间绑定的Serverless资源组的VPC,配置公网NAT网关EIP。

    重要

    Shell节点触发HTTP触发器节点时,需要通过公网发送触发命令。

  • 已为发送触发命令的Shell节点所在工作空间绑定MaxCompute计算资源,详情请参见计算资源管理

注意事项

  • HTTP触发器节点仅用作触发,无需配置开发节点的内容。

  • 您需要根据已创建的工作空间进行相关案例操作:

    • 参加数据开发(Data Studio)公测的工作空间:使用新版数据开发触发案例

    • 未参加数据开发(Data Studio)公测的工作空间:使用旧版数据开发触发案例

      说明

      未参加数据开发(Data Studio)公测的工作空间,仅DataWorks企业版支持使用HTTP触发器节点。

新版数据开发触发案例

如果您需要从租户A环境触发租户B环境中新版数据开发的节点执行,请参见以下内容。

租户B环境:创建HTTP触发器节点工作流

在租户B环境的工作空间内创建一个需要被远程触发的工作流。该工作流应包含接收触发命令的HTTP触发器节点和下游等待运行的业务节点(本文以Shell节点为例),以便后续验证跨租户触发的效果。

  1. 进入数据开发。

    进入DataWorks工作空间列表页,在顶部切换至目标地域,找到目标工作空间,单击操作列的快速进入 > Data Studio,进入Data Studio。

  2. 新建工作流。

    1. 在左侧导航栏单击image,单击项目目录右侧的image,选择新建工作流,进入新建工作流弹窗。

    2. 新建工作流弹窗中自定义工作流名称(本示例为HTTP_Workflow),单击确认,进入工作流编辑页。

  3. 新建节点。

    1. 在工作流编辑页的添加节点目录栏中,分别拖拽通用 > HTTP触发器节点和通用 > Shell节点到工作流编辑页。

    2. 并将Shell节点添加为HTTP触发器节点的下游节点。

      image

  4. 编辑任务流Shell节点。

    重要

    HTTP触发器节点仅作为触发器使用,无需配置开发节点的内容。

    1. 鼠标悬浮至Shell节点上方,单击弹出的打开节点。在节点编辑页输入以下内容。

      echo "DataWorks";
    2. Shell节点编辑页右侧的调度配置中的调度资源组配置为您工作空间绑定的Serverless资源组。

    3. 单击Shell节点编辑页上方工具栏中的保存

  5. 发布工作流。

    1. 在项目目录中找到您所创建的工作流HTTP_Workflow,在工作流编辑页右侧的调度配置中配置实例生成方式发布后即时生成

    2. 单击工具栏中的image按钮唤起发布看板,单击开始发布生产,任务将按照发布检查流程执行发布操作,更多信息,请参见节点/工作流发布

  6. 记录HTTP实例参数信息。

    由于HTTP触发器节点会立即生成周期实例,您可前往运维中心查看并记录HTTP实例参数信息。

    1. 进入运维中心页面。

      登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与运维 > 运维中心,在下拉框中选择对应工作空间后单击进入运维中心

    2. 在左侧导航栏单击周期任务运维 > 周期实例,进入周期实例页面。

    3. 在列表中找到您所创建的HTTP触发器节点实例,记录实例的任务ID定时时间

      说明

      鼠标悬浮到HTTP触发器节点实例的名称上方,即可查看实例的任务ID。

本地环境:准备示例代码

  1. 添加pom依赖。

    您可进入TriggerSchedulerTaskInstance调试页面,在SDK示例页签查看完整的SDK 安装信息

        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.3.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.example.demo.CrossTenantTriggerSchedulerTaskInstance</mainClass>  <!-- 将这里替换为你的主类 -->
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase> <!-- 在打包阶段执行 -->
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
        <dependencies>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>dataworks_public20240518</artifactId>
                <version>6.2.0</version>
            </dependency>
        </dependencies>
    重要

    在代码开发完成后,您需要将参数 mainClass 替换为您所创建的Java代码主类名称。主类名称的格式是完整的包名加上类名。例如com.example.demo.CrossTenantTriggerSchedulerTaskInstance

  2. 代码开发。

    package com.example.demo;
    import com.aliyun.dataworks_public20240518.Client;
    import com.aliyun.dataworks_public20240518.models.TriggerSchedulerTaskInstanceRequest;
    import com.aliyun.dataworks_public20240518.models.TriggerSchedulerTaskInstanceResponse;
    import com.aliyun.teautil.models.RuntimeOptions;
    
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    
    public class CrossTenantTriggerSchedulerTaskInstance {
        // 创建阿里云DataWorks客户端的方法
        public static Client createClient20240518(String accessId, String accessKey, String endpoint) throws Exception {
            // 初始化OpenAPI配置对象
            com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
            config.setAccessKeyId(accessId); // 设置访问密钥ID
            config.setAccessKeySecret(accessKey); // 设置访问密钥Secret
            config.setEndpoint(endpoint); // 设置服务端点
            return new Client(config); // 返回初始化后的客户端实例
        }
        // 触发DataWorks节点运行的方法
        public static TriggerSchedulerTaskInstanceResponse runTriggerScheduler(Client client, Long nodeId, String EnvType,Long TriggerTime) throws Exception {
            TriggerSchedulerTaskInstanceRequest request = new TriggerSchedulerTaskInstanceRequest(); // 创建API请求对象
            request.setTaskId(nodeId); // 设置要触发的节点ID
            request.setEnvType(EnvType); // 设置项目环境
            request.setTriggerTime(TriggerTime); //设置定时触发时间(毫秒级时间戳)
            RuntimeOptions runtime = new RuntimeOptions(); // 初始化运行时配置
            return client.triggerSchedulerTaskInstanceWithOptions(request, runtime); // 执行API调用并返回响应
        }
        // 程序入口方法
        public static void main(String[] args) throws Exception {
            // 初始化节点ID(示例值)
            String nodeId1 = "";
            // 初始化项目环境(示例值)
            String EnvTypeStr = "";
            // 初始化定时时间(示例值)
            String cycTimeStr = "";
            // for循环处理命令行参数,分别赋值给nodeId1、cycTimeParam、bizTimeParam
            int i;
            for(i = 0; i < args.length; ++i) {
                if (i == 0) {
                    nodeId1 = args[i];
                } else if (i == 1) {
                    EnvTypeStr = args[i];
                }else if (i == 2) {
                    cycTimeStr = args[i];
                }
            }
            // 将字符串转换为Long类型节点ID
            Long nodeId = Long.parseLong(nodeId1);
            // 输出程序使用说明
            System.out.println("Usage: java -jar test-1.0-SNAPSHOT.jar nodeId EnvTypeStr cycTimeParam");
            // 解析定时时间并转换为时间戳
            SimpleDateFormat sdft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Calendar calendar = Calendar.getInstance();
            calendar.setTime(sdft.parse(cycTimeStr)); // 解析时间字符串
            Long cycTime = calendar.getTimeInMillis(); // 获取毫秒级时间戳
            // 输出调试信息(时间戳)
            System.out.println("定时时间戳:" + cycTime);
            // 配置阿里云服务参数
            String endpoint = "dataworks.cn-hangzhou.aliyuncs.com"; // 服务端点
            String accessId = "xxx"; // 访问密钥ID
            String accessKey = "xxx"; // 访问密钥Secret
            // 创建阿里云客户端实例
            Client client = createClient20240518(accessId, accessKey, endpoint);
            // 执行触发节点操作
            TriggerSchedulerTaskInstanceResponse response = runTriggerScheduler(client, nodeId,EnvTypeStr, cycTime);
            // 输出API响应结果(JSON格式)
            System.out.println(com.aliyun.teautil.Common.toJSONString(com.aliyun.teautil.Common.toMap(response)));
        }
    }

    按照以下参数说明,将上述代码中的endpointaccessIdaccessKey参数替换成业务所需实际值。

    参数

    说明

    endpoint

    目标HTTP触发器节点所在工作空间的服务地址,详情请参见阿里云OpenAPI开发者服务地址

    accessId

    目标HTTP触发器节点所在阿里云账号的AccessKey ID

    您可以登录DataWorks控制台,鼠标悬停至顶部菜单栏右侧的用户头像,进入AccessKey管理,获取目标HTTP触发器节点所在阿里云RAM用户AccessKey IDAccessKey Secret信息。

    警告

    阿里云主账号的AK拥有全部权限,一旦泄露将带来严重安全风险。建议使用仅具备目标工作空间的空间管理员角色权限的RAM用户AK

    accessKey

    目标HTTP触发器节点所在阿里云账号的AccessKey Secret

  3. 将上述代码进行打包,生成以jar-with-dependencies.jar为后缀的JAR包。

租户A环境:配置Shell节点

在租户A所属的工作空间中新建配置Shell节点,用于发送触发命令。

  1. 进入数据开发。

    进入DataWorks工作空间列表页,在顶部切换至目标地域,找到目标工作空间,单击操作列的快速进入 > Data Studio,进入Data Studio。

  2. 上传JAR资源。

    1. 在左侧导航栏单击image图标,进入资源管理。

    2. 资源管理页面,单击新建按钮或image图标,选择新建资源 > MaxCompute Jar

    3. 新建资源和函数弹窗中输入资源名称http_node_work.jar,单击确认。

    4. 在上传资源的详情页面中,单击点击上传按钮,将您在本地环境:准备示例代码步骤中生成的JAR包上传,数据源选择您所绑定的MaxCompute数据源。

  3. 保存发布资源。

    待您所准备的JAR资源上传完成,单击工具栏中的image按钮唤起发布看板,单击开始发布生产,依次完成发布检查流程操作,详情请参见节点/工作流发布

  4. 新建触发端Shell节点。

    1. 在左侧导航栏单击image,单击项目目录右侧的image图标。

    2. 选择通用 > Shell节点,进入新建节点弹窗。

    3. 自定义节点名称,单击确认进入节点编辑页。

  5. 编辑触发端Shell节点。

    1. 单击Shell节点编辑页左侧导航栏的image图标,找到您所上传的JAR资源(http_node_work.jar)。

    2. 右键单击您所上传的JAR资源,选择引用资源

    3. Shell节点中补全触发代码执行参数信息。

    ##@resource_reference{"http_node_work.jar"}
     java -jar http_node_work.jar nodeId "EnvTypeStr" "cycTimeParam"

    参数

    说明

    java -jar

    JAR执行命令。

    http_node_work.jar

    您所引用的资源名称。

    nodeId

    租户B环境:创建HTTP触发器节点工作流步骤中记录的HTTP触发器节点任务ID

    EnvTypeStr

    目标HTTP触发器节点的项目环境。本文执行的是发布到生产环境的HTTP触发器节点,请将参数设置为Prod

    如需执行开发环境的HTTP触发器节点,请将参数设置为Dev

    cycTimeParam

    租户B环境:创建HTTP触发器节点工作流步骤中记录HTTP触发器节点任务的定时时间。时间格式为yyyy-MM-dd HH:mm:ss

  6. 配置触发端Shell节点。

    请在Shell节点的编辑页面右侧找到调度配置中的调度策略 > 调度资源组。并选择工作空间所绑定的Serverless资源组作为调度资源组

运行并查看结果

运行租户A下的Shell节点,用于触发执行租户B中的HTTP触发器节点及其后续节点任务。

  1. 运行触发端Shell节点。

    单击您在租户A环境:配置Shell节点步骤中配置的Shell节点上方工具栏中的运行

  2. 查看执行结果。

    您需前往租户B所在环境,按照以下步骤查看运行结果。

    1. 进入运维中心页面。

      登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与运维 > 运维中心,在下拉框中选择对应工作空间后单击进入运维中心

    2. 在左侧导航栏单击周期任务运维 > 周期实例,进入周期实例页面。

    3. 找到您所需触发的HTTP触发器节点实例,查看运行结果。

      image

旧版数据开发触发案例

如果您需要从租户A环境触发租户B环境中旧版数据开发的节点执行,请参见以下内容。

租户B环境:创建HTTP触发器节点工作流

在租户B环境的工作空间内创建一个需要被远程触发的工作流。该工作流应包含接收触发命令的HTTP触发器节点和下游等待运行的业务节点(本文以Shell节点为例),以便后续验证跨租户触发的效果。

image

  1. 新建HTTP触发器节点,详情请参见HTTP触发器节点

  2. 新建Shell节点,详情请参见Shell节点

    节点编辑页输入以下内容,并在Shell节点编辑页右侧的调度配置中配置重跑属性调度资源组。并保存节点配置信息。

    echo "DataWorks";
  3. 工作流发布到运维中心。

  4. 记录HTTP实例参数信息。

    由于HTTP触发器节点仅支持T+1生成周期实例,您可在次日单击工作流右上方的image图标,在运维中心周期任务运维 > 周期实例中查看HTTP实例参数信息。

    找到您所创建的HTTP触发器节点实例,记录任务ID定时时间业务时间

    说明

    鼠标悬浮到HTTP触发器节点实例的名称上方,即可查看实例的任务ID。

本地环境:准备代码

  1. 添加pom依赖。

    您可进入RunTriggerNode的调试页面,在SDK示例页签查看完整的SDK 安装信息

        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.3.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.example.demo.CrossTenantTriggerNode</mainClass>  <!-- 将这里替换为你的主类 -->
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase> 
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
        <dependencies>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>dataworks_public20200518</artifactId>
                <version>7.0.1</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>tea-openapi</artifactId>
                <version>0.3.8</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>tea-console</artifactId>
                <version>0.0.1</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>tea-util</artifactId>
                <version>0.2.23</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>credentials-java</artifactId>
                <version>1.0.1</version>
            </dependency>
        </dependencies>
    重要

    在代码开发完成后,您需要将参数 mainClass 替换为您所创建的Java代码的主类名称。主类名称的格式是完整的包名加上类名。例如com.example.demo.CrossTenantTriggerNode

  2. 代码开发。

    package com.example.demo;
    
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import com.aliyun.dataworks_public20200518.Client;
    import com.aliyun.dataworks_public20200518.models.RunTriggerNodeRequest;
    import com.aliyun.dataworks_public20200518.models.RunTriggerNodeResponse;
    import com.aliyun.teautil.models.RuntimeOptions;
    
    public class CrossTenantTriggerNode {
        // 创建阿里云DataWorks客户端的方法
        public static Client createClient20200518(String accessId, String accessKey, String endpoint) throws Exception {
            // 初始化OpenAPI配置对象
            com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
            config.setAccessKeyId(accessId); // 设置访问密钥ID
            config.setAccessKeySecret(accessKey); // 设置访问密钥Secret
            config.setEndpoint(endpoint); // 设置服务端点
            return new Client(config); // 返回初始化后的客户端实例
        }
        // 触发DataWorks节点运行的方法
        public static RunTriggerNodeResponse runTriggerNode(Client client, Long nodeId, Long cycleTime, Long bizDate, Long appId) throws Exception {
            RunTriggerNodeRequest request = new RunTriggerNodeRequest(); // 创建API请求对象
            request.setNodeId(nodeId); // 设置要触发的节点ID
            request.setCycleTime(cycleTime); // 设置周期时间(毫秒级时间戳)
            request.setBizDate(bizDate); // 设置业务日期(毫秒级时间戳)
            request.setAppId(appId); // 设置应用ID
            RuntimeOptions runtime = new RuntimeOptions(); // 初始化运行时配置
            return client.runTriggerNodeWithOptions(request, runtime); // 执行API调用并返回响应
        }
        // 程序入口方法
        public static void main(String[] args) throws Exception {
            // 初始化节点ID(示例值)
            String nodeId1 = "";
            // 初始化定时时间和业务日期(示例值)
            String cycTimeStr = "";
            String bizTimeParam = "";
            // for循环处理命令行参数,分别赋值给nodeId1、cycTimeParam、bizTimeParam
            int i;
            for(i = 0; i < args.length; ++i) {
                if (i == 0) {
                    nodeId1 = args[i];
                } else if (i == 1) {
                    cycTimeStr = args[i];
                }else if (i == 2) {
                    bizTimeParam = args[i];
                }
            }
            // 将字符串转换为Long类型节点ID
            Long nodeId = Long.parseLong(nodeId1);
            // 输出程序使用说明
            System.out.println("Usage: java -jar test-1.0-SNAPSHOT.jar nodeId cycTimeParam bizTimeParam");
            // 解析定时时间并转换为时间戳
            SimpleDateFormat sdft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Calendar calendar = Calendar.getInstance();
            calendar.setTime(sdft.parse(cycTimeStr)); // 解析时间字符串
            Long cycTime = calendar.getTimeInMillis(); // 获取毫秒级时间戳
            // 解析业务日期并转换为时间戳
            SimpleDateFormat sdfti = new SimpleDateFormat("yyyy-MM-dd");
            Calendar calendari = Calendar.getInstance();
            calendari.setTime(sdfti.parse(bizTimeParam));
            Long bizTime = calendari.getTimeInMillis();
            // 输出调试信息(时间戳)
            System.out.println("定时时间戳:" + cycTime);
            System.out.println("业务日期时间戳:" + bizTime);
            // 配置阿里云服务参数
            String endpoint = "dataworks.cn-hangzhou.aliyuncs.com"; // 服务端点
            String accessId = "xxx"; // 访问密钥ID
            String accessKey = "xxx"; // 访问密钥Secret
            Long appId = Long.valueOf(xxx); // 应用ID(需替换为实际值)
            // 创建阿里云客户端实例
            Client client = createClient20200518(accessId, accessKey, endpoint);
            // 执行触发节点操作
            RunTriggerNodeResponse response = runTriggerNode(client, nodeId, cycTime, bizTime, appId);
            // 输出API响应结果(JSON格式)
            System.out.println(com.aliyun.teautil.Common.toJSONString(com.aliyun.teautil.Common.toMap(response)));
        }
    }
    

    将上述代码中的endpointaccessIdaccessKeyappId替换成业务所需真实值。

    参数

    说明

    endpoint

    目标HTTP触发器节点所在工作空间的服务地址,详情请参见阿里云OpenAPI开发者服务地址

    accessId

    目标HTTP触发器节点所在阿里云账号的AccessKey ID

    您可以登录DataWorks控制台,鼠标悬停至顶部菜单栏右侧的用户头像,进入AccessKey管理,获取AccessKey IDAccessKey Secret信息。

    警告

    阿里云主账号的AK拥有全部权限,一旦泄露将带来严重安全风险。建议使用仅具备目标工作空间的空间管理员角色权限的RAM用户AK

    accessKey

    目标HTTP触发器节点所在阿里云账号的AccessKey Secret

    appId

    目标HTTP触发器节点所在工作空间ID。可在管理中心查看工作空间ID。

  3. 将上述代码进行打包,生成以jar-with-dependencies.jar为后缀的JAR包。

租户A环境:配置Shell节点

您可按照以下步骤,在租户A所属的工作空间中,配置Shell节点,用来触发租户B工作空间中的HTTP触发器节点。

  1. 创建并上传Jar资源。

    将您在本地环境:准备代码步骤打包的JAR包创建为MaxCompute资源,详情请参见创建并使用MaxCompute资源

  2. 开发触发端Shell节点。

    创建通用Shell节点并在Shell节点引用MaxCompute资源。在Shell节点中补全触发代码执行参数信息,示例代码如下:

    ##@resource_reference{"http_node_work.jar"}
     java -jar http_node_work.jar nodeId "cycleTime" "bizDate"

    参数

    说明

    http_node_work.jar

    您所引用的资源名称。

    nodeId

    租户B环境:创建HTTP触发器节点工作流步骤中记录的HTTP触发器节点任务ID。

    cycleTime

    租户B环境:创建HTTP触发器节点工作流步骤中记录触发器节点任务的定时时间。时间格式为yyyy-MM-dd HH:mm:ss

    bizDate

    租户B环境:创建HTTP触发器节点工作流步骤中记录触发器节点任务的业务时间。时间格式为yyyy-MM-dd HH:mm:ss

  3. 配置触发端Shell节点。

    Shell节点编辑页面右侧的调度配置中,找到资源属性 > 调度资源组,将其设置为工作空间绑定的Serverless资源组。

运行并查看结果

运行租户A下的Shell节点,用于触发租户B中的HTTP触发器节点及其后续任务。

  1. 运行触发端Shell节点。

    单击您在步骤租户A环境:配置Shell节点中配置的Shell节点上方工具栏中的image图标,运行节点任务。

  2. 查看执行结果。

    您需前往租户B所在环境,按照以下步骤查看运行结果。

    1. 进入运维中心页面。

      登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与运维 > 运维中心,在下拉框中选择对应工作空间后单击进入运维中心

    2. 在左侧导航栏单击周期任务运维 > 周期实例,进入周期实例页面。

    3. 找到您所需触发的HTTP触发器节点实例,查看运行结果。

      image