DataWorks提供内置的流程检查,如任务发布前代码评审、数据治理中心治理项的内置检查项校验,此外,DataWorks还支持您自定义校验逻辑并接入DataWorks,实现DataWorks流程管控。本文以在提交与发布时校验代码中是否存在MAX_PT函数为例,为您介绍如何基于扩展程序实现工作空间中不允许使用特定函数。
场景说明
本实践示例的场景为校验提交发布的代码中是否有MAX_PT函数,示例的校验流程为:
|
序号 |
核心流程 |
核心功能点 |
|
1 |
将工作空间中的文件提交与发布消息(例如提交、发布节点)通过OpenEvent发布至EventBridge,后续通过EventBridge过滤事件消息,并将消息发送至您的服务。 |
开启消息订阅的时候,通过事件规则指定订阅的事件类型为文件提交与发布(dataworks:FileChange:CommitFile和dataworks:FileChange:DeployFile) 配置详情请参见最佳实践:(高级特性应用)禁止使用MAX_PT函数。 |
|
2 |
本地或在线服务接收消息,通过对扩展程序的设置,实现:
|
配置详情请参见最佳实践:(高级特性应用)禁止使用MAX_PT函数。 |
前提条件
操作步骤
步骤一:配置自定义总线
-
登录事件总线EventBridge控制台,单击左侧导航栏事件总线,进入事件总线创建页面。
-
单击
按钮,创建自定义事件总线。-
在总线模块内配置完自定义事件总线名称后,单击下一步,进入事件源配置。
-
单击跳过,跳过事件源、规则、目标模块的配置。同时填写 描述。
-
-
单击左侧导航栏事件总线进入事件总线创建页面,找到已创建的事件总线后,单击名称,进入事件总线概览页面。
-
单击左侧导航栏事件规则进入事件规则页面后,单击创建规则新建事件规则。
-
本实践定义该EventBridge自定义总线可接收DataWorks文件提交事件消息和文件发布事件消息,配置demo与核心参数如下。
-
配置基本信息:自定义规则名称即可。
-
实践模式内容。
-
事件源类型:选择自定义事件源。
-
事件源:不进行配置。
-
模式内容:以JSON格式编写,配置内容如下。
{ "source": [ "acs.dataworks" ], "type": [ "dataworks:FileChange:CommitFile", "dataworks:FileChange:DeployFile" ] }-
source:定义事件的产品名称标识acs.dataworks。
-
type:定义产品下事件的类型标识,配置为dataworks:FileChange:CommitFile、dataworks:FileChange:DeployFile。
-
-
事件模式调试:将source、type取值进行补充修改,然后进行事件测试,测试成功后单击下一步。单击测试,验证自定义事件能否匹配该模式。测试通过后页面提示匹配通过,事件可正常被触发。
-
-
配置事件目标。
-
服务类型:选择HTTPS或HTTP,更多服务类型可参见管理事件规则。
-
URL:填写接收自定义总线推送信息的URL,例如
https://服务器地址:端口号/extensions/consumer。 -
Body:选择完整事件。
-
网络类型:选择公网。
-
-
-
步骤二:配置事件分发通道
进入开放平台页面。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的,单击进入开放平台,进入开发者后台页面。
-
在开发者后台页面,单击左侧导航栏OpenEvent,进入页面后,单击添加事件分发通道,在弹窗内进行配置。
-
要分发事件的工作空间:选择已创建空间。
-
指定分发到EventBridge中自定义总线:选择步骤一创建的事件总线。
-
-
保存事件分发通道后,在事件分发通道的操作列,单击启用按钮,启用新建的事件分发通道。
步骤三:注册并配置扩展程序(Extensions)
进入开放平台页面。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的,单击进入开放平台,进入开发者后台页面。
-
在开发者后台页面,单击左导航栏扩展程序,进入页面后,单击注册扩展程序,在弹窗内进行配置。
-
部署方式选择:选择通过自建服务部署。
-
注册扩展程序
-
扩展程序名称:自定义配置。
-
处理的扩展点:选择文件提交前置事件、文件发布前置事件。
-
测试用工作空间:配置后,可在扩展性程序创建好,但并未提交前针对该空间内进行扩展程序测试。
-
扩展程序参数配置
您可通过本参数的配置来控制扩展程序的生效范围,在指定工作空间下:
-
提交节点(文件提交前置事件)时不触发扩展程序校验、不阻塞提交节点的流程。
-
发布节点(文件发布前置事件)时触发扩展程序校验,如果不满足校验通过条件会阻塞发布节点的流程。
需配置为
extension.project.commit-file.disabled=YourProjectId。其中YourProjectId需替换为扩展程序不生效的扩展点事件所在的工作空间ID。 -
-
扩展程序选项配置:用于设置不满足校验条件后的响应方式,以下示例中指定的相应方式包括告警和禁用。
{ "type":"object", "properties":{ "checkStatus":{ "type":"number", "title":"MAX-PT函数检查方式", "x-decorator":"FormItem", "x-component":"Radio.Group", "x-decorator-props":{ "tooltip":"描述文件" }, "x-component-props":{ "dataSource":[ { "value":"WARN", "label":"告警" }, { "value":"FAIL", "label":"禁用" } ], "mode":"multiple" } } } }
-
-
-
配置完成以上内容后,单击确定保存注册扩展程序。
-
在已创建好的扩展程序的操作列,单击提交,进入审核状态。
说明-
扩展程序审核由DataWorks平台审核,审核事件通常在T+3个工作日完成,请耐心等待。
-
如需要测试创建的扩展程序,需要配置测试用工作空间。
-
-
审核通过后,单击操作列的上线,即可上线使用该扩展程序。
-
单击扩展程序管理按钮,进入页面,选择创建的扩展程序,在启用列启用该扩展程序,并单击设置工作空间对MAX_PT函数使用的管控力度。
开发配置扩展程序
事件总线EventBridge通过HTTP请求获取DataWorks发送的JSON格式事件,解析消息,并推送至目标服务程序,对事件进行处理后,返回结果至DataWorks中。
示例代码
该代码示例是通过调用UpdateIDEEventResultAPI,进行结果的回调。通过API中的messageId参数来获取详细的事件详情后进行逻辑判断是否包含限制函数,判断结果使用UpdateIDEEventResult返回至DataWorks中。详情请参见开发扩展程序。
环境构建:Java8,Maven构建工具。
package com.aliyun.dataworks.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import com.aliyun.dataworks.config.EventCheckEnum;
import com.aliyun.dataworks.config.ExtensionParamProperties;
import com.aliyun.dataworks.services.DataWorksOpenApiClient;
import com.aliyun.dataworks_public20200518.Client;
import com.aliyun.dataworks_public20200518.models.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author dataworks demo
*/
@RestController
@RequestMapping("/extensions")
public class ExtensionsController {
@Autowired(required = false)
private DataWorksOpenApiClient dataWorksOpenApiClient;
@Autowired
private ExtensionParamProperties extensionParamProperties;
/**
* 接收eventBridge推送过来的消息
*
* @param jsonParam
*/
@PostMapping("/consumer")
public void consumerEventBridge(@RequestBody String jsonParam) {
JSONObject jsonObj = JSON.parseObject(jsonParam);
String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
if (Constants.COMMIT_FILE_EVENT_CODE.equals(eventCode) || Constants.DEPLOY_FILE_EVENT_CODE.equals(eventCode)) {
//初始化client
Client client = dataWorksOpenApiClient.createClient();
try {
//当前事件参数信息
String messageId = jsonObj.getString("id");
JSONObject data = jsonObj.getObject("data", JSONObject.class);
Long projectId = data.getLong("projectId");
//初始化事件回调
UpdateIDEEventResultRequest updateIDEEventResultRequest = new UpdateIDEEventResultRequest();
updateIDEEventResultRequest.setMessageId(messageId);
updateIDEEventResultRequest.setExtensionCode(extensionParamProperties.getExtensionCode());
//查询触发扩展点事件时的扩展点数据快照
GetIDEEventDetailRequest getIDEEventDetailRequest = new GetIDEEventDetailRequest();
getIDEEventDetailRequest.setMessageId(messageId);
getIDEEventDetailRequest.setProjectId(projectId);
GetIDEEventDetailResponse getIDEEventDetailResponse = client.getIDEEventDetail(getIDEEventDetailRequest);
String content = getIDEEventDetailResponse.getBody().getEventDetail().getCommittedFile().getContent();
//判断代码是否包含限制函数
if (content.contains(Constants.CHECK_CODE)) {
//获取扩展程序选项配置在项目空间下的配置
GetOptionValueForProjectRequest getOptionValueForProjectRequest = new GetOptionValueForProjectRequest();
getOptionValueForProjectRequest.setProjectId(String.valueOf(projectId));
getOptionValueForProjectRequest.setExtensionCode(extensionParamProperties.getExtensionCode());
GetOptionValueForProjectResponse getOptionValueForProjectResponse = client.getOptionValueForProject(getOptionValueForProjectRequest);
JSONObject jsonObject = JSON.parseObject(getOptionValueForProjectResponse.getBody().getOptionValue());
//注意 这里需根据在DataWorks上实际设置格式来填写
String checkStatus = jsonObject.getString("checkStatus");
updateIDEEventResultRequest.setCheckResult(checkStatus);
updateIDEEventResultRequest.setCheckResultTip("代码中存在限制函数");
} else {//成功回调
updateIDEEventResultRequest.setCheckResult(EventCheckEnum.OK.getCode());
updateIDEEventResultRequest.setCheckResultTip(EventCheckEnum.OK.getName());
}
//回调DataWorks
UpdateIDEEventResultResponse response = client.updateIDEEventResult(updateIDEEventResultRequest);
//请求的唯一标识,用于后续错误排查使用
System.out.println("response:" + response.getBody().getRequestId());
} catch (Exception e) {
//错误描述信息
System.out.println("ErrMsg:" + e.getMessage());
}
} else {
System.out.println("未能过滤其他事件,请检查配置步骤");
}
}
}
示例工程部署
-
准备环境与工程
-
依赖环境:Java8及以上,Maven构建工具。
-
-
部署方式
-
本地部署:将工程文件打成jar包后,在本地环境中已部署Java8和Maven工具的本地服务器上,执行
java -jar yourapp.jar命令,启动服务程序。 -
云平台部署:将工程文件打成jar包后,上传至相应的运行环境例如:Docker容器、云服务器等进行部署。
说明部署完成的服务,需要保证EventBridge可通过公网访问到该服务。
-
-
下载工程后,进入工程根目录下执行打包命令,将工程项目打成jar包。
mvn clean package -Dmaven.test.skip=true spring-boot:repackage -
执行jar包:
java -jar target/extensions-demo-maxpt-1.0.jar此时会成功启动工程。
/\ / /'---'. __ __ ____ /'\'\\ ( ( )\'---'| '--'| '--'\/'--'| '\ \\\\ \/ /__) | |__| | | |__(___| | ) ) ) ) ' '__| | |__| |'| |___| | / / / / =========|_|===============|__/=/=/_/ :: Spring Boot :: (v2.7.2) 2022-08-22 11:17:35.261 INFO 28648 --- [ main] com.aliyun.dataworks.DemoStarter : Starting DemoStarter v1.0 using Java 1.8.0_151 on xxx.local with PID 28648 (/Users/xxx/ts/extensions-demo-maxpt) ns-demo-maxpt/target/extensions-demo-maxpt-1.0.jar started by guangzhen.zk in /Users/xxx/ts/extensions-demo-maxpt) 2022-08-22 11:17:35.264 INFO 28648 --- [ main] com.aliyun.dataworks.DemoStarter : No active profile set, falling back to 1 default profile: "default" 2022-08-22 11:17:35.276 INFO 28648 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http) 2022-08-22 11:17:36.291 INFO 28648 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2022-08-22 11:17:36.291 INFO 28648 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.65] 2022-08-22 11:17:36.419 INFO 28648 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 2022-08-22 11:17:36.419 INFO 28648 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1099 ms 2022-08-22 11:17:36.851 INFO 28648 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '' 2022-08-22 11:17:36.860 INFO 28648 --- [ main] com.aliyun.dataworks.DemoStarter : Started DemoStarter in 1.98 seconds (JVM running for 2.449)在浏览器输入
http://localhost:8080/index会得到"hello world!",表示应用成功部署,打通网络后即可订阅EventBridge的消息了。
结果验证
完成代码部署与打通EventBridge的网络后,您可以在开启扩展程序的空间内进行验证。
验证步骤
-
在数据开发页面创建节点,在节点内编辑被禁止的
MAX_PT函数保存并提交。 -
单击发布按钮,进入创建发布包页面对该节点直接进行发布,则会触发扩展程序校验。
说明根据扩展程序配置,并不会对指定工作空间的代码文件提交事件生效,所以在提交包含
MAX_PT函数的节点时不会触发扩展校验程序,而是在对包含MAX_PT函数节点进行发布时,触发扩展程序校验。
您也可以在该扩展程序的参数配置中,配置的提交扩展点黑名单生效项目,进行提交并发布包含MAX_PT函数的节点,观察并验证扩展程序校验流程,以及扩展点黑名单是否生效。触发校验后,弹出操作检查对话框,显示检查项列表:官方检查器/代码评审和冒烟测试状态为通过,自定义扩展程序规则不允许使用max_pt函数状态为检查中(橙色),页面右上角提示文件暂未通过检查,无法创建发布包,表明扩展程序成功拦截了包含 MAX_PT 函数的代码发布。