本文介绍写入Kafka消息的消息结构及各字段含义。
背景信息
同步整库数据至kafka任务,是将从上游数据源读取的数据,按照下面描述的JSON格式写入到Kafka的topic。消息总体格式包括变更记录的列信息、以及数据变更前后的状态信息等。为确保消费Kafka中数据时能够准确判断同步任务进度,同步任务还将定时产生op字段作为MHEARTBEAT的同步任务心跳记录写入Kafka的topic中。以下为您介绍写入Kafka的消息总体格式、同步任务心跳消息格式及源端更改数据对应的消息格式,关于字段类型及参数说明等信息,详情请参见字段类型和参数说明。
消息总体格式
写入Kafka消息的总体格式如下所示:
{
"schema": { //变更的元数据信息,仅指定列名与列类型信息
"dataColumn": [//变更的数据列信息,更新目标表记录内容
{
"name": "id",
"type": "LONG"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "binData",
"type": "BYTES"
},
{
"name": "ts",
"type": "DATE"
},
{
"name":"rowid",// 数据源为Oracle时,rowid会放在数据列中
"type":"STRING"
}
],
"primaryKey": [
"pkName1",
"pkName2"
],
"source": {
"dbType": "mysql",
"dbVersion": "1.0.0",
"dbName": "myDatabase",
"schemaName": "mySchema",
"tableName": "tableName"
}
},
"payload": {
"before": {
"dataColumn":{
"id": 111,
"name":"scooter",
"binData": "[base64 string]",
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"//字符串类型,Oracle的rowid信息
}
},
"after": {
"dataColumn":{
"id": 222,
"name":"donald",
"binData": "[base64 string]",
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"//字符串类型,Oracle的rowid信息
}
},
"sequenceId":"XXX",//字符串类型,用于增全量数据合并的数据排序,
"scn":"xxxx",//字符串类型,Oracle的scn信息
"op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",//大小写敏感,
"timestamp": {
"eventTime": 1,//必选,记录源端库发生变更的时间,毫秒精度的13位时间戳
"systemTime": 2,//可选,同步任务处理该条变更消息的时间,毫秒精度的13位时间戳
"checkpointTime": 3//可选,重置同步位点时的设置时间,毫秒精度的13位时间戳,一般等于eventTime
},
"ddl": {
"text": "ADD COLUMN ...",
"ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]"
}
},
"version":"1.0.0"
}
同步任务心跳消息格式
{
"schema": {
"dataColumn": null,
"primaryKey": null,
"source": null
},
"payload": {
"before": null,
"after": null,
"sequenceId": null,
"timestamp": {
"eventTime": 1620457659000,
"checkpointTime": 1620457659000
},
"op": "MHEARTBEAT",
"ddl": null
},
"version": "0.0.1"
}
源端更改数据对应的消息格式
源端插入数据对应的Kafka消息格式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000000", "timestamp": { "eventTime": 1620457896000, "systemTime": 1620457896977, "checkpointTime": 1620457896000 }, "op": "INSERT", "ddl": null }, "version": "0.0.1" }
源端更新数据对应的Kafka消息格式:
当未勾选源端update变更对应一条Kafka记录时,源端更新数据对应的Kafka消息格式包含两条Kafka消息,分别描述更新前的数据状态和更新后的数据状态。具体消息格式如下:
更新前的数据状态消息格式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_BEFOR", "ddl": null }, "version": "0.0.1" }
更新后的数据状态消息格式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" }
当勾选了源端update变更对应一条Kafka记录时,源端更新数据对应的Kafka消息格式包含一条Kafka消息,描述更新前的数据状态和更新后的数据状态。具体消息格式如下:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" }
源端删除数据对应的Kafka消息格式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000002", "timestamp": { "eventTime": 1620458266000, "systemTime": 1620458266101, "checkpointTime": 1620458266000 }, "op": "DELETE", "ddl": null }, "version": "0.0.1" }
字段类型
写入Kafka topic中的消息将从源端读取数据映射为BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING六种类型,再以不同的JSON格式写入kafka topic中。
类型 | 说明 |
BOOLEAN | 对应JSON中的布尔类型,取值为true,false |
DATE | 对应JSON中的数值类型,取值为13位数字时间戳,精确到毫秒(ms)级。 |
BYTES | 对应JSON中的字符串类型,写入Kafka前会先对字节数组进行base64编码转换为字符串,消费时需要进行base64解码(编码Base64.getEncoder().encodeToString(text.getBytes("UTF-8"));解码Base64.getDecoder().decode(encodedText))。 |
STRING | 对应JSON中的字符串类型 |
LONG | 对应JSON中的数值类型 |
DOUBLE | 对应JSON中的数值类型 |
参数说明
以下为您介绍写入Kafka的消息中的各个字段的含义及说明。
一级元素 | 二级元素 | 说明 |
schema | dataColumn | JSONArray类型,数据列的类型信息。dataColumn记录上游数据变更记录的所有列和对应的列类型信息。变更操作包括数据库对数据的更改(新增、删除及修改)和数据库表结构等变更。
|
primaryKey | List类型,主键信息。 pk:主键名。 | |
source | Object 类型,源端数据库或表信息。
| |
payload | before | JSONObject类型,修改前的数据。例如:数据源端为mysql,做了一次记录的update操作,before字段存储记录被update之前的数据内容。
|
after | 修改后的数据。格式同before相同。 | |
sequenceId | 字符串类型,Streamx产生,用于增量数据和全量数据合并的数据排序,每个streamx record都是唯一的。 说明 对于从源端读取的更新操作消息,会生成两条写入记录,一条update before记录和一条update after记录,这两条记录的sequenceId相同。 | |
scn | 当源端为Oracle数据库时有效,对应Oracle的scn信息。 | |
op | 对应源端读取到的消息类型,取值如下:
| |
timestamp | JSONObject 类型,本条数据的相关时间戳。
| |
ddl | 该字段只在更改数据库的表结构时才会填充数据,更改数据(包括新增、删除和修改)时对应的ddl直接填充为null。
| |
version | 无 | 格式的版本号。 |