新版数据订阅支持使用0.11版本至1.1版本的Kafka客户端消费订阅数据,本文将介绍Kafka客户端demo代码的使用说明。

前提条件

Kafka客户端Demo代码下载

请下载Kafka客户端Demo代码,关于代码使用的详细介绍,请参见Demo中的Readme文档。

数据格式介绍

数据以Avro序列化存储,详细格式请参见Record.avsc文档。订阅到数据后,您需要根据avro schema定义进行数据解析。

Kafka客户端Demo代码说明

  • 代码中参数设置项。
    说明 您可以通过DTS控制台获取以下参数的取值,详情请参见获取数据订阅所需信息
    参数 说明
    dtsConnectIp 数据订阅通道的网络地址。
    dtsConnectPort 数据订阅通道的端口。
    topic 数据订阅通道的订阅Topic。
    sid 消费组ID。
    username 该消费组的的账号。
    password 该消费组账号对应的的密码。
    说明 如果忘记消费组密码,您可以修改消费组密码
    startTimeStamp 您需要保存已消费的数据时间点。当业务程序中断后,您可以通过订阅客户端传入已消费的数据时间点来继续消费数据,防止数据丢失。同时您还可以在订阅客户端启动时,传入所需的消费位点,对订阅位点进行调整,实现按需消费数据。
  • 关键代码。
    • makeProps

      功能:配置访问订阅通道的相关参数。

    • assignOffsetToConsumer
      功能:指定期望时间点开始消费数据。
      说明 您需要保存已消费的数据时间点,便于业务程序中断后,仍可按需设置时间点消费,保证数据不丢失。
    • consume

      功能:具体消息处理函数,主要作用是对数据类型进行转换。

    • avro数据格式与MySQL类型的对应关系

      功能:按类型对数据解析并使用。avro数据格式的解析数据中包含dataTypeNumber字段,该字段的不同取值对应不同的MySQL数据类型,详情请参见MySQL字段类型与dataTypeNumber数值的对应关系

获取数据订阅所需信息

本案例以RDS for MySQL数据订阅通道为例,介绍如何获取数据订阅所需信息。

  1. 登录数据传输控制台
  2. 在左侧导航栏,单击数据订阅
  3. 数据订阅列表页面上方,选择订阅通道所属地域。
  4. 定位目标数据订阅通道,单击该订阅ID。
  5. 订阅配置页面,您将获取到订阅Topic网络信息。

    说明
    • 网络区域框中展示的地址信息包含域名和端口号。
    • 如果您部署Kafka Client的ECS实例与数据订阅通道属于同一经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。
  6. 在左侧导航栏,单击数据消费,您将获取到消费组ID和对应的账号信息。

    说明 如果忘记消费组密码,您可以修改消费组密码

MySQL字段类型与dataTypeNumber数值的对应关系

MySQL字段类型 对应dataTypeNumber数值
MYSQL_TYPE_DECIMAL 0
MYSQL_TYPE_INT8 1
MYSQL_TYPE_INT16 2
MYSQL_TYPE_INT32 3
MYSQL_TYPE_FLOAT 4
MYSQL_TYPE_DOUBLE 5
MYSQL_TYPE_NULL 6
MYSQL_TYPE_TIMESTAMP 7
MYSQL_TYPE_INT64 8
MYSQL_TYPE_INT24 9
MYSQL_TYPE_DATE 10
MYSQL_TYPE_TIME 11
MYSQL_TYPE_DATETIME 12
MYSQL_TYPE_YEAR 13
MYSQL_TYPE_DATE_NEW 14
MYSQL_TYPE_VARCHAR 15
MYSQL_TYPE_BIT 16
MYSQL_TYPE_TIMESTAMP_NEW 17
MYSQL_TYPE_DATETIME_NEW 18
MYSQL_TYPE_TIME_NEW 19
MYSQL_TYPE_JSON 245
MYSQL_TYPE_DECIMAL_NEW 246
MYSQL_TYPE_ENUM 247
MYSQL_TYPE_SET 248
MYSQL_TYPE_TINY_BLOB 249
MYSQL_TYPE_MEDIUM_BLOB 250
MYSQL_TYPE_LONG_BLOB 251
MYSQL_TYPE_BLOB 252
MYSQL_TYPE_VAR_STRING 253
MYSQL_TYPE_STRING 254
MYSQL_TYPE_GEOMETRY 255