本文介绍DataWorks开放消息的消息结构、不同事件类型事件的消息格式及各字段含义,帮助您快速获取和感知事件消息的状态变更信息。

消息总体格式

一个完整的消息由消息实体、消息类型和消息ID组成,示例如下:

{
    "messageId": "d62427a0-eb1d-4ec6-926e-8665068f7cbc",
    "messageType": "SCHEDULE_TASK",
    "messageBody": OBJECT
}

上述示例中,各字段详细说明请参见下表:

字段名称 字段类型 说明
messageId 文本 用来定位问题的唯一ID。
messageType 文本

消息类型,取值如下:

  • META_TABLE:对应表变更事件
  • SCHEDULE_TASK:对应调度任务变更事件
  • FILE_SUBMIT_EXTENSION:对应文件发布检查事件
messageBody 对象 事件类型的消息实体格式,具体请参见消息实体格式

消息实体格式

目前,消息类型主要包括任务变更事件、文件发布检查事件和表变更事件,不同地域支持的消息类型有所不同,具体请以产品界面为准。更多事件类型正在逐步丰富中。

  • 任务状态变更事件示例如下:
     {
            "finishTime": 1619148187000,
            "beginWaitTimeTime": 1619148161000,
            "beginRunningTime": 1619148163000,
            "dagId": *****,
            "dagType": 0,
            "taskType": 0,
            "modifyTime": 1619148187000,
            "createTime": 1619027108000,
            "appId": *****,
            "tenantId": *****,
            "opCode": 61,
            "flowId": *****,
            "nodeId": *****,
            "beginWaitResTime": 1619148161000,
            "taskId": *****,
            "status": 6
        }
    字段名称 字段类型 说明
    finishTime Long 调度任务实例运行完成的具体时间
    beginWaitTimeTime Long 调度任务实例开始等待的具体时间
    beginRunningTime Long 调度任务实例开始运行的具体时间
    dagId Long DagId
    dagType Integer
    Dag的类型,取值如下:
    • 0:周期调度任务
    • 1:手动任务
    • 2:冒烟测试
    • 3:补数据
    • 4:手动业务流程
    • 5:临时业务流程
    taskType Integer
    任务实例的调度类型,取值如下
    • NORMAL(0):正常调度任务。该任务被日常调度。
    • MANUAL(1):手动任务。该任务不会被日常调度。
    • PAUSE(2):冻结任务。该任务被日常调度,但启动调度时直接被置为失败状态。
    • SKIP(3):空跑任务。该任务被日常调度,但启动调度时直接被置为成功状态。
    • SKIP_UNCHOOSE(4):临时工作流中未选择的任务,仅存在于临时工作流中,启动调度时直接被置为成功状态。
    • SKIP_CYCLE(5):未到运行周期的周或月任务。该任务被日常调度,但启动调度时直接被置为成功状态。
    • CONDITION_UNCHOOSE(6):上游实例中有分支(IF)节点,但是该下游节点未被分支节点选中,直接置为空跑任务。
    • REALTIME_DEPRECATED(7):实时生成的已经过期的周期实例,该类型的任务直接被置为成功状态。
    modifyTime Long 任务实例的修改时间。
    createTime Long 任务实例的创建时间。
    appId Long 工作空间的ID。
    tenantId Long 调度任务实例的租户ID。
    opCode Integer 调度任务实例的操作码:该字段可忽略。
    flowId Long 业务流程的ID,周期调度任务实例的业务流程默认为1,手动业务流程和内部工作流调度任务实例为实际的业务流程ID
    nodeId Long 调度任务实例对应的节点ID。
    beginWaitResTime Long 调度任务实例开始等资源的具体时间。
    taskId Long 调度任务实例ID。
    status Integer
    任务的状态,取值如下:
    • 0:未运行
    • 2:等待定时时间dueTime或cycleTime到来
    • 3:等待资源
    • 4:运行中
    • 5:执行失败
    • 6:执行成功
    • 7:下发给数据质量进行数据校检
    • 8:正在进行分支条件校检
  • 文件发布检查事件示例如下:
    {
        "isContentChange": true,                         
        "baseId": "**************",                     
        "appId": 123456,                             
        "tenantId": 123456,                     
        "cloudUuid": 123456,                    
        "type": 123456,                             
        "fileName": "********",                         
        "owner": "****************",                 
        "projectOwner": "*************",                 
        "projectName": "***********",                     
        "checkerIdentify": "************",                 
        "checkerInstanceId": "***************",            
        "changeType": "UPDATE",                    
        "useType": 0,                                
        "fileCreateTime": "2021-01-15 14:03:02",            
        "fileId": 123456,                            
        "connName": "*******",                        
        "fileVersion": 3                                
    }
    字段名称 字段类型 说明
    isContentChange Boolean 相较于上一次文件内容是否有变动
    baseId String 发布人用户UID
    appId Long 项目空间ID
    tenantId Long 租户ID
    cloudUuid Long 调度节点ID
    type Long 文件的代码类型,常见的包括:6(Shell) 、 10(ODPS SQL)、11(ODPS MR) 、 23(数据集成)、24(ODPS Script) 、 99(虚拟节点)、221(PyODPS 2) 、 225(ODPS Spark)、227(EMR Hive) 、 228(EMR Spark)、229(EMR Spark SQL) 、 230(EMR MR)、239(OSS对象检查) 、 257(EMR Shell)、258(EMR Spark Shell) 、 259(EMR Presto)、260(EMR Impala) 、 900(实时同步)、1089(跨租户节点) 、 1091(Hologres开发)、1093(Hologres SQL) 、 1100(赋值节点)、1221(PyODPS 3)
    fileName String 文件名称
    owner String ower
    projectOwner String 项目空间ower
    projectName String 项目空间名称
    checkerIdentify String 检查器唯一标识
    checkerInstanceId String 检查器实例唯一标识
    changeType String
    变更类型,取值如下:
    • UPDATE
    • DELETE
    • CREATE
    useType Long
    文件所属的功能模块,取值如下:
    • 0:NORMAL 数据开发
    • 1:MANUAL 手动任务
    • 2:MANUAL_BIZ 手动工作流
    • 3:SKIP 数据开发的空跑调度
    • 10:ADHOCQUERY 临时查询
    • 30:COMPONENT 组件管理
    fileCreateTime String 文件创建时间,格式为yyyy-MM-dd HH:mm:ss
    fileId Long 文件ID
    connName String 文件关联的数据源标识
    fileVersion Long 文件版本
  • 表/分区变更事件示例如下:
    // 新增分区
    {
      "appName": "di_fvt_after_3in1_02",
      "eventType": "ADD_PARTITION",
      "gmtCreate": 1619590083495,
      "level": "TABLE",
      "owner": "ALIYUN$di-test-05@datax.io",
      "properties": {
        "partitions": [
          "ds=20210428"
        ]
      },
      "tableGuid": "odps.di_fvt_after_3in1_02.sd_pt_sample",
      "tableName": "sd_pt_sample",
      "timestamp": 1619590081000
    }
    
    // 修改分区
    {
      "appName": "di_fvt_after_3in1_02",
      "eventType": "ALTER_PARTITION",
      "gmtCreate": 1619591301617,
      "level": "TABLE",
      "owner": "ALIYUN$di-test-05@datax.io",
      "properties": {
        "partitions": [
          "ds=20210428/hr=12"
        ]
      },
      "tableGuid": "odps.di_fvt_after_3in1_02.sample_ppt",
      "tableName": "sample_ppt",
      "timestamp": 1619591299000
    }
    
    // 删除分区
    {
      "appName": "di_fvt_after_3in1_02",
      "eventType": "DROP_PARTITION",
      "gmtCreate": 1619590287062,
      "level": "TABLE",
      "owner": "ALIYUN$di-test-05@datax.io",
      "properties": {
        "partitions": [
          "ds=20210428"
        ]
      },
      "tableGuid": "odps.di_fvt_after_3in1_02.sd_pt_sample",
      "tableName": "sd_pt_sample",
      "timestamp": 1619590285000
    }
    
    
    // 创建表
    {
      "appName": "di_fvt_after_3in1_02",
      "eventType": "CREATE_TABLE",
      "gmtCreate": 1619589912720,
      "level": "TABLE",
      "owner": "ALIYUN$di-test-05@datax.io",
      "properties": {},
      "tableGuid": "odps.di_fvt_after_3in1_02.sd_pt_sample",
      "tableName": "sd_pt_sample",
      "timestamp": 1619589909000
    }
    
    // 修改表
    {
      "appName": "di_fvt_after_3in1_02",
      "eventType": "ALTER_TABLE",
      "gmtCreate": 1619590339220,
      "level": "TABLE",
      "owner": "ALIYUN$di-test-05@datax.io",
      "properties": {},
      "tableGuid": "odps.di_fvt_after_3in1_02.sd_pt_sample",
      "tableName": "sd_pt_sample",
      "timestamp": 1619590339000
    }
    
    // 修改表(重命名操作)
    {
      "appName": "di_fvt_after_3in1_02",
      "eventType": "ALTER_TABLE",
      "gmtCreate": 1619590448417,
      "level": "TABLE",
      "owner": "ALIYUN$di-test-05@datax.io",
      "properties": {
        "oldTable": "sd_pt_sample"
      },
      "tableGuid": "odps.di_fvt_after_3in1_02.sd_pt_sample_rnd",
      "tableName": "sd_pt_sample_rnd",
      "timestamp": 1619590446000
    }
    
    // 删除表
    {
      "appName": "di_fvt_after_3in1_02",
      "eventType": "DROP_TABLE",
      "gmtCreate": 1619590379490,
      "level": "TABLE",
      "owner": "ALIYUN$di-test-05@datax.io",
      "properties": {},
      "tableGuid": "odps.di_fvt_after_3in1_02.sd_pt_sample",
      "tableName": "sd_pt_sample",
      "timestamp": 1619590378000
    }
    字段名称 字段类型 说明
    appName String 事件操作对应的项目名称
    tableName String 事件所发生的表名称
    tableGuid String 表唯一标识
    timestamp Long 事件发生的时间戳
    level String

    事件级别,取值如下:

    • 分区级别
    • 表级别
    gmtCreate Long 消息发生时间戳(10位)
    Properties Struct
    扩展属性,表重命名或分区操作时有取值,其他场景下取值为空:
    • 表重命名时,取值为被重命名前的表名称。
    • 分区操作时,取值包含完整的分区信息例如ds=20210428/hr=12
    eventType String 事件类型,不同事件类型,消息主体信息也不同,主要包括:
    • CREATE_TABLE
    • ALTER_TABLE
    • DROP_TABLE
    • ADD_PARTITION
    • ALTER_PARTITION
    • DROP_PARTITION
    owner String 触发者云账号,包含AccountProvider信息