本文为您介绍序列化方式和数据库传输到文本协议的数据格式。
序列化方式的格式说明
您在使用数据传输同步源端数据至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,支持序列化方式控制数据同步至目标端的消息格式。序列化方式包括 Default、Canal、 DataWorks(支持 2.0 版本)、SharePlex、DefaultExtendColumnType、Debezium、DebeziumFlatten、DebeziumSmt 和 Avro。
目前仅 OceanBase 数据库 MySQL 租户支持序列化方式 Debezium、DebeziumFlatten 和 DebeziumSmt。
目前仅同步 OceanBase 数据库 MySQL 租户的数据至 Kafka 时,支持序列化方式 Avro。
Default JSON 消息格式
数据同步至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 Default 使用如下 JSON 消息格式。
{
"prevStruct": { // 变更前镜像
"col1": "val1" // 键值对,包含全量键值
},
"postStruct": { // 变更后镜像
"col1": "val1" // 键值对,包含全量键值
},
"allMetaData" {
"checkpoint": "STRING", // 当前同步位点,增量阶段表示同步到的时间位点(秒级时间戳),全量阶段使用主键键值对表示
"record_primary_key": "STRING", // 主键列的名称。如果存在多列使用 \u0001 分割
"record_primary_value" "STRING", // 主键值。如果存在多列使用 \u0001 分割
"source_identity": "STRING", // 源端标识,如果是增量则是 subtopic,如果是全量则没有意义的序号
"dbType": "STRING", // 数据库的类型。包括 MYSQL/ORACLE/OCEANBASE(老模式,兼容使用)/OB_IN_ORACLE_MODE(老模式,兼容使用)/DB2(老模式,兼容使用)/OB_MYSQL/OB_ORACLE/DB2_LUW
"storeDataSequence": LONG, // 该字段只有在增量场景下 source.json 配置中包含 sequenceEnabled=true 才存在,默认是 true。用于排序,生成规则是一个同步进程中,时间戳 + 不超过五位序号递增。{时间戳}{递增序号}。
"table_name": "STRING", // 使用 SQL 语句进行变更的表的名称
"db": "STRING", // 使用 SQL 语句进行变更的数据库的名称。如果是 OceanBase 数据库,则包含租户,格式为 {tenant}.{database}
"timestamp": "STRING", // 数据变更秒级时间戳,仅增量存在
"uniqueId": "STRING", // 增量中表示 STORE 传递下来的事务序号标识,
"ddlType": "STRING", // DDL 具体类型,OMS 4.0.0 版本新增支持
},
"recordType": "INSERT/UPDATE/DELETE/HEARTBEAT/DDL" // 变更类型
}
DDL 的 Record 中,仅存在 "ddl" 为列名的键,值为 DDL 语句。
前镜像和后镜像:
prevStruct
:表示增量数据的前镜像信息,即 SQL 执行前的数据。postStruct
:表示增量数据的后镜像信息,即 SQL 执行后的数据。
DELETE
仅存在prevStruct
,INSERT
和DDL
仅存在postStruct
,UPDATE
同时存在prevStruct
和postStruct
,HEARTBEAT
(定期心跳消息)不存在postStruct
和postStruct。
数据示例如下:
INSERT(插入)数据的示例
{ "allMetaData":{ "checkpoint":null, "record_primary_key":"id1\u0001id2", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table_name", "db":"tenant.database", "timestamp":"1609344671" }, "prevStruct":null, "recordType":"INSERT", "postStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.999999, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", } }
UPDATE(更新)数据的示例
{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType":"OB_MYSQL", "table_name": "table_name", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", } }
DELETE(删除)数据的示例
{ "allMetaData":{ "checkpoint":null, "record_primary_key":"id1\u0001id2", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table_name", "db":"tenant.database", "timestamp":"1609344671" }, "prevStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col16":1.2222, "col7":9.99999999, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.999999999, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345" }, "recordType":"DELETE", "postStruct":null }
DDL 示例
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "prevStruct" : null, "postStruct" : { "ddl" : "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }, "allMetaData" : { "checkpoint" : "1671177057", "dbType" : "OB_MYSQL", "storeDataSequence" : null, "db" : "connector_test", "timestamp" : "1671177057", "uniqueId" : null, "ddlType" : "ALTER_TABLE", "record_primary_key" : null, "source_identity" : null, "record_primary_value" : null, "table_name" : "all_mysql_type_test" }, "recordType" : "DDL" }
Canal JSON 消息格式
数据同步至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 Canal 使用如下 JSON 消息格式。
{
"database": "STRING", // 使用 SQL 语句进行变更的数据库的名称。如果是 OceanBase 数据库,仅存在数据库名称,无需租户名称。
"sqlType": {
"col1": INTEGER, // 变更列类型,数字参考 java.sql.Types
},
"data": [ // 变更后数据键值对,目前只会存在一条消息
{
"col1": "val1"
}
],
"pkNames": [ // 主键列名
"col1"
],
"old": [ // 仅 UPDATE 消息存在。表示 UPDATE 语句变更的列,即变更前的列值
{
"col1": "val1"
}
],
"mysqlType": { // 列类型描述
"col": "STRING"
},
"type": "STRING", // 变更类型
"table": "STRING", // 使用 SQL 语句进行变更的表的名称
"es": LONG, // 变更时间,毫秒级时间戳
"isDdl": BOOLEAN, // 是否是 DDL
"ts": LONG, // 写入目的端时间戳
"sql": "STRING", // 当前是空
}
数据示例如下:
INSERT
(插入)数据的示例{ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"INSERT", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618323429026, "sql":"" }
UPDATE
(更新)数据的示例{ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world 2020", "col3":1.2222, "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":[ { "string":"hello world" } ], "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"UPDATE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364572908, "sql":"" }
DELETE
(删除)数据的示例{ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "int8", "int16" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"DELETE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364660278, "sql":"" }
DDL 示例
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "database" : "connector_test", "sqlType" : null, "data" : null, "pkNames" : null, "old" : null, "mysqlType" : null, "type" : "ALTER", "table" : "all_mysql_type_test", "es" : 1671177209000, "isDdl" : true, "ts" : 1671177291475, "sql" : "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }
DataWorks JSON 消息格式
数据同步至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 DataWorks 使用如下 JSON 消息格式。
{
"version":"2.0", //协议版本,目前仅支持 DataWorks 2.0 版本
"schema": { //变更的元数据信息,仅指定列名与列类型信息
"source": {//变更来源信息
"dbType": "mysql", //数据源类型
"dbVersion": "5.7.35", //数据库版本
"dbName": "myDatabase", //数据库名称
"schema": "mySchema", //Schema 名称,存在 Schema 的系统必填
"table": "tableName" //表名
}
"column": [//变更的数据列信息,更新目标表记录内容
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "varchar(20)"
},
{
"name": "mydata",
"type": "binary"
},
{
"name": "ts",
"type": "datetime"
}
],
"pk": [//有主键或唯⼀键必填,否则可以不填
"pkName1",
"pkName2"
]
},
"payload": {
"before": {
"data":{
"id": 111,
"name":"scooter",
"mydata": "[base64 string]", //如果是二进制类型,需要进行 Base64 编码
"ts": 1590315269000.123456789 //时间戳,其整数部分 13 位,小数部分 9 位
}
},
"after": {
"data":{
"id": 222,
"name":"donald",
"mydata": "[base64 string]",
"ts": 1590315269000
}
},
"op":"INSERT/UPDATE/DELETE/HEARTBEAT/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTE
R/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/...",//大小写敏感
"timestamp": {
"eventTime": 1620457659000 // 变更在源端库发生时间,毫秒精度的 13 位时间戳
},
"ddl": {
"text": "ADD COLUMN ..."
},
"scn": "⾃增 ID"
},
"extend": { //extend 扩展字段,用于后续扩展需求。如果没有,可以不填
"load_fm":"CIBS", //记录来源系统。例如,"CIBS"
}
}
同步任务心跳消息:
{
"version":"2.0", //协议版本
"payload": {
"timestamp": {
"eventTime": 1620457659000 //⼼跳包时间
},
"op": "HEARTBEAT" //标识是⼼跳包
}
}
数据示例如下:
INSERT
(插入)数据的示例{ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName12" ] }, "payload":{ "before":null, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"INSERT", "timestamp":{ "eventTime":1647581000000, "systemTime":1647581000795, "checkpointTime":1647581000 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }
UPDATE
(更新)数据的示例{ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world 2020", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"UPDATE", "timestamp":{ "eventTime":1647581038000, "systemTime":1647581038674, "checkpointTime":1647581038 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }
DELETE
(删除)数据的示例{ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":null, "op":"DELETE", "timestamp":{ "eventTime":1647581072000, "systemTime":1647581072976, "checkpointTime":1647581072 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }
DDL 示例
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "version" : "2.0", "schema" : { "source" : { "dbType" : "ob_mysql", "dbVersion" : null, "dbName" : "connector_test", "schema" : null, "table" : "all_mysql_type_test" }, "column" : null, "pk" : null }, "payload" : { "before" : null, "after" : null, "op" : "ALTER", "timestamp" : { "eventTime" : 1671177209000, "systemTime" : 1671177291485, "checkpointTime" : 1671177200 }, "ddl" : { "text" : "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }, "scn" : "null" }, "extend" : { } }
SharePlex JSON 消息格式
数据同步至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 SharePlex 使用如下 JSON 消息格式。
{
"data": { // 变更数据键值对,如果是 INSERT / DELETE 是全量值,如果是 UPDETE 只有变更值
"col1": "val1"
},
"meta": {
"time": "YYYY-MM-DDTHH:mm:ss", // 变更时间
"op": "", // 变更类型,包括 ins/upd/del/ddl
"posttime": "YYYY-MM-DDTHH:mm:ss", // 写入目标端的时间
"idx":"STRING", //消息在事务中的索引/索引的消息数量。该参数已废弃。
"size": NUMBER, //事务内消息数量。该参数已废弃。
"seq": "STRING", // 排序序号,需要配合源端打开 transactionEnabled 才能存在
"table": "STRING", // SQL 变更库表名 {database}.{table}
"rowid": "STRING", // {变更库表名}-{主键值使用\u0001} 分割
"trans": "STRING", // 事务 ID
"scn": "STRING", // 该字段只有在增量场景下 source.json 配置中包含 sequenceEnabled=true 才存在,默认是 true。用于排序,生成规则是一个同步进程中,时间戳 + 不超过五位序号递增。
},
"key": { // 仅 UPDATE 存在,表示变更前的值
}
}
数据示例如下:
INSERT
(插入)数据的示例{ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:22:00", "op":"ins", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3129", "trans":"shareplex_transaction_id", "scn":"123456789" } }
UPDATE
(更新)数据的示例{ "data":{ "string":"hello world 2020" }, "meta":{ "posttime":"2020-12-07T13:59:09", "op":"upd", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" }, "key":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17":10223372036854775806, "col18":"1606233662.012345" } }
DELETE
(删除)数据的示例{ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:34:10", "op":"del", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" } }
DDL 示例
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "data" : {}, "meta" : { "posttime" : "2022-12-16T15:54:51", "op" : "ddl", "size" : 0, "time" : "2022-12-16T15:53:29", "idx" : "0/0", "seq" : 0, "table" : "connector_test.all_mysql_type_test", "rowid" : "connector_test.all_mysql_type_test-", "trans" : null, "scn" : "null" }, "sql" : { "ddl" : "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" } }
DefaultExtendColumnType JSON 消息格式
数据同步至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 DefaultExtendColumnType 使用如下 JSON 消息格式。
DefaultExtendColumnType JSON 消息格式会在 DEFAULT
的基础上,在镜像内增加一个字段 __light_type
,用于表示字段的数据类型。
{
"prevStruct": { // 变更前镜像
},
"postStruct": { // 变更后镜像
"__light_type": {
"col":{ // 字段的名称
"schemaType":"type" // 值的类型
}
}
},
"allMetaData" {},
}
数据示例如下:
INSERT
(插入)数据的示例{ "allMetaData":{ "checkpoint":null, "record_primary_key":"id1\u0001id2", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table", "db":"database", "timestamp":"1609344671" }, "prevStruct":null, "recordType":"INSERT", "postStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long":{ "schemaType":"TIMESTAMP" } } } }
UPDATE
(更新)数据的示例{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType":"OB_MYSQL", "table_name": "table", "db": "database", "timestamp": "1609344671" }, "prevStruct": { "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType": "UPDATE", "postStruct": { "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world 2020", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } } }
DELETE
(删除)数据的示例{ "allMetaData":{ "checkpoint":null, "record_primary_key":"id1\u0001id2", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table", "db":"database", "timestamp":"1609344671" }, "prevStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType":"DELETE", "postStruct":null }
DDL 示例
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "prevStruct" : null, "postStruct" : { "ddl" : "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'", "__light_type" : { "ddl" : { "schemaType" : "VAR_STRING" } } }, "allMetaData" : { "checkpoint" : "1671177200", "dbType" : "OB_MYSQL", "storeDataSequence" : null, "db" : "connector_test", "timestamp" : "1671177209", "uniqueId" : null, "ddlType" : "ALTER_TABLE", "record_primary_key" : null, "source_identity" : null, "record_primary_value" : null, "table_name" : "all_mysql_type_test" }, "recordType" : "DDL" }
Debezium JSON 消息格式
同步 OceanBase 数据库 MySQL 租户的数据至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 Debezium 使用如下 JSON 消息格式,共包含两种,通常默认仅显示 payload
中的结构。
存在
schema
和payload
{ "schema":{//描述 payload 字段信息的结构体,默认没有该结构体 "type": "struct",//struct 表示该字段内部还有结构 "optional": false,//是否必须包含该字段 "fields": [ { "type": "int64",//字段的类型 "optional": false,//是否必须包含该字段 "field": "ts_ms"//字段的名称 } ... ] }, "payload":{ "op":"c", //数据修改类型,包括 c(全量、插入)、u(更新)、d(删除)和 HEARTBEAT(心跳消息) "source":{ "version":"" //OMS 的版本 "connector":"OB_MYSQL", //数据源的类型 "name":"OMS", //固定值 OMS "ts_ms":0, //数据变更秒级时间戳,仅增量存在 "db":"test", //使用 SQL 语句进行变更的数据库的名称。如果是 OceanBase 数据库,仅存在数据库名称,无需租户名称 "table":"testTab" //使用 SQL 语句进行变更的表的名称 "pos":"553132@1668496109" //在 binlog 文件中的位置 [binlog 文件名]@[binlog 文件名 offset] }, "before":{ //变更前镜像 "column":"value" //键值对,包含全量键值 } "after":{ //变更后镜像 "column":"value" // 键值对,包含全量键值 }, "ts_ms":1668497367188 //数据处理时间戳 } }
仅存在
payload
{ "payload":{ "op":"c", //数据修改类型,包括 c(全量、插入)、u(更新)、d(删除)和 HEARTBEAT(心跳消息) "source":{ "version":"" //OMS 的版本 "connector":"OB_MYSQL", //数据源的类型 "name":"OMS", //固定值 OMS "ts_ms":0, //数据变更秒级时间戳,仅增量存在 "db":"test", //使用 SQL 语句进行变更的数据库的名称。如果是 OceanBase 数据库,仅存在数据库名称,无需租户名称 "table":"testTab" //使用 SQL 语句进行变更的表的名称 "pos":"553132@16684****" //在 binlog 文件中的位置 [binlog 文件名]@[binlog 文件名 offset] }, "before":{ //变更前镜像 "column":"value" //键值对,包含全量键值 } "after":{ //变更后镜像 "column":"value" // 键值对,包含全量键值 }, "ts_ms":1668497367188 //数据处理时间戳 } }
数据示例如下:
INSERT
(插入)数据的示例{ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 } }
UPDATE
(更新)数据的示例{ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 } }
DELETE
(删除)数据的示例{ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 } }
DebeziumFlatten JSON 消息格式
同步 OceanBase 数据库 MySQL 租户的数据至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 DebeziumFlatten 的 JSON 消息格式如下所示,和序列化方式 Debezium 相比,不再填充 schema
和 payload
。
{
"op":"c", //数据修改类型,包括 c(全量、插入)、u(更新)、d(删除)和 HEARTBEAT(心跳消息)
"source":{
"version":"" //OMS 的版本
"connector":"OB_MYSQL", //数据源的类型
"name":"OMS", //固定值 OMS
"ts_ms":0, //数据变更秒级时间戳,仅增量存在
"db":"test", //使用 SQL 语句进行变更的数据库的名称。如果是 OceanBase 数据库,仅存在数据库名称,无需租户名称
"table":"testTab" //使用 SQL 语句进行变更的表的名称
"pos":"553132@16684****" //在 binlog 文件中的位置 [binlog 文件名]@[binlog 文件名 offset]
},
"before":{ //变更前镜像
"column":"value" //键值对,包含全量键值
}
"after":{ //变更后镜像
"column":"value" // 键值对,包含全量键值
},
"ts_ms":1668497367188 //数据处理时间戳
}
}
数据示例如下:
INSERT
(插入)数据的示例{ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 }
UPDATE
(更新)数据的示例{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 }
DELETE
(删除)数据的示例{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 }
DebeziumSmt JSON 消息格式
DebeziumSmt 是 Debezium 提供的一种配置方式,使用事件扁平化单消息转换(Single Message Transform,SMT)对单条信息进行转换和处理。同步 OceanBase 数据库 MySQL 租户的数据至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 DebeziumSmt 的 JSON 消息格式仅显示 after
中的 key:value
。
例如,使用序列化方式 Debezium 更新数据:
{
"op": "u",
"source": {
"connector":"OB_MYSQL",
"name":"OMS",
...
},
"ts_ms" : 1668496119717,
"before" : {
"field1" : "before_value1",
"field2" : "before_value2"
},
"after" : {
"field1" : "after_value1",
"field2" : "after_value2"
}
}
SMT 对上述示例的消息进行处理后,简化了消息格式。即使用序列化方式 DebeziumSmt,JSON 消息格式如下所示。
{
"field1" : "after_value1",
"field2" : "after_value2"
}
数据示例如下:
INSERT
(插入)数据的示例{ "field1" : "after_value1", "field2" : "after_value2", "__deleted": "false" }
UPDATE
(更新)数据的示例{ "field1" : "after_value1", "field2" : "after_value2", "__deleted": "false" }
DELETE
(删除)数据的示例{ "field1" : "after_value1", "field2" : "after_value2", "__deleted": "true" }
Avro JSON 消息格式
同步 OceanBase 数据库 MySQL 租户的数据至 Kafka 时,序列化方式 Avro 使用如下 JSON 消息格式。
全量迁移
{ "version": 1, "id": 0, "sourceTimestamp": 1702371565, // 时间戳安全位点。 "sourcePosition": "", // 全量迁移无 position 等信息。 "safeSourcePosition": "", "sourceTxid": "", "source": { "sourceType": "MySQL", // 固定值 MySQL。 "version": "OBMySQL" // 固定值 OBMySQL。 }, }, "operation": "INIT", // 全量类型为 INIT。 "objectName": "test***", "processTimestamps": [1702371565238], // 只有投递时间。 "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" // 只有主键类型。 }, "fields": [ {"name": "id", "dataTypeNumber": 246}, // 每个列的类型。 {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15}, {"name": "address", "dataTypeNumber": 254} ], "beforeImages": null, // 全量迁移前镜像为空。 "afterImages": [ // 后镜像。INTEGER 类型的 precision 衡为 8,FLOAT 类型的 precision 衡为 8、scale 衡为 64。 {"value": "1", "precision": 1, "scale": 0}, {"precision": 8, "value": "11"}, {"charset": "utf8mb4", "value": {"bytes": "yyy"}}, null ] }
增量同步 DML
INSERT
(插入)数据的示例{ "version": 1, "id": 170236922143600000, "sourceTimestamp": 1702369092, "sourcePosition": "1702369080", // OceanBase 数据库 MySQL 租户的 checkpoint。 "safeSourcePosition": "1702369080", // OceanBase 数据库 MySQL 租户的 checkpoint。 "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "INSERT", "objectName": "test***", "processTimestamps": [1702369221480], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": null, // INSERT 前镜像为空。 "afterImages": [ {"precision": 8, "value": "2"}, {"precision": 8, "value": "12"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"} } ] }
UPDATE
(更新)数据的示例{ "version": 1, "id": 170236975822100001, "sourceTimestamp": 1702369757, "sourcePosition": "1702369756", "safeSourcePosition": "1702369756", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "UPDATE", "objectName": "test***", "processTimestamps": [1702369758237], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ // UPDATE 存在前镜像和后镜像。 {"precision": 8, "value": "3"}, {"precision": 8, "value": "22"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ] }
DELETE
(删除)数据的示例{ "version": 1, "id": 170236976527500000, "sourceTimestamp": 1702369764, "sourcePosition": "1702369763", "safeSourcePosition": "1702369763", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DELETE", "objectName": "test***", "processTimestamps": [1702369765287], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": null // DELETE 后镜像为空。 }
增量同步 DDL
{ "version": 1, "id": 170236979372400000, "sourceTimestamp": 1702369793, "sourcePosition": "1702369792", "safeSourcePosition": "1702369792", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DDL", "objectName": "test***", "processTimestamps": [1702369794543], "tags": {}, "fields": null, // 增量同步 DDL 无 fields 和 beforeImages。 "beforeImages": null, "afterImages": "alter table multi_db_multi_tbl add column address char(20) default null" // STRING 类型的 afterImages 为 DDL 语句。 }
数据库传输到文本协议的格式说明
同步 OceanBase 数据库的数据至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时:
如果序列化方式为 Default、Canal、DataWorks(支持 2.0 版本)、SharePlex 或 DefaultExtendColumnType,OceanBase 数据库两种租户对应的映射说明如下。
OceanBase 数据库 MySQL 租户
数据类型
映射类型
描述
TINYINT
SMALLINT
MEDIUMINT
INT
INTEGER
YEAR
BOOL
BOOLEAN
Long
64 位以下的整型。
正常数值,例如 1000,不使用科学计数法。
对于 BOOL/BOOLEAN,则 true = 1,false = 0。
DECIMAL
NUMERIC
BigDecimal
精确小数数值类型以及超过 64 位的整型。
对于整型数值不会展示小数点及小数。
对于存在小数的数值,会根据数据库传入的数据进行位数展示,不会去除末尾的 0,使用科学计数法。
FLOAT
DOUBLE
Double
浮点数
根据源端是 FLOAT 或 DOUBLE 类型决定有效位数。FLOAT 是 7 位有效位数,DOUBLE 是 16 位有效位数。
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
String
字符串。
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BINARY
VARBINARY
BIT
Bytes
字节数组,默认以 BASE64 编码展示。
说明对于 BIT 定长的类型,增量接收到字节数组之后会将高位 0 去除,但是全量不会,所以看到的 BASE64 编码可能会不一致。但是实际结果一致,解码之后结果一致。
DATE
Date
日期类型,格式为
YYYY-MM-DD
。 如果是非法时间,会显示原有字符串。TIME
Time
时间类型,格式为
HH:mm:ss[.nnnnnnnnn]
。低于秒级的时间最多展示 9 位。如果是低于秒级的时间,会显示出所有非 0 的数值。如果是非法时间,会显示原有字符串。
DATETIME
DateTime
日期时间类型,包括时区。格式为
YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId]
。低于秒级的时间最多展示 9 位。如果是低于秒级的时间,会显示出所有非 0 的数值。如果是非法时间,会显示原有字符串。
TIMESTAMP
Timestamp
时间戳类型,格式为
[秒级时间戳][.nnnnnnnnn]
。低于秒级的时间最多展示 9 位。如果是低于秒级的时间,会显示出所有非 0 的数值。如果是非法时间,会以 0000-00-00 00:00:00 格式显示。
OceanBase 数据库 Oracle 租户
数据类型
映射类型
描述
INTEGER
Long
64 位以下的整型。
正常数值,例如 1000,不使用科学计数法。
NUMBER
FLOAT
BigDecimal
精确小数数值类型以及超过 64 位的整型。
BINARY_FLOAT BINARY_DOUBLE
Double
浮点数
根据源端是 FLOAT 或 DOUBLE 类型决定有效位数。FLOAT 是 7 位有效位数,DOUBLE 是 16 位有效位数。
VARCHAR2
NVARCHAR2
INTERVAL YEAR TO MOTH
INTERVAL DAY TO SECOND
CLOB
NCLOB
ROWID
UROWID
String
字符串
BLOB
BFILE
RAW
Bytes
字节数组
默认以 BASE64 编码展示。
DATE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITH LOCAL TIME ZONE
DateTime
日期时间类型,包括时区。格式为
YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId]
。低于秒级的时间最多展示 9 位。如果是低于秒级的时间,会显示出所有非 0 的数值。如果是非法时间,会显示原有字符串。
如果序列化方式为 Debezium,OceanBase 数据库 MySQL 租户对应的映射说明如下。
重要同步 OceanBase 数据库 Oracle 租户的数据至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,不支持选择序列化方式 Debezium。
数据类型
映射类型
描述
BOOLEAN
BOOL
BOOLEAN
取值包括 true 和 false。
TINYINT
SMALLINT
MEDIUMINT
INT/INTEGER
BIGINT
YEAR
LONG
-263 ~ 263范围的整型。
BIGINT
STRING
使用字符串完整展示数据。
FLOAT
DOUBLE
DOUBLE
浮点数。
DECIMAL
NUMERIC
STRING
使用字符串完整展示数据。对于存在小数的数值,会根据数据库传入的数据进行位数展示,不会去除末尾的 0,使用科学计数法。
BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTES
字节数组,base16 编码。
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
STRING
字符串。
TIMESTAMP
STRING
格式为 YYYY-MM-DDTHH:mm:ss[.nnnnnnnnn]Z,时区为 0 时区。
DATE
LONG
表示自 1970-01-01 以来的天数。
TIME
LONG
表示自 00:00:00 以来的时间值(以微秒为单位),不包括时区信息。
DATETIME
LONG
表示自 1970-01-01 00:00:00 以来的毫秒数,不包括时区信息。
如果序列化方式为 Avro,OceanBase 数据库 MySQL 租户对应的映射说明如下。
重要仅同步 OceanBase 数据库 MySQL 租户的数据至 Kafka 时,支持选择序列化方式 Avro。
类型名称
映射类型
TINYINT
BOOLEAN
SMALLINT
MEDIUMINT
INT
BIGINT
BIT
INTEGER
FLOAT
DOUBLE
FLOAT
DECIMAL
NUMERIC
DECIMAL
VARCHAR
CHAR
TINYTEXT
MEDIUMTEXT
LONGTEXT
TEXT
CHARACTER
BINARY
VARBINARY
TINYBLOB
MEDIUMBLOB
LONGBLOB
BLOB
BinaryObject
TIMESTAMP
TimestampObject
说明对于 TIMESTAMP 类型,全量和增量均会转换至时间戳,非法时间为
-9223372022400L
。除非法时间外,您可以使用 Java 的
Instant.ofEpochSecond(ts, nanos)
方法获取正确的墙上时间。DATE
TIME
DATETIME
YEAR
DATETIME
JSON
ENUM
SET
TextObject
GEOMETRY
TextGeometry
说明目前数据传输使用 EWKT 格式透传,所以映射为 TextGeometry 类型。