完成开放消息的配置后,您可以通过本地开发并在DataWorks注册扩展程序的方式,来对指定工作空间下某些页面功能(即进行扩展点事件的相关操作)进行管控。扩展程序订阅Kafka后,DataWorks会将操作产生的事件消息通过Kafka发送至您的扩展程序,并根据扩展程序返回的结果来决定该操作是否生效。

前提条件

本文以具体场景示例扩展程序接收并处理扩展点事件消息,但您需要先了解以下几方面内容:

  • 已经完成Kafka相关配置,确保DataWorks事件消息可以正常推送,详情可参考文档:开启消息订阅:DataWorks
  • 已经根据开发消息订阅:用户端中提供的代码样例,配置好扩展程序订阅消息,确保扩展程序(Consumer)可以正常接收DataWorks推送的消息。
  • 已经了解扩展程序接收、处理事件消息,并将处理结果通过回调API返回DataWorks的完整链路,详情可参考文档:开发部署扩展程序

背景信息

在DataWorks上,扩展点事件可通过扩展程序功能实现流程卡点与事件消息响应,避免因开发人员数据开发专业技能欠缺、数据质量保障意识薄弱,导致代码修改和开发后未进行严格测试便发布生产,或者进行其他高危操作(误删关键表、资源函数等)而出现故障的问题。将人为检视转变为程序审批(开发扩展程序并注册在DataWorks),避免这类问题的发生。扩展程序接收并处理扩展点事件目前支持的扩展点,及扩展程序处理扩展点事件消息的示例如下:
支持的扩展点 扩展程序处理事件消息示例
文件提交扩展点 文件提交、发布事件消息处理示例
文件发布扩展点
表提交扩展点 表提交、发布事件消息处理示例
表发布扩展点
文件删除扩展点 文件删除事件消息处理示例
代码运行扩展点 代码运行事件消息处理示例

使用限制

调用API的账号需要有AliyunDataWorksFullAccess权限。关于阿里云主账号授予RAM子账号AliyunDataWorksFullAccess权限的操作详情可参考文档:DataWorks权限管控概述

文件提交、发布事件消息处理示例

示例场景:当进行节点提交、发布操作时,如果节点代码中包含CREATE TABLE关键字,则抛出警告但不阻塞流程;当节点代码中包含DROP TABLE关键字时,则阻塞节点提交、发布操作。
String messageId = ""; // messageId从消息中解析
String extensionCode = ""; // 扩展程序标识符从DataWorks页面查询
Long projectId = 0L;  // 项目空间ID从消息中解析
IAcsClient client = null; // 初始化POP客户端

GetIDEEventDetailRequest request = new GetIDEEventDetailRequest();
request.setMessageId(messageId);
request.setProjectId(projectId);
GetIDEEventDetailResponse response = client.getAcsResponse(request);
GetIDEEventDetailResponse.EventDetail.CommittedFile dataSnapshot = response.getEventDetail().getCommittedFile();
String checkResult = "OK";
String checkResultTip = "Success";
if (dataSnapshot.getContent().contains("CREATE TABLE")) {
    checkResult = "WARN";
    checkResultTip = "最好不要包含建表语句";
}
if (dataSnapshot.getContent().contains("DROP TABLE")) {
    checkResult = "FAIL";
    checkResultTip = "不能包含删表语句";
}

UpdateIDEEventResultRequest request = new UpdateIDEEventResultRequest();
request.setCheckResult(checkResult);
request.setExtensionCode(extensionCode);
request.setCheckResultTip(checkResultTip);
request.setMessageId(messageId);
client.getAcsResponse(request);
                

表提交、发布事件消息处理示例

示例场景:表提交或者发布时必须指定生命周期,否则不允许提交或发布。
String messageId = ""; // messageId从消息中解析
String extensionCode = ""; // 扩展程序标识符从DataWorks页面查询
Long projectId = 0L;  // 项目空间ID从消息中解析
IAcsClient client = null; // 初始化POP客户端

GetIDEEventDetailRequest request = new GetIDEEventDetailRequest();
request.setMessageId(messageId);
request.setProjectId(projectId);
GetIDEEventDetailResponse response = client.getAcsResponse(request);
GetIDEEventDetailResponse.EventDetail.TableModel dataSnapshot = response.getEventDetail().getTableModel();
String checkResult = "OK";
String checkResultTip = "Success";
if (dataSnapshot.getLifeCycle() == null || dataSnapshot.getLifeCycle() == 0) {
    checkResult = "FAIL";
    checkResultTip = "表必须指定生命周期";
}

UpdateIDEEventResultRequest request = new UpdateIDEEventResultRequest();
request.setCheckResult(checkResult);
request.setExtensionCode(extensionCode);
request.setCheckResultTip(checkResultTip);
request.setMessageId(messageId);
client.getAcsResponse(request);

文件删除事件消息处理示例

示例场景:工作空间下不允许任何文件(资源、函数、节点)被删除,当指定工作空间下进行文件删除操作时将会被阻塞。
String messageId = ""; // messageId从消息中解析
String extensionCode = ""; // 扩展程序标识符从DataWorks页面查询
Long projectId = 0L;  // 项目空间ID从消息中解析
IAcsClient client = null; // 初始化POP客户端

String checkResult = "FAIL";
String checkResultTip = "不允许删除文件";

UpdateIDEEventResultRequest request = new UpdateIDEEventResultRequest();
request.setCheckResult(checkResult);
request.setExtensionCode(extensionCode);
request.setCheckResultTip(checkResultTip);
request.setMessageId(messageId);
client.getAcsResponse(request);

代码运行事件消息处理示例

场景示例:当节点代码中包含CREATE TABLE关键字时,节点运行前将抛出警告;当节点代码中包含DROP TABLE关键字时,将阻塞节点运行操作。
String messageId = ""; // messageId从消息中解析
String extensionCode = ""; // 扩展程序标识符从DataWorks页面查询
Long projectId = 0L;  // 项目空间ID从消息中解析
IAcsClient client = null; // 初始化POP客户端

GetIDEEventDetailRequest request = new GetIDEEventDetailRequest();
request.setMessageId(messageId);
request.setProjectId(projectId);
GetIDEEventDetailResponse response = client.getAcsResponse(request);
GetIDEEventDetailResponse.EventDetail.FileExecutionCommand dataSnapshot = response.getEventDetail().getFileExecutionCommand();
String checkResult = "OK";
String checkResultTip = "Success";
if (dataSnapshot.getContent().contains("CREATE TABLE")) {
    checkResult = "WARN";
    checkResultTip = "最好不要包含建表语句";
}
if (dataSnapshot.getContent().contains("DROP TABLE")) {
    checkResult = "FAIL";
    checkResultTip = "不能包含删表语句";
}

UpdateIDEEventResultRequest request = new UpdateIDEEventResultRequest();
request.setCheckResult(checkResult);
request.setExtensionCode(extensionCode);
request.setCheckResultTip(checkResultTip);
request.setMessageId(messageId);
client.getAcsResponse(request);