开发部署扩展程序:函数计算方式

在DataWorks扩展程序中,您可以自定义逻辑以监管用户的操作行为,例如拦截和阻断不当行为,通过扩展程序对特定事件进行消息通知与流程管控。本文为您介绍如何通过函数计算方式开发部署扩展程序。

背景信息

函数计算是事件驱动的全托管计算服务,您可以将扩展程序部署在函数计算执行环境中,DataWorks 会将扩展点事件消息推送至 ExtensionRequest 类。在您的扩展程序代码中,通过实现 PojoRequestHandler 接口的 handleRequest 方法,您可以构造 ExtensionRequest 对象,以接收 DataWorks 推送的扩展点事件消息及上下文(Context),您可以定义扩展程序的处理逻辑,并通过 ExtensionResponse 类返回扩展程序处理结果。DataWorks 将自动从 ExtensionResponse 中获取处理结果来决定本次扩展点操作流程是否阻塞。

使用限制

  • 仅支持DataWorks企业版。

  • 支持地域:华北2(北京)、华东1(杭州)、华东2(上海)、华北3(张家口)、华南1(深圳)、西南1(成都)、美国(硅谷)、美国(弗吉尼亚)、德国(法兰克福)、日本(东京)、中国(香港)、新加坡

注意事项

  • 权限控制:仅开放平台管理员租户管理员、阿里云主账号或者拥有AliyunDataWorksFullAccess权限的RAM用户拥有开发者后台的读写权限,权限控制详情请参见全局级模块权限控制产品及控制台权限控制详情:RAM Policy

  • 版本限制:如果您使用的企业版DataWorks到期,所有扩展程序将会失效,无法再触发事件检查。已触发且未达终态的检查会自动通过。

  • 节点限制:包含内部节点的组合类节点(例如:机器学习(PAI)节点do-while节点for-each节点)触发检查时,需内部节点均检查通过才可继续进行后续操作。

  • 触发说明:多个扩展程序可关联同一个扩展点事件,即同一个事件支持触发多个扩展程序。

  • 事件限制:通过函数计算方式部署的扩展程序目前仅支持事件为:数据下载前置事件资产上架\下架前置事件数据上传前置事件

处理流程

以下为通过函数计算开发部署的扩展程序处理扩展点事件运行的基本流程。

image
说明

扩展点事件触发后,在等待扩展程序回调API消息结果时,触发扩展事件的流程会变成检查中状态,直至扩展程序将处理结果返回给DataWorks,DataWorks会根据反馈结果状态决定是否阻塞本次流程。

用户侧

在进行函数计算部署扩展程序前,需先完成扩展程序的开发,使用fc-java-core库运行请求处理程序,示例代码为fc_dataworks_demo01-1.0-SNAPSHOT.jar,操作详情请参见事件请求处理程序(Event Handler)

步骤一:配置扩展程序依赖

在开发扩展程序时,需要添加以下依赖进pom.xml文件。

DataWorks依赖库

<dependency>
 <groupId>com.aliyun</groupId>
 <artifactId>dataworks_public20200518</artifactId>
 <version>5.6.0</version>
</dependency>

函数计算依赖库

<!-- https://mvnrepository.com/artifact/com.aliyun.fc.runtime/fc-java-core -->
<dependency>
    <groupId>com.aliyun.fc.runtime</groupId>
    <artifactId>fc-java-core</artifactId>
    <version>1.4.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.aliyun.fc.runtime/fc-java-event -->
<dependency>
    <groupId>com.aliyun.fc.runtime</groupId>
    <artifactId>fc-java-event</artifactId>
    <version>1.2.0</version>
</dependency>

打包依赖库

