完成开放消息的配置后,您可以通过本地开发并在DataWorks注册扩展程序的方式,来对指定工作空间下某些页面功能(即进行扩展点事件的相关操作)进行管控。扩展程序订阅Kafka后,DataWorks会将操作产生的事件消息通过Kafka发送至您的扩展程序,并根据扩展程序返回的结果来决定该操作是否生效。
前提条件
本文以具体场景示例扩展程序接收并处理扩展点事件消息,但您需要先了解以下几方面内容:
- 已经完成Kafka相关配置,确保DataWorks事件消息可以正常推送,详情可参考文档:开启消息订阅:DataWorks端。
- 已经根据开发消息订阅:用户端中提供的代码样例,配置好扩展程序订阅消息,确保扩展程序(Consumer)可以正常接收DataWorks推送的消息。
- 已经了解扩展程序接收、处理事件消息,并将处理结果通过回调API返回DataWorks的完整链路,详情可参考文档:开发部署扩展程序。
背景信息
在DataWorks上,扩展点事件可通过扩展程序功能实现流程卡点与事件消息响应,避免因开发人员数据开发专业技能欠缺、数据质量保障意识薄弱,导致代码修改和开发后未进行严格测试便发布生产,或者进行其他高危操作(误删关键表、资源函数等)而出现故障的问题。将人为检视转变为程序审批(开发扩展程序并注册在DataWorks),避免这类问题的发生。
目前支持的扩展点,及扩展程序处理扩展点事件消息的示例如下:
目前支持的扩展点,及扩展程序处理扩展点事件消息的示例如下:
| 支持的扩展点 | 扩展程序处理事件消息示例 |
|---|---|
| 文件提交扩展点 | 文件提交、发布事件消息处理示例 |
| 文件发布扩展点 | |
| 表提交扩展点 | 表提交、发布事件消息处理示例 |
| 表发布扩展点 | |
| 文件删除扩展点 | 文件删除事件消息处理示例 |
| 代码运行扩展点 | 代码运行事件消息处理示例 |
使用限制
调用API的账号需要有AliyunDataWorksFullAccess权限。关于阿里云主账号授予RAM子账号AliyunDataWorksFullAccess权限的操作详情可参考文档:DataWorks权限管控概述。
文件提交、发布事件消息处理示例
示例场景:当进行节点提交、发布操作时,如果节点代码中包含CREATE TABLE关键字,则抛出警告但不阻塞流程;当节点代码中包含DROP TABLE关键字时,则阻塞节点提交、发布操作。
String messageId = ""; // messageId从消息中解析
String extensionCode = ""; // 扩展程序标识符从DataWorks页面查询
Long projectId = 0L; // 项目空间ID从消息中解析
IAcsClient client = null; // 初始化POP客户端
GetIDEEventDetailRequest request = new GetIDEEventDetailRequest();
request.setMessageId(messageId);
request.setProjectId(projectId);
GetIDEEventDetailResponse response = client.getAcsResponse(request);
GetIDEEventDetailResponse.EventDetail.CommittedFile dataSnapshot = response.getEventDetail().getCommittedFile();
String checkResult = "OK";
String checkResultTip = "Success";
if (dataSnapshot.getContent().contains("CREATE TABLE")) {
checkResult = "WARN";
checkResultTip = "最好不要包含建表语句";
}
if (dataSnapshot.getContent().contains("DROP TABLE")) {
checkResult = "FAIL";
checkResultTip = "不能包含删表语句";
}
UpdateIDEEventResultRequest request = new UpdateIDEEventResultRequest();
request.setCheckResult(checkResult);
request.setExtensionCode(extensionCode);
request.setCheckResultTip(checkResultTip);
request.setMessageId(messageId);
client.getAcsResponse(request);
表提交、发布事件消息处理示例
示例场景:表提交或者发布时必须指定生命周期,否则不允许提交或发布。
String messageId = ""; // messageId从消息中解析
String extensionCode = ""; // 扩展程序标识符从DataWorks页面查询
Long projectId = 0L; // 项目空间ID从消息中解析
IAcsClient client = null; // 初始化POP客户端
GetIDEEventDetailRequest request = new GetIDEEventDetailRequest();
request.setMessageId(messageId);
request.setProjectId(projectId);
GetIDEEventDetailResponse response = client.getAcsResponse(request);
GetIDEEventDetailResponse.EventDetail.TableModel dataSnapshot = response.getEventDetail().getTableModel();
String checkResult = "OK";
String checkResultTip = "Success";
if (dataSnapshot.getLifeCycle() == null || dataSnapshot.getLifeCycle() == 0) {
checkResult = "FAIL";
checkResultTip = "表必须指定生命周期";
}
UpdateIDEEventResultRequest request = new UpdateIDEEventResultRequest();
request.setCheckResult(checkResult);
request.setExtensionCode(extensionCode);
request.setCheckResultTip(checkResultTip);
request.setMessageId(messageId);
client.getAcsResponse(request);文件删除事件消息处理示例
示例场景:工作空间下不允许任何文件(资源、函数、节点)被删除,当指定工作空间下进行文件删除操作时将会被阻塞。
String messageId = ""; // messageId从消息中解析
String extensionCode = ""; // 扩展程序标识符从DataWorks页面查询
Long projectId = 0L; // 项目空间ID从消息中解析
IAcsClient client = null; // 初始化POP客户端
String checkResult = "FAIL";
String checkResultTip = "不允许删除文件";
UpdateIDEEventResultRequest request = new UpdateIDEEventResultRequest();
request.setCheckResult(checkResult);
request.setExtensionCode(extensionCode);
request.setCheckResultTip(checkResultTip);
request.setMessageId(messageId);
client.getAcsResponse(request);代码运行事件消息处理示例
场景示例:当节点代码中包含CREATE TABLE关键字时,节点运行前将抛出警告;当节点代码中包含DROP TABLE关键字时,将阻塞节点运行操作。
String messageId = ""; // messageId从消息中解析
String extensionCode = ""; // 扩展程序标识符从DataWorks页面查询
Long projectId = 0L; // 项目空间ID从消息中解析
IAcsClient client = null; // 初始化POP客户端
GetIDEEventDetailRequest request = new GetIDEEventDetailRequest();
request.setMessageId(messageId);
request.setProjectId(projectId);
GetIDEEventDetailResponse response = client.getAcsResponse(request);
GetIDEEventDetailResponse.EventDetail.FileExecutionCommand dataSnapshot = response.getEventDetail().getFileExecutionCommand();
String checkResult = "OK";
String checkResultTip = "Success";
if (dataSnapshot.getContent().contains("CREATE TABLE")) {
checkResult = "WARN";
checkResultTip = "最好不要包含建表语句";
}
if (dataSnapshot.getContent().contains("DROP TABLE")) {
checkResult = "FAIL";
checkResultTip = "不能包含删表语句";
}
UpdateIDEEventResultRequest request = new UpdateIDEEventResultRequest();
request.setCheckResult(checkResult);
request.setExtensionCode(extensionCode);
request.setCheckResultTip(checkResultTip);
request.setMessageId(messageId);
client.getAcsResponse(request);