在DataWorks开放平台中,成功注册、上线并启用扩展程序后,在该扩展程序所生效的工作空间中,使用页面部分功能(即触发扩展点事件)时,DataWorks会将事件消息发送至您的扩展程序,您可以查看扩展程序返回的结果。本文将为您介绍数据集成支持的扩展点事件,以及使用扩展点触发扩展程序校验的操作过程。
前提条件
已开启消息订阅操作,详情请参见开启消息订阅。
已完成扩展程序的开发部署操作,详情请参见开发部署扩展程序:函数计算方式。
背景信息
启动扩展程序:您可以在工作空间页面查看扩展程序(Extensions),并控制是否需要在当前工作空间启用该扩展程序校验。
数据集成支持的扩展点事件:
DataWorks的数据集成模块,支持扩展点事件包括任务启动前置事件、任务批量启动前置事件等。扩展点事件的概念和详细列表请参见扩展程序(Extensions)。
DataWorks支持对扩展点事件进行消息通知,通过扩展程序来自动化校验并响应,扩展点触发过程详情请参见应用示例:数据开发过程触发事件检查。
使用限制
启用扩展程序后,数据集成任务触发扩展事件时,消息通知与响应存在以下限制。
仅支持对数据集成解决方案启动阶段生效。
启用后,仅对启动数据集成解决方案被触发时,流程会被阻断,进入操作检查,通过内置扩展程序对对应的逻辑进行校验并返回校验结果。针对解决方案的其他操作,比如删除,修改不会生效。
开放平台使用EventBridge与客户进行交互,消息体的大小需要控制在1MB以内。由于数据集成解决方案最多可同步5000张表,且表的字段数量不限制,消息体极有可能超过1MB。因此,采取了以下限制:
传递的消息在500张表以内时,不包括目标列名,只包含源端表名。
传递的消息在500张表以上时,不包含源端表名,只有任务id,客户需要通过OpenAPI获取详情。
能力概述
数据集成模块支持您的本地服务接收以下扩展点事件的消息。此外,还支持将本地程序注册为DataWorks扩展程序,通过扩展程序接收扩展点事件消息。实现扩展程序对扩展点事件的自定义逻辑处理,并通过回调CallbackExtension将处理结果返回至平台,实现DataWorks上的流程管控。数据集成支持的扩展点如下:
事件名称 | 事件内容 |
开启任务 | 启动数据集成任务前置事件。 |
批量开启任务 | 批量启动数据集成任务前置事件。 |
开启扩展程序
开发并发布扩展程序后,进入目标空间的DataWorks管理中心中,在 页面为当前空间启用数据集成触发事件的扩展程序。
支持的扩展点
开启数据集成任务拦截
完成数据集成任务配置后,单击操作列的启动按钮,即可启动数据集成任务。但由于已开启扩展程序,任务不会正常启动,而是进入操作检查中,需通过CallbackExtension返回参数至DataWorks后,DataWorks才根据返回结果是否通过启动数据集成任务的请求。
批量开启数据集成任务拦截
完成多个数据集成任务配置后,在任务列表左侧选择多个数据集成任务,单击下边栏上的启动按钮,批量启动数据集成任务。但由于已开启扩展程序,任务不会正常启动,而是进入操作检查中,需通过CallbackExtension返回参数至DataWorks后,DataWorks才根据返回结果是否通过启动数据集成任务的请求。
操作检查
在个人开发程序中,使用CallbackExtension,需将以下参数返回至DataWorks,以便判断操作是否通过检查。
OK:扩展程序对本次扩展点事件检查通过。
FAIL:扩展程序对本次扩展点事件检查不通过。您需要查看并及时处理报错,以免影响后续程序的正常执行。
WARN:扩展程序对本次扩展点事件检查通过,但存在警告。
相关文档
附录:特殊说明
针对同步500张表以上的任务,返回的消息内容包含showTableMapping字段。关于数据集成的事件更多消息格式以及字段说明可参见:数据集成事件列表及消息格式。
showTableMapping字段为true时返回tableMapping的详情。
## message v2任务启动,表没有超过500张 { "datacontenttype": "application/json;charset=utf-8", "aliyunaccountid": "111", "aliyunpublishtime": "2024-09-12T08:52:35.919Z", "data": { "extensionBizId": "xxx", "extensionBizName": "sync******20240101", "blockBusiness": true, "operator": "111", "setting": { "lastStartPosition": "2024-09-11 12:00:00" }, "eventCode": "start-diJob", "jobId": 11, "forceRun": false, "appId": 1, "showTableMapping": true, "tenantId": 1, "startAsV2": false, "tableMapping": [ { "srcTable": "test_001", "dstDatasourceName": "odps_first", "srcDatabaseName": "db_001", "srcDatasourceName": "datasource_001", "dstTable": "ods_test_001" }, { "srcTable": "test_002", "dstDatasourceName": "odps_first", "srcDatabaseName": "db_001", "srcDatasourceName": "datasource_001", "dstTable": "ods_test_002" }, { "srcTable": "test_003", "dstDatasourceName": "odps_first", "srcDatabaseName": "db_001", "srcDatasourceName": "datasource_001", "dstTable": "ods_test_003" } ] }, "aliyunoriginalaccountid": "111", "specversion": "1.0", "aliyuneventbusname": "xxx", "id": "xxx", "source": "acs.dataworks", "time": "2024-09-12T16:52:35.874Z", "aliyunregionid": "cn-shanghai", "type": "dataworks:NodeChange:StartDiJob" }
showTableMapping字段为false时无法返回。
##message v2任务启动,表超过500张 ,"showTableMapping": false { "datacontenttype": "application/json;charset=utf-8", "aliyunaccountid": "11*****3538", "aliyunpublishtime": "2024-09-10T14:50:14.553Z", "data": { "eventCode": "start-diJob", "jobId": 11, "forceRun": false, "extensionBizId": "xxx", "extensionBizName": "sync_******_20240101", "appId": 11, "showTableMapping": false, "tenantId": 524*****64736, "blockBusiness": true, "startAsV2": false, "operator": "111", "setting": { "lastStartPosition": "2024-04-12 22:07:02", "startDateTime": "2024-09-10 17:00:00", "timeZone": "Asia/Shanghai" } }, "aliyunoriginalaccountid": "111", "specversion": "1.0", "aliyuneventbusname": "cbytest", "id": "xxx", "source": "acs.dataworks", "time": "2024-09-10T22:50:14.534Z", "aliyunregionid": "cn-shanghai", "type": "dataworks:NodeChange:StartDiJob" } { "datacontenttype": "application/json;charset=utf-8", "aliyunaccountid": "15******516", "aliyunpublishtime": "2024-09-12T09:34:09.297Z", "data": { "eventCode": "start-diJob", "jobId": 1, "forceRun": false, "extensionBizId": "a5de8a*******f75ba", "extensionBizName": "sync_*******_20240912_170517", "appId": 11, "showTableMapping": true, "tenantId": 1, "blockBusiness": true, "startAsV2": false, "tableMapping": [ { "srcTable": "test.*", "dstDatasourceName": "test_db", "srcDatabaseName": ".*", "srcDatasourceName": "mysql_test", "dstTable": "aaa" }, { "srcTable": "shard.*", "dstDatasourceName": "test_db", "srcDatabaseName": ".*", "srcDatasourceName": "mysql_test", "dstTable": "vvv" } ], "operator": "111" }, "aliyunoriginalaccountid": "111", "specversion": "1.0", "aliyuneventbusname": "xia****1prj", "id": "99d59*******c95a9cacc", "source": "acs.dataworks", "time": "2024-09-12T17:34:09.248Z", "aliyunregionid": "cn-shanghai", "type": "dataworks:NodeChange:StartDiJob" }