DataWorks提供内置的流程检查,如任务发布前代码评审、数据治理中心治理项的内置检查项校验,此外,DataWorks还支持您自定义校验逻辑并接入DataWorks,实现DataWorks流程管控。本文以在运维中心的获取实例状态变更为例,为您介绍如何基于开放消息实现订阅实例状态变更。
背景信息
本实践涉及的开放平台的相关功能介绍与基本概念可参见OpenEvent概述。
开启并配置消息订阅(OpenEvent)
开启并配置消息订阅的详细步骤请参见开启消息订阅,以下为本实践中的核心配置流程与注意事项。
- 在EvenBridge控制台,跳过事件源等配置,快速创建一个自定义总线。
- 在EvenBridge控制台对应的事件总线中,创建事件规则。本实践定义该EventBridge自定义总线可接收DataWorks实例状态变更消息,配置demo与核心参数配置如下。
- 配置事件模式。
{ "source": [ "acs.dataworks" ], "type": [ "dataworks:InstanceStatusChanges:InstanceStatusChanges" ] }
- source:定义事件的产品名称标识,配置为acs.dataworks。
- type:定义产品下事件的类型标识,配置为dataworks:InstanceStatusChanges:InstanceStatusChanges。您可以在下方的事件模式调试中,将source、type取值进行补充修改,然后进行事件测试,测试成功后单击下一步。
- 配置事件目标中,服务类型选择为HTTPS,并填写合适的URL,其他参数可保持默认。
- 配置事件模式。
- 在DataWorks控制台的开放平台页面,启用上述消息分发通道。
代码编写
package com.aliyun.dataworks.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author dataworks demo
*/
@RestController
@RequestMapping("/event")
public class ExtensionsController {
/**
* 接收eventBridge推送过来的消息
* @param jsonParam
*/
@PostMapping("/consumer")
public void consumerEventBridge(@RequestBody String jsonParam){
JSONObject jsonObj = JSON.parseObject(jsonParam);
String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
if(Constants.INSTANCE_STATUS_EVENT_CODE.equals(eventCode)){
JSONObject dataParam = JSON.parseObject(jsonObj.getString("data"));
//调度任务实例开始等时间的具体时间
System.out.println("beginWaitTimeTime: "+ dataParam.getString("beginWaitTimeTime"));
//DagId
System.out.println("dagId: "+ dataParam.getString("dagId"));
//Dag的类型,取值如下:
//0:周期调度任务
//1:手动任务
//2:冒烟测试
//3:补数据
//4:手动业务流程
//5:临时业务流程
System.out.println("dagType: "+dataParam.getString("dagType"));
//任务实例的调度类型,取值如下:
//NORMAL(0):正常调度任务。该任务被日常调度。
//MANUAL(1):手动任务。该任务不会被日常调度。
//PAUSE(2):冻结任务。该任务被日常调度,但启动调度时直接被置为失败状态。
//SKIP(3):空跑任务。该任务被日常调度,但启动调度时直接被置为成功状态。
//SKIP_UNCHOOSE(4):临时工作流中未选择的任务,仅存在于临时工作流中,启动调度时直接被置为成功状态。
//SKIP_CYCLE(5):未到运行周期的周或月任务。该任务被日常调度,但启动调度时直接被置为成功状态。
//CONDITION_UNCHOOSE(6):上游实例中有分支(IF)节点,但是该下游节点未被分支节点选中,直接置为空跑任务。
//REALTIME_DEPRECATED(7):实时生成的已经过期的周期实例,该类型的任务直接被置为成功状态。
System.out.println("taskType: "+dataParam.getString("taskType"));
//任务实例的修改时间
System.out.println("modifyTime: "+dataParam.getString("modifyTime"));
//任务实例的创建时间
System.out.println("createTime: "+dataParam.getString("createTime"));
//工作空间的ID。您可以调用ListProjects查看空间ID信息。
System.out.println("appId: "+dataParam.getString("appId"));
//调度任务实例所在工作空间的租户ID
System.out.println("tenantId: "+dataParam.getString("tenantId"));
//调度任务实例的操作码:该字段可忽略
System.out.println("opCode: "+dataParam.getString("opCode"));
//业务流程的ID,周期调度任务实例的业务流程默认为1,手动业务流程和内部工作流调度任务实例为实际的业务流程ID
System.out.println("flowId: "+dataParam.getString("flowId"));
//调度任务实例对应的节点ID
System.out.println("nodeId:"+dataParam.getString("nodeId"));
//调度任务实例开始等资源的具体时间
System.out.println("beginWaitResTime: "+dataParam.getString("beginWaitResTime"));
//调度任务实例ID
System.out.println("taskId: "+dataParam.getString("taskId"));
//任务的状态,取值如下:
//0(未运行)
//2(等待定时时间dueTime或cycleTime到来)
//3(等待资源)
//4(运行中)
//7(下发给数据质量进行数据校检)
//8(正在进行分支条件校检)
//5(执行失败)
//6(执行成功)
System.out.println("status: "+dataParam.getString("status"));
}else{
System.out.println("未能过滤其他事件,请检查配置步骤");
}
}
}
本地部署运行
下载工程:
- 依赖环境:java8及以上,maven构建工具。
- 工程下载链接:event-demo-instance-status.zip。
mvn clean package -Dmaven.test.skip=true spring-boot:repackage
获得可直接运行的jar后执行:java -jar target/event-demo-instance-status-1.0.jar
此时会成功启动工程,如下图所示:在浏览器输入http://localhost:8080/index
会得到"hello world!"
,表示应用成功部署,打通网络后即可订阅EventBridge的消息了。