阶段三:数据安全持续运营

在该阶段,DataWorks为您提供了数据违规下载实时阻断及审批、数据违规流转准实时告警等场景的最佳实践,帮助企业做好数据安全的持续运营。

DataWorks可基于用户行为实时事件、实时审计日志进行分析,帮助您实时发现风险行为并及时响应。具体应用场景请参见下文。

image.png

场景一:数据违规下载实时阻断及审批

数据下载是企业风险治理的重中之重。通常,企业数据开发人员、分析人员只允许在数据平台上浏览及使用数据,不允许将明细数据下载至本地进行分析。数据导出到本地后将无法审计其使用行为,若使用不当或遇到别有用心者,将导致数据被滥用、泄露,严重则可能产生数安事件及风险舆情。

不同企业对下载行为风控规则的定义存在差异,本文将以“实时阻断或审批超过1000条数据的下载行为”为例进行展示。

实现原理

DataWorks的OpenEvent为您提供消息推送订阅的能力,同时,您可将服务程序注册为DataWorks的扩展程序,通过扩展程序来卡点并响应订阅的事件消息,实现通过扩展程序对特定事件进行消息通知与流程管控。详情请参见扩展程序概述

操作步骤

  1. 开启并配置消息订阅(OpenEvent)

    • 由于“查询结果下载”为非工作空间内的操作,因此本案例将使用Default总线来承接操作事件消息。

      说明

      如需通过RAM子账号/RAM角色身份读取Default总线中的事件,请提前进行授权

      image.png

    • 查询结果下载的事件名称为“dataworks:ResourcesDownload:DownloadResources”,可参考下图查看详情。

      image.png

      事件详情及关键参数含义。

      {
        "datacontenttype": "application/json;charset=utf-8",
        "aliyunaccountid": "1107550004253538",
        "aliyunpublishtime": "2023-12-05T07:25:31.708Z",
        "data": {
          "eventCode": "download-resources",
          "extensionBizId": "audit_4d7ebb42b805428483148295a97a8404",
          "extensionBizName": "DataWorks_IDE_Query_20231205152530.csv",
          "requestId": "77cac0c2fc12cecbf1d289128897cc7b@@ac15054317017611303051804e8b43",
          "appId": 3159,
          "tenantId": 524257424564736,
          "blockBusiness": true,
          "eventBody": {
            "sqlText": "SELECT * FROM table_1",   // 查询SQL
            "queryDwProjectId": "3159",    // 查询数据源所在的工作空间ID
            "moduleType": "develop_query",  // 下载来源:develop_query(数据开发查询)/sqlx_query(数据分析查询)/dw_excel(数据分析电子表格)
            "operatorBaseId": "1107550004253538", //操作者的UID
            "datasourceId": "18889", //查询的数据源ID
            "queryDwProjectName": "yongxunQA_emr_chengdu1", // 查询数据源所在的工作空间名称
            "dataRowSize": 4577,  // 下载的数据量
            "datasourceName": "odps_source",  // 查询的数据源名称
            "operatorUid": "1107550004253538"
          },
          "operator": "1107550004253538"
        },
        "aliyunoriginalaccountid": "1107550004253538",
        "specversion": "1.0",
        "aliyuneventbusname": "default",
        "id": "169d171c-d523-4370-a874-bb0fa083194d",
        "source": "acs.dataworks",
        "time": "2023-12-05T15:25:31.588Z",
        "aliyunregionid": "cn-chengdu",
        "type": "dataworks:ResourcesDownload:DownloadResources"
      }
  2. 开发部署线扩展程序(Extensions)。

    说明

    实际使用时,请结合上一步消息事件内容进行风险判断。

    处理的扩展点选择“数据下载前置事件”。image.png

    开发扩展程序可参考的示例代码。

    package com.aliyun.dataworks.demo;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.aliyun.dataworks.config.Constants;
    import com.aliyun.dataworks.config.EventCheckEnum;
    import com.aliyun.dataworks.config.ExtensionParamProperties;
    import com.aliyun.dataworks.services.DataWorksOpenApiClient;
    import com.aliyuncs.IAcsClient;
    import com.aliyuncs.dataworks_public.model.v20200518.*;
    import com.aliyuncs.exceptions.ClientException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;
    
    /**
     * @author dataworks demo
     */
    @RestController
    @RequestMapping("/extensions")
    public class ExtensionsController {
    
        @Autowired(required = false)
        private DataWorksOpenApiClient dataWorksOpenApiClient;
    
        @Autowired
        private ExtensionParamProperties extensionParamProperties;
    
        /**
         * 接收eventBridge推送过来的消息
         * @param jsonParam
         */
        @PostMapping("/consumer")
        public void consumerEventBridge(@RequestBody String jsonParam){
            JSONObject jsonObj = JSON.parseObject(jsonParam);
            String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
            if(Constants.COMMIT_FILE_EVENT_CODE.equals(eventCode) || Constants.DEPLOY_FILE_EVENT_CODE.equals(eventCode)){
                //初始化client
                IAcsClient client = dataWorksOpenApiClient.createClient();
                try {
                    //当前事件参数信息
                    String messageId = jsonObj.getString("id");
                    JSONObject data = jsonObj.getObject("data", JSONObject.class);
                   // Long projectId = data.getLong("appId");
    
                    //初始化事件回调
                    CallbackExtensionRequest callbackExtensionRequest = new CallbackExtensionRequest();
                    callbackExtensionRequest.setMessageId(messageId);
                    callbackExtensionRequest.setExtensionCode(extensionParamProperties.getExtensionCode());
                    JSONObject eventBody = data.getJSONObject("eventBody");
                    Long dataRowSize = eventBody.getLong("dataRowSize");
                    //获取扩展程序选项配置在项目空间下的配置
                    GetOptionValueForProjectRequest getOptionValueForProjectRequest = new GetOptionValueForProjectRequest();
                    //全局扩展点事件的配置信息所属projectId默认为-1
                    getOptionValueForProjectRequest.setProjectId("-1");
                    getOptionValueForProjectRequest.setExtensionCode(extensionParamProperties.getExtensionCode());
                    GetOptionValueForProjectResponse getOptionValueForProjectResponse = client.getAcsResponse(getOptionValueForProjectRequest);
                    JSONObject jsonObject = JSON.parseObject(getOptionValueForProjectResponse.getOptionValue());
                    //这里需根据在DataWorks上实际设置格式来填写
                    Long maxDataRowSize = jsonObject.getLong("dataRowSize");
                    //判断代码是否包含限制函数
                    if(dataRowSize > 1000){
    
                        callbackExtensionRequest.setCheckResult(EventCheckEnum.FAIL.getCode());
                        callbackExtensionRequest.setCheckMessage("下载的行数超过限制数");
                    }else{//成功回调
                        callbackExtensionRequest.setCheckResult(EventCheckEnum.OK.getCode());
                    }
                    //回调DataWorks
                    CallbackExtensionResponse acsResponse = client.getAcsResponse(callbackExtensionRequest);
                    //请求的唯一标识,用于后续错误排查使用
                    System.out.println("acsResponse:" + acsResponse.getRequestId());
                } catch (ClientException e) {
                    //请求的唯一标识,用于后续错误排查使用
                    System.out.println("RequestId:" + e.getRequestId());
                    //错误状态码
                    System.out.println("ErrCode:" + e.getErrCode());
                    //错误描述信息
                    System.out.println("ErrMsg:" + e.getErrMsg());
                }
            }else{
                System.out.println("未能过滤其他事件,请检查配置步骤");
            }
        }
    }
  3. 配置风险响应规则。

    进入安全中心 > 风险识别规则,为上述已发布上线的扩展程序配置风险响应,如需审批,则可添加已创建的审批流程。

    说明

    在响应配置时:

    • 如需实现“审批”,则需保证扩展程序在识别到用户风险行为时callbackExtensionRequest.setCheckResult()返回“WARN”。

    • 如需实现“阻断”,则callbackExtensionRequest.setCheckResult()应返回“FAIL”。

    image.png

  4. 开启扩展程序。image.png