<build>
        <plugins>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-shade-plugin</artifactId>
                  <version>3.2.1</version>
                  <executions>
                    <execution>
                      <phase>package</phase>
                      <goals>
                        <goal>shade</goal>
                      </goals>
                      <configuration>
                        <filters>
                          <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                              <exclude>META-INF/*.SF</exclude>
                              <exclude>META-INF/*.DSA</exclude>
                              <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                          </filter>
                        </filters>
                      </configuration>
                    </execution>
                  </executions>
              </plugin>
        </plugins>
</build>

您可以使用Apache Maven Shade插件或Apache Maven Assembly插件,以上示例仅以Apache Maven Shade插件为例。

步骤二:开发扩展程序代码

在函数计算上开发扩展程序,需要使用接口PojoRequestHandler,来实现方法handleRequest接收ExtensionRequest 类的请求和Context上下文并定义扩展程序接收处理逻辑,最后通过ExtensionResponse 类返回扩展程序处理结果。

  1. 开发程序代码注意事项。

    解析消息内容

    DataWorks推送的事件消息格式可参考开发参考:事件列表与消息格式。在消息格式中messageBody为具体的消息内容,在实际开发时,可通过消息格式中的messageBody.eventCode字段确认消息类型,并通过messageId字段获取消息详情。

    编写处理逻辑

    请根据业务场景需要,针对该类型消息进行逻辑处理。在扩展程序开发过程中,您可通过以下方式提高开发效率和应用效果。

    • 使用高级应用:扩展程序参数配置,例如extension.project.disabled,使扩展程序对某个工作空间不生效。

    • 在处理数据开发DataStudio模块相关扩展点时,调用GetIDEEventDetail接口,根据MessageId获取触发扩展点事件时的数据快照。

    说明

    MessageId对应消息中的id字段,详情可参考附录:DataWorks发送给EventBridge的消息格式

    返回处理结果至DataWorks

    在封装返回结果ExtensionResponse时,需要指定消息处理结果CheckResult。DataWorks会自动读取最终的处理结果CheckResult,判断流程是否失败。

    • OK:扩展程序对本次扩展点事件检查通过。

    • FAIL:扩展程序对本次扩展点事件检查不通过。您需要查看并及时处理报错,以免影响后续程序的正常执行。

    • WARN:扩展程序对本次扩展点事件检查通过,但存在警告。

    您可下载此示例代码fc_dataworks_demo01-1.0-SNAPSHOT.jar上传至函数计算用于验证测试,代码详情如下:

    代码详情

    APP

    该代码实现了使用函数计算接口PojoRequestHandler来实现方法 handleRequest 接收 ExtensionRequest 类型的请求和 Context上下文并定义扩展程序接收处理逻辑,最后通过 ExtensionResponse 类型返回扩展程序处理结果。

    该代码示例为禁止手动往ADS表上传数据。

    package com.aliyun.example;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.aliyun.dataworks.ExtensionRequest;
    import com.aliyun.dataworks.ExtensionResponse;
    import com.aliyun.fc.runtime.Context;
    import com.aliyun.fc.runtime.PojoRequestHandler;
    
    
    public class App implements PojoRequestHandler<ExtensionRequest, ExtensionResponse> {
    
        public ExtensionResponse handleRequest(ExtensionRequest extensionRequest, Context context) {
            // 打印请求内容,用于调试
             System.out.println(JSON.toJSONString(extensionRequest));
            // 创建响应对象
            ExtensionResponse extensionResponse = new ExtensionResponse();
            // 判断 eventType 是否为 'upload-data-to-table'
            if ("upload-data-to-table".equals(extensionRequest.getEventType())) {
                try {
                    // 将 messageBody 转换为字符串然后解析为 JSONObject
                    String messageBodyStr = JSON.toJSONString(extensionRequest.getMessageBody());
                    JSONObject messageBody = JSON.parseObject(messageBodyStr);
                    String tableGuid = messageBody.getString("tableGuid");
                    // 判断 tableGuid 是否包含 'ads'
                    if (tableGuid != null && tableGuid.contains("ads")) {
                        extensionResponse.setCheckResult("FAIL");
                    } else {
                        extensionResponse.setCheckResult("OK");
                    }
                } catch (Exception e) {
                    extensionResponse.setCheckResult("FAIL");
                    extensionResponse.setErrorMessage("Error processing request: " + e.getMessage());
                    return extensionResponse;
                }
            } else {
                extensionResponse.setCheckResult("FAIL");
            }
    
            // 设置错误消息,作为反馈信息
            extensionResponse.setErrorMessage("This is a test!");
    
            // 返回响应对象
            return extensionResponse;
        }
    }
    
    

    ExtensionRequest

    定义扩展程序请求结构,用于封装DataWorks发送的事件消息。

    public class ExtensionRequest {
        private Object messageBody;
        private String messageId;
        private String extensionBizId;
        private String extensionBizName;
        private String eventType;
        private String eventCategoryType;
        private Boolean blockBusiness;
    
        public ExtensionRequest() {
        }
    
        public Object getMessageBody() {
            return this.messageBody;
        }
    
        public void setMessageBody(Object messageBody) {
            this.messageBody = messageBody;
        }
    
        public String getMessageId() {
            return this.messageId;
        }
    
        public void setMessageId(String messageId) {
            this.messageId = messageId;
        }
    
        public String getExtensionBizId() {
            return this.extensionBizId;
        }
    
        public void setExtensionBizId(String extensionBizId) {
            this.extensionBizId = extensionBizId;
        }
    
        public String getExtensionBizName() {
            return this.extensionBizName;
        }
    
        public void setExtensionBizName(String extensionBizName) {
            this.extensionBizName = extensionBizName;
        }
    
        public String getEventType() {
            return this.eventType;
        }
    
        public void setEventType(String eventType) {
            this.eventType = eventType;
        }
    
        public String getEventCategoryType() {
            return this.eventCategoryType;
        }
    
        public void setEventCategoryType(String eventCategoryType) {
            this.eventCategoryType = eventCategoryType;
        }
    
        public Boolean getBlockBusiness() {
            return this.blockBusiness;
        }
    
        public void setBlockBusiness(Boolean blockBusiness) {
            this.blockBusiness = blockBusiness;
        }
    }

    ExtensionResponse

    定义扩展程序响应结构,用于封装扩展程序的处理结果。

    public class ExtensionResponse {
        private String checkResult;
        private String errorMessage;
    
        public ExtensionResponse() {
        }
    
        public String getCheckResult() {
            return this.checkResult;
        }
    
        public void setCheckResult(String checkResult) {
            this.checkResult = checkResult;
        }
    
        public String getErrorMessage() {
            return this.errorMessage;
        }
    
        public void setErrorMessage(String errorMessage) {
            this.errorMessage = errorMessage;
        }
    }
  2. 代码开发完成后,需将程序打包为可运行的.jar包或.zip包,以供后续配置函数计算使用。

    打开代码开发编辑器的命令行窗口,切换至根目录,执行mvn clean package命令进行打包。

    • 如果显示编译失败,请根据输出的编译错误信息调整代码。

    • 如果编译成功,编译后的JAR包位于项目文件夹内的target目录,并根据pom.xml内的artifactIdversion字段命名为java-example-1.0-SNAPSHOT.jar

    说明

    macOS与Linux操作系统下,打包前请确保代码文件具有可读可执行权限。

函数计算侧

步骤一:部署扩展程序

  1. 登录函数计算控制台2.0,在左侧导航栏单击任务,切换至任务页面。

  2. 单击创建函数创建函数计算服务及所需函数,进而部署扩展程序。开发的扩展程序后续会将特定事件消息直接下发至该服务。操作请参见创建服务创建函数。详情配置如下:image

    • 运行环境需根据您的代码选择,步骤一提供的示例包(fc_dataworks_demo01-1.0-SNAPSHOT.jar)对应Java8环境。您需将fc_dataworks_demo01-1.0-SNAPSHOT.jar下载至本地并上传至代码包中。实际使用时,请根据需要配置。

    • 更多函数的相关操作,请参见管理函数

  3. 修改函数入口

    在函数计算控制台可配置请求处理程序(函数入口),对于Java语言的函数,您的请求处理程序需配置为[包名].[类名]::[方法名],例如:

    • 包名为example

    • 类型为HelloFC

    • 方法名为handleRequest

    请求处理程序可配置为example.HelloFC::handleRequest

    说明

    函数计算默认程序入口为example.App::handleRequest,您需要根据您实际代码情况修改函数入口,更多关于函数入口的说明,详情请参见:请求处理程序

步骤二:测试函数

完成注册后,可在函数详情页面切换至函数代码页签单击测试函数,对已创建的函数进行测试,当返回执行成功后,即可回到DataWorks侧注册扩展程序。

DataWorks侧

步骤一:注册扩展程序

扩展程序在函数计算FC上部署完成后,您需在DataWorks注册扩展程序,操作步骤如下。

  1. 进入开放平台页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的更多 > 开放平台,进入开放平台的开发者后台页面。

  2. 注册扩展程序。

    1. 在左侧导航栏单击扩展程序,进入扩展程序页面。

    2. 单击扩展程序列表 > 注册扩展程序,选择通过函数计算部署并配置扩展程序信息。

    您可根据需要配置所需参数,参数说明如下。

    参数说明

    参数

    如何配置

    扩展程序名称

    自定义扩展程序的名称,用于标识扩展程序。

    函数计算服务及函数

    选择扩展程序需部署在函数计算的哪个服务及函数上。后续开发的扩展程序会将特定事件消息直接下发至该服务。

    处理的扩展点

    目前仅支持处理数据下载前置事件资产上架\下架前置事件数据上传前置事件触发的消息。

    说明

    选择完成后,配置界面会自动根据选择结果匹配所属事件适用模块,无需手动配置。

    负责人

    扩展程序的负责人,方便扩展程序使用者遇到问题时能及时联系到负责人。

    测试用工作空间

    选择测试扩展程序的工作空间。扩展程序无需进行上线操作,即可在测试工作空间中生效。

    扩展程序上线前,开发人员可在测试空间进行完整链路的测试验证,通过触发事件,测试DataWorks通过EventBridge发送消息、扩展程序接收消息并进行消息审核与回调。

    说明

    处理的扩展点选择租户级扩展点事件,无需配置测试用工作空间

    扩展程序详情地址

    输入介绍扩展程序详情的地址,帮助扩展程序使用者更好地理解和使用此扩展程序。

    您可在开发部署扩展程序时,开发一个扩展程序的详情展示页面,将页面地址配置在此处,以便使用者在触发扩展程序校验时,可通过链接查看完整的校验过程。例如,此次扩展程序检查链路和阻塞的原因。

    扩展程序文档地址

    输入扩展程序的帮助文档地址,供扩展程序的使用者阅读。

    您可在开发部署扩展程序时,开发一个扩展程序的帮助文档页面,将页面地址配置在此处,以便使用者学习了解扩展程序的校验逻辑与属性。

    扩展程序参数配置

    DataWorks支持在扩展程序开发过程中使用参数来提高扩展程序开发和应用效率,在扩展程序代码开发时,将需要应用的参数添加至此处即可。

    您可直接使用DataWorks提供的典型应用场景的内置参数,也可自定义参数。

    支持添加多个参数,一行一个参数,参数格式为key=value。更多参数的使用,请参见高级应用:扩展程序参数配置

    扩展程序选项配置

    输入提供给扩展程序使用者使用的功能配置项,可实现该扩展程序在不同工作空间进行个性化管控。扩展程序开发者需在此界面通过JSON字符串定义选项。

    例如,可通过选项配置让扩展程序使用者自行管控SQL长度。JSON格式可参考高级应用:扩展程序选项配置

  3. 完成扩展程序注册。

    单击确定,完成扩展程序注册。注册完成后,在扩展程序列表,查看此注册成功的扩展程序。

步骤二:上线扩展程序

扩展程序开发、部署并在DataWorks上注册完成后,您还需通过测试、审批、上线流程,之后扩展程序责任人之外的其他管理者可在管理中心中启用该扩展程序,操作详情请参见:应用扩展程序

附录:DataWorks发送给函数计算的消息格式

以下为函数计算方式各类型消息通用格式,其中messageBody包含DataWorks事件消息详细信息,消息类型不同,消息内容也不同。

{
	"blockBusiness": true,
	"eventCategoryType": "resources-download",//事件类别
	"eventType": "upload-data-to-table",//事件类型
	"extensionBizId": "job_6603643923728775070",
	"messageBody": {
             //消息类型不同,消息内容也不同。以下为消息中固定的两个字段。
             "tenantId": 28378****10656,//租户id,DataWorks每个阿里云主账号对应一个租户,每个租户有自己的租户id,该值您可在DataWorks数据开发右上角用户信息查看。
             "eventCode": "xxxx"//
	},
	"messageId": "52d44ee7-b51f-4d4d-afeb-*******"//事件ID。标识事件的唯一值。
}
说明

messageBody字段下因为消息类型不同,消息内容也不同,各事件消息请参见:开发参考:事件列表与消息格式

相关文档