数据消费格式

本文介绍实时数据订阅功能的数据消费格式定义说明和示例,默认格式为Debezium Format V2.0。

数据消费定义说明

数据消费格式如下代码,字段说明如下表所示。

{ 
  "payload": { 
    "op": "u", 
    "ts_ms": 1465491411815, 
    "before": { 
      "id": 1004,
      "name": "Jane"
    },
    "after": { 
      "id": 1004,
      "name": "Anne"
    },
    "source": { 
      "version": "v1.0",
      "db": "ld-xxxx",
      "namespace": "default",
      "table": "customers",
      "ts_ms": 1465491411807
    }
  },
  "schema": { 
  "type": "struct",
  "fields": [
     {
        "type": "string",
        "optional": false,
        "field": "op"
   }, {
        "type": "int64",
        "optional": false,
        "field": "ts_ms"
   }, {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
     }, {
            "type": "string",
            "optional": false,
            "field": "name"
          }
        ],
        "optional": true,
        "field": "before"
   }, {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
     }, {
            "type": "string",
            "optional": false,
            "field": "name"
     }
        ],
        "optional": true,
        "field": "after"
   }, {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
         }, {
            "type": "string",
            "optional": false,
            "field": "db"
     }, {
            "type": "string",
            "optional": false,
            "field": "namespace"
     }, {
            "type": "string",
            "optional": false,
            "field": "table"
     }, {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
     }
        ],
        "optional": false,
        "field": "source"
   }
    ],
  "optional": false
 }
}

Field name

描述

payload.op

  • c表示对数据进行新增操作。

  • u表示对数据进行更新操作。

  • d表示对数据进行删除操作。

  • r表示对数据进行全量导出操作,暂时不会涉及到。

payload.ts_ms

表示写入Kafka的Unix时间戳。

payload.before

表示导出整行数据更新前的值。

payload.after

表示导出整行数据的最新值。

payload.source

表示操作的额外信息,支持额外添加。

  • version:消息对应Lindorm数据库的版本号。

  • db:源集群。

  • namespace:表的namespace。

  • table:表名。

  • ts_ms:记录更新Lindorm的时间。

schema

根据payload内容自动生成,对整个JSON的结构及所有字段类型进行说明。默认都包括schema信息。整个schema的结构是递归的。

  • field:字段名称。

  • type:field字段名对应的字段类型。

  • name:schema字段所属模式名称。

  • fields:递归说明当前字段包含的子字段内容。

  • optional:该字段是否可选。

说明

HBase表的订阅格式跟SQL表一致,但在结构上存在以下两点不同:

  • HBase表在数据库中存储的是原始的二进制数据,通过数据订阅消费到的数据是对二进制数据进行Base64编码后的字符串。

  • HBase表存在列族的概念,因此非主键列的列名格式为列族_列名,主键列的列名固定为ROW

数据消费示例

SQL表

在Lindorm数据库中创建如下Schema。

CREATE TABLE customers (id VARCHAR,first_name VARCHAR,last_name VARCHAR, PRIMARY KEY(id));
  • 插入数据的数据消费示例。

    {
      "schema": {}, 
      "payload": { 
        "op": "c", 
        "ts_ms": 1465491411815, 
        "before": null, 
        "after": { 
          "id": "1004",
          "first_name": "Anne",
          "last_name": "Kretchmar"
        },
        "source": { 
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }
  • 更新数据的数据消费示例。

    {
      "schema": {},
      "payload": { 
        "op": "u", 
        "ts_ms": 1465491411815,
        "before": {
          "id": "1004",
          "first_name": "Anne Marie",
          "last_name": "Kretchmar"
        }, 
        "after": {
          "id": "1004",
          "first_name": "Anne",
          "last_name": "Kretchmar"
        },
        "source": {
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }
  • 删除一行数据的数据消费示例。

    {
      "schema": {},
      "payload": { 
        "op": "d", 
        "ts_ms": 1465491411815,
        "before": { 
          "id": "1004",
          "first_name": "Anne Marie",
          "last_name": "Kretchmar"
        }, 
        "after": null,
        "source": {
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }
  • 删除列的数据消费示例。

    {
      "schema": {},
      "payload": { 
        "op": "u", 
        "ts_ms": 1465491411815,
        "before": { 
          "id": "1004",
          "first_name": "Anne Marie",
          "last_name": "Kretchmar"
        }, 
        "after": { 
          "id": "1004",
          "first_name": "Anne Marie"
        }, 
        "source": {
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }

HBase表

向HBase表中插入一条数据:

Put put = new Put(Bytes.toBytes("user1"));
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("lucky"));
table.put(put);

对应的变更消息为:

{
    "schema": {},
    "payload": {
        "op": "c",
        "ts_ms": 1725258859839,
        "before": null,
        "after": {
            "ROW": "dXNlcjE=",
            "f_name": "bHVja3k="
        },
        "source": {
            "version": "v2.0",
            "db": "ld-xxxx",
            "namespace": "default",
            "table": "customers",
            "ts_ms": 1725258833727
        }
    }
}