文档

使用Debezium将MySQL数据同步至云消息队列 Kafka 版

更新时间:

本文介绍如何创建Debezium MySQL Source Connector,将MySQL的数据同步至云消息队列 Kafka 版

前提条件

步骤一:创建数据表

  1. 登录RDS管理控制台,创建RDS MySQL实例。更多信息,请参见快速创建RDS MySQL实例

    创建实例时,请选择与前提条件中已购买部署的Kafka实例相同的VPC,并将此VPC网段加入白名单。image..png

  2. 实例创建完成后,在实例列表页面单击目标实例,然后在实例详情页面的左侧导航栏,完成以下操作。

    1. 单击账号管理,创建一个新账号,也可使用已有账号。更多信息,请参见创建数据库和账号

    2. 单击数据库管理,创建数据库,也可使用已有数据库。更多信息,请参见创建数据库和账号

    3. 单击数据库连接,记录内网地址和端口号。image..png

  3. 基本信息页面,单击登录数据库进入DMS数据管理服务平台,完成以下操作。

    1. 在左侧双击数据库名称,切换到已创建的数据库。image..png

    2. SQL Console页签,使用SQL语句创建表格。例如,创建一个列参数分别为idnumber的表格,命令如下。更多信息,请参见SQL Commands

      CREATE TABLE sql_table(id INT ,number INT);

