数据传输服务DTS(Data Transmission Service)支持选择数据同步或迁移到消息队列(例如Kafka或RocketMQ)时所采用的存储格式,本文为您介绍数据格式的定义说明,方便您根据定义解析数据。
数据存储格式
DTS支持将写入至消息队列的数据存储为如下三种格式:
DTS Avro:一种数据序列化格式,可以将数据结构或对象转化成便于存储或传输的格式。
Shareplex Json:数据复制软件SharePlex读取源库中的数据,将数据写入至消息队列时,数据存储格式为Shareplex Json。
Canal Json:Canal解析数据库增量日志,并将增量数据传输至消息队列,数据存储格式为Canal Json。
DTS Avro
您需要根据DTS Avro的Schema定义进行数据解析,详情请参见DTS Avro的Schema定义和DTS Avro的反序列化示例。
DTS Avro格式中的DDL语句为String类型。
Shareplex Json
参数说明
参数 | 说明 |
| 数据库中事务的提交时间,格式为yyyy-MM-ddTHH:mm:ssZ(UTC时间)。 |
| 提交事务的用户ID。 |
| 数据操作类型,包括INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、UPDATE AFTER。 |
| 系统变化编号SCN(System Change Number),用以标识数据库在某个确切时刻提交事务的版本。每个已提交的事务分配一个唯一的SCN。 |
| 用于定位数据库中一条记录的一个相对唯一地址值。 |
| 事务ID。 |
| 事务内部的操作序号,从1开始记录。 |
| 事务内部的操作总数。 |
| 表名。 |
| 事务内部操作的索引,格式为 |
| 事务提交至目标库的时间。 |
示例
插入数据
{
"meta": {
"time": "2017-06-16T14:24:34",
"userid": 84,
"op": "ins",
"scn": "14589063118712",
"rowid": "AAATGpAAIAAItcIAAA",
"trans": "7.0.411499",
"seq": 1,
"size": 11,
"table": "CL_BIZ1.MIO_LOG",
"idx": "1/11",
"posttime": "2017-06-16T14:33:52"
},
"data": {
"MIO_LOG_ID": "32539737"
}
}
更新数据
{
"meta": {
"time": "2017-06-16T15:38:13",
"userid": 84,
"op": "upd",
"table": "CL_BIZ1.MIO_LOG"
….
},
"data": {
"CNTR_NO": "1171201606"
},
"key": {
"MIO_LOG_ID": "32537893",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CNTR_NO": "1171201606syui26"
}
}
删除数据
{
"meta": {
"time": "2017-06-16T15:51:35",
"userid": 84,
"op": "del",
},
"data": {
"MIO_LOG_ID": "32539739",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CG_NO": null
}
}
Canal Json
参数说明
参数 | 说明 |
| 数据库名称。 |
| 操作在源库的执行时间,13位Unix时间戳,单位为毫秒。 说明 Unix时间戳转换工具可通过搜索引擎获取。 |
| 操作的序列号。 |
| 是否是DDL操作。
|
| 字段的数据类型。 说明 不支持精度等数据类型的参数信息。 |
| 变更前或变更后的数据。 说明 2022年3月20日之前创建的同步或迁移实例, |
| 主键名称。 |
| SQL语句。 |
| 经转换处理后的字段类型,取值与dataTypeNumber的数值相同。更多信息,请参见字段类型与dataTypeNumber数值的对应关系。 |
| 表名。 |
| 操作开始写入到目标库的时间,13位Unix时间戳,单位为毫秒。 说明 Unix时间戳转换工具可通过搜索引擎获取。 |
| 操作的类型,比如DELETE、UPDATE、INSERT。 说明 全量任务阶段固定为INIT。 |
| 全局事务标识GTID(Global Transaction Identifier),具有全局唯一性,一个事务对应一个GTID。 |
示例
更新数据
2022年3月20日之前创建的同步或迁移实例,源表的DELETE
语句同步或迁移到Kafka,其中old
的值是数据,data
的值是null。为了和开源社区保持一致,2022年3月20日起创建或重启的同步或迁移实例,data
的值是数据,old
的值是null。
2022年3月20日之前创建的同步或迁移实例
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
2022年3月20日起创建或重启的同步或迁移实例
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
DDL操作
{
"database":"dbname",表示同步或迁移的数据库名称
"es":1600161894000,表示源库数据写入到binlog的时间
"id":58,DTS缓存的偏移量
"isDdl":true,是否同步或迁移DDL
"sql":"eg:createxxx",Binlog的DDL语句
"table":"tablename",同步或迁移的表名
"ts":1600161894771,DTS将数据写入目标的时间
"type":"DDL"
}