本文为您介绍DataHub不同数据类型对应操作的支持情况,不同数据类型的分片策略、数据格式及相关消息示例。

不同数据类型对应操作的支持情况

Topic是DataHub订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型:
DataHub类型 写入DML消息 写入上游心跳消息 写入DDL消息 源表和目标topic映射方式 数据类型
Tuple 支持 不支持 不支持 单表对单topic DataHub支持的类型
Blob 支持 支持 支持 单库(多表)对单topic Blob二进制数据
  • Tuple类型由于schema各字段在topic创建后无法更改,所以适用于schema固定,且源表无add column、drop column等改变schema的DDL操作场景。Tuple类型不支持保留上游传递的DDL消息以及心跳消息,即Tuple不能将此类消息透穿给消费DataHub的下游。而且源表和topic的映射方式为单表对单topic,如果源表数量过多,则需要创建大量的topic,将不便于下游消费。
  • Blob类型由于不存在schema,topic内只存放Blob二进制数据,因此有较大的自由度。支持存放源表的DDL消息和源端的心跳消息,可传递给下游消费,且采用单库(多表)对单topic的映射方式,不论源表数量多少仅需创建一个topic,方便下游消费。更适用于DataHub作为中间消息队列进行整库迁移的场景。

不同数据类型分片策略

Shard表示对DataHub的一个topic进行数据传输的并发通道,单个Shard写入速率有上限,多个Shard可以提高写入性能,但DataHub仅能保证单个Shard在消费时的有序性,不保证多个Shard之间消息的顺序。因此,为了既能通过增加Shard数量提高写入性能,又能够保证多个Shard之间消息的有序性,同时避免数据倾斜,现针对Blob和Tuple类型提供以下分片策略。

场景 Tuple Blob
有主键(包含自定义主键) 按主键进行分片 按主键进行分片
顺序保证 同一主键消息保证有序 同一主键消息保证有序
无主键 随机分片 按表名进行分片
顺序保证 不保证有序 同一表消息保证有序

