应用示例:数据集成触发事件检查

DataWorks开放平台中,成功注册、上线并启用扩展程序后,在该扩展程序所生效的工作空间中,使用页面部分功能(即触发扩展点事件)时,DataWorks会将事件消息发送至您的扩展程序,您可以查看扩展程序返回的结果。本文将为您介绍数据集成支持的扩展点事件,以及使用扩展点触发扩展程序校验的操作过程。

前提条件

背景信息

  • 启动扩展程序:您可以在工作空间页面查看扩展程序(Extensions),并控制是否需要在当前工作空间启用该扩展程序校验。

  • 数据集成支持的扩展点事件

使用限制

启用扩展程序后,数据集成任务触发扩展事件时,消息通知与响应存在以下限制。

  • 仅支持对数据集成解决方案启动阶段生效。

  • 启用后,仅对启动数据集成解决方案被触发时,流程会被阻断,进入操作检查,通过内置扩展程序对对应的逻辑进行校验并返回校验结果。针对解决方案的其他操作,比如删除,修改不会生效。

  • 开放平台使用EventBridge与客户进行交互,消息体的大小需要控制在1MB以内。由于数据集成解决方案最多可同步5000张表,且表的字段数量不限制,消息体极有可能超过1MB。因此,采取了以下限制:

    • 传递的消息在500张表以内时,不包括目标列名,只包含源端表名。

    • 传递的消息在500张表以上时,不包含源端表名,只有任务id,客户需要通过OpenAPI获取详情。

能力概述

数据集成模块支持您的本地服务接收以下扩展点事件的消息。此外,还支持将本地程序注册为DataWorks扩展程序,通过扩展程序接收扩展点事件消息。实现扩展程序对扩展点事件的自定义逻辑处理,并通过回调CallbackExtension将处理结果返回至平台,实现DataWorks上的流程管控。数据集成支持的扩展点如下:

事件名称

事件内容

开启任务

启动数据集成任务前置事件。

批量开启任务

批量启动数据集成任务前置事件。

开启扩展程序

开发并发布扩展程序后,进入目标空间的DataWorks管理中心中,在租户配置 > 扩展程序页面为当前空间启用数据集成触发事件的扩展程序。

image

支持的扩展点

开启数据集成任务拦截

完成数据集成任务配置后,单击操作列的启动按钮,即可启动数据集成任务。但由于已开启扩展程序,任务不会正常启动,而是进入操作检查中,需通过CallbackExtension返回参数至DataWorks后,DataWorks才根据返回结果是否通过启动数据集成任务的请求。

image

批量开启数据集成任务拦截

完成多个数据集成任务配置后,在任务列表左侧选择多个数据集成任务,单击下边栏上的启动按钮,批量启动数据集成任务。但由于已开启扩展程序,任务不会正常启动,而是进入操作检查中,需通过CallbackExtension返回参数至DataWorks后,DataWorks才根据返回结果是否通过启动数据集成任务的请求。

image

操作检查

在个人开发程序中,使用CallbackExtension,需将以下参数返回至DataWorks,以便判断操作是否通过检查。

  • OK:扩展程序对本次扩展点事件检查通过

    image

  • FAIL:扩展程序对本次扩展点事件检查不通过。您需要查看并及时处理报错,以免影响后续程序的正常执行。

    image

  • WARN:扩展程序对本次扩展点事件检查通过,但存在警告

    image

相关文档

附录:特殊说明