结果验证

  1. 在数据开发、数据分析模块单击下载数据,将跳转至数据下载页面进行风险检测。image.png

  2. 根据检测结果进行后续处理。

    • 若检测通过,则可继续下载。

    image.png

    • 若检测不通过,则下载被阻断,或告知用户需申请权限。

      • 下载被阻断。

        image.png

      • 提示用户申请权限。image.png

场景二:数据违规流转准实时告警

开发者在开发环境中使用数仓数据构建应用时,可能存在将ODS层数据同步至分析层项目空间将分析层数据同步至数据仓库之外的存储介质(业务数据库或对象存储)等情况,此类行为极易造成数仓分析层数据可用性、完整性受损,或明细数据泄露。管理员可通过监控DataWorks操作审计日志及时发现此类行为,步骤如下:

  1. 登录至操作行为审计ActionTrail控制台

  2. 查看运行数据集成任务ExecuteFile事件的审计报文。

    image.pngimage.png

    审计报文详情。

    报文中,Reader为odps,但Write为Mysql,表示该同步任务的源端为MaxCompute,目的端为Mysql

    {
      "eventId": "0bc1746317041210600372496e6191",
      "eventVersion": 1,
      "eventSource": "dataworks.aliyuncs.com",
      "requestParameters": {
        "codeContent": {
          "transform": false,
          "type": "job",
          "version": "2.0",
          "steps": [
            {
              "stepType": "odps",
              "copies": 1,
              "parameter": {
                "partition": [
                  "pt=${bizdate}"
                ],
                "envType": 0,
                "datasource": "0_odps_xc_DPE_E2_engine",
                "tunnelQuota": "default",
                "isSupportThreeModel": false,
                "column": [
                  "spcode",
                  "hjstatus",
                  "sncode",
                  "spname",
                  "id",
                  "spcard",
                  "spkhrq",
                  "spperm",
                  "spgz",
                  "spmfact",
                  "spmfactzg",
                  "spjym",
                  "dwbfye",
                  "grbfye",
                  "spmend",
                  "bchjny"
                ],
                "tableComment": "智慧城市人口财产主题分析-公积金信息数据",
                "table": "ods_t_ss_persons_delta_d"
              },
              "name": "Reader",
              "gui": {
                "x": 100,
                "y": 100
              },
              "category": "reader"
            },
            {
              "stepType": "mysql",
              "copies": 1,
              "parameter": {
                "postSql": [],
                "envType": 0,
                "datasource": "smartcity",
                "column": [
                  "spcode",
                  "hjstatus",
                  "sncode",
                  "spname",
                  "id",
                  "spcard",
                  "spkhrq",
                  "spperm",
                  "spgz",
                  "spmfact",
                  "spmfactzg",
                  "spjym",
                  "dwbfye",
                  "grbfye",
                  "spmend",
                  "bchjny"
                ],
                "tableComment": "",
                "writeMode": "insert",
                "batchSize": 256,
                "table": "t_ss_persons_delta",
                "preSql": []
              },
              "name": "Writer",
              "gui": {
                "x": 100,
                "y": 200
              },
              "category": "writer"
            },
            {
              "copies": 1,
              "parameter": {
                "nodes": [],
                "edges": [],
                "groups": [],
                "version": "2.0"
              },
              "name": "Processor",
              "gui": {
                "x": 100,
                "y": 300
              },
              "category": "processor"
            }
          ],
          "order": {
            "hops": [
              {
                "from": "Reader",
                "gui": {
                  "sourceAnchor": 1,
                  "targetAnchor": 0
                },
                "to": "Writer"
              }
            ]
          },
          "setting": {
            "errorLimit": {
              "record": ""
            },
            "speed": {
              "throttle": false,
              "concurrent": 2
            }
          }
        }
      },
      "sourceIpAddress": "58.100.XXX.XXX, 11.193.XXX.XX",
      "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.X.X Safari/537.36",
      "eventRW": "Write",
      "eventType": "ConsoleOperation",
      "referencedResources": {
        "ACS::DataWorks::File": [
          "502990524"
        ],
        "ACS::DataWorks::Project": [
          "94864"
        ]
      },
      "userIdentity": {
        "sessionContext": {
          "attributes": {
            "mfaAuthenticated": "false",
            "creationDate": "1704119017735"
          }
        },
        "accountId": "1912232488744735",
        "principalId": "1912232488744735",
        "type": "root-account",
        "userName": "root"
      },
      "serviceName": "DataWorks",
      "additionalEventData": {
        "CallerBid": "26842"
      },
      "requestId": "0bc1746317041210600372496e6191",
      "eventTime": "2024-01-01T14:57:42Z",
      "isGlobal": false,
      "acsRegion": "cn-shanghai",
      "eventName": "ExecuteFile"
    }
  3. 配置告警规则。

    • 方式一:通过日志服务进行告警。

      通过特定语法配置告警规则“当Reader为odps且Write不为odps时则告警”,实现事后告警。配置详情请参见创建自定义告警规则

      语法示例参考如下。

      event.eventName: ExecuteFile and event.serviceName: DataWorks | select * from (select json_extract_scalar(json_parse("event.requestParameterJson"), '$.codeContent.steps.0.stepType') as src, json_extract_scalar(json_parse("event.requestParameterJson"), '$.codeContent.steps.1.stepType') as dst from log) where src='odps' and dst != 'odps'
    • 方式二:通过自建告警系统进行告警。

      通过ActionTrail OpenAPI(LookupEvents)将审计事件明细内容拉取至本地解析,若审计报文内容符合“当Reader为odps且Write不为odps时则告警”规则,则可通过自建告警系统发出告警。

