本文介绍如何创建Debezium MySQL Source Connector,将MySQL的数据同步至云消息队列 Kafka 版。
前提条件
已开通对象存储OSS服务并创建存储空间(Bucket)。更多信息,请参见控制台创建存储空间。
已开通Serverless应用引擎服务。更多信息,请参见准备工作。
已创建专有网络及交换机。更多信息,请参见步骤一:创建专有网络和交换机。
已购买并部署云消息队列 Kafka 版实例。更多信息,请参见购买和部署实例。
步骤一:创建数据表
登录RDS管理控制台,创建RDS MySQL实例。更多信息,请参见快速创建RDS MySQL实例。
创建实例时,请选择与前提条件中已购买部署的Kafka实例相同的VPC,并将此VPC网段加入白名单。
实例创建完成后,在实例列表页面单击目标实例,然后在实例详情页面的左侧导航栏,完成以下操作。
在基本信息页面,单击登录数据库进入DMS数据管理服务平台,完成以下操作。
在左侧双击数据库名称,切换到已创建的数据库。
在SQL Console页签,使用SQL语句创建表格。例如,创建一个列参数分别为id和number的表格,命令如下。更多信息,请参见SQL Commands。
CREATE TABLE sql_table(id INT ,number INT);
步骤二:创建Connector任务
下载Debezium MySQL Source Connector文件,上传至提前创建好的OSS bucket,更多信息,请参见控制台上传文件。
重要下载Debezium MySQL Connector文件时请选择适配Java 8的版本。
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,选择 。
在任务列表页面,单击创建任务。
在创建任务面板,设置任务名称,选择流入类型为Apache Kafka Connect。
在连接器配置区域,设置以下参数。
配置项
参数
说明
Kafka Connect插件
Bucket存储桶
选择OSS Bucket。
文件
选择上传的.ZIP文件。
Kafka资源信息
Kafka实例
选择前提条件中创建的实例。
专有网络VPC
默认选择部署Kafka实例时选择的VPC ID且不可更改。
交换机
默认选择部署Kafka实例时选择的vSwitch ID且不可更改。
安全组
选择安全组。
Kafka Connect配置信息
解析当前ZIP包下的properties文件
选择.ZIP文件中包含的SourceConnector对应的.properties文件。路径为/etc/xxx.properties。在输入框中更新下列字段的取值。
tasks.max:Task的最大数量。该场景下只支持设置为1。
database.hostname:填写步骤一:创建数据表中获取的源数据库内网地址。
database.port:填写步骤一:创建数据表中获取的端口号。
database.user:数据库登录账号。
database.password:数据库登录密码。
database.server.name:MySQL数据库服务的逻辑名称。
仅允许包含英文字母、数字以及下划线(_)。
该字段用于组成数据库及表格的目标Topic的名称,数据库目标Topic的命名规则为
{database.server.name}
,表格目标Topic的命名规则为{database.server.name}.{databaseName}.{tableName}
。请在运行Connector任务前按照命名规则提前创建对应的目标Topic。
database.include.list:源数据库名称。若有多个数据库,用英文逗号(,)分隔。
table.include.list:源表格名称,单个表格的格式为
{databaseName}.{tableName}
;若有多个表格,用英文逗号(,)分隔。database.history.kafka.bootstrap.servers:Kafka实例连接地址。
该实例用于记录数据库所有Schema变动记录。
可使用Connector任务中已配置的目标Kafka实例,也可使用新实例。
实例需要与数据库实例处于同一个VPC中。
该实例需要开启自由使用Group能力。更多信息,请参见自由使用Group。
database.history.kafka.topic:此Topic用于记录数据库所有Schema的变动记录,请在运行Connector任务前提前创建。
include.schema.changes:是否监控Schema变动记录,若取值为true,则会将这些变动记录写入名为
{database.server.name}
的Topic中。
Connector全量参数,请参见Debezium Connector Properties。
在实例配置区域,设置以下参数。
配置项
参数
说明
Worker规格
Worker规格
选择合适的Worker规格。
最小Worker数
设置为1。
最大Worker数
设置为1。此数值不得超过Task的最大数量。
Kafka Connect Worker 配置
自动创建Kafka Connect Worker依赖资源
建议勾选此项,此时会在选择的Kafka实例中自动创建Kafka Connect运行所需的一些Internal Topic以及ConsumerGroup,并将这些必填配置自动填入配置框中,包括以下配置项:
Offset Topic:用于存储源数据偏移量,命名规则为
connect-eb-offset-<任务名称>
。Config Topic:用于存储Connectors以及Tasks的配置信息,命名规则为
connect-eb-config-<任务名称>
。Status Topic:用于存储Connectors以及Tasks状态信息,命名规则为
connect-eb-status-<任务名称>
。Kafka Connect Consumer Group:Kafka Connect Worker用于消费Internal Topics的消费组,命名规则为
connect-eb-cluster-<任务名称>
。Kafka Source Connector Consumer Group:只针对Sink Connector有效,用于消费源Topic中的数据,命名规则为
connector-eb-cluster-<任务名称>-<connector名称>
。
在运行配置区域,将日志投递方式设置为投递至SLS或者投递至Kafka,在角色授权卡片设置Connect依赖的角色配置。
重要建议配置的角色包含AliyunSAEFullAccess权限,否则可能会导致任务运行失败。
单击确定。
等待任务状态变为运行中,此时Connector已经在正常工作中。
步骤三:测试Connector任务
在DMS数据管理服务平台,向步骤一:创建数据表中创建的数据表插入一条数据。例如,插入一条id为123,number为20000的数据,命令如下。
INSERT INTO sql_table(id, number) VALUES(123,20000);
登录云消息队列 Kafka 版控制台,在实例列表页面,单击目标实例。
在目标实例页面,单击目标Topic,然后单击消息查询,查看插入的消息数据,消息Value示例如下。
数据库目标Topic(命名规则为
{database.server.name}
)示例:{ "source":{ "version":"1.5.0.Final", "connector":"mysql", "name":"fulfillment", "ts_ms":1686283675404, "snapshot":"true", "db":"wbdb", "sequence":null, "table":"sql_table", "server_id":0, "gtid":null, "file":"mysql-bin.000006", "pos":188032, "row":0, "thread":null, "query":null }, "databaseName":"wbdb", "schemaName":null, "ddl":"DROP TABLE IF EXISTS sql_table", "tableChanges":[ ] }
数据表格目标Topic(命名规则为
{database.server.name}.{databaseName}.{tableName}
)示例:{ "before":null, "after":{ "id":123, "number":20000 }, "source":{ "version":"1.5.0.Final", "connector":"mysql", "name":"fulfillment", "ts_ms":1686283675675, "snapshot":"last", "db":"wbdb", "sequence":null, "table":"sql_table", "server_id":0, "gtid":null, "file":"mysql-bin.000006", "pos":188032, "row":0, "thread":null, "query":null }, "op":"r", "ts_ms":1686283675675, "transaction":null }
database.history.kafka.topic字段指定的Topic示例:
{ "source":{ "server":"fulfillment" }, "position":{ "ts_sec":1686283675, "file":"mysql-bin.000006", "pos":188032, "gtids":"be4286e6-05ce-11ee-b8c2-00163e20****:1-****", "snapshot":true }, "databaseName":"wbdb", "ddl":"CREATE DATABASE `wbdb` CHARSET utf8 COLLATE utf8_general_ci", "tableChanges":[ ] }
常见报错
场景一:所有Tasks运行失败
错误信息:
All tasks under connector mongo-source failed, please check the error trace of the task.
解决方法:在消息流入任务详情页面,单击基础信息区域的诊断链接,即可跳转到Connector监控页面,可以看到Tasks运行失败的详细错误信息。
场景二:Kafka Connect退出
错误信息:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.
解决方法:由于状态获取可能会有延迟,建议您先尝试刷新页面。若刷新后仍然是失败状态,您可以按照以下步骤查看错误信息。
在消息流入任务详情页面的Worker信息区域,单击SAE应用后的实例名称,跳转到SAE应用详情页面。
在基本信息页面,单击实例部署信息页签。
在实例右侧操作列,单击Webshell登录Kafka Connect运行环境。
执行
vi /home/admin/connector-bootstrap.log
命令,查看Connector启动日志,查找其中是否包含错误信息。执行
vi /opt/kafka/logs/connect.log
命令,查看Connector运行日志,在其中查找ERROR或者WARN字段来查看是否有错误信息。
基于错误信息提示进行修复操作后,可以重新启动对应任务。
场景三:Connector参数校验失败
错误信息:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s): Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
解决方法:此时需要根据错误信息,找出具体哪个参数出错,更新对应参数即可。若基于上述错误信息无法定位具体的出错参数,可以参考上文场景二中的步骤登录Kafka Connect运行环境,执行以下命令,查询参数是否校验通过。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate
该指令会返回Connector参数中每个参数是否校验通过,若不通过,则errors属性非空,如下所示。
"value":{ "name":"snapshot.mode", "value":null, "recommended_values":[ "never", "initial_only", "when_needed", "initial", "schema_only", "schema_only_recovery" ], "errors":[ "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery" ], "visible":true }
场景四:获取Topic元数据超时
错误信息:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
解决方法:出现此报错的原因是您在启动Connector任务前未按照命名规则提前创建数据库目标Topic
{database.server.name}
和表格目标Topic{database.server.name}.{databaseName}.{tableName}
,建议您停用任务后创建目标Topic再启用任务。