步骤二:创建Connector任务

  1. 下载Debezium MySQL Source Connector文件,上传至提前创建好的OSS bucket,更多信息,请参见控制台上传文件

    重要

    下载Debezium MySQL Connector文件时请选择适配Java 8的版本。

  2. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。

  3. 在左侧导航栏,选择Connector生态集成 > 任务列表

  4. 任务列表页面,单击创建任务列表

  5. 创建任务面板。设置任务名称,配置以下配置项。

    • 任务创建

      1. Source(源)配置向导,选择数据提供方Apache Kafka Connect,单击下一步

      2. 连接器配置配置向导,设置以下配置项,然后单击下一步

        配置项

        参数

        说明

        Kafka Connect插件

        Bucket存储桶

        选择OSS Bucket。

        文件

        选择上传的.ZIP文件。

        Kafka资源信息

        Kafka 参数配置

        选择Source Connect。

        Kafka实例

        选择前提条件中创建的实例。

        专有网络VPC

        默认选择部署Kafka实例时选择的VPC ID且不可更改。

        交换机

        默认选择部署Kafka实例时选择的vSwitch ID且不可更改。

        安全组

        选择安全组。

        Kafka Connect配置信息

        解析当前ZIP包下的properties文件

        选择.ZIP文件中包含的SourceConnector对应的.properties文件。路径为/etc/xxx.properties。在输入框中更新下列字段的取值。

        • connector.class:运行Connector的包名称,无需修改。

        • tasks.max:Task的最大数量。该场景下只支持设置为1。

        • database.hostname:填写步骤一:创建数据表中获取的源数据库内网地址。

        • database.port:填写步骤一:创建数据表中获取的端口号。

        • database.user:数据库登录账号。

        • database.password:数据库登录密码。

        • database.server.name:MySQL数据库服务的逻辑名称。

          • 仅允许包含英文字母、数字以及下划线(_)。

          • 该字段用于组成数据库及表格的目标Topic的名称,数据库目标Topic的命名规则为{database.server.name},表格目标Topic的命名规则为{database.server.name}.{databaseName}.{tableName}。请在运行Connector任务前按照命名规则提前创建对应的目标Topic。

        • database.include.list:源数据库名称。若有多个数据库,用英文逗号(,)分隔。

        • table.include.list:源表格名称,单个表格的格式为 {databaseName}.{tableName};若有多个表格,用英文逗号(,)分隔。

        • database.history.kafka.bootstrap.servers:Kafka实例连接地址。

          • 该实例用于记录数据库所有Schema变动记录。

          • 可使用Connector任务中已配置的目标Kafka实例,也可使用新实例。

          • 实例需要与数据库实例处于同一个VPC中。

          • 该实例需要开启自由使用Group能力。更多信息,请参见自由使用Group

        • database.history.kafka.topic:此Topic用于记录数据库所有Schema的变动记录,请在运行Connector任务前提前创建。

        • include.schema.changes:是否监控Schema变动记录,若取值为true,则会将这些变动记录写入名为{database.server.name}的Topic中。

        示例代码如下:

        connector.class=io.debezium.connector.mysql.MySqlConnector
        database.hostname=xxx
        database.password=xxx
        database.port=3306
        # 数据库账号,建议使用高权限账号。
        database.user=xxx
        name=debezium-mysql-source
        # MySQL数据库服务的逻辑名称,仅允许包含英文字母、数字以及下划线。
        # 该字段用于组成数据库以及表格的目标Topic的名称,数据库下游Topic的命名规则为<database.server.name>;表格下游Topic的命名规则为<database.server.name>.<databaseName>.<tableName>。
        # 注意,运行前建议提前创建对应的下游Topic。
        database.server.name=fulfillment
        # 源数据库名称,多个数据库用逗号分隔。
        database.include.list=test_database
        # 源表格名称,多个表格用逗号分隔,单个表格的格式为<databaseName>.<tableName>
        table.include.list=test_database.test_table
        # 注意,这里只能有1个task进行消费。
        tasks.max=1
        # Kafka实例连接地址,该实例用于记录数据库所有Schema变动记录。
        # 该实例需要与数据库实例处于同一个VPC中,同时该实例需要打开“自由使用 Group”能力。
        database.history.kafka.bootstrap.servers=xxx
        # 该Topic用于记录数据库所有Schema变动记录。
        database.history.kafka.topic=xxx
        # 决定是否监控schema变动记录,若为true,则会将这些变动记录写入名为<database.server.name> 的Topic中。
        include.schema.changes=true
        # 决定是否将执行的sql也记录下来,若为true,需要MySQL端的参数binlog_rows_query_log_events=ON。
        # include.query=true
        # 指定运行快照的模式,包含:
        #   1. initial: the connector runs a snapshot only when no offsets have been recorded for the logical server name.
        #   2. when_needed: the connector runs a snapshot upon startup whenever it deems it necessary. That is, when no offsets are available, or when a previously recorded offset specifies a binlog location or GTID that is not available in the server.
        #   3. never: the connector never uses snapshots. Upon first startup with a logical server name, the connector reads from the beginning of the binlog. Configure this behavior with care. It is valid only when the binlog is guaranteed to contain the entire history of the database.
        #   4. schema_only: the connector runs a snapshot of the schemas and not the data. This setting is useful when you do not need the topics to contain a consistent snapshot of the data but need them to have only the changes since the connector was started.
        #   5. schema_only_recovery: this is a recovery setting for a connector that has already been capturing changes. When you restart the connector, this setting enables recovery of a corrupted or lost database history topic. You might set it periodically to "clean up" a database history topic that has been growing unexpectedly. Database history topics require infinite retention.
        snapshot.mode=when_needed
        
        # 消息Value格式转换组件。
        value.converter=org.apache.kafka.connect.json.JsonConverter
        # 消息Value内容中是否包含结构体schema信息。
        value.converter.schemas.enable=false

        Connector全量参数,请参见Debezium Connector Properties

      3. 实例配置配置向导,设置以下参数,然后单击下一步

        配置项

        参数

        说明

        Worker规格

        Worker规格

        选择合适的Worker规格。

        最小Worker数

        设置为1。

        最大Worker数

        设置为1。此数值不得超过Task的最大数量。

        Kafka Connect Worker 配置

        自动创建Kafka Connect Worker依赖资源

        建议勾选此项,此时会在选择的Kafka实例中自动创建Kafka Connect运行所需的一些Internal Topic以及ConsumerGroup,并将这些必填配置自动填入配置框中,包括以下配置项:

        • Offset Topic:用于存储源数据偏移量,命名规则为connect-eb-offset-<任务名称>

        • Config Topic:用于存储Connectors以及Tasks的配置信息,命名规则为connect-eb-config-<任务名称>

        • Status Topic:用于存储Connectors以及Tasks状态信息,命名规则为connect-eb-status-<任务名称>

        • Kafka Connect Consumer Group:Kafka Connect Worker用于消费Internal Topics的消费组,命名规则为connect-eb-cluster-<任务名称>

        • Kafka Source Connector Consumer Group:只针对Sink Connector有效,用于消费源Topic中的数据,命名规则为connector-eb-cluster-<任务名称>-<connector名称>

      4. 运行配置区域,将日志投递方式设置为投递至SLS或者投递至Kafka,在角色授权卡片设置Connect依赖的角色配置,然后单击保存

        重要

        建议配置的角色包含AliyunSAEFullAccess权限,否则可能会导致任务运行失败。

    • 任务属性

      设置此任务的重试策略及死信队列。更多信息,请参见重试和死信

    等待任务状态变为运行中,此时Connector已经在正常工作中。