其他典型风险运营场景示例

DataWorks上可能出现的其他风险操作、风险场景、建议响应方式请参见下表。

说明

相关操作如需实现审批、事中拦截,请提交工单、反馈客户经理或通过钉钉群联系产品侧进行评估。

模块

操作名称

风险场景示例

响应方式

实现方式

数据集成

保存数据集成任务

禁止将高安全级别区域的数据源置为源端,将低安全级别的数据源置为目的端。

禁止将贴源层数据源置为源端,将非贴源层数据源置为目的端。

禁止将境内数据源置为源端,将境外数据源置为目的端。

(1)事后告警

(1)事后告警:通过解析操作审计事件DIUpdateDataxJob、DISaveSolution来识别风险。

提交数据集成任务

禁止将高安全级别区域的数据源置为源端,将低安全级别的数据源置为目的端。

禁止将贴源层数据源置为源端,将非贴源层数据源置为目的端。

禁止将境内数据源置为源端,将境外数据源置为目的端。

(1)事后告警

(2)事中拦截

(1)事后告警:通过解析操作审计事件SubmitFile,并结合OPEN API"GetFile"获取节点信息来识别风险。

(2)事中拦截:构建“文件提交”扩展程序,并结合对应开放事件及OPEN API"GetFile"获取节点信息来识别风险、拦截操作。

