消息队列中的数据存储格式

数据传输服务DTS(Data Transmission Service)支持选择数据同步或迁移到消息队列(例如KafkaRocketMQ)时所采用的存储格式,本文为您介绍数据格式的定义说明,方便您根据定义解析数据。

数据存储格式

DTS支持将写入至消息队列的数据存储为如下三种格式:

  • DTS Avro:一种数据序列化格式,可以将数据结构或对象转化成便于存储或传输的格式。

  • Shareplex Json:数据复制软件SharePlex读取源库中的数据,将数据写入至消息队列时,数据存储格式为Shareplex Json。

  • Canal JsonCanal解析数据库增量日志,并将增量数据传输至消息队列,数据存储格式为Canal Json。

DTS Avro

您需要根据DTS AvroSchema定义进行数据解析,详情请参见DTS AvroSchema定义DTS Avro的反序列化示例

说明

DTS Avro格式中的DDL语句为String类型。

Shareplex Json

参数说明

参数

说明

time

数据库中事务的提交时间,格式为yyyy-MM-ddTHH:mm:ssZ(UTC时间)。

userid

提交事务的用户ID。

op

数据操作类型,包括INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、UPDATE AFTER。

scn

系统变化编号SCN(System Change Number),用以标识数据库在某个确切时刻提交事务的版本。每个已提交的事务分配一个唯一的SCN。

rowid

用于定位数据库中一条记录的一个相对唯一地址值。

trans

事务ID。

seq

事务内部的操作序号,从1开始记录。

size

事务内部的操作总数。

table

表名。

idx

事务内部操作的索引,格式为seq/size,例如1/11表示,在操作总数为11的事务内部,该操作的序号为1。

posttime

事务提交至目标库的时间。

示例

插入数据

{
    "meta": {
        "time": "2017-06-16T14:24:34", 
        "userid": 84,                                    
        "op": "ins",                                   
          "scn": "14589063118712",                  
          "rowid": "AAATGpAAIAAItcIAAA",      
        "trans": "7.0.411499",                 
        "seq": 1,                                          
        "size": 11,                                         
        "table": "CL_BIZ1.MIO_LOG",       
          "idx": "1/11",                                       
        "posttime": "2017-06-16T14:33:52"
    },
    "data": {
        "MIO_LOG_ID": "32539737"
     }
}

更新数据

{
    "meta": {
        "time": "2017-06-16T15:38:13",
        "userid": 84,
        "op": "upd",                             
        "table": "CL_BIZ1.MIO_LOG"
        ….
    },
    "data": {                                          
        "CNTR_NO": "1171201606"
    },
    "key": {                                            
        "MIO_LOG_ID": "32537893",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CNTR_TYPE": null,
        "CNTR_NO": "1171201606syui26"
    }
}

删除数据

{
    "meta": {
        "time": "2017-06-16T15:51:35",
        "userid": 84,
        "op": "del",                      
     },
    "data": {                                    
        "MIO_LOG_ID": "32539739",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CNTR_TYPE": null,
        "CG_NO": null
     }
}

Canal Json

参数说明

参数

说明

database

数据库名称。

es

操作在源库的执行时间,13Unix时间戳,单位为毫秒。

说明

Unix时间戳转换工具可通过搜索引擎获取。

id

操作的序列号。

isDdl

是否是DDL操作。

  • true:是。

  • false:否。

mysqlType

字段的数据类型。

说明

不支持精度等数据类型的参数信息。

olddata

变更前或变更后的数据。

说明

2022320日之前创建的同步或迁移实例,old的值是变更后的数据(默认包含所有列的数据,而不是只包含被修改列的数据),data的值是变更前的数据。为了和开源社区保持一致,2022320日起创建或重启的同步或迁移实例,data的值是变更后的数据,old的值是变更前的数据。

pkNames

主键名称。

sql

SQL语句。

sqlType

经转换处理后的字段类型,取值与dataTypeNumber的数值相同。更多信息,请参见字段类型与dataTypeNumber数值的对应关系

table

表名。

ts

操作开始写入到目标库的时间,13Unix时间戳,单位为毫秒。

说明

Unix时间戳转换工具可通过搜索引擎获取。

type

操作的类型,比如DELETE、UPDATE、INSERT。

说明

全量任务阶段固定为INIT

gtid

全局事务标识GTID(Global Transaction Identifier),具有全局唯一性,一个事务对应一个GTID。

示例

更新数据

说明

2022320日之前创建的同步或迁移实例,源表的DELETE语句同步或迁移到Kafka,其中old的值是数据,data的值是null。为了和开源社区保持一致,2022320日起创建或重启的同步或迁移实例,data的值是数据,old的值是null。

2022320日之前创建的同步或迁移实例

{
    "old": [
        {
            "shipping_type": "aaa"
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}

2022320日起创建或重启的同步或迁移实例

{
    "data": [
        {
            "id": "500000287", 
            "shipping_type": null
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
            

DDL操作

{
    "database":"dbname",表示同步或迁移的数据库名称
    "es":1600161894000,表示源库数据写入到binlog的时间
    "id":58,DTS缓存的偏移量
    "isDdl":true,是否同步或迁移DDL
    "sql":"eg:createxxx",Binlog的DDL语句
    "table":"tablename",同步或迁移的表名
    "ts":1600161894771,DTS将数据写入目标的时间
    "type":"DDL"
}