同步数据格式

  • Tuple
    数据类型为DataHub Tuple topic自身支持的类型。当使用数据集成创建的topic时,会增加部分元数据列。元数据列其中_sequence_id__excute_time__source_table_ _before_image__after_image_为元信息列。
    参数 描述
    _sequence_id_ string类型,由数字组成,每条消息唯一(update before和update after共用一个sequence id)。
    _excute_time_ 数据产生时间。
    _source_table_ 数据源表名。
    _before_image_ 前镜像(update before和delete为Y,update after和insert为N)。
    _after_image_ 后镜像(update before和delete为N,update after和insert为Y)。
    示例:下表为一条Insert、Update、Delete语句同步到DataHub的结果。
    _sequence_id_ _operation_type_ _excute_time_ _before_image_ _after_image_
    1649991610688000000 I 1649991726000 N Y
    1649991610688000001 U 1649991756000 Y N
    1649991610688000001 U 1649991756000 N Y
    1649991610688000002 D 1649991774000 Y N
  • Blob
    Blob类型的消息格式为JSON字符串转化的二进制数据,其对应的JSON格式如下:
    {
        "schema": { //变更的元数据信息,仅指定列名与列类型信息
            "dataColumn": [//变更的数据列信息,更新目标表记录内容
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "binData",
                    "type": "BYTES"
                },
                {
                    "name": "ts",
                    "type": "DATE"
                }
            ],
            "primaryKey": [
                "pkName1",
                "pkName2"
            ],
            "source": {
                "dbType": "mysql",
                "dbVersion": "1.0.0",
                "dbName": "myDatabase",
                "schemaName": "mySchema",
                "tableName": "tableName"
            }
        },
        "payload": {
            "before": {
                "dataColumn":{
                    "id": 111,
                    "name":"scooter",
                    "binData": "[base64 string]",
                    "ts": 1590315269000
                }
            },
            "after": {
                "dataColumn":{
                    "id": 222,
                    "name":"donald",
                    "binData": "[base64 string]",
                    "ts": 1590315269000
                }
            },
              "sequenceId":XXX//字符串类型,用于增全量数据合并的数据排序,
            "op": "INSERT/UPDATE/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT..."//大小写敏感,
            "timestamp": {
                "eventTime": 1,//必选,记录的变更时间,13为时间戳,ms精度
                "systemTime": 2,//可选,oracle CDC等部分数据源存在
                "checkpointTime": 3//可选,部分数据库如oceanbase等数据源包含
            },
            "ddl": {
                "text": "ADD COLUMN ...",
                "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]"
            }
        },
        "version":"1.0.0"
    }
    • Blob字段说明
      注意 消息中的所有字段类型范围为StreamX定义的BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING六种类型。
      BOOLEAN:取值为true,false
      DATE:取值为13为整形,时间精确到ms级
      BYTES: 存储bytes类型,格式为base64编码后的字符串
      
      BASE64编解码使用java.util.Base64中的接口实现:
      String text = "测试text123";
      //编码
      Base64.getEncoder().encodeToString(text.getBytes("UTF-8"))
      //编码
      Base64.getDecoder().decode(encodedText)//解码
                                  
      一级元素 二级元素 说明
      schema dataColumn JSONArray类型,数据列的类型信息。dataColumn记录上游数据变更记录的所有列和对应的列类型信息。变更操作包括数据库对数据的更改(新增、删除及修改)和数据库表结构等变更。
      • name:列名
      • type:列类型
      primaryKey List类型,主键信息。

      pk:主键名。

      source Object 类型,源端数据库或表信息。
      • dbType:String类型,数据库类型
      • dbVersion:String类型,数据库版本
      • dbName:String类型,数据库名
      • schemaName:String类型,Schema名(针对Postgres和SQL Server等)
      • tableName:String 类型,数据表名
      payload before JSONObject类型,修改前的数据。例如:数据源端为mysql,做了一次记录的update操作,before字段存储记录被update之前的数据内容。
      • 在从源端读取到更新、删除操作消息时,在写入记录中填充该字段。
      • dataColumn:JSONObject类型,表示数据信息。格式为列名:列值, 列名为字符串,列值取决于本身类型,BYTES类型使用Base64 String进行表示,DATE类型采用long表示的13位时间戳,其余类型的值均为本身类型。
      after 修改后的数据。格式同before相同。
      说明 在更新、插入操作时必填。
      op 操作类型。取值如下:
      • INSERT:数据插入
      • UPDATE_BEFOR:数据更新前
      • UPDATE_AFTER:数据更新后
      • DELETE:数据删除
      • TRANSACTION_BEGIN:数据库事务开始
      • TRANSACTION_END:数据库事务结束
      • CREATE:数据库建表
      • ALTER:数据库表变更
      • QUERY:数据库变更的原始SQL
      • TRUNCATE:数据库表清空
      • RENAME:数据库表重命名
      • CINDEX:创建索引
      • DINDEX:删除索引
      • MHEARTBEAT:用于在源端无新增数据时标识同步仍正常进行的心跳消息
      timestamp JSONObject 类型,本条数据的相关时间戳。
      • eventTime:Long类型,记录源端库发生变更的时间,毫秒精度的13位时间戳。
      • systemTime:Long类型,同步任务处理该条变更消息的时间,毫秒精度的13位时间戳。
      • checkpointTime:Long类型,重置同步位点时的设置时间,毫秒精度的13位时间戳,一般与eventTime值一致。
      ddl 该字段只在更改数据库的表结构时才会填充数据,更改数据(包括新增、删除和修改)时对应的ddl直接填充为null。
      • text:String类型,数据库DDL语句文本。
      • ddlMeta:String类型,使用FastSQL对DDL进行解析后生成的SQLStatement Object进行序列化的二进制表示,并使用Base64编码为String存储。

        开启ddl支持时,需要传递的SQLStatement序列化对象,下游链路反序列化解析对象后,还原成目标数据源的ddl语句做变更。

      version 格式的版本号。
    • Blob序列化说明

      本文定义的JSON格式,一条消息对应一个JSONObject,JSONObject内部按照消息格式,逐级映射为相应的格式(JSONObject,JSONArray,相应类型的value等)。

      整个JSONObject中每个字段的存放类型均按照上述字段说明。序列化将JSONObject转换为String(如fastJSON的toJSONString方法)然后再采用String的getBytes(Charsets.UTF_8)方法,指定UTF_8字符集转化为byte[]。

