附录:消息格式

本文介绍写入Kafka消息的消息结构及各字段含义。

背景信息

同步整库数据至kafka任务,是将从上游数据源读取的数据,按照下面描述的JSON格式写入到Kafkatopic。消息总体格式包括变更记录的列信息、以及数据变更前后的状态信息等。为确保消费Kafka中数据时能够准确判断同步任务进度,同步任务还将定时产生op字段作为MHEARTBEAT的同步任务心跳记录写入Kafkatopic中。以下为您介绍写入Kafka消息总体格式同步任务心跳消息格式源端更改数据对应的消息格式,关于字段类型及参数说明等信息,详情请参见字段类型参数说明

消息总体格式

写入Kafka消息的总体格式如下所示:

{
    "schema": { //变更的元数据信息,仅指定列名与列类型信息
        "dataColumn": [//变更的数据列信息,更新目标表记录内容
            {
                "name": "id",
                "type": "LONG"
            },
            {
                "name": "name",
                "type": "STRING"
            },
            {
                "name": "binData",
                "type": "BYTES"
            },
            {
                "name": "ts",
                "type": "DATE"
            },
            {
              "name":"rowid",// 数据源为Oracle时,rowid会放在数据列中
              "type":"STRING"
            }
        ],
        "primaryKey": [
            "pkName1",
            "pkName2"
        ],
        "source": {
            "dbType": "mysql",
            "dbVersion": "1.0.0",
            "dbName": "myDatabase",
            "schemaName": "mySchema",
            "tableName": "tableName"
        }
    },
    "payload": {
        "before": {
            "dataColumn":{
                "id": 111,
                "name":"scooter",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"//字符串类型,Oracle的rowid信息
            }
        },
        "after": {
            "dataColumn":{
                "id": 222,
                "name":"donald",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"//字符串类型,Oracle的rowid信息
            }
        },
        "sequenceId":"XXX",//字符串类型,用于增全量数据合并的数据排序,
        "scn":"xxxx",//字符串类型,Oracle的scn信息
        "op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",//大小写敏感,
        "timestamp": {
            "eventTime": 1,//必选,记录源端库发生变更的时间,毫秒精度的13位时间戳
            "systemTime": 2,//可选,同步任务处理该条变更消息的时间,毫秒精度的13位时间戳
            "checkpointTime": 3//可选,重置同步位点时的设置时间,毫秒精度的13位时间戳,一般等于eventTime
        },
        "ddl": {
            "text": "ADD COLUMN ...",
            "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]"
        }
    },
    "version":"1.0.0"
}
说明

关于字段类型及参数说明等信息,详情请参见字段类型参数说明

同步任务心跳消息格式

{
    "schema": {
        "dataColumn": null,
        "primaryKey": null,
        "source": null
    },
    "payload": {
        "before": null,
        "after": null,
        "sequenceId": null,
        "timestamp": {
            "eventTime": 1620457659000,
            "checkpointTime": 1620457659000
        },
        "op": "MHEARTBEAT",
        "ddl": null
    },
    "version": "0.0.1"
}
说明

关于字段类型及参数说明等信息,详情请参见字段类型参数说明

源端更改数据对应的消息格式

  • 源端插入数据对应的Kafka消息格式:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": null,
            "after": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "man",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "sequenceId": "1620457642589000000",
            "timestamp": {
                "eventTime": 1620457896000,
                "systemTime": 1620457896977,
                "checkpointTime": 1620457896000
            },
            "op": "INSERT",
            "ddl": null
        },
        "version": "0.0.1"
    }
  • 源端更新数据对应的Kafka消息格式:

    • 当未勾选源端update变更对应一条Kafka记录时,源端更新数据对应的Kafka消息格式包含两条Kafka消息,分别描述更新前的数据状态和更新后的数据状态。具体消息格式如下:

      更新前的数据状态消息格式:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": null,
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_BEFOR",
              "ddl": null
          },
          "version": "0.0.1"
      }

      更新后的数据状态消息格式:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": null,
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
    • 当勾选了源端update变更对应一条Kafka记录时,源端更新数据对应的Kafka消息格式包含一条Kafka消息,描述更新前的数据状态和更新后的数据状态。具体消息格式如下:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
  • 源端删除数据对应的Kafka消息格式:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "woman",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "after": null,
            "sequenceId": "1620457642589000002",
            "timestamp": {
                "eventTime": 1620458266000,
                "systemTime": 1620458266101,
                "checkpointTime": 1620458266000
            },
            "op": "DELETE",
            "ddl": null
        },
        "version": "0.0.1"
    }