步骤三:测试Connector任务

  1. 在DMS数据管理服务平台,向步骤一:创建数据表中创建的数据表插入一条数据。例如,插入一条id为123,number为20000的数据,命令如下。

    INSERT INTO sql_table(id, number) VALUES(123,20000);
  2. 登录云消息队列 Kafka 版控制台,在实例列表页面,单击目标实例。

  3. 在目标实例页面,单击目标Topic,然后单击消息查询,查看插入的消息数据,消息Value示例如下。

    • 数据库目标Topic(命名规则为{database.server.name})示例:

      {
          "source":{
              "version":"1.5.0.Final",
              "connector":"mysql",
              "name":"fulfillment",
              "ts_ms":1686283675404,
              "snapshot":"true",
              "db":"wbdb",
              "sequence":null,
              "table":"sql_table",
              "server_id":0,
              "gtid":null,
              "file":"mysql-bin.000006",
              "pos":188032,
              "row":0,
              "thread":null,
              "query":null
          },
          "databaseName":"wbdb",
          "schemaName":null,
          "ddl":"DROP TABLE IF EXISTS sql_table",
          "tableChanges":[
      
          ]
      }
    • 数据表格目标Topic(命名规则为{database.server.name}.{databaseName}.{tableName})示例:

      {
          "before":null,
          "after":{
              "id":123,
              "number":20000
          },
          "source":{
              "version":"1.5.0.Final",
              "connector":"mysql",
              "name":"fulfillment",
              "ts_ms":1686283675675,
              "snapshot":"last",
              "db":"wbdb",
              "sequence":null,
              "table":"sql_table",
              "server_id":0,
              "gtid":null,
              "file":"mysql-bin.000006",
              "pos":188032,
              "row":0,
              "thread":null,
              "query":null
          },
          "op":"r",
          "ts_ms":1686283675675,
          "transaction":null
      }
    • database.history.kafka.topic字段指定的Topic示例:

      {
          "source":{
              "server":"fulfillment"
          },
          "position":{
              "ts_sec":1686283675,
              "file":"mysql-bin.000006",
              "pos":188032,
              "gtids":"be4286e6-05ce-11ee-b8c2-00163e20****:1-****",
              "snapshot":true
          },
          "databaseName":"wbdb",
          "ddl":"CREATE DATABASE `wbdb` CHARSET utf8 COLLATE utf8_general_ci",
          "tableChanges":[
      
          ]
      }

    常见报错

    场景一:所有Tasks运行失败

    错误信息:

    All tasks under connector mongo-source failed, please check the error trace of the task.

    解决方法:在消息流入任务详情页面,单击基础信息区域的诊断链接,即可跳转到Connector监控页面,可以看到Tasks运行失败的详细错误信息。

    场景二:Kafka Connect退出

    错误信息:

    Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.

    解决方法:由于状态获取可能会有延迟,建议您先尝试刷新页面。若刷新后仍然是失败状态,您可以按照以下步骤查看错误信息。

    1. 在消息流入任务详情页面的Worker信息区域,单击SAE应用后的实例名称,跳转到SAE应用详情页面。

    2. 基本信息页面,单击实例部署信息页签。

    3. 在实例右侧操作列,单击Webshell登录Kafka Connect运行环境。实例部署信息

      • 执行vi /home/admin/connector-bootstrap.log命令,查看Connector启动日志,查找其中是否包含错误信息。

      • 执行vi /opt/kafka/logs/connect.log命令,查看Connector运行日志,在其中查找ERROR或者WARN字段来查看是否有错误信息。

    基于错误信息提示进行修复操作后,可以重新启动对应任务。

    场景三:Connector参数校验失败

    错误信息:

    Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
    Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
    You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

    解决方法:此时需要根据错误信息,找出具体哪个参数出错,更新对应参数即可。若基于上述错误信息无法定位具体的出错参数,可以参考上文场景二中的步骤登录Kafka Connect运行环境,执行以下命令,查询参数是否校验通过。

    curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

    该指令会返回Connector参数中每个参数是否校验通过,若不通过,则errors属性非空,如下所示。

    "value":{
        "name":"snapshot.mode",
        "value":null,
        "recommended_values":[
            "never",
            "initial_only",
            "when_needed",
            "initial",
            "schema_only",
            "schema_only_recovery"
        ],
        "errors":[
            "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
        ],
        "visible":true
    }

    场景四:获取Topic元数据超时

    错误信息:

    org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

    解决方法:出现此报错的原因是您在启动Connector任务前未按照命名规则提前创建数据库目标Topic{database.server.name}和表格目标Topic{database.server.name}.{databaseName}.{tableName},建议您停用任务后创建目标Topic再启用任务。

  • 本页导读 (1)