相关消息的JSON样例

  • Insert:
    {
        "schema": {
            "dataColumn": [
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "comment",
                    "type": "STRING"
                }
            ],
            "source": {
                "dbName": "yunshi_db",
                "dbType": "MySQL",
                "tableName": "t_shiyu_pk"
            },
            "primaryKey": [
                "id",
                "name"
            ]
        },
        "payload": {
            "op": "INSERT",
            "after": {
                "dataColumn": {
                    "name": "joe",
                    "comment": "comment",
                    "id": 1
                }
            },
            "sequenceId": "1605339516000000004",
            "timestamp": {
                "eventTime": 1605339932000,
                "systemTime": 1605339932736,
                "checkpointTime": 1605339932000
            }
        },
        "version": "0.0.1"
    }
  • update before:
    {
        "schema": {
            "dataColumn": [
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "comment",
                    "type": "STRING"
                }
            ],
            "source": {
                "dbName": "yunshi_db",
                "dbType": "MySQL",
                "tableName": "t_shiyu_pk"
            },
            "primaryKey": [
                "id",
                "name"
            ]
        },
        "payload": {
            "op": "UPDATE_BEFOR",
            "before": {
                "dataColumn": {
                    "name": "joe",
                    "comment": "comment",
                    "id": 1
                }
            },
            "sequenceId": "1605339516000000005",
            "timestamp": {
                "eventTime": 1605339934000,
                "systemTime": 1605339934951,
                "checkpointTime": 1605339934000
            }
        },
        "version": "0.0.1"
    }
  • update after:
    {
        "schema": {
            "dataColumn": [
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "comment",
                    "type": "STRING"
                }
            ],
            "source": {
                "dbName": "yunshi_db",
                "dbType": "MySQL",
                "tableName": "t_shiyu_pk"
            },
            "primaryKey": [
                "id",
                "name"
            ]
        },
        "payload": {
            "op": "UPDATE_AFTER",
            "after": {
                "dataColumn": {
                    "name": "joe",
                    "comment": "com1",
                    "id": 1
                }
            },
            "sequenceId": "1605339516000000005",
            "timestamp": {
                "eventTime": 1605339934000,
                "systemTime": 1605339934951,
                "checkpointTime": 1605339934000
            }
        },
        "version": "0.0.1"
    }
  • delete:
    {
        "schema": {
            "dataColumn": [
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "comment",
                    "type": "STRING"
                }
            ],
            "source": {
                "dbName": "yunshi_db",
                "dbType": "MySQL",
                "tableName": "t_shiyu_pk"
            },
            "primaryKey": [
                "id",
                "name"
            ]
        },
        "payload": {
            "op": "DELETE",
            "before": {
                "dataColumn": {
                    "name": "joe",
                    "comment": "com1",
                    "id": 1
                }
            },
            "sequenceId": "1605339516000000006",
            "timestamp": {
                "eventTime": 1605339937000,
                "systemTime": 1605339937671,
                "checkpointTime": 1605339937000
            }
        },
        "version": "0.0.1"
    }
  • Heartbeat:
    {
        "schema": {},
        "payload": {
            "op": "MHEARTBEAT",
            "timestamp": {
                "eventTime": 1605339953629,
                "checkpointTime": 1605339953629
            }
        },
        "version": "0.0.1"
    }
  • DDL:
    {
        "schema": {
            "source": {
                "dbName": "yunshi_db",
                "dbType": "MySQL",
                "tableName": "t_shiyu_nopk"
            }
        },
        "payload": {
            "op": "ALTER",
            "sequenceId": "1605339516000000035",
            "ddl": {
                "text": "alter table t_shiyu_nopk add column holo text",
                "ddlMeta": "rO0ABXNyACljb20uYWxpYmFiYS5kaS5wbHVnaW4uY2VudGVyLm1ldGEuRERMTWV0YQLb5Cx/YWXtAgACTAAHZGRsVGV4dHQAEkxqYXZhL2xhbmcvU3RyaW5nO0wACXN0YXRlbWVudHQAKkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMU3RhdGVtZW50O3hwdAAtYWx0ZXIgdGFibGUgdF9zaGl5dV9ub3BrIGFkZCBjb2x1bW4gaG9sbyB0ZXh0c3IAPGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQWx0ZXJUYWJsZVN0YXRlbWVudBQPP3vMUl2cAgAPSQAHYnVja2V0c1oABmlnbm9yZVoAF2ludmFsaWRhdGVHbG9iYWxJbmRleGVzWgAPbWVyZ2VTbWFsbEZpbGVzWgAHb2ZmbGluZVoABm9ubGluZVoADnJlbW92ZVBhdGl0aW5nWgATdXBkYXRlR2xvYmFsSW5kZXhlc1oAD3VwZ3JhZGVQYXRpdGluZ0wAC2NsdXN0ZXJlZEJ5dAAQTGphdmEvdXRpbC9MaXN0O0wABWl0ZW1zcQB+AAZMAAlwYXJ0aXRpb250ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTFBhcnRpdGlvbkJ5O0wACHNvcnRlZEJ5cQB+AAZMAAx0YWJsZU9wdGlvbnNxAH4ABkwAC3RhYmxlU291cmNldAA6TGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9zdGF0ZW1lbnQvU1FMRXhwclRhYmxlU291cmNlO3hyACxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMU3RhdGVtZW50SW1wbEOxUUDVCJMGAgADWgAJYWZ0ZXJTZW1pTAAGZGJUeXBldAAcTGNvbS9hbGliYWJhL2Zhc3RzcWwvRGJUeXBlO0wACWhlYWRIaW50c3EAfgAGeHIAKWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5TUUxPYmplY3RJbXBs5LvqLFggFVECAAVJAAxzb3VyY2VDb2x1bW5JAApzb3VyY2VMaW5lTAAKYXR0cmlidXRlc3QAD0xqYXZhL3V0aWwvTWFwO0wABGhpbnR0ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTENvbW1lbnRIaW50O0wABnBhcmVudHQAJ0xjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMT2JqZWN0O3hwAAAAAAAAAABwcHAAfnIAGmNvbS5hbGliYWJhLmZhc3RzcWwuRGJUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAFbXlzcWxwAAAAAAAAAAAAAAAAc3IAE2phdmEudXRpbC5BcnJheUxpc3R4gdIdmcdhnQMAAUkABHNpemV4cAAAAAB3BAAAAAB4c3EAfgAUAAAAAXcEAAAAAXNyADxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTEFsdGVyVGFibGVBZGRDb2x1bW4l5T6CFe//BAIABloAB2Nhc2NhZGVaAAVmaXJzdEwAC2FmdGVyQ29sdW1udAAlTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxOYW1lO0wAB2NvbHVtbnNxAH4ABkwAC2ZpcnN0Q29sdW1ucQB+ABhMAAhyZXN0cmljdHQAE0xqYXZhL2xhbmcvQm9vbGVhbjt4cQB+AAsAAAAAAAAAAHBwcQB+AA8AAHBzcQB+ABQAAAABdwQAAAABc3IAOWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQ29sdW1uRGVmaW5pdGlvbst0gLKZ0qAtAgAmWgANYXV0b0luY3JlbWVudFoADGRpc2FibGVJbmRleFoAB3ByZVNvcnRJAAxwcmVTb3J0T3JkZXJaAAZzdG9yZWRaAAd2aXJ0dWFsWgAHdmlzaWJsZUwACGFubkluZGV4dAApTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxBbm5JbmRleDtMAAZhc0V4cHJ0ACVMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTEV4cHI7TAALY2hhcnNldEV4cHJxAH4AHkwADWNvbFByb3BlcnRpZXNxAH4ABkwAC2NvbGxhdGVFeHBycQB+AB5MAAdjb21tZW50cQB+AB5MAAtjb21wcmVzc2lvbnQALkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvZXhwci9TUUxDaGFyRXhwcjtMAAtjb25zdHJhaW50c3EAfgAGTAAIZGF0YVR5cGV0AClMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTERhdGFUeXBlO0wABmRiVHlwZXEAfgAKTAALZGVmYXVsdEV4cHJxAH4AHkwACWRlbGltaXRlcnEAfgAeTAASZGVsaW1pdGVyVG9rZW5pemVycQB+AB5MAAZlbmFibGVxAH4AGUwABmVuY29kZXEAfgAfTAAGZm9ybWF0cQB+AB5MABBnZW5lcmF0ZWRBbGF3c0FzcQB+AB5MAAhpZGVudGl0eXQARExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTENvbHVtbkRlZmluaXRpb24kSWRlbnRpdHk7TAASanNvbkluZGV4QXR0cnNFeHBycQB+AB5MAAhtYXBwZWRCeXEAfgAGTAAEbmFtZXEAfgAYTAAMbmxwVG9rZW5pemVycQB+AB5MAAhvblVwZGF0ZXEAfgAeTAAEcmVseXEAfgAZTAAMc2VxdWVuY2VUeXBldAAvTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9BdXRvSW5jcmVtZW50VHlwZTtMAARzdGVwcQB+AB5MAAdzdG9yYWdlcQB+AB5MAAl1bml0Q291bnRxAH4AHkwACXVuaXRJbmRleHEAfgAeTAAIdmFsaWRhdGVxAH4AGUwACXZhbHVlVHlwZXEAfgAeeHEAfgALAAAAAAAAAABwcHEAfgAaAAAAAAAAAAAAAHBwcHBwcHBzcQB+ABQAAAAAdwQAAAAAeHNyADpjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTENoYXJhY3RlckRhdGFUeXBlqtJac/d+04cCAAVaAAloYXNCaW5hcnlMAAtjaGFyU2V0TmFtZXEAfgABTAAIY2hhclR5cGVxAH4AAUwAB2NvbGxhdGVxAH4AAUwABWhpbnRzcQB+AAZ4cgArY29tLmFsaWJhYmEuZmFzdHNxbC5zcWwuYXN0LlNRTERhdGFUeXBlSW1wbEWL29pc1gZFAgAJSgAObmFtZUhhc2hDb2RlNjRaAAh1bnNpZ25lZFoAEXdpdGhMb2NhbFRpbWVab25lWgAIemVyb2ZpbGxMAAlhcmd1bWVudHNxAH4ABkwABmRiVHlwZXEAfgAKTAAHaW5kZXhCeXEAfgAeTAAEbmFtZXEAfgABTAAMd2l0aFRpbWVab25lcQB+ABl4cQB+AAsAAAAAAAAAAHBwcQB+ACP6BPTvGZVAfgAAAHNxAH4AFAAAAAB3BAAAAAB4cHB0AAR0ZXh0cABwcHBwcQB+ABJwcHBwcHBwcHBwc3IAMmNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5leHByLlNRTElkZW50aWZpZXJFeHBy3DXH1zvWbgkCAARKAApoYXNoQ29kZTY0TAAEbmFtZXEAfgABTAAOcmVzb2x2ZWRDb2x1bW5xAH4ADkwAE3Jlc29sdmVkT3duZXJPYmplY3RxAH4ADnhyACdjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMRXhwckltcGxs2ypmFJxWrQIAAHhxAH4ACwAAAAAAAAAAcHBwQCnxzH5tIDl0AARob2xvcHBwcHBwcHBwcHBweHBweHBzcQB+ABQAAAAAdwQAAAAAeHNxAH4AFAAAAAB3BAAAAAB4c3IAOGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMRXhwclRhYmxlU291cmNlRHD7eYJ4eswCAAVMAAdjb2x1bW5zcQB+AAZMAARleHBycQB+AB5MAApwYXJ0aXRpb25zcQB+AAZMAAhzYW1wbGluZ3QAOExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTFRhYmxlU2FtcGxpbmc7TAAMc2NoZW1hT2JqZWN0dAAxTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL3JlcG9zaXRvcnkvU2NoZW1hT2JqZWN0O3hyADhjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTFRhYmxlU291cmNlSW1wbAqEMenTm5zUAgAESgAPYWxpYXNIYXNoQ29kZTY0TAAFYWxpYXNxAH4AAUwACWZsYXNoYmFja3EAfgAeTAAFaGludHNxAH4ABnhxAH4ACwAAAAAAAAAAcHBwAAAAAAAAAABwcHBwc3EAfgAqAAAAAAAAAABwcHEAfgA0NH7o4UvP9Dt0AAx0X3NoaXl1X25vcGtwcHBwcA=="
            },
            "timestamp": {
                "eventTime": 1605342109000,
                "systemTime": 1605342109259,
                "checkpointTime": 1605342109000
            }
        },
        "version": "0.0.1"
    }