DTS支持选择迁移或同步到Kafka集群的数据存储格式,本文为您介绍数据格式的定义说明,方便您根据定义解析数据。

数据存储格式

DTS支持将写入至Kafka集群的数据存储为如下三种格式:

  • DTS Avro:一种数据序列化格式,可以将数据结构或对象转化成便于存储或传输的格式。
  • Shareplex Json:数据复制软件Shareplex读取源库中的数据,将数据写入至Kafka集群时,数据存储格式为Shareplex Json。
  • Canal JsonCanal解析数据库增量日志,并将增量数据传输至Kafka集群,数据存储格式为Canal Json。

DTS Avro

DTS Avro为默认存储格式。您需要根据DTS Avro的schema定义进行数据解析,schema定义详情请参见迁移或同步至Kafka集群中的数据均DTS Avro的schema定义
说明 DTS Avro格式中的DDL语句为String类型。

Shareplex Json

表 1. 参数说明
参数 说明
time 数据库中事务的提交时间,格式为yyyy-MM-ddTHH:mm:ssZ(UTC时间)。
userid 提交事务的用户ID。
op 数据操作类型,包括INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, UPDATE AFTER。
scn 系统变化编号SCN(System Change Number),用以标识数据库在某个确切时刻提交事务的版本。每个已提交的事务分配一个唯一的SCN。
rowid 用于定位数据库中一条记录的一个相对唯一地址值。
trans 事务ID。
seq 事务内部的操作序号,从1开始记录。
size 事务内部的操作总数。
table 表名。
idx 事务内部操作的索引,格式为seq/size,例如1/11表示,在操作总数为11的事务内部,该操作的序号为1。
posttime 事务提交至目标库的时间。

示例如下:

  • 插入数据:
    {
        "meta": {
            "time": "2017-06-16T14:24:34", 
            "userid": 84,                                    
            "op": "ins",                                   
              "scn": "14589063118712",                  
              "rowid": "AAATGpAAIAAItcIAAA",      
            "trans": "7.0.411499",                 
            "seq": 1,                                          
            "size": 11,                                         
            "table": "CL_BIZ1.MIO_LOG",       
              "idx": "1/11",                                       
            "posttime": "2017-06-16T14:33:52"
        },
        "data": {
            "MIO_LOG_ID": "32539737"
         }
    }
  • 更新数据:
    {
        "meta": {
            "time": "2017-06-16T15:38:13",
            "userid": 84,
            "op": "upd",                             
            "table": "CL_BIZ1.MIO_LOG"
            ….
        },
        "data": {                                          
            "CNTR_NO": "1171201606"
        },
        "key": {                                            
            "MIO_LOG_ID": "32537893",
            "PLNMIO_REC_ID": "31557806",
            "POL_CODE": null,
            "CNTR_TYPE": null,
            "CNTR_NO": "1171201606syui26"
        }
    }
  • 删除数据:
    {
        "meta": {
            "time": "2017-06-16T15:51:35",
            "userid": 84,
            "op": "del",                      
         },
        "data": {                                    
            "MIO_LOG_ID": "32539739",
            "PLNMIO_REC_ID": "31557806",
            "POL_CODE": null,
            "CNTR_TYPE": null,
            "CG_NO": null
         }
    }

Canal Json

表 2. 参数说明
参数 说明
database 数据库名称。
es 操作在源库的执行时间,13位Unix时间戳,单位为毫秒。
说明 Unix时间戳转换工具可用搜索引擎获取。
id 操作的序列号。
isDdl 是否是DDL操作。
  • true:是。
  • false:否。
mysqlType 字段的数据类型。
olddata 变更前的数据。
说明 2022年3月20日之前创建的DTS订阅实例,old的值是数据,data的值是null。为了和开源社区保持一致,2022年3月20日起创建或重启的DTS订阅实例,data的值是数据,old的值是null。
pkNames 主键名称。
sql SQL语句。
sqlType 经转换处理后的字段类型,如unsigned int会被转化为Long,unsigned long会被转换为BigDecimal。
table 表名。
ts 操作写入到目标库的时间,13位Unix时间戳,单位为毫秒。
说明 Unix时间戳转换工具可用搜索引擎获取。
type 操作的类型,比如DELETE、UPDATE、INSERT。

更新数据的示例如下:

说明 2022年3月20日之前创建的DTS订阅实例,源表的DELETE语句同步到kafka,其中old的值是数据,data的值是null。为了和开源社区保持一致,2022年3月20日起创建或重启的DTS订阅实例,data的值是数据,old的值是null。
2022年3月20日之前创建的DTS订阅实例
{
    "old": [
        {
            "shipping_type": "aaa"
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint(20)", 
        "shipping_type": "varchar(50)"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
2022年3月20日起创建或重启的DTS订阅实例
{
    "data": [
        {
            "id": "500000287", 
            "shipping_type": null
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint(20)", 
        "shipping_type": "varchar(50)"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
            

DDL操作示例如下:

{  
    "database": "dbname",   表示同步的数据库名称  
    "es": 1600161894000,   表示源库数据写入到binlog的时间  
    "id": 58,     DTS缓存的偏移量  
    "isDdl": true,      是否同步DDL  
    "sql": "eg: create xxx",     Binlog的DDL语句  
    "table": "tablename",      同步的表名  
    "ts": 1600161894771,     DTS将数据写入目标的时间  
    "type": "DDL"                 
}