说明

关于字段类型及参数说明等信息,详情请参见字段类型参数说明

字段类型

写入Kafka topic中的消息将从源端读取数据映射为BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING六种类型,再以不同的JSON格式写入kafka topic中。

类型

说明

BOOLEAN

对应JSON中的布尔类型,取值为true,false

DATE

对应JSON中的数值类型,取值为13位数字时间戳,精确到毫秒(ms)级。

BYTES

对应JSON中的字符串类型,写入Kafka前会先对字节数组进行base64编码转换为字符串,消费时需要进行base64解码(编码Base64.getEncoder().encodeToString(text.getBytes("UTF-8"));解码Base64.getDecoder().decode(encodedText))

STRING

对应JSON中的字符串类型

LONG

对应JSON中的数值类型

DOUBLE

对应JSON中的数值类型

参数说明

以下为您介绍写入Kafka的消息中的各个字段的含义及说明。

一级元素

二级元素

说明

schema

dataColumn

JSONArray类型,数据列的类型信息。dataColumn记录上游数据变更记录的所有列和对应的列类型信息。变更操作包括数据库对数据的更改(新增、删除及修改)和数据库表结构等变更。

  • name:列名

  • type:列类型

primaryKey

List类型,主键信息。

pk:主键名。

source

Object 类型,源端数据库或表信息。

  • dbType:String类型,数据库类型

  • dbVersion:String类型,数据库版本

  • dbName:String类型,数据库名

  • schemaName:String类型,Schema名(针对PostgresSQL Server等)

  • tableName:String 类型,数据表名

payload

before

JSONObject类型,修改前的数据。例如:数据源端为mysql,做了一次记录的update操作,before字段存储记录被update之前的数据内容。

  • 在从源端读取到更新、删除操作消息时,在写入记录中填充该字段。

  • dataColumn:JSONObject类型,表示数据信息。格式为列名:列值, 列名为字符串,列值BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING。

after

修改后的数据。格式同before相同。

sequenceId

字符串类型,Streamx产生,用于增量数据和全量数据合并的数据排序,每个streamx record都是唯一的。

说明

对于从源端读取的更新操作消息,会生成两条写入记录,一条update before记录和一条update after记录,这两条记录的sequenceId相同。

scn

当源端为Oracle数据库时有效,对应Oraclescn信息。

op

对应源端读取到的消息类型,取值如下:

  • INSERT:数据插入

  • UPDATE_BEFOR:数据更新前

  • UPDATE_AFTER:数据更新后

  • DELETE:数据删除

  • TRANSACTION_BEGIN:数据库事务开始

  • TRANSACTION_END:数据库事务结束

  • CREATE:数据库建表

  • ALTER:数据库表变更

  • QUERY:数据库变更的原始SQL

  • TRUNCATE:数据库表清空

  • RENAME:数据库表重命名

  • CINDEX:创建索引

  • DINDEX:删除索引

  • MHEARTBEAT:用于在源端无新增数据时标识同步仍正常进行的心跳消息

timestamp

JSONObject 类型,本条数据的相关时间戳。

  • eventTime:Long类型,记录源端库发生变更的时间,毫秒精度的13位时间戳。

  • systemTime:Long类型,同步任务处理该条变更消息的时间,毫秒精度的13位时间戳。

  • checkpointTime:Long类型,重置同步位点时的设置时间,毫秒精度的13位时间戳,一般与eventTime值一致。

ddl

该字段只在更改数据库的表结构时才会填充数据,更改数据(包括新增、删除和修改)时对应的ddl直接填充为null。

  • text:String类型,数据库DDL语句文本。

  • ddlMeta:String类型,将数据库ddl类型变更记录到一个Java对象,使用对象序列化后再进行base64编码得到的字符串。

version

格式的版本号。