Appendix: Message format

更新时间:
复制 MD 格式

This topic describes the structure and fields of messages written to Kafka.

Background information

When you synchronize a full database to Kafka, the synchronization task reads data from an upstream data source and writes it to a Kafka topic in the JSON format described in this topic. The overall message format includes column information for the change record and the state of the data before and after the change. To allow consumers to track task progress, the synchronization task also periodically sends a heartbeat record to the Kafka topic. This heartbeat record has an op field with the value MHEARTBEAT. This topic describes the overall message format, the heartbeat message format, and the message formats for source data changes. For more information, see Field types and Parameters.

Message format for real-time single-table output

When you configure a Kafka destination for a real-time single-table synchronization task, you must specify the value format for the records written to Kafka. Supported formats include Canal CDC and JSON. For more details, see Appendix: Output format description.

Field types

The system reads data from the source, maps it to one of six types (BOOLEAN, DOUBLE, DATE, BYTES, LONG, or STRING), and writes it to a Kafka topic in JSON format.

Type

Description

BOOLEAN

Corresponds to the JSON boolean type. Valid values are true and false.

DATE

Corresponds to the JSON number type. The value is a 13-digit Unix timestamp with millisecond (ms) precision.

BYTES

This corresponds to the string type in JSON. Before being written to Kafka, the byte array is Base64-encoded into a string. When consumed, the data must be Base64-decoded (Encoding: Base64.getEncoder().encodeToString(text.getBytes("UTF-8")). Decoding: Base64.getDecoder().decode(encodedText)).

STRING

Corresponds to the JSON string type.

LONG

Corresponds to the JSON number type.

DOUBLE

Corresponds to the JSON number type.

Parameters

The following table describes each field in a message written to Kafka.

Level-1 field

Level-2 field

Description

schema

dataColumn

The data type is JSONArray. The dataColumn parameter records the type information for all columns from upstream data change records. Change operations include changes to data in the database, such as additions, deletions, and updates, and changes to the database table schema.

  • name: The column name.

  • type: The column type.

primaryKey

A list of primary key column names.

source

An object containing information about the source database or table.

  • dbType: The database type. String.

  • dbVersion: The database version. String.

  • dbName: The database name. String.

  • schemaName: The schema name, used for databases like PostgreSQL and SQL Server. String.

  • tableName: The table name. String.

payload

before

A JSONObject representing the data record before the change, also known as the "before image." For example, if a record in a MySQL data source is updated, the before field stores the content of the record before the update.

  • This field is populated for update and delete operations.

  • dataColumn: The data information, which is of the JSONObject type. The format is Column name:Column value, where the column name is a string and the column value can be BOOLEAN, DOUBLE, DATE, BYTES, LONG, or STRING.

after

The data after the change, also known as the "after image." The format is the same as the before field.

sequenceId

A unique string generated by StreamX for each record. It is used to sort data when merging full data and incremental data.

Note

An update operation from the source generates two records: an "update before" record and an "update after" record. Both records share the same sequenceId.

scn

The System Change Number (SCN) from the source. This field applies only to Oracle sources.

op

The type of operation captured from the source. Valid values:

  • INSERT: A data insertion.

  • UPDATE_BEFOR: The before image of an update.

  • UPDATE_AFTER: The after image of an update.

  • DELETE: A data deletion.

  • TRANSACTION_BEGIN: The start of a database transaction.

  • TRANSACTION_END: The end of a database transaction.

  • CREATE: A table creation.

  • ALTER: A table alteration.

  • QUERY: The original SQL statement for a database change.

  • TRUNCATE: A table truncation.

  • RENAME: A table rename operation.

  • CINDEX: An index creation.

  • DINDEX: An index deletion.

  • MHEARTBEAT: A heartbeat message indicating the synchronization task is active, even if the source has no new data.

timestamp

A JSONObject containing timestamps related to the data record.

  • eventTime: The time when the change occurred in the source database. A 13-digit timestamp with millisecond precision. Long.

  • systemTime: The time when the synchronization task processed the change message. A 13-digit timestamp with millisecond precision. Long.

  • checkpointTime: The timestamp used when resetting a synchronization offset. This value is usually the same as eventTime. A 13-digit timestamp with millisecond precision. Long.

