DataWorks提供内置的流程检查,如任务发布前代码评审、数据治理中心治理项的内置检查项校验,此外,DataWorks还支持您自定义校验逻辑并接入DataWorks,实现DataWorks流程管控。本文以在提交与发布时校验代码中是否存在MAX_PT函数为例,为您介绍如何基于扩展程序实现工作空间中不允许使用特定函数。

场景说明

本实践示例的场景为校验提交发布的代码中是否有MAX_PT函数,示例的校验流程为:
序号 核心流程 核心功能点
1 将工作空间中的文件提交与发布消息(例如提交、发布节点)通过OpenEvent发布至EventBridge,后续通过EventBridge过滤事件消息,并将消息发送至您的服务。 开启消息订阅的时候,通过事件规则指定订阅的事件类型为文件提交与发布(dataworks:FileChange:CommitFiledataworks:FileChange:DeployFile

配置详情请参见开启并配置消息订阅(OpenEvent)

2 本地或在线服务接收消息,通过对扩展程序的设置,实现:
  • 对于某个指定工作空间的提交文件的消息:不触发校验,流程直接通过。
  • 对于某个指定工作空间的发布文件的消息:触发校验,判断代码中是否使用MAX_PT函数并给出响应。

    若存在MAX_PT函数,则通过回调API阻塞DataWorks上的提交或发布流程,或者发送报警信息。

  • 注册扩展程序时,通过扩展程序的参数配置,指定不生效的工作空间和事件。本示例指定提交文件事件不生效(不触发扩展程序校验)。
  • 注册扩展程序时,通过扩展程序选项配置,指定扩展程序对于不满足校验条件的事件的响应选项,后续在启用扩展程序时选择具体的响应方式。本示例可选的响应方式有告警和禁用。
配置详情请参见注册并配置扩展程序(Extensions)

开启并配置消息订阅(OpenEvent)

开启并配置消息订阅的详细步骤请参见开启消息订阅,以下为本实践中的核心配置流程与注意事项。
  1. EvenBridge控制台,跳过事件源等配置,快速创建一个自定义总线。创建自定义总线
  2. EvenBridge控制台对应的事件总线中,创建事件规则。
    本实践定义该EventBridge自定义总线可接收DataWorks文件提交事件消息和文件发布事件消息,配置demo与核心参数配置如下。
    1. 配置事件模式创建规则
      {
          "source": [
              "acs.dataworks"
          ],
          "type": [
              "dataworks:FileChange:CommitFile",
              "dataworks:FileChange:DeployFile"
          ]
      }
      • source:定义事件的产品名称标识,配置为acs.dataworks
      • type:定义产品下事件的类型标识,配置为dataworks:FileChange:CommitFiledataworks:FileChange:DeployFile。您可以在下方的事件模式调试中,将source、type取值进行补充修改,然后进行事件测试,测试成功后单击下一步测试
    2. 配置事件目标中,服务类型选择为HTTPS,并填写合适的URL,其他参数可保持默认。事件目标
  3. 进入DataWorks控制台的开放平台页面,在OpenEvent页签打开启用消息订阅开关,并添加事件分发通道。分发通道
  4. DataWorks控制台的开放平台页面,启用上述消息分发通道。启用

注册并配置扩展程序(Extensions)

注册并配置扩展程序的详细步骤请参见准备工作,以下为本实践中的核心配置流程与注意事项。

  1. 注册扩展程序。

    进入DataWorks控制台的开放平台页面,在Extensions页签单击注册扩展程序

    本实践以在数据开发页面提交节点的事件为示例,注册扩展程序时,核心参数配置说明如下,其他参数可根据界面提示进行配置。
    核心参数 配置说明
    处理的扩展点 需选择为文件提交前置事件文件发布前置事件
    扩展程序参数配置 您可通过本参数的配置来控制扩展程序的生效范围,在指定工作空间下:
    • 提交节点(文件提交前置事件)时不触发扩展程序校验、不阻塞提交节点的流程。
    • 发布节点(文件发布前置事件)时触发扩展程序校验,如果不满足校验通过条件会阻塞发布节点的流程。

    需配置为extension.project.commit-file.disabled=YourProjectId。其中YourProjectId需替换为扩展程序不生效的扩展点事件所在的工作空间ID。

    扩展程序选项配置 需配置为以下示例内容,用于后续设置不满足校验条件后的响应方式,以下示例中指定的响应方式包括告警和禁用。
    {
      "type":"object",
      "properties":{
        "checkStatus":{
          "type":"number",
          "title":"MAX-PT函数检查方式",
          "x-decorator":"FormItem",
          "x-component":"Radio.Group",
          "x-decorator-props":{
            "tooltip":"描述文件"
          },
          "x-component-props":{
            "dataSource":[
              {
                "value":"WARN",
                "label":"告警"
              },
              {
                "value":"FAIL",
                "label":"禁用"
              }
            ],
            "mode":"multiple"
          }
        }
      }
    }
  2. 开发部署扩展程序。
    配置DataWorks的SDK,配置详情请参见安装Java SDK,本实践的示例代码如下。
    package com.aliyun.dataworks.demo;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.aliyun.dataworks.config.Constants;
    import com.aliyun.dataworks.config.EventCheckEnum;
    import com.aliyun.dataworks.config.ExtensionParamProperties;
    import com.aliyun.dataworks.services.DataWorksOpenApiClient;
    import com.aliyuncs.IAcsClient;
    import com.aliyuncs.dataworks_public.model.v20200518.*;
    import com.aliyuncs.exceptions.ClientException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;
    
    /**
     * @author dataworks demo
     */
    @RestController
    @RequestMapping("/extensions")
    public class ExtensionsController {
    
        @Autowired(required = false)
        private DataWorksOpenApiClient dataWorksOpenApiClient;
    
        @Autowired
        private ExtensionParamProperties extensionParamProperties;
    
        /**
         * 接收eventBridge推送过来的消息
         * @param jsonParam
         */
        @PostMapping("/consumer")
        public void consumerEventBridge(@RequestBody String jsonParam){
            JSONObject jsonObj = JSON.parseObject(jsonParam);
            String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
            if(Constants.COMMIT_FILE_EVENT_CODE.equals(eventCode) || Constants.DEPLOY_FILE_EVENT_CODE.equals(eventCode)){
                //初始化client
                IAcsClient client = dataWorksOpenApiClient.createClient();
                try {
                    //当前事件参数信息
                    String messageId = jsonObj.getString("id");
                    JSONObject data = jsonObj.getObject("data", JSONObject.class);
                    Long projectId = data.getLong("projectId");
    
                    //初始化事件回调
                    UpdateIDEEventResultRequest updateIDEEventResultRequest = new UpdateIDEEventResultRequest();
                    updateIDEEventResultRequest.setMessageId(messageId);
                    updateIDEEventResultRequest.setExtensionCode(extensionParamProperties.getExtensionCode());
    
                    //查询触发扩展点事件时的扩展点数据快照
                    GetIDEEventDetailRequest getIDEEventDetailRequest = new GetIDEEventDetailRequest();
                    getIDEEventDetailRequest.setMessageId(messageId);
                    getIDEEventDetailRequest.setProjectId(projectId);
                    GetIDEEventDetailResponse getIDEEventDetailResponse = client.getAcsResponse(getIDEEventDetailRequest);
                    String content = getIDEEventDetailResponse.getEventDetail().getCommittedFile().getContent();
    
                    //判断代码是否包含限制函数
                    if(content.contains(Constants.CHECK_CODE)){
                        //获取扩展程序选项配置在项目空间下的配置
                        GetOptionValueForProjectRequest getOptionValueForProjectRequest = new GetOptionValueForProjectRequest();
                        getOptionValueForProjectRequest.setProjectId(String.valueOf(projectId));
                        getOptionValueForProjectRequest.setExtensionCode(extensionParamProperties.getExtensionCode());
                        GetOptionValueForProjectResponse getOptionValueForProjectResponse = client.getAcsResponse(getOptionValueForProjectRequest);
                        JSONObject jsonObject = JSON.parseObject(getOptionValueForProjectResponse.getOptionValue());
                        //注意:这里需根据在DataWorks上实际设置格式来填写
                        String checkStatus = jsonObject.getString("checkStatus");
                        updateIDEEventResultRequest.setCheckResult(checkStatus);
                        updateIDEEventResultRequest.setCheckResultTip("代码中存在限制函数");
                    }else{//成功回调
                        updateIDEEventResultRequest.setCheckResult(EventCheckEnum.OK.getCode());
                        updateIDEEventResultRequest.setCheckResultTip(EventCheckEnum.OK.getName());
                    }
                    //回调DataWorks
                    UpdateIDEEventResultResponse acsResponse = client.getAcsResponse(updateIDEEventResultRequest);
                    //请求的唯一标识,用于后续错误排查使用
                    System.out.println("acsResponse:" + acsResponse.getRequestId());
                } catch (ClientException e) {
                    //请求的唯一标识,用于后续错误排查使用
                    System.out.println("RequestId:" + e.getRequestId());
                    //错误状态码
                    System.out.println("ErrCode:" + e.getErrCode());
                    //错误描述信息
                    System.out.println("ErrMsg:" + e.getErrMsg());
                }
            }else{
                System.out.println("未能过滤其他事件,请检查配置步骤");
            }
        }
    }
  3. 启用扩展程序。
    扩展程序注册完成后,单击扩展程序管理,进入扩展程序配置页面,打开上述注册的扩展程序的启动开关,根据界面提示完成授权启用扩展程序,完成后单击操作列的设置,设置工作空间对MAX_PT函数使用的管控力度。启用扩展程序02
  4. 本地部署运行。
    下载工程: 下载工程后,进入工程根目录下执行:
    mvn clean package -Dmaven.test.skip=true spring-boot:repackage
    获得可直接运行的jar后执行:
    java -jar target/extensions-demo-maxpt-1.0.jar
    此时会成功启动工程,如下图所示:maxpt在浏览器输入http://localhost:8080/index会得到"hello world!",表示应用成功部署,打通网络后即可订阅EventBridge的消息了。

结果验证

完成上述实践配置后,您可以在工作空间进行验证。本示例中,文件提交事件(如提交节点)与文件发布事件(如发布节点)会触发扩展程序校验,但通过扩展程序的参数配置,对指定的工作空间的文件提交事件不生效,所以在指定工作空间内提交包含MAX_PT函数的节点时,不会触发扩展程序校验,当发布该包含MAX_PT函数的节点时,将触发该扩展程序校验。

下图以在该扩展程序的参数配置中,配置的提交扩展点黑名单生效项目,进行提交并发布包含MAX_PT函数的节点,观察并验证扩展程序校验流程,以及扩展点黑名单是否生效。结果验证