DataWorks的开发平台提供了OpenAPI、OpenEvent、Extensions功能,支持您通过这三个开放性功能,实现对指定流程的自定义管控与响应。本文以一个任务发布封网管控的业务场景,如在数据开发页面提交发布节点事件为例,为您演示如何进行开放平台的相关配置。

背景信息

本实践涉及的开放平台的相关功能介绍与基本概念可参见OpenEvent概述扩展程序概述

开启并配置消息订阅(OpenEvent)

开启并配置消息订阅的详细步骤请参见开启消息订阅,以下为本实践中的核心配置流程与注意事项。
  1. EvenBridge控制台,跳过事件源等配置,快速创建一个自定义总线。创建自定义总线
  2. EvenBridge控制台对应的事件总线中,创建事件规则。
    本实践以标准模式的工作空间内,数据开发页面提交发布节点的场景为例,配置demo与核心参数配置如下。
    1. 配置事件模式创建规则
      {
          "source": [
              "acs.dataworks"
          ],
          "type": [
              "dataworks:FileChange:CommitFile"
          ]
      }
      • source:定义事件的产品名称标识,配置为acs.dataworks
      • type:定义产品下事件的类型标识,配置为dataworks:FileChange:CommitFile
        说明 如果是简单模式的工作空间,可配置为dataworks:FileChange:DeployFile
        您可以在下方的事件模式调试中,将source、type取值进行补充修改,然后进行事件测试,测试成功后单击下一步测试
    2. 配置事件目标中,服务类型选择为HTTPS,并填写合适的URL,其他参数可保持默认。事件目标
  3. 进入DataWorks控制台的开放平台页面,在OpenEvent页签添加事件分发通道。分发通道
  4. DataWorks控制台的开放平台页面,启用上述消息分发通道。启用

注册并配置扩展程序(Extensions)

注册并配置扩展程序的详细步骤请参见准备工作,以下为本实践中的核心配置流程与注意事项。

  1. 注册扩展程序。
    进入DataWorks控制台的开放平台页面,在扩展程序页签单击注册扩展程序注册扩展程序本实践以在数据开发页面提交节点的事件为示例,注册扩展程序时,处理的扩展点需选择为文件提交前置事件,其他参数可根据界面提示进行配置。
  2. 开发部署扩展程序。
    配置DataWorks的SDK,配置详情请参见安装Java SDK,本实践的示例代码如下。
    package com.aliyun.dataworks.demo;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.aliyun.dataworks.config.Constants;
    import com.aliyun.dataworks.config.EventCheckEnum;
    import com.aliyun.dataworks.config.ExtensionParamProperties;
    import com.aliyun.dataworks.services.DataWorksOpenApiClient;
    import com.aliyuncs.IAcsClient;
    import com.aliyuncs.dataworks_public.model.v20200518.UpdateIDEEventResultRequest;
    import com.aliyuncs.dataworks_public.model.v20200518.UpdateIDEEventResultResponse;
    import com.aliyuncs.exceptions.ClientException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.text.SimpleDateFormat;
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * @author dataworks demo
     */
    @RestController
    @RequestMapping("/extensions")
    public class ExtensionsController {
    
        @Autowired(required = false)
        private DataWorksOpenApiClient dataWorksOpenApiClient;
    
        @Autowired
        private ExtensionParamProperties extensionParamProperties;
    
        /**
         * 接收eventBridge推送过来的消息
         * @param jsonParam
         */
        @PostMapping("/consumer")
        public void consumerEventBridge(@RequestBody String jsonParam){
            JSONObject jsonObj = JSON.parseObject(jsonParam);
            String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
            if(Constants.COMMIT_FILE_EVENT_CODE.equals(eventCode)){
                //初始化client
                IAcsClient client = dataWorksOpenApiClient.createClient();
                //获取当前时间
                SimpleDateFormat sdf = new SimpleDateFormat(Constants.YYYY_MM_DD);
                String now = sdf.format(System.currentTimeMillis());
                //获取2022年节假日及其封网管控日期
                List<String> holidayList = Arrays.asList(extensionParamProperties.getHolidayList().split(","));
                //判断当前时间是否是管控日
                boolean isExists = holidayList.stream().anyMatch(day -> day.equals(now));
                //回调方法
                UpdateIDEEventResultRequest updateIDEEventResultRequest = new UpdateIDEEventResultRequest();
                updateIDEEventResultRequest.setMessageId(jsonObj.getString("id"));
                updateIDEEventResultRequest.setExtensionCode(extensionParamProperties.getExtensionCode());
                //是管控日 检查不通过
                if(isExists){
                    updateIDEEventResultRequest.setCheckResult(EventCheckEnum.FAIL.getCode());
                    updateIDEEventResultRequest.setCheckResultTip("管控日不可作提交文件操作");
                }else{
                    //非管控日 检查通过
                    updateIDEEventResultRequest.setCheckResult(EventCheckEnum.OK.getCode());
                    updateIDEEventResultRequest.setCheckResultTip(EventCheckEnum.OK.getName());
                }
                try {
                    //回调DataWorks
                    UpdateIDEEventResultResponse acsResponse = client.getAcsResponse(updateIDEEventResultRequest);
                    //请求的唯一标识,用于后续错误排查使用
                    System.out.println("acsResponse:" + acsResponse.getRequestId());
                } catch (ClientException e) {
                    //请求的唯一标识,用于后续错误排查使用
                    System.out.println("RequestId:" + e.getRequestId());
                    //错误状态码
                    System.out.println("ErrCode:" + e.getErrCode());
                    //错误描述信息
                    System.out.println("ErrMsg:" + e.getErrMsg());
                }
            }else{
                System.out.println("未能过滤其他事件,请检查配置步骤");
            }
        }
    }
                            
  3. 启用扩展程序。
    扩展程序注册完成后,单击扩展程序管理,进入管理中心,在扩展程序页面打开上述注册的扩展程序的启动开关,根据界面提示完成授权即可启用扩展程序。启用扩展程序
  4. 本地部署运行。
    下载工程:下载工程后,进入工程根目录下执行:
    mvn clean package -Dmaven.test.skip=true spring-boot:repackage
    获得可直接运行的jar后执行:
    java -jar target/extension-demo-deploycontroller-1.0.jar
    此时会成功启动工程,如下图所示:启动工程在浏览器输入http://localhost:8080/index会得到"hello world!",表示应用成功部署,打通网络后即可订阅EventBridge的消息了。

结果验证

完成上述实践配置后,您在数据开发页面提交节点时,会触发扩展点事件检查。文件提交