针对同步500张表以上的任务,返回的消息内容包含showTableMapping字段。关于数据集成的事件更多消息格式以及字段说明可参见:数据集成事件列表及消息格式

  • showTableMapping字段为true时返回tableMapping的详情。

    ## message v2任务启动,表没有超过500张
    {
        "datacontenttype": "application/json;charset=utf-8",
        "aliyunaccountid": "111",
        "aliyunpublishtime": "2024-09-12T08:52:35.919Z",
        "data": {
            "extensionBizId": "xxx",
            "extensionBizName": "sync******20240101",
            "blockBusiness": true,
            "operator": "111",
            "setting": {
                "lastStartPosition": "2024-09-11 12:00:00"
            },
            "eventCode": "start-diJob",
            "jobId": 11,
            "forceRun": false,
            "appId": 1,
            "showTableMapping": true,
            "tenantId": 1,
            "startAsV2": false,
            "tableMapping": [
                {
                    "srcTable": "test_001",
                    "dstDatasourceName": "odps_first",
                    "srcDatabaseName": "db_001",
                    "srcDatasourceName": "datasource_001",
                    "dstTable": "ods_test_001"
                },
                {
                    "srcTable": "test_002",
                    "dstDatasourceName": "odps_first",
                    "srcDatabaseName": "db_001",
                    "srcDatasourceName": "datasource_001",
                    "dstTable": "ods_test_002"
                },
                {
                    "srcTable": "test_003",
                    "dstDatasourceName": "odps_first",
                    "srcDatabaseName": "db_001",
                    "srcDatasourceName": "datasource_001",
                    "dstTable": "ods_test_003"
                }
            ]
        },
        "aliyunoriginalaccountid": "111",
        "specversion": "1.0",
        "aliyuneventbusname": "xxx",
        "id": "xxx",
        "source": "acs.dataworks",
        "time": "2024-09-12T16:52:35.874Z",
        "aliyunregionid": "cn-shanghai",
        "type": "dataworks:NodeChange:StartDiJob"
    }
  • showTableMapping字段为false时无法返回。

    ##message v2任务启动,表超过500张 ,"showTableMapping": false
    {
        "datacontenttype": "application/json;charset=utf-8",
        "aliyunaccountid": "11*****3538",
        "aliyunpublishtime": "2024-09-10T14:50:14.553Z",
        "data": {
            "eventCode": "start-diJob",
            "jobId": 11,
            "forceRun": false,
            "extensionBizId": "xxx",
            "extensionBizName": "sync_******_20240101",
            "appId": 11,
            "showTableMapping": false,
            "tenantId": 524*****64736,
            "blockBusiness": true,
            "startAsV2": false,
            "operator": "111",
            "setting": {
                "lastStartPosition": "2024-04-12 22:07:02",
                "startDateTime": "2024-09-10 17:00:00",
                "timeZone": "Asia/Shanghai"
            }
        },
        "aliyunoriginalaccountid": "111",
        "specversion": "1.0",
        "aliyuneventbusname": "cbytest",
        "id": "xxx",
        "source": "acs.dataworks",
        "time": "2024-09-10T22:50:14.534Z",
        "aliyunregionid": "cn-shanghai",
        "type": "dataworks:NodeChange:StartDiJob"
    }
    
    {
        "datacontenttype": "application/json;charset=utf-8",
        "aliyunaccountid": "15******516",
        "aliyunpublishtime": "2024-09-12T09:34:09.297Z",
        "data": {
            "eventCode": "start-diJob",
            "jobId": 1,
            "forceRun": false,
            "extensionBizId": "a5de8a*******f75ba",
            "extensionBizName": "sync_*******_20240912_170517",
            "appId": 11,
            "showTableMapping": true,
            "tenantId": 1,
            "blockBusiness": true,
            "startAsV2": false,
            "tableMapping": [
                {
                    "srcTable": "test.*",
                    "dstDatasourceName": "test_db",
                    "srcDatabaseName": ".*",
                    "srcDatasourceName": "mysql_test",
                    "dstTable": "aaa"
                },
                {
                    "srcTable": "shard.*",
                    "dstDatasourceName": "test_db",
                    "srcDatabaseName": ".*",
                    "srcDatasourceName": "mysql_test",
                    "dstTable": "vvv"
                }
            ],
            "operator": "111"
        },
        "aliyunoriginalaccountid": "111",
        "specversion": "1.0",
        "aliyuneventbusname": "xia****1prj",
        "id": "99d59*******c95a9cacc",
        "source": "acs.dataworks",
        "time": "2024-09-12T17:34:09.248Z",
        "aliyunregionid": "cn-shanghai",
        "type": "dataworks:NodeChange:StartDiJob"
    }