本文介绍DataWorks开放消息的消息结构、不同类型事件的消息格式及各字段含义,帮助您快速获取和感知事件消息的状态变更信息。
开放消息的总体结构
完整的事件消息示例如下。
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"beginWaitTimeTime": 165270057****,
"dagId": 44655****,
"dagType": 0,
"taskType": 0,
"modifyTime": 165270057****,
"createTime": 165254323****,
"appId": 3*****2,
"tenantId": 235454***432001,
"opCode": 31,
"flowId": 1,
"nodeId": 100269****,
"beginWaitResTime": 165270057****,
"taskId": 453268****,
"status": 3
},
"id": "539fd8f4-4ea1-4625-aa8b-6c906674****",
"source": "acs.dataworks",
"specversion": "1.0",
"subject": "",
"time": "2020-11-19T21:04:41+08:00",
"type": "dataworks:InstanceStatusChanges:InstanceStatusChanges",
"aliyunaccountid": "123456789098****",
"aliyunpublishtime": "2020-11-19T21:04:42.179PRC",
"aliyuneventbusname": "default",
"aliyunregionid": "cn-hangzhou",
"aliyunpublishaddr": "172.25.XX.XX"
}
一个完整的事件消息包括消息的实体格式,以及消息的ID、来源、产生时间等基础信息。其中,重要字段的详细说明如下表。
字段名称 | 字段类型 | 说明 |
---|
data | object | 消息实体格式。不同类型事件的消息格式及其包含的字段含义存在差异,详情请参见: |
id | String | 事件消息的唯一ID。用于定位事件消息。 |
type | String | 事件类型。用于描述事件源相关的事件类型。取值示例如下: dataworks:FileChange:CommitFile :表示提交文件。dataworks:FileChange:DeployFile :表示发布文件。
事件类型的全量取值及说明,请参见 消息事件类型列表。 |
消息事件类型列表
事件类型为事件消息中
type
的取值,不同类型的事件对应的息实体格式存在差异。事件类型列表如下。
事件类型 | 事件描述 | 对应的消息实体格式(data) |
---|
dataworks:InstanceStatusChanges:InstanceStatusChanges | 调度任务状态变更 | 消息实体格式:调度任务状态变更事件 |
dataworks:InstanceChange:UnfreezeInstance | 解冻实例 | 消息实体格式:实例操作事件(冻结、解冻、终止、重跑、置成功) |
dataworks:InstanceChange:FreezeInstance | 冻结实例 |
dataworks:InstanceChange:KillInstance | 终止实例 |
dataworks:InstanceChange:RerunInstance | 重跑实例 |
dataworks:InstanceChange:SetInstanceSuccess | 置成功实例 |
dataworks:FileChange:CommitFile | 提交文件 | 消息实体格式:文件变更事件(提交、发布、运行、删除) |
dataworks:FileChange:DeployFile | 发布文件 |
dataworks:FileChange:RunFile | 运行文件 |
dataworks:FileChange:DeleteFile | 删除文件 |
dataworks:TableChange:CommitTable | 提交表至开发环境 | 消息实体格式:表变更事件(提交表至开发环境、发布表至生产环境) |
dataworks:TableChange:DeployTable | 发布表至生产环境 |
dataworks:NodeChange:NodeChangeCreated | 新增节点 | 消息实体格式:节点变更事件(新增、修改、删除、冻结、解冻、下线) |
dataworks:NodeChange:NodeChangeUpdated | 修改节点 |
dataworks:NodeChange:NodeChangeDeleted | 删除节点 |
dataworks:NodeChange:FreezeNode | 冻结节点 |
dataworks:NodeChange:UnFreezeNode | 解冻节点 |
dataworks:NodeChange:UndeployNode | 下线节点 |
dataworks:BackfillDataOperate:BackfillData | 补数据 | 消息实体格式:补数据操作事件 |
dataworks:ApprovalChange:ApprovalChangeCreated | 创建申请单 | 消息实体格式:审批中心(创建、完成审批单) |
dataworks:ApprovalChange:ApprovalChangeFinished | 完成申请单 |
dataworks:MonitorAlert:WorkbenchMonitorAlert | 运维中心告警 | 消息实体格式:运维中心告警(基线、事件、任务规则、资源组规则) |
dataworks:DagStatusChanges:DagStatusChanges | 工作流状态变更 | 消息实体格式:工作流状态变更事件 |
dataworks:DqcCheck:DqcCheckFeedbackEvent | 数据质量校验结果反馈事件 | 消息实体格式:数据质量校验结果反馈事件 |
dataworks:DqcCheck:DqcCheckFinishedEvent | 数据质量校验完成事件 | 消息实体格式:数据质量校验完成事件 |
消息实体格式:调度任务状态变更事件
调度任务状态变更事件的消息实体格式(即事件消息中data
字段的内容)示例如下。{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"beginWaitTimeTime": 1652700576000,
"dagId": 446555330,
"dagType": 0,
"taskType": 0,
"modifyTime": 1652700577000,
"createTime": 1652543233000,
"appId": 3*****2,
"tenantId": 235454***432001,
"opCode": 31,
"flowId": 1,
"nodeId": 1002698219,
"beginWaitResTime": 1652700577000,
"taskId": 4532688169,
"status": 3
}
}
重要字段说明如下表。
说明 上述data
内容仅为示例,下表所列参数为调度任务状态变更事件中data
的常用参数介绍。
字段名称 | 字段类型 | 说明 |
---|
finishTime | Long | 调度任务实例运行完成的具体时间。 |
beginWaitTimeTime | Long | 调度任务实例开始等待运行的具体时间。 |
beginRunningTime | Long | 调度任务实例开始运行的具体时间。 |
dagId | Long | 根据DagId 可获取Dag详情。 |
dagType | Integer | Dag的类型,取值如下:- 0:周期调度任务
- 1:手动任务
- 2:冒烟测试
- 3:补数据
- 4:临时业务流程
- 5:手动业务流程
|
taskType | Integer | 任务实例的调度类型,取值如下:- 0:NORMAL,正常调度任务。该任务会被日常调度。
- 1:MANUAL,手动任务。该任务不会被日常调度。
- 2:PAUSE,冻结任务。该任务被日常调度,但启动调度时直接被置为失败状态。
- 3:SKIP,空跑任务。该任务被日常调度,但启动调度时直接被置为成功状态。
- 4:SKIP_UNCHOOSE,临时工作流中未选择的任务,仅存在于临时工作流中,启动调度时直接被置为成功状态。
- 5:SKIP_CYCLE,未到运行周期的周或月任务。该任务被日常调度,但启动调度时直接被置为成功状态。
- 6:CONDITION_UNCHOOSE,上游实例中有分支(IF)节点,但是该下游节点未被分支节点选中,直接置为空跑任务。
- 7:REALTIME_DEPRECATED,实时生成的已经过期的周期实例,该类型任务直接被置为成功状态。
|
modifyTime | Long | 任务实例最近一次的修改时间。 |
createTime | Long | 任务实例的创建时间。 |
appId | Long | 工作空间的ID。您可以调用ListProjects查看空间ID信息。 |
tenantId | Long | 调度任务实例所在工作空间的租户ID。 |
opCode | Integer | 调度任务实例的操作码。该字段可忽略。 |
flowId | Long | 业务流程的ID。- 周期调度任务实例:业务流程ID默认为1。
- 手动业务流程和内部工作流的调度任务实例:为实际的业务流程ID。
|
nodeId | Long | 调度任务实例对应的节点ID。 |
beginWaitResTime | Long | 调度任务实例开始等待资源的具体时间。 |
taskId | Long | 调度任务实例的ID。 |
status | Integer | 任务的状态,取值如下:- 0:未运行。
- 2:等待定时时间dueTime或cycleTime到来。
- 3:等待资源。
- 4:运行中。
- 7:下发给数据质量进行数据校检。
- 8:正在进行分支条件校检。
- 5:执行失败。
- 6:执行成功。
|
消息实体格式:实例操作事件(冻结、解冻、终止、重跑、置成功)
实例操作事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"operator": "182293110403****",
"projectId": 123488,
"projectType": "PROD",
"taskIds":
[
523536569736
],
"tenantId": 280749521950784
}
字段说明如下表。
字段名称 | 字段类型 | 说明 |
---|
operator | String | 操作实例(冻结、解冻、终止、重跑、置成功等操作)用户的UID。 |
projectType | String | 运行环境。 |
taskIds | List<Long> | 实例的ID集合。 |
projectId | Long | DataWorks项目空间ID。 |
tenantId | Long | 实例所在工作空间的租户ID。 |
消息实体格式:文件变更事件(提交、发布、运行、删除)
- 文件提交、发布事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"operator": "**************", //本次操作人
"projectId": 12*****56,
"tenantId": 12*****56,
"nodeId": 123456, //节点ID
"fileType": 123456, //文件类型
"fileName": "********",
"fileOwner": "****************", //文件责任人
"extensionBizId": "***************", //
"changeType": "UPDATE", //NEW UPDATE DELETE
"fileCreateTime": "2021-01-15 14:03:02",
"fileId": 123456,
"fileVersion": 3,
"dataSourceName":"odps_first"
}
字段说明如下表。字段名称 | 字段类型 | 说明 |
---|
operator | String | 提交或发布文件的用户UID。 |
projectId | Long | 文件所属项目空间的ID。 |
tenantId | Long | 租户ID。 |
nodeId | Long | 调度节点ID。 |
fileType | Long | 文件的代码类型,常用的代码类型包括:- 6(Shell)
- 10(ODPS SQL)
- 11(ODPS MR)
- 23(数据集成)
- 24(ODPS Script)
- 99(虚拟节点)
- 221(PyODPS 2)
- 225(ODPS Spark)
- 227(EMR Hive)
- 228(EMR Spark)
- 229(EMR Spark SQL)
- 230(EMR MR)
- 239(OSS对象检查)
- 257(EMR Shell)
- 258(EMR Spark Shell)
- 259(EMR Presto)
- 260(EMR Impala)
- 900(实时同步)
- 1089(跨租户节点)
- 1091(Hologres开发)
- 1093(Hologres SQL)
- 1100(赋值节点)
- 1221(PyODPS 3)
|
fileName | String | 文件名称。 |
fileOwner | String | 文件所有者。 |
extensionBizId | String | 扩展程序卡点流程ID。 |
changeType | String | 文件的变更类型:- UPDATE:更新文件。
- DELETE:删除文件。
- CREATE:创建文件。
|
fileCreateTime | String | 文件的创建时间,格式为yyyy-MM-dd HH:mm:ss 。 |
fileId | Long | 文件ID。 |
fileVersion | Long | 文件版本。 |
dataSourceName | String | 数据源名称。 |
- 文件删除、运行事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"operator": "**************", //本次操作人
"projectId": 123456,
"tenantId": 123456,
"nodeId": 123456, //节点ID
"fileType": 123456, //文件类型
"fileName": "********",
"fileOwner": "****************", //文件责任人
"extensionBizId": "***************", //
"fileCreateTime": "2021-01-15 14:03:02",
"fileId": 123456
}
字段说明如下表。字段名称 | 字段类型 | 说明 |
---|
operator | String | 删除或运行文件的用户UID。 |
projectId | Long | 文件所属项目空间的ID。 |
tenantId | Long | 租户ID。 |
nodeId | Long | 调度节点ID。 |
fileType | Long | 文件的代码类型。常用的代码类型包括:- 6(Shell)
- 10(ODPS SQL)
- 11(ODPS MR)
- 23(数据集成)
- 24(ODPS Script)
- 99(虚拟节点)
- 221(PyODPS 2)
- 225(ODPS Spark)
- 227(EMR Hive)
- 228(EMR Spark)
- 229(EMR Spark SQL)
- 230(EMR MR)
- 239(OSS对象检查)
- 257(EMR Shell)
- 258(EMR Spark Shell)
- 259(EMR Presto)
- 260(EMR Impala)
- 900(实时同步)
- 1089(跨租户节点)
- 1091(Hologres开发)
- 1093(Hologres SQL)
- 1100(赋值节点)
- 1221(PyODPS 3)
|
fileName | String | 文件名称。 |
fileOwner | String | 文件所有者。 |
extensionBizId | String | 扩展程序卡点流程ID。 |
fileCreateTime | String | 文件的创建时间,格式为yyyy-MM-dd HH:mm:ss 。 |
fileId | Long | 文件ID。 |
消息实体格式:表变更事件(提交表至开发环境、发布表至生产环境)
表变更事件的消息实体格式(即事件消息中data
字段的内容)示例如下。{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"operator": "**************",
"projectId": 12*****56,
"tenantId": 12******56,
"extensionBizId": "123456",
"tableName":"table1",
"tableType":"ODPS", // ODPS、EMR
"maxComputeProject":"project1"
}
字段说明如下表。字段名称 | 字段类型 | 说明 |
---|
operator | String | 提交或发布表的用户UID。 |
projectId | Long | 项目空间ID。 |
tenantId | Long | 租户ID。 |
extensionBizId | String | 扩展程序卡点流程ID。 |
tableName | String | 表名称。 |
tableType | String | 表类型,取值如下: |
maxComputeProject | String | 对应MaxCompute的项目名称。 |
消息实体格式:节点变更事件(新增、修改、删除、冻结、解冻、下线)
- 节点新增、修改、删除事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"nodeName": "",
"programType": "ODPS_SQL",
"cronExpress": "00 20 00 * * ?",
"schedulerType": "NORMAL",
"ownerId": "1912232488744735",
"priority": 1,
"baselineId": 117801853,
"repeatability": true,
"modifyTime": 1646364549642,
"createTime": 1646364549642,
"datasource": "odps_first",
"tenantId": 28378****10656,
"nodeId": 100***150,
"projectId": 30**95,
"operator": "**************" //本次操作人
}
字段说明如下表。字段名称 | 字段类型 | 说明 |
---|
nodeName | String | 节点名称。 |
programType | String | 文件的代码类型。常用的代码类型包括:- 6(Shell)
- 10(ODPS SQL)
- 11(ODPS MR)
- 24(ODPS Script)
- 99(虚拟节点)
- 221(PyODPS 2)
- 225(ODPS Spark)
- 227(EMR Hive)
- 228(EMR Spark)
- 229(EMR Spark SQL)
- 230(EMR MR)
- 239(OSS对象检查)
- 257(EMR Shell)
- 258(EMR Spark Shell)
- 259(EMR Presto)
- 260(EMR Impala)
- 900(实时同步)
- 1089(跨租户节点)
- 1091(Hologres开发)
- 1093(Hologres SQL)
- 1100(赋值节点)
- 1221(PyODPS 3)
您可以调用ListFileType接口,查询文件的代码类型。 |
cronExpress | String | 周期调度的cron表达式。 该参数与DataWorks控制台中,数据开发任务的对应。 配置完调度周期及定时调度时间后,DataWorks会自动生成相应cron表达式。示例如下:- 每天凌晨5点30分定时调度:
00 30 05 * * ? - 每个小时的第15分钟定时调度:
00 15 * * * ? - 每隔十分钟调度一次:
00 00/10 * * * ? - 每天8点到17点,每隔十分钟调度一次:
00 00-59/10 8-17 * * * ? - 每月的1日0点20分自动调度:
00 20 00 1 * ? - 从1月1日0点10分开始,每过3个月调度一次:
00 10 00 1 1-12/3 ? - 每周二、周五的0点5分自动调度:
00 05 00 * * 2,5
说明 cron表达式的限制如下: - 最短调度间隔时间为5分钟。
- 每天最早调度时间为0点5分。
|
schedulerType | String | 任务实例的调度类型,取值如下:- 0:NORMAL,正常调度任务。该任务会被日常调度。
- 1:MANUAL,手动任务。该任务不会被日常调度。
- 2:PAUSE,冻结任务。该任务被日常调度,但启动调度时直接被置为失败状态。
- 3:SKIP,空跑任务。该任务被日常调度,但启动调度时直接被置为成功状态。
- 4:SKIP_UNCHOOSE,临时工作流中未选择的任务,仅存在于临时工作流中,启动调度时直接被置为成功状态。
- 5:SKIP_CYCLE,未到运行周期的周或月任务。该任务被日常调度,但启动调度时直接被置为成功状态。
- 6:CONDITION_UNCHOOSE,上游实例中有分支(IF)节点,但是该下游节点未被分支节点选中,直接置为空跑任务。
- 7:REALTIME_DEPRECATED,实时生成的已经过期的周期实例,该类型任务直接被置为成功状态。
|
ownerId | String | 节点责任人的阿里云用户ID。如果该参数为空,则默认使用调用者的阿里云用户ID。 |
priority | Integer | 任务优先级,取值为1、3、5、7、8。数值越大,优先级越高。 |
baselineId | Long | 基线ID。 |
repeatability | Boolean | 节点是否可以重复运行:- true:可以重复运行。
- false:不可以重复运行。
|
modifyTime | Long | 节点最近一次的修改时间。 |
createTime | Long | 节点的创建时间。 |
nodeId | Long | 节点ID。 |
projectId | Long | 节点所在的项目空间ID。 |
tenantId | Long | 节点所属的租户ID。 |
operator | String | 新增、修改或删除节点的用户UID。 |
- 节点冻结、解冻、下线事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"operator": "**************",
"projectId": 123456,
"tenantId": 123456,
"nodeIds":[1,2,3],
"extensionBizId": "123456"
}
字段说明如下表。字段名称 | 字段类型 | 说明 |
---|
operator | String | 冻结、解冻、下线节点的用户UID。 |
projectId | Long | 节点所在项目空间ID。 |
tenantId | Long | 租户ID。 |
nodeIds | Array | 被操作的节点ID列表。 |
extensionBizId | String | 扩展程序卡点流程ID 。 |
消息实体格式:补数据操作事件
补数据操作事件的消息实体格式(即事件消息中data
字段的内容)示例如下。{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"excludeNodeIds":[],
"rootNodeId": 10000838271,
"startFutureInstanceImmediately": false,
"useMultipleTimePeriods": true,
"operator": "1320876710332592",
"multipleTimePeriods": "[{\"bizBeginTime\":\"2022-04-17\",\"bizEndTime\":\"2022-04-17\"}]",
"parallelGroup": 1,
"rootNodeProjectId": 12*****8,
"isParallel": false,
"name": "P_fff_20220418_215404",
"tenantId": 16935*****3377,
"includeNodeIds":
[
10000838271
],
"projectId": 12958,
"order": "asc",
"extensionBizId": "123456"
}
重要字段说明如下表。
说明 上述data
内容仅为示例,下表所列参数为data
中常用的参数介绍。
字段名称 | 字段类型 | 说明 |
---|
name | String | 补数据工作流的名称。 |
rootNodeId | Long | 补数据工作流根节点ID。 |
rootNodeProjectId | Long | 补数据工作流根节点所在的项目空间ID。 |
includeNodeIds | Array | 补数据的节点ID列表。 |
excludeNodeIds | Array | 无需补数据的节点ID列表。该列表中的节点会生成空跑实例,空跑实例被调度后直接运行成功,不会执行脚本内容。 |
bizBeginTime | String | 任务的开始时间。仅小时调度任务需要设置该参数,格式为HH:mm:ss ,取值范围为 00:00:00~23:59:59。 |
bizEndTime | String | 任务的结束时间。仅小时调度任务需要设置该参数,格式为HH:mm:ss ,取值范围为 00:00:00~23:59:59。 |
isParallel | Boolean | 补数据操作是否可以并行运行: |
parallelGroup | Integer | 并行运行的分组数,1表示不分组。 |
startFutureInstanceImmediately | Boolean | 是否跳过调度时间,立即调度运行未来时间的实例:- true:跳过调度时间,立即执行未来实例。
- false:不跳过调度时间,不会立即执行未来实例。
|
order | String | 根据业务日期制定补数据运行的顺序:- acs:表示按照业务日期正序运行。
- desc:表示按照业务日期倒序运行。
|
multipleTimePeriods | String | 分段选取业务日期。示例,[{\"bizBeginTime\":\"2022-04-17\",\"bizEndTime\":\"2022-04-17\"}] 。 |
tenantId | Long | 租户ID。 |
projectId | Long | 补数据操作所在的项目ID。 |
operator | String | 执行补数据操作的用户。 |
extensionBizId | String | 扩展程序卡点流程ID。 |
消息实体格式:审批中心(创建、完成审批单)
- 创建审批单事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"appId":194209,
"assignee":"286098539641742899",
"assigneeName":"yupeng.sunyp",
"createTime":1652094363000,
"eventType":"approval",
"process":{
"applicant":"286098539641742899",
"applicantName":"yupeng.sunyp",
"approvalContent":{
"applyPeriod":"2997964800000",
"applyReason":"测试",
"arrayData":[
{
"ownerBaseId":"1822931104031845",
"columns":[
{
"columnType":"string",
"columnGuid":"odps.b_mc1.loghub_070103.id",
"columnComment":"",
"columnName":"id"
},
{
"columnType":"string",
"columnGuid":"odps.b_mc1.loghub_070103.name",
"columnComment":"",
"columnName":"name"
},
{
"columnType":"string",
"columnGuid":"odps.b_mc1.loghub_070103.times",
"columnComment":"",
"columnName":"times"
}
],
"columnMetaList":[
{
"columnType":"string",
"columnGuid":"odps.b_mc1.loghub_070103.id",
"columnComment":"",
"columnName":"id"
},
{
"columnType":"string",
"columnGuid":"odps.b_mc1.loghub_070103.name",
"columnComment":"",
"columnName":"name"
},
{
"columnType":"string",
"columnGuid":"odps.b_mc1.loghub_070103.times",
"columnComment":"",
"columnName":"times"
}
],
"objectType":"TABLE",
"odpsTable":"loghub_070103",
"envType":1,
"projectGuid":"odps.b_mc1",
"objectGuid":"odps.b_mc1.loghub_070103",
"tenantId":0,
"objectName":"loghub_070103",
"ownerAccountName":"ALIYUN$dataworks_3h1_1(dataworks_3h1_1)",
"odpsProject":"B_MC1",
"projectName":"B_MC1",
"actions":[
"Select",
"Describe"
],
"projectId":194209,
"workspaceId":"194209"
}
],
"contentType":"application/json",
"granteeAccounts":[
{
"granteeId":"286098539641742899",
"granteeTypeSub":103,
"granteeType":1,
"granteeName":"RAM$dataworks_3h1_1:yupeng.sunyp"
},
{
"granteeId":"237857631119109360",
"granteeTypeSub":105,
"granteeType":1,
"granteeName":"RAM$dataworks_3h1_1:dev"
}
],
"odpsProjectName":"B_MC1",
"projectEnv":"1",
"resourceSummary":"loghub_070103",
"tenantId":280749521950784,
"workspaceId":194209
},
"assignmentCategory":"MaxCompute",
"createTime":1652094363000,
"processDefinitionId":"definition-3dcc9ce7-d29d-435d-a908-60d4355ff5e2",
"processId":"528535869984706",
"status":"Pending",
"title":"MaxComputeTable",
"updateTime":1652094363000
},
"processId":"528535869984706",
"status":"Submit",
"taskId":"528535870015424",
"tenantId":280749521950784,
"updateTime":1652094364000
}
重要字段说明如下表。字段名称 | 字段类型 | 说明 |
---|
appId | Long | 工作空间ID。 |
assignee | String | 审批单处理人的BaseId。 |
assigneeName | String | 审批单处理人的名称。 |
comments | String | 备注信息。 |
createTime | Long | 审批单的创建时间戳。 |
processId | String | 审批单的ID。 |
status | String | 审批单的状态。 |
taskId | String | 审批任务的ID。 |
tenantId | String | 租户ID。 |
updateTime | String | 审批单更新的时间戳。 |
eventType | String | 事件类型。 |
process | Object | 审批任务对象。 |
applicant | String | 申请BaseID。 |
applicantName | String | 申请人名称。 |
assignmentCategory | String | 申请内容类型。 |
createTime | String | 审批单创建的时间戳。 |
processDefinitionId | String | 工作流定义ID。 |
processId | String | 工作流ID。 |
status | String | 工作流状态。 |
title | String | 工作流标题。 |
updateTime | Long | 审批单更新的时间戳。 |
approvalContent | Object | 审批内容对象。 |
applyPeriod | String | 审批单的申请时长。 |
applyReason | String | 审批单的申请原因。 |
contentType | String | 审批内容类型。 |
odpsProjectName | String | 审批项目名称。 |
resourceSummary | String | 资源描述。 |
tenantId | Long | 租户ID。 |
workspaceId | Long | 工作空间ID。 |
projectEnv | String | 审批项目所属环境。 |
granteeAccounts | Array | 授权列表。 |
.granteeId | String | 授权主体ID。 |
granteeType | String | 授权类型。 |
granteeTypeSub | String | 授权主体子类型。 |
granteeName | String | 授权主体名称。 |
arrayData | Array | 授权内容列表,详情请参见附录一:授权内容列表(MaxCompute)。 |
- 完成审批单事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"appId":227859,
"assignee":"286098539641742899",
"assigneeName":"yupeng.sunyp",
"comments":"ces",
"createTime":1652095981000,
"eventType":"approval",
"process":{
"applicant":"286098539641742899",
"applicantName":"yupeng.sunyp",
"approvalContent":{
"applyPeriod":"2997964800000",
"applyReason":"测试",
"arrayData":[
{
"ownerBaseId":"238266539737108884",
"columns":[
{
"columnType":"string",
"columnGuid":"odps.da_simple_202112.cdd.id",
"columnComment":"",
"columnName":"id"
}
],
"columnMetaList":[
{
"columnType":"string",
"columnGuid":"odps.da_simple_202112.cdd.id",
"columnComment":"",
"columnName":"id"
}
],
"objectType":"TABLE",
"odpsTable":"cdd",
"objectNameCn":"树大根深的",
"envType":1,
"projectGuid":"odps.da_simple_202112",
"objectGuid":"odps.da_simple_202112.cdd",
"tenantId":0,
"objectName":"cdd",
"ownerAccountName":"RAM$dataworks_3h1_1:wb-wjh926877(wb-wjh926877)",
"odpsProject":"da_simple_202112",
"projectName":"da_simple_202112",
"actions":[
"Select",
"Describe"
],
"projectId":227859,
"workspaceId":"227859"
}
],
"contentType":"application/json",
"granteeAccounts":[
{
"granteeId":"286098539641742899",
"granteeTypeSub":103,
"granteeType":1,
"granteeName":"RAM$dataworks_3h1_1:yupeng.sunyp"
}
],
"odpsProjectName":"da_simple_202112",
"projectEnv":"1",
"resourceSummary":"cdd",
"tenantId":280749521950784,
"workspaceId":227859
},
"assignmentCategory":"MaxCompute",
"createTime":1652095981000,
"processDefinitionId":"definition-6e6418e6-c65f-4f26-a673-88576b1c1e4a",
"processId":"528539182432192",
"status":"Pending",
"title":"MaxComputeTable",
"updateTime":1652095981000
},
"processId":"528539182432192",
"status":"Completed",
"taskId":"528539182780353",
"tenantId":280749521950784,
"updateTime":1652096017000
}
重要字段说明如下表。字段名称 | 字段类型 | 说明 |
---|
appId | Long | 工作空间ID。 |
assignee | String | 审批单处理人BaseId。 |
assigneeName | String | 审批单处理人名称。 |
comments | String | 备注信息。 |
createTime | Long | 审批单的创建时间戳。 |
processId | String | 审批单ID。 |
status | String | 审批单状态。 |
taskId。 | String | 审批任务ID。 |
tenantId | String | 租户ID。 |
updateTime | String | 审批单最近一次更新的时间戳。 |
eventType | String | 事件类型。 |
process | Object | 审批任务对象。 |
applicant | String | 申请BaseID。 |
applicantName | String | 申请人名称。 |
assignmentCategory | String | 申请内容类型。 |
createTime | String | 审批单创建的时间戳。 |
processDefinitionId | String | 工作流定义ID。 |
processId | String | 工作流ID。 |
status | String | 工作流状态。 |
title | String | 工作流标题。 |
updateTime | Long | 审批单更新的时间戳。 |
approvalContent | Object | 审批内容对象。 |
applyPeriod | String | 申请时长。 |
applyReason | String | 申请原因。 |
contentType | String | 内容类型。 |
odpsProjectName | String | 项目名称。 |
resourceSummary | String | 资源描述。 |
tenantId | Long | 租户ID。 |
workspaceId | Long | 工作空间ID。 |
projectEnv | String | 所属环境。 |
granteeAccounts | Array | 授权列表。 |
granteeId | String | 授权主体ID。 |
granteeType | String | 授权类型。 |
granteeTypeSub | String | 授权主体子类型。即授权账号类型,具体如下:- 生产云账号(生产调度使用的账号):ACCOUNT_PRD(101)
- 应用云账号:ACCOUNT_APP(102)
- 个人云账号:ACCOUNT_USER(103)
- 部门云账号:ACCOUNT_DEPT(104)
- MOCK账号:ACCOUNT_MOCK(106)
- 他人云账号:ACCOUNT_OTHER_USER(105)
|
granteeName | String | 授权主体名称。 |
arrayData | Array | 授权内容列表,详情请参见附录二:授权内容列表(DataService)。 |
消息实体格式:运维中心告警(基线、事件、任务规则、资源组规则)
- 基线告警的消息实体格式(即消息中
data
字段的内容)示例如下。{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"alarmType": "SLA_ALERT",
"baselineId": 137000723,
"baselineName": "ods层check任务专用 -- 小时级",
"baselineStatus": 3,
"bizDate": 1654444800000,
"inGroupId": 14,
"nodeId": 10000398734,
"projectId": 7634,
"taskId": 3072893778,
"tenantId": 260953503125729
}
字段说明如下表。字段名称 | 字段类型 | 说明 |
---|
alarmType | String | 告警类型。取值如下:- SLA_ALERT
- REMIND_ALERT
- TOPIC_ALERT
|
baselineId | Long | 基线ID。 |
baselineName | String | 基线名称。 |
baselineStatus | Integer | 基线的状态。取值如下: |
bizDate | Long | 业务日期时间戳。 |
inGroupId | Integer | 基线实例的周期号。天基线为1 ,小时基线的取值范围为[1,24] 。 |
nodeId | Long | 导致基线异常的节点ID。 |
projectId | Long | 基线所属项目空间ID。 |
taskId | Long | 导致基线异常的实例ID。 |
tenantId | Long | 租户ID。 |
- 事件告警的消息实体格式(即消息中
data
字段的内容)示例如下。{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"alarmType": "TOPIC_ALERT",
"nodeId": 10000316315,
"projectId": 9109,
"taskId": 3072750357,
"taskStatus": 5,
"tenantId": 280749521950784,
"topicId": 1084769
}
字段说明如下表。字段名称 | 字段类型 | 说明 |
---|
alarmType | String | 告警类型。取值如下:- SLA_ALERT
- REMIND_ALERT
- TOPIC_ALERT
|
topicId | Long | 事件的ID。 |
taskStatus | String | 触发事件的节点实例状态。 |
nodeId | Integer | 触发事件的节点ID。 |
projectId | Long | 触发事件的节点所属项目空间ID。 |
taskId | Long | 触发事件的节点实例ID。 |
tenantId | Long | 租户ID。 |
- 任务规则告警的消息实体格式(即消息中
data
字段的内容)示例如下。
说明 通常,规则对象为任务节点、基线、工作空间、业务流程等。
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"alarmType": "REMIND_ALERT",
"nodeIds": "10000405472,10000405473,10000405474",
"projectId": 9876,
"remindId": 7605,
"remindName": "出错报警",
"remindType": "ERROR",
"remindUnit": "NODE",
"taskIds": "3072900896,3072900870,3072900855",
"tenantId": 259025816027648
}
字段说明如下表。字段名称 | 字段类型 | 说明 |
---|
alarmType | String | 告警类型。取值如下:- SLA_ALERT
- REMIND_ALERT
- TOPIC_ALERT
|
nodeIds | String | 触发规则告警的节点列表。 |
remindId | Long | 规则ID。 |
remindType | Stirng | 规则触发条件。取值如下:- FINISHED:完成。
- UNFINISHED:未完成。
- ERROR:运行出错。
- CYCLE_UNFINISHED:周期未完成。
- TIMEOUT:运行超时。
|
projectId | Long | 触发规则的节点所属项目空间ID。 |
remindUnit | String | 触发规则的对象类型。取值如下:- NODE:任务节点。
- GATEWAY_RES:独享调度资源组。
- DI_RES:数据集成资源组。
|
tenantId | Long | 租户ID。 |
taskId | String | 触发规则告警的实例列表。 |
remindName | String | 规则名称。 |
- 资源组规则告警的消息实体格式(即消息中
data
字段的内容)示例如下。{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"alarmType": "REMIND_ALERT",
"projectId": 2991,
"remindId": 200000186,
"remindName": "独享资源组告警",
"remindType": "RES_GROUP_THRESHOLD",
"remindUnit": "GATEWAY_RES",
"resourceGroupIdentifier": "S_res_group_195820716552192_1650965857744",
"resourceGroupName": "emr_exclusive_scheduld",
"resourceGroupType": "GATEWAY",
"tenantId": 195820716552192
}
字段说明如下表。字段名称 | 字段类型 | 说明 |
---|
alarmType | String | 告警类型。取值如下:- SLA_ALERT
- REMIND_ALERT
- TOPIC_ALERT
|
remindId | Long | 规则ID。 |
remindType | Stirng | 规则触发条件。取值如下:- FINISHED:完成。
- UNFINISHED:未完成。
- ERROR:运行出错。
- CYCLE_UNFINISHED:周期未完成。
- TIMEOUT:运行超时。
- RES_GROUP_THRESHOL:资源组利用率。
- RES_GROUP_WAIT_AMOUNT:资源组等待资源实例数。
|
projectId | Long | 触发规则的节点所属项目空间ID。 |
remindUnit | String | 触发规则的对象类型。取值如下:- NODE:任务节点。
- GATEWAY_RES:独享调度资源组。
- DI_RES:数据集成资源组。
|
tenantId | Long | 租户ID。 |
remindName | String | 规则名称。 |
resourceGroupIdentifier | String | 资源组的唯一标识符。 |
resourceGroupName | String | 资源组名称。 |
resourceGroupType | String | 资源组类型。取值如下:- GATEWAY:调度资源组。
- DI:数据集成资源组。
|
消息实体格式:工作流状态变更事件
工作流状态变更事件的消息实体格式(即事件消息中data
字段的内容)示例如下。{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"bizDate": "2022-11-07 00:00:00",
"createTime": "2022-11-08 10:56:32",
"dagId": 500358972116,
"dagName": "P_test_spark_true_copy_20221108_105631",
"dagType": 3,
"flowId": 1,
"flowName": "ATCLOUD_FLOW",
"operator": "1107550004253538",
"projectEnv": "PROD",
"projectId": 258463,
"status": 6,
"tenantId": 524257424564736
}
字段说明如下表。字段名称 | 字段类型 | 说明 |
---|
bizDate | String | 工作流的业务日期。格式为yyyy-mm-dd hh24:mi:ss 。 |
createTime | String | 工作流的创建时间。格式为yyyy-mm-dd hh24:mi:ss 。 |
dagId | Long | DagId,根据DagId可获取Dag详情。 |
dagName | String | 工作流的名称。 |
dagType | Integer | Dag的类型,取值如下:- 0:周期调度任务。
- 1:手动任务。
- 2:冒烟测试。
- 3:补数据。
- 4:临时业务流程。
- 5:手动业务流程。
|
flowId | Integer | 工作流对应的业务流程ID。 |
flowName | String | 工作流对应的业务流程名称。 |
operator | String | 创建工作流的用户UID。 |
projectEnv | String | 工作流所属环境,取值如下: |
tenantId | Long | 调度任务实例所在工作空间的租户ID。 |
projectId | Long | 工作空间ID。 |
status | Integer | 工作流中任务的状态,取值如下:- 0:未运行。
- 4:运行中。
- 5:执行失败。
- 6:执行成功。
|
消息实体格式:数据质量校验结果反馈事件
数据质量校验结果反馈事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"ruleCheckId": 521771452,
"feedbackContent": "跳过",
"ruleId": 28610334,
"createUser": "110755000425****",
"taskId": "1676448144650974278b99ee64ca9a26ecf4063a88797",
"beginTime": "1676448145000",
"envType": "ODPS",
"projectName": "test_mc_2303_kongjian",
"projectId": 242601,
"tenantId": 524257424564736
}
字段名称 | 字段类型 | 说明 |
---|
ruleId | Long | 数据质量规则ID。 |
ruleCheckId | Long | 校验结果自增ID。 |
feedbackContent | String | 反馈内容。 |
createUser | String | 反馈人用户ID。 |
taskid | String | 数据质量任务的ID。 |
beginTime | String | 反馈时间。 |
envType | String | 规则关联的表所属数据源类型,包括:ODPS、EMR、HOLO。 |
projectName | String | 规则关联的表所属的数据源唯一标识。 |
projectId | Long | DataWorks项目空间ID。 |
tenantId | Long | DataWorks租户ID。 |
消息实体格式:数据质量校验完成事件
数据质量校验完成事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"projectId": 242601,
"tenantId": 52425742456****,
"id": 52177****,
"taskId": "1676463288449d1b2102ac40048aeb7a628753363****",
"entityId": 1562***,
"ruleId": 28610334,
"property": "-",
"bizdate": "2023-02-09 00:00:00",
"dateType": "YMD",
"actualExpression": "ds\u003d20230210",
"matchExpression": "ds\u003d$[yyyymmdd]",
"blockType": 1,
"checkResult": 0,
"checkResultStatus": 0,
"methodName": "table_count",
"beginTime": "2023-02-15 20:14:48",
"endTime": "2023-02-15 20:14:55",
"timeConsuming": "7s",
"externalType": "CWF2",
"externalId": "triggerByManual",
"discrete": false,
"fixedCheck": true,
"referenceValue": [
{
"bizDate": "3000-12-31 00:00:00",
"discreteProperty": "表行数,1天差值",
"value": 0.0,
"singleCheckResult": 0
}
],
"sampleValue": [
{
"bizDate": "2023-02-09 00:00:00",
"value": 3.0
}
],
"trend": "\u003e\u003d",
"expectValue": 0.0,
"op": "\u003e\u003d",
"projectName": "test_mc_2303_kongjian",
"tableName": "sx_dim_1209_001",
"templateId": 47,
"checkerType": 0,
"ruleName": "前一天差值",
"isPrediction": false,
"feedbackStatus": 0,
"whetherToFilterDirtyData": false
}
字段名称 | 字段类型 | 说明 |
---|
id | Long | 本次校验流程的主键ID。每触发一次规则校验会新增一条主键ID记录。 |
projectId | Long | DataWorks项目空间ID |
tenantId | Long | DataWorks租户ID |
taskId | String | 校验任务的ID。 |
entityId | Long | 分区表达式ID。 |
ruleId | Long | 规则ID。 |
property | String | 规则属性的字段,即被校验数据源表的column名称。 |
bizdate | Long | 业务日期。如果被校验的业务主体为离线数据,则业务日期通常为执行校验操作的前一天。 |
dateType | String | 调度周期的类型。通常为YMD,即年任务、月任务、天任务。 |
actualExpression | String | 被校验的数据源表的实际分区。 |
matchExpression | String | 分区表达式。 |
blockType | Integer | 校验规则的强弱。强弱表示规则的重要程度。取值如下:您可以根据实际需求设置重要的规则为强规则。如果使用强规则并触发了红色告警,则会阻塞调度任务。 |
checkResult | Integer | 校验结果状态。取值如下:- -1:校验异常
- 0:校验通过
- 1:触发橙色阈值
- 2:触发红色阈值
|
methodName | String | 采集样本数据的方法。包括:avg、count、sum、min、max、count_distinct、user_defined、table_count、table_size、table_dt_load_count、table_dt_refuseload_count、null_value、null_value/table_count、(table_count-count_distinct)/table_count、table_count-count_distinct等。 |
beginTime | Long | 执行校验操作的开始时间。 |
endTime | Long | 查询校验结果的截止时间。 |
timeConsuming | String | 执行校验任务花费的时间。 |
externalType | String | 调度系统的类型。目前仅支持CWF2。如果externalType为空则表示是手动试跑任务。 |
externalId | String | - externalType为CWF2时,表示调度任务的节点ID。
- externalType为空时,取值为triggerByManual,表示为手动试跑任务。
|
discrete | Boolean | 是否为离散校验。取值如下: |
fixedCheck | Boolean | 是否为固定值校验。取值如下:- true:是固定值校验。
- false:非固定值校验。
|
referenceValue | 历史样本值。 |
| bizDate | Long | 业务日期。如果被校验的业务主体为离线数据,则业务日期通常为执行校验操作的前一天。 |
discreteProperty | String | 通过group by分组后的样本字段取值。例如group by性别字段,则DiscreteProperty为男生、女生和null。 |
value | Decimal | 样本值。 |
singleCheckResult | Integer | 校验结果的字符串。 |
sampleValue | 当前使用的样本。 |
| bizDate | Long | 业务日期。如果被校验的业务主体为离线数据,则业务日期通常为执行校验操作的前一天。 |
value | Decimal | 样本值。 |
trend | String | 校验结果的趋势。 |
expectValue | Double | 期望值。 |
op | String | 比较符。 |
projectName | String | 需要进行数据质量校验的引擎或者数据源名称。 |
tableName | String | 进行校验的表名称。 |
templateId | Integer | 使用的校验模板的ID。 |
checkerType | Integer | 校验器的类型。 |
ruleName | String | 规则的名称。 |
isPrediction | Boolean | 是否为预测的结果。取值如下:- true:是预测的结果。
- false:不是预测的结果。
|
comment | String | 校验规则的描述。 |
附录一:授权内容列表(MaxCompute)
当
assignmentCategory
类型为MaxCompute时,
arrayData
的数据内容示例如下。
[
{
"ownerBaseId":"1822931104031845",
"columns":[
{
"columnType":"decimal(38,18)",
"columnGuid":"odps.px_wokebench_1.oracle_dss_a1214_1136_1_invalid_column.i__d",
"columnComment":"",
"columnName":"i__d"
}
],
"columnMetaList":[
{
"columnType":"decimal(38,18)",
"columnGuid":"odps.px_wokebench_1.oracle_dss_a1214_1136_1_invalid_column.i__d",
"columnComment":"",
"columnName":"i__d"
}
],
"objectType":"TABLE",
"odpsTable":"oracle_dss_a1214_1136_1_invalid_column",
"envType":1,
"projectGuid":"odps.px_wokebench_1",
"objectGuid":"odps.px_wokebench_1.oracle_dss_a1214_1136_1_invalid_column",
"tenantId":0,
"objectName":"oracle_dss_a1214_1136_1_invalid_column",
"ownerAccountName":"ALIYUN$dataworks_3h1_1(dataworks_3h1_1)",
"odpsProject":"dataworks_pref_test1113",
"projectName":"dataworks_pref_test1113",
"actions":[
"Select",
"Describe"
],
"projectId":107100,
"workspaceId":"107100"
}
]
重要字段说明如下表。
字段名称 | 字段类型 | 说明 |
---|
ownerBaseId | String | 表所有者的BaseID。 |
ownerAccountName | String | 表所有者的名称。 |
objectType | String | 对象类型。 |
odpsTable | String | 表名称。 |
envType | String | 表所属环境。 |
projectGuid | String | 项目Guid。 |
objectGuid | String | 对象Guid。 |
objectName | String | 对象名称。 |
odpsProject | String | ODPS项目名称。 |
projectName | String | 项目代码。 |
projectId | Long | 项目ID。 |
workspaceId | String | 工作空间ID。 |
actions | Array | 操作类型列表。 |
columns | Array | 列对应的列表。 |
columnType | String | 列类型。 |
columnGuid | String | 列Guid。 |
columnComment | String | 列备注。 |
columnName | String | 列名称。 |
columnMetaList | Array | 列元数据列表。 |
附录二:授权内容列表(DataService)
当
assignmentCategory
类型为DataService时,
arrayData
的数据内容示例如下。
[
{
"resourceId":"DsApiDeploy/1822931104031845/workspaceId/227859/dsDeployId/1|417381955947846|1|280749521950784|227859",
"ownerName":"ysf01115618",
"resourceVersion":1,
"name":"api_api",
"dsDeployId":"1|417381955947846|1|280749521950784|227859",
"workspaceName":"da_simple_202112",
"id":"417381955947846",
"type":1,
"ownerId":"258882139554032242",
"url":"https://ds-cn-shanghai.data.aliyun.com/?projectId=227859&type=api&id=417381955947846&version=1&defaultProjectId=227859",
"workspaceId":"227859"
}
]
字段说明如下表。
字段名称 | 字段类型 | 说明 |
---|
resourceId | String | 资源ID。 |
ownerName | String | 资源所有者的名称。 |
resourceVersion | Long | 资源版本。 |
name | String | 资源名称。 |
dsDeployId | String | 数据服务发布ID。 |
workspaceName | String | 工作空间名称。 |
id | String | 资源唯一ID。 |
type | String | 资源类型,取值如下: |
ownerId | String | 资源责任人BaseID。 |
url | String | 数据服务链接地址。 |
workspaceId | String | 工作空间ID。 |