文档

数据消费格式

更新时间:

本文介绍实时数据订阅功能的数据消费格式定义说明和示例,默认格式为Debezium Format V2.0。

数据消费定义说明

数据消费格式如下代码,字段说明如下表所示。

{ 
  "payload": { 
    "op": "u", 
    "ts_ms": 1465491411815, 
    "before": { 
      "id": 1004,
      "name": "Jane"
    },
    "after": { 
      "id": 1004,
      "name": "Anne"
    },
    "source": { 
      "version": "v1.0",
      "db": "ld-xxxx",
      "namespace": "default",
      "table": "customers",
      "ts_ms": 1465491411807
    }
  },
  "schema": { 
  "type": "struct",
  "fields": [
     {
        "type": "string",
        "optional": false,
        "field": "op"
   }, {
        "type": "int64",
        "optional": false,
        "field": "ts_ms"
   }, {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
     }, {
            "type": "string",
            "optional": false,
            "field": "name"
          }
        ],
        "optional": true,
        "field": "before"
   }, {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
     }, {
            "type": "string",
            "optional": false,
            "field": "name"
     }
        ],
        "optional": true,
        "field": "after"
   }, {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
         }, {
            "type": "string",
            "optional": false,
            "field": "db"
     }, {
            "type": "string",
            "optional": false,
            "field": "namespace"
     }, {
            "type": "string",
            "optional": false,
            "field": "table"
     }, {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
     }
        ],
        "optional": false,
        "field": "source"
   }
    ],
  "optional": false
 }
}

Field name

描述

payload.op

  • c表示对数据进行新增操作。

  • u表示对数据进行更新操作。

  • d表示对数据进行删除操作。

  • r表示对数据进行全量导出操作,暂时不会涉及到。

payload.ts_ms

表示写入Kafka的unix时间戳。

payload.before

表示导出整行数据更新前的值。

payload.after

表示导出整行数据的最新值。

payload.source

表示操作的额外信息,支持额外添加。

  • version:消息对应Lindorm数据库的版本号。

  • db:源集群。

  • namespace:表的namespace。

  • table:表名。

  • ts_ms:记录更新Lindorm的时间。

schema

根据payload内容自动生成,对整个JSON的结构及所有字段类型进行说明。默认都包括schema信息。整个schema的结构是递归的。

  • field:字段名称。

  • type:field字段名对应的字段类型。

  • name:schema字段所属模式名称。

  • fields:递归说明当前字段包含的子字段内容。

  • optional:该字段是否可选。

数据消费示例

在Lindorm数据库中创建如下schema。

CREATE TABLE customers (id varchar,first_name varchar,last_name varchar,constraint pk primary key(id))
  • 插入数据的数据消费示例。

    {
      "schema": {}, 
      "payload": { 
        "op": "c", 
        "ts_ms": 1465491411815, 
        "before": null, 
        "after": { 
          "id": "1004",
          "first_name": "Anne",
          "last_name": "Kretchmar"
        },
        "source": { 
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }
  • 更新数据的数据消费示例。

    {
      "schema": {},
      "payload": { 
        "op": "u", 
        "ts_ms": 1465491411815,
        "before": {
          "id": "1004",
          "first_name": "Anne Marie",
          "last_name": "Kretchmar"
        }, 
        "after": {
          "id": "1004",
          "first_name": "Anne",
          "last_name": "Kretchmar"
        },
        "source": {
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }
  • 删除一行数据的数据消费示例。

    {
      "schema": {},
      "payload": { 
        "op": "d", 
        "ts_ms": 1465491411815,
        "before": { 
          "id": "1004",
          "first_name": "Anne Marie",
          "last_name": "Kretchmar"
        }, 
        "after": null,
        "source": {
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }
  • 删除列的数据消费示例。

    {
      "schema": {},
      "payload": { 
        "op": "u", 
        "ts_ms": 1465491411815,
        "before": { 
          "id": "1004",
          "first_name": "Anne Marie",
          "last_name": "Kretchmar"
        }, 
        "after": { 
          "id": "1004",
          "first_name": "Anne Marie"
        }, 
        "source": {
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }
  • 本页导读 (1)
文档反馈