ddl

This field is populated with data only when the table schema of a database is changed. For data changes, such as additions, deletions, and modifications, the corresponding ddl is directly set to null.

  • text: The DDL statement text. String.

  • ddlMeta: A Base64-encoded string of a serialized Java object that records the DDL change. String.

version

N/A

The version of the message format.

Overall message format

The following code shows the overall format of a Kafka message:

{
    "schema": { // Metadata about the change, including column names and types.
        "dataColumn": [// Information about the changed data columns, used to update records in the destination.
            {
                "name": "id",
                "type": "LONG"
            },
            {
                "name": "name",
                "type": "STRING"
            },
            {
                "name": "binData",
                "type": "BYTES"
            },
            {
                "name": "ts",
                "type": "DATE"
            },
            {
              "name":"rowid",// If the data source is Oracle, the rowid is included in the data columns.
              "type":"STRING"
            }
        ],
        "primaryKey": [
            "pkName1",
            "pkName2"
        ],
        "source": {
            "dbType": "mysql",
            "dbVersion": "1.0.0",
            "dbName": "myDatabase",
            "schemaName": "mySchema",
            "tableName": "tableName"
        }
    },
    "payload": {
        "before": {
            "dataColumn":{
                "id": 111,
                "name":"scooter",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"// String type. The rowid from the Oracle source.
            }
        },
        "after": {
            "dataColumn":{
                "id": 222,
                "name":"donald",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"// String type. The rowid from the Oracle source.
            }
        },
        "sequenceId":"XXX",// String type. Used to sort data when merging full and incremental data.
        "scn":"xxxx",// String type. The System Change Number (SCN) from the Oracle source.
        "op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",// The value is case-sensitive.
        "timestamp": {
            "eventTime": 1,// Required. The time of the change event in the source database. A 13-digit timestamp with millisecond precision.
            "systemTime": 2,// Optional. The time when the synchronization task processed this change message. A 13-digit timestamp with millisecond precision.
            "checkpointTime": 3// Optional. The timestamp used when resetting the synchronization offset. Typically the same as eventTime.
        },
        "ddl": {
            "text": "ADD COLUMN ...",
            "ddlMeta": "[Base64-encoded string of the serialized SQLStatement object]"
        }
    },
    "version":"1.0.0"
}
Note

For more information about field types and parameters, see Field types and Parameters.

Heartbeat message format

{
    "schema": {
        "dataColumn": null,
        "primaryKey": null,
        "source": null
    },
    "payload": {
        "before": null,
        "after": null,
        "sequenceId": null,
        "timestamp": {
            "eventTime": 1620457659000,
            "checkpointTime": 1620457659000
        },
        "op": "MHEARTBEAT",
        "ddl": null
    },
    "version": "0.0.1"
}
Note

For more information about field types and parameters, see Field types and Parameters.

Message formats for source data changes

  • Message format for an insert operation:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": null,
            "after": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "man",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "sequenceId": "1620457642589000000",
            "timestamp": {
                "eventTime": 1620457896000,
                "systemTime": 1620457896977,
                "checkpointTime": 1620457896000
            },
            "op": "INSERT",
            "ddl": null
        },
        "version": "0.0.1"
    }
  • Message format for an update operation:

    • If the "When one record in the source is updated, one Kafka record is generated." option is not selected, an update operation generates two Kafka messages: one for the data state before the update (the "before image") and one for the state after the update (the "after image"). The following sample code shows the formats:

      Message format for the before image:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": null,
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_BEFOR",
              "ddl": null
          },
          "version": "0.0.1"
      }

      Message format for the after image:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": null,
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
    • If the "When one record in the source is updated, one Kafka record is generated." option is selected, an update operation generates a single Kafka message that contains both the before and after images of the data. The following sample code shows the format:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
  • Message format for a delete operation:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "woman",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "after": null,
            "sequenceId": "1620457642589000002",
            "timestamp": {
                "eventTime": 1620458266000,
                "systemTime": 1620458266101,
                "checkpointTime": 1620458266000
            },
            "op": "DELETE",
            "ddl": null
        },
        "version": "0.0.1"
    }
Note

For more information about field types and parameters, see Field types and Parameters.