DataWorks提供内置的流程检查,如任务发布前代码评审、数据治理中心治理项的内置检查项校验,此外,DataWorks还支持您自定义校验逻辑并接入DataWorks,实现DataWorks流程管控。本文以在运维中心的获取实例状态变更为例,为您介绍如何基于开放消息实现订阅实例状态变更。

背景信息

本实践涉及的开放平台的相关功能介绍与基本概念可参见OpenEvent概述

开启并配置消息订阅(OpenEvent)

开启并配置消息订阅的详细步骤请参见开启消息订阅,以下为本实践中的核心配置流程与注意事项。

  1. EvenBridge控制台,跳过事件源等配置,快速创建一个自定义总线。创建自定义总线
  2. EvenBridge控制台对应的事件总线中,创建事件规则。
    本实践定义该EventBridge自定义总线可接收DataWorks实例状态变更消息,配置demo与核心参数配置如下。
    1. 配置事件模式事件规则3
      {
          "source": [
              "acs.dataworks"
          ],
          "type": [
              "dataworks:InstanceStatusChanges:InstanceStatusChanges"
          ]
      }
      • source:定义事件的产品名称标识,配置为acs.dataworks
      • type:定义产品下事件的类型标识,配置为dataworks:InstanceStatusChanges:InstanceStatusChanges。您可以在下方的事件模式调试中,将source、type取值进行补充修改,然后进行事件测试,测试成功后单击下一步测试3
    2. 配置事件目标中,服务类型选择为HTTPS,并填写合适的URL,其他参数可保持默认。事件目标
  3. DataWorks控制台的开放平台页面,启用上述消息分发通道。启用

代码编写

package com.aliyun.dataworks.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


/**
 * @author dataworks demo
 */
@RestController
@RequestMapping("/event")
public class ExtensionsController {

    /**
     * 接收eventBridge推送过来的消息
     * @param jsonParam
     */
    @PostMapping("/consumer")
    public void consumerEventBridge(@RequestBody String jsonParam){
        JSONObject jsonObj = JSON.parseObject(jsonParam);
        String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
        if(Constants.INSTANCE_STATUS_EVENT_CODE.equals(eventCode)){
            JSONObject dataParam = JSON.parseObject(jsonObj.getString("data"));
            //调度任务实例开始等时间的具体时间
            System.out.println("beginWaitTimeTime: "+ dataParam.getString("beginWaitTimeTime"));
            //DagId
            System.out.println("dagId: "+ dataParam.getString("dagId"));
            //Dag的类型,取值如下:
            //0:周期调度任务
            //1:手动任务
            //2:冒烟测试
            //3:补数据
            //4:手动业务流程
            //5:临时业务流程
            System.out.println("dagType: "+dataParam.getString("dagType"));
            //任务实例的调度类型,取值如下:
            //NORMAL(0):正常调度任务。该任务被日常调度。
            //MANUAL(1):手动任务。该任务不会被日常调度。
            //PAUSE(2):冻结任务。该任务被日常调度,但启动调度时直接被置为失败状态。
            //SKIP(3):空跑任务。该任务被日常调度,但启动调度时直接被置为成功状态。
            //SKIP_UNCHOOSE(4):临时工作流中未选择的任务,仅存在于临时工作流中,启动调度时直接被置为成功状态。
            //SKIP_CYCLE(5):未到运行周期的周或月任务。该任务被日常调度,但启动调度时直接被置为成功状态。
            //CONDITION_UNCHOOSE(6):上游实例中有分支(IF)节点,但是该下游节点未被分支节点选中,直接置为空跑任务。
            //REALTIME_DEPRECATED(7):实时生成的已经过期的周期实例,该类型的任务直接被置为成功状态。
            System.out.println("taskType: "+dataParam.getString("taskType"));
            //任务实例的修改时间
            System.out.println("modifyTime: "+dataParam.getString("modifyTime"));
            //任务实例的创建时间
            System.out.println("createTime: "+dataParam.getString("createTime"));
            //工作空间的ID。您可以调用ListProjects查看空间ID信息。
            System.out.println("appId: "+dataParam.getString("appId"));
            //调度任务实例所在工作空间的租户ID
            System.out.println("tenantId: "+dataParam.getString("tenantId"));
            //调度任务实例的操作码:该字段可忽略
            System.out.println("opCode: "+dataParam.getString("opCode"));
            //业务流程的ID,周期调度任务实例的业务流程默认为1,手动业务流程和内部工作流调度任务实例为实际的业务流程ID
            System.out.println("flowId: "+dataParam.getString("flowId"));
            //调度任务实例对应的节点ID
            System.out.println("nodeId:"+dataParam.getString("nodeId"));
            //调度任务实例开始等资源的具体时间
            System.out.println("beginWaitResTime: "+dataParam.getString("beginWaitResTime"));
            //调度任务实例ID
            System.out.println("taskId: "+dataParam.getString("taskId"));
            //任务的状态,取值如下:
            //0(未运行)
            //2(等待定时时间dueTime或cycleTime到来)
            //3(等待资源)
            //4(运行中)
            //7(下发给数据质量进行数据校检)
            //8(正在进行分支条件校检)
            //5(执行失败)
            //6(执行成功)
            System.out.println("status: "+dataParam.getString("status"));
        }else{
            System.out.println("未能过滤其他事件,请检查配置步骤");
        }
    }
}
            

本地部署运行

下载工程:下载工程后,进入工程根目录下执行:
mvn clean package -Dmaven.test.skip=true spring-boot:repackage
获得可直接运行的jar后执行:
java -jar target/event-demo-instance-status-1.0.jar
此时会成功启动工程,如下图所示:部署成功在浏览器输入http://localhost:8080/index会得到"hello world!",表示应用成功部署,打通网络后即可订阅EventBridge的消息了。