运行数据集成任务

不允许将高安全级别区域的数据源置为源端,将低安全级别的数据源置为目的端。

不允许将贴源层数据源置为源端,将费源层数据源置为目的端。

(1)事后告警

(2)事中拦截

(1)事后告警:通过解析操作审计事件ExecuteFile来识别风险。

(2)事中拦截:构建“节点运行”扩展程序,并结合对应开放事件及OPEN API"GetFile"获取节点信息来识别风险、拦截操作。

数据开发

提交文件

不允许提交包含Select * 语句的任务。

不允许提交无Insert关键字的Select语句任务 。

禁止提交DROP、TRUNCATE、PURGE相关的DDL语句。

禁止提交直接输出(敏感)字段的语句。

(1)事后告警

(2)事中拦截

(1)事后告警:通过解析操作审计事件SubmitFile,并结合OPEN API"GetFile"获取节点信息来识别风险。

(2)事中拦截:构建“文件提交”扩展程序,并结合对应开放事件及"GetFile"OPEN API获取节点信息来识别风险、拦截操作。

发布文件

不允许发布包含Select * 语句的任务。

不允发布交无Insert关键字的Select语句任务 。

不允许同时拥有开发、运维角色人员将自己开发的任务直接发布到生产环境。

(1)事后告警

(2)事中拦截

