This topic describes the structure and fields of messages written to Kafka.
Background information
When you synchronize a full database to Kafka, the synchronization task reads data from an upstream data source and writes it to a Kafka topic in the JSON format described in this topic. The overall message format includes column information for the change record and the state of the data before and after the change. To allow consumers to track task progress, the synchronization task also periodically sends a heartbeat record to the Kafka topic. This heartbeat record has an op field with the value MHEARTBEAT. This topic describes the overall message format, the heartbeat message format, and the message formats for source data changes. For more information, see Field types and Parameters.
Message format for real-time single-table output
When you configure a Kafka destination for a real-time single-table synchronization task, you must specify the value format for the records written to Kafka. Supported formats include Canal CDC and JSON. For more details, see Appendix: Output format description.
Field types
The system reads data from the source, maps it to one of six types (BOOLEAN, DOUBLE, DATE, BYTES, LONG, or STRING), and writes it to a Kafka topic in JSON format.
|
Type |
Description |
|
BOOLEAN |
Corresponds to the JSON boolean type. Valid values are |
|
DATE |
Corresponds to the JSON number type. The value is a 13-digit Unix timestamp with millisecond (ms) precision. |
|
BYTES |
This corresponds to the string type in JSON. Before being written to Kafka, the byte array is Base64-encoded into a string. When consumed, the data must be Base64-decoded (Encoding: Base64.getEncoder().encodeToString(text.getBytes("UTF-8")). Decoding: Base64.getDecoder().decode(encodedText)). |
|
STRING |
Corresponds to the JSON string type. |
|
LONG |
Corresponds to the JSON number type. |
|
DOUBLE |
Corresponds to the JSON number type. |
Parameters
The following table describes each field in a message written to Kafka.
|
Level-1 field |
Level-2 field |
Description |
|
schema |
dataColumn |
The data type is JSONArray. The dataColumn parameter records the type information for all columns from upstream data change records. Change operations include changes to data in the database, such as additions, deletions, and updates, and changes to the database table schema.
|
|
primaryKey |
A list of primary key column names. |
|
|
source |
An object containing information about the source database or table.
|
|
|
payload |
before |
A JSONObject representing the data record before the change, also known as the "before image." For example, if a record in a MySQL data source is updated, the
|
|
after |
The data after the change, also known as the "after image." The format is the same as the |
|
|
sequenceId |
A unique string generated by StreamX for each record. It is used to sort data when merging full data and incremental data. Note
An update operation from the source generates two records: an "update before" record and an "update after" record. Both records share the same |
|
|
scn |
The System Change Number (SCN) from the source. This field applies only to Oracle sources. |
|
|
op |
The type of operation captured from the source. Valid values:
|
|
|
timestamp |
A JSONObject containing timestamps related to the data record.
|
|
|
ddl |
This field is populated with data only when the table schema of a database is changed. For data changes, such as additions, deletions, and modifications, the corresponding ddl is directly set to null.
|
|
|
version |
N/A |
The version of the message format. |
Overall message format
The following code shows the overall format of a Kafka message:
{
"schema": { // Metadata about the change, including column names and types.
"dataColumn": [// Information about the changed data columns, used to update records in the destination.
{
"name": "id",
"type": "LONG"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "binData",
"type": "BYTES"
},
{
"name": "ts",
"type": "DATE"
},
{
"name":"rowid",// If the data source is Oracle, the rowid is included in the data columns.
"type":"STRING"
}
],
"primaryKey": [
"pkName1",
"pkName2"
],
"source": {
"dbType": "mysql",
"dbVersion": "1.0.0",
"dbName": "myDatabase",
"schemaName": "mySchema",
"tableName": "tableName"
}
},
"payload": {
"before": {
"dataColumn":{
"id": 111,
"name":"scooter",
"binData": "[base64 string]",
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"// String type. The rowid from the Oracle source.
}
},
"after": {
"dataColumn":{
"id": 222,
"name":"donald",
"binData": "[base64 string]",
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"// String type. The rowid from the Oracle source.
}
},
"sequenceId":"XXX",// String type. Used to sort data when merging full and incremental data.
"scn":"xxxx",// String type. The System Change Number (SCN) from the Oracle source.
"op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",// The value is case-sensitive.
"timestamp": {
"eventTime": 1,// Required. The time of the change event in the source database. A 13-digit timestamp with millisecond precision.
"systemTime": 2,// Optional. The time when the synchronization task processed this change message. A 13-digit timestamp with millisecond precision.
"checkpointTime": 3// Optional. The timestamp used when resetting the synchronization offset. Typically the same as eventTime.
},
"ddl": {
"text": "ADD COLUMN ...",
"ddlMeta": "[Base64-encoded string of the serialized SQLStatement object]"
}
},
"version":"1.0.0"
}
For more information about field types and parameters, see Field types and Parameters.
Heartbeat message format
{
"schema": {
"dataColumn": null,
"primaryKey": null,
"source": null
},
"payload": {
"before": null,
"after": null,
"sequenceId": null,
"timestamp": {
"eventTime": 1620457659000,
"checkpointTime": 1620457659000
},
"op": "MHEARTBEAT",
"ddl": null
},
"version": "0.0.1"
}
For more information about field types and parameters, see Field types and Parameters.
Message formats for source data changes
-
Message format for an insert operation:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000000", "timestamp": { "eventTime": 1620457896000, "systemTime": 1620457896977, "checkpointTime": 1620457896000 }, "op": "INSERT", "ddl": null }, "version": "0.0.1" } -
Message format for an update operation:
-
If the "When one record in the source is updated, one Kafka record is generated." option is not selected, an update operation generates two Kafka messages: one for the data state before the update (the "before image") and one for the state after the update (the "after image"). The following sample code shows the formats:
Message format for the before image:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_BEFOR", "ddl": null }, "version": "0.0.1" }Message format for the after image:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" } -
If the "When one record in the source is updated, one Kafka record is generated." option is selected, an update operation generates a single Kafka message that contains both the before and after images of the data. The following sample code shows the format:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" }
-
-
Message format for a delete operation:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000002", "timestamp": { "eventTime": 1620458266000, "systemTime": 1620458266101, "checkpointTime": 1620458266000 }, "op": "DELETE", "ddl": null }, "version": "0.0.1" }
For more information about field types and parameters, see Field types and Parameters.