(1)事后告警:通过解析操作审计事件DeployFile,并结合OPEN API"GetFile"获取节点信息来识别风险。

(2)事中拦截:构建“文件发布”扩展程序,并结合对应开放事件及OPEN API"GetFile/ListProjectMembers"获取节点与成员信息来识别风险、拦截操作。

运行代码

不允许直接输出查询敏感级别大于X级别的明细数据。

禁止对生成环境执行DROP、TRUNCATE、PURGE相关的DDL语句。

(1)事后告警

(2)事中拦截

(1)事后告警:通过解析操作审计事件ExecuteFile来识别风险。

(2)事中拦截:构建“节点运行”扩展程序,并结合对应开放事件及OPEN API"GetFile"获取节点信息来识别风险、拦截操作。

删除文件

不允许删除以“X”字符串开通的任务。

不允许删除核心业务流程中的节点。

(1)事后告警

(2)事中拦截

(1)事后告警:通过解析操作审计事件DeleteFile,并结合OPEN API"GetFile"获取节点信息来识别风险。

(2)事中拦截:构建“文件删除”扩展程序,并结合对应开放事件及OPEN API"GetFile"获取节点信息来识别风险、拦截操作。

表提交

不允许执行“提交表”操作来修改表结构。

(1)事后告警

(2)事中拦截

(1)事后告警:通过开启并消费“表提交”开放事件来识别风险。

(2)事中拦截:构建“表提交”扩展程序,并结合对应开放事件来识别风险、拦截操作。

表发布

不允许直接将表发布至数仓ADS层。

(1)事后告警

(2)事中拦截

(1)事后告警:通过开启并消费“表发布”开放事件来识别风险。

(2)事中拦截:构建“表发布”扩展程序,并结合对应开放事件来识别风险、拦截操作。

偷锁

不允许开发人员偷锁他人任务的锁,恶意修改他人代码。

(1)事后告警

(1)事后告警:通过解析操作审计事件LockFile来识别风险。

查看数据(查询结果)

不允许查看明细敏感数据。

(1)事后告警

(1)事后告警:通过解析操作审计事件ReadExecutionResults来识别风险。

复制数据(查询结果)

不允许复制明细敏感数据。

(1)事后告警

(1)事后告警:通过解析操作审计事件CopyTaskResult来识别风险。

下载数据(查询结果)

不允许一次性下载超过1000条数据。

按人员所属部门(工作空间)判断其是否允许下载数据。

当查询SQL中包含敏感字段时阻断下载。

分段风控,当下载条数超过2W条时需要审批,超过5W时条则阻断。

针对空间角色定义下载条数。例如,开发角色允许下载N条,超过则阻断;分析师角色允许下载M条,超过则阻断。

需针对数据开发、数据分析场景分别设置不同的下载条数策略。

(1)事后告警

(2)事中拦截或审批

(1)事后告警:通过解析操作审计事件DownloadExecutionResult来识别风险。

(2)事中拦截或审批:构建“数据下载”扩展程序,并结合对应开放事件及OPEN API"ListProjectMembers"获取节点信息来识别风险、拦截&审批操作。

运维中心

暂停&冻结&下线任务

不允许暂停&冻结&下线某条核心业务线的任务。

(1)事后告警

(2)事中拦截

(1)事后告警:通过解析操作审计事件OfflineNode、StopInstance、SuspendInstance来识别风险。

(2)事中拦截:构建“节点冻结、节点下线”扩展程序,并结合对应开放事件来识别风险、拦截操作。

补数据

不允许一次性补数据超过N个节点。

(1)事后告警

(2)事中拦截

(1)事后告警:通过解析操作审计事件RunCycleDagNodes来识别风险。

(2)事中拦截:构建“补数据”扩展程序,并结合对应开放事件来识别风险、拦截操作。

触发式任务运行

所运行的任务不允许包含Select * 语句的任务;

所运行的任务不允许包含Insert关键字的Select语句任务 。

(1)事后告警

(1)事后告警:通过解析操作审计事件RunTriggerNode,并结合OPEN API"GetFile"获取节点信息来识别风险。

基线增删改

不允许直接增删改基线优先级,避免高优先级任务无法准时产出。

(1)事后告警

(1)事后告警:通过解析操作审计事件UpdateBaseline、DeleteBaseline,并结合OPEN API"GetBaseline"获取基线信息来识别风险。

数据质量

设为弱规则

不允许直接将与某些核心任务挂钩的数据质量规则设置为弱规则,避免脏数据蔓延而无法直接阻止。

(1)事后告警

(1)事后告警:通过解析操作审计事件UpdateQualityRule,并结合OPEN API"GetQualityRule"获取质量规则信息来识别风险。

删除规则

不允许直接删除与某些核心任务挂钩的数据质量规则,避免脏数据蔓延而无法直接阻止。

(1)事后告警

(1)事后告警:通过解析操作审计事件DeleteQualityRule,并结合OPEN API"GetQualityRule"获取质量规则信息来识别风险。

停止规则

不允许直接停止与某些核心任务挂钩的数据质量规则,避免脏数据蔓延而无法直接阻止。

(1)事后告警

(1)事后告警:通过解析操作审计事件UpdateQualityRule,并结合OPEN API"GetQualityRule"获取质量规则信息来识别风险。

数据分析

SQL查询

不允许直接输出查询敏感级别大于X级别的明细数据。

禁止对生成环境执行DROP、TRUNCATE、PURGE相关的DDL语句。

(1)事后告警

(1)事后告警:通过解析操作审计事件RunTask来识别风险。

查看数据(查询结果)

不允许查看明细敏感数据。

(1)事后告警

(1)事后告警:通过解析操作审计事件ReadExecutionResults来识别风险。

复制数据(查询结果/电子表格)

不允许复制明细敏感数据。

(1)事后告警

(1)事后告警:通过解析操作审计事件CopyTaskResult/CopySheetContent来识别风险。

下载数据(查询结果/电子表格)

不允许一次性下载超过1000条数据。

按人员所属部门(工作空间)判断其是否允许下载数据。

当查询SQL中包含敏感字段时阻断下载。

分段风控,当下载条数超过2W条时需要审批,超过5W时条则阻断。

针对空间角色定义下载条数。例如,开发角色允许下载N条,超过则阻断;分析师角色允许下载M条,超过则阻断。

需针对数据开发、数据分析场景分别设置不同的下载条数策略。

(1)事后告警

(2)事中拦截

(1)事后告警:通过解析操作审计事件DownloadSqlResult/DownloadSheet来识别风险。

(2)事中拦截:构建“数据下载”扩展程序,并结合对应开放事件及OPEN API"ListProjectMembers"获取节点信息来识别风险、拦截&审批操作。

管控台

删除项目空间

不允许直接删除DataWorks项目空间。

(1)事后告警

(2)事中拦截

(1)事后告警:通过解析操作审计事件DeleteProject,并结合OPEN API"GetProject"获取工作空间详情信息来识别风险。

(2)事中拦截:构建“删除项目空间”扩展程序,并结合对应开放事件及OPEN API"GetProject"获取工作空间详情信息来识别风险、拦截操作。

数据源

删除数据源

不允许直接删除与核心调度任务关联的数据源。

(1)事后告警

(2)事中拦截

(1)事后告警:通过开启并消费“删除数据源”开放事件来识别风险。

(2)事中拦截:构建“删除数据源”扩展程序,并结合对应开放事件来识别风险、拦截操作。

数据地图

修改MC字段Label值

不允许直接修改MaxCompute表字段的Label值。

(1)事后告警

(1)事后告警:通过解析操作审计事件updateTableColumnSecurityLabels来识别风险。

数据服务

发布数据服务API

不允许直接发布回参包含敏感字段为N级的数服务API。

(1)事后告警

(1)事后告警:通过解析操作审计事件PublishApi来识别风险。