本文介绍如何创建MongoDB Connector,将MongoDB数据库和云消息队列 Kafka 版的数据进行双向同步。
前提条件
-
已开通对象存储OSS服务并创建存储空间(Bucket)。更多信息,请参见控制台创建存储空间。
-
已开通Serverless应用引擎服务,更多信息,请参见准备工作。
-
已创建专有网络及交换机。更多信息,请参见搭建IPv4专有网络。
-
已购买并部署云消息队列 Kafka 版实例。更多信息,请参见购买并部署实例。
步骤一:创建数据表
-
登录云数据库MongoDB管理控制台,创建实例或使用已有实例。以创建分片集群实例为例,详细步骤,请参见创建分片集群实例。
重要-
创建实例或使用已有实例时,请保证实例与前提条件中云消息队列 Kafka 版实例使用相同的VPC,否则将会导致链路不通。
-
创建实例时会自动创建root用户名,设置密码时请勿包含at(@)或冒号(:)。
-
创建实例时,请选择与云消息队列 Kafka 版实例相同的vSwitch,若使用已有实例,请检查vSwitch是否相同,若不在同一vSwitch且在同一VPC下,可以将Kafka实例的vSwitch网段添加至MongoDB访问白名单中。详情请见设置白名单。vSwitch的网段信息可以在专有网络控制台的交换机详情页面获取。
-
-
实例创建完成后,在实例列表页面单击目标实例,然后在实例详情页面的左侧导航栏,完成以下操作。
-
将DMS服务器的IP地址加入白名单。更多信息,请参见设置白名单。
-
在基本信息页面的连接信息区域,记录专有网络的连接地址。
连接地址为 Connection String URI 格式,形如
mongodb://root:xxxxxx@<host1>:<port1>,<host2>:<port2>/admin,其中xxxxxx需替换为 root 账号的密码。为保障鉴权成功,请安装与 MongoDB 实例版本相对应的 Mongo Shell 版本。
-
-
在基本信息页面,单击登录数据库进入DMS数据管理服务平台。更多信息,请参见通过DMS连接MongoDB分片集群实例。
-
在目标实例中创建数据库和集合。
-
在SQL Console页面中,执行以下命令,创建test数据库。
use test -
在test数据库中执行以下命令,创建名为mongo的集合。
db.createCollection("mongo")
更多信息,请参见创建数据库和集合并写入数据。
-
步骤二:创建Connector
Source Connector
-
下载MongoDB Connector1.8.1版本文件,解压至本地。
重要下载MongoDB Connector文件时请选择适配Java 8的版本。
. ├── assets │ ├── mongodb-leaf.png │ └── mongodb-logo.png ├── doc │ ├── LICENSE.txt │ └── README.md ├── etc │ ├── MongoSinkConnector.properties │ └── MongoSourceConnector.properties ├── lib │ └── mongo-kafka-connect-1.9.1-confluent.jar └── manifest.json -
从Maven仓库中下载avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,将这两个jar包移动至MongoDB Connector文件夹中的lib目录下,然后将其压缩成.ZIP文件,上传至提前创建好的OSS Bucket。更多信息,请参见上传文件。
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
-
在左侧导航栏,选择。
-
在任务列表页面,单击创建任务。
-
在创建任务面板。设置任务名称,配置以下配置项。
-
任务创建
-
在Source(源)配置向导,选择数据提供方为Apache Kafka Connect,单击下一步。
-
在连接器配置配置向导,设置以下配置项,然后单击下一步。
配置项
参数
说明
Kafka Connect插件
Bucket存储桶
选择OSS Bucket。
文件
选择上传的.ZIP文件。
Kafka资源信息
Kafka参数配置
选择Source Connect。
Kafka实例
选择前提条件中创建的实例。
专有网络VPC
选择VPC ID。
交换机
选择vSwitch ID。
安全组
选择安全组。
Kafka Connect配置信息
解析当前ZIP包下的properties文件
选择新建 properties 文件。选择.ZIP文件中包含的SourceConnector对应的.properties文件。路径为/etc/MongoSourceConnector.properties。
Connector全量参数,请参见MongoDB Source Connector Properties。
-
在实例配置配置向导,设置以下参数,然后单击下一步。
配置项
参数
说明
Worker规格
Worker 规格
选择合适的Worker规格。
最小 Worker 数
设置为1。
最大 Worker 数
设置为1。
Kafka Connect Worker配置
自动创建Kafka Connect Worker依赖资源
建议勾选此项,此时会在选择的Kafka实例中自动创建Kafka Connect运行所需的一些Internal Topic以及ConsumerGroup,并将这些必填配置自动填入配置框中,包括以下配置项:
-
Offset Topic:用于存储源数据偏移量,命名规则为
connect-eb-offset-<任务名称>。 -
Config Topic:用于存储Connectors以及Tasks的配置信息,命名规则为
connect-eb-config-<任务名称>。 -
Status Topic:用于存储Connectors以及Tasks状态信息,命名规则为
connect-eb-status-<任务名称>。 -
Kafka Connect Consumer Group:Kafka Connect Worker用于消费Internal Topics的消费组,命名规则为
connect-eb-cluster-<任务名称>。 -
Kafka Source Connector Consumer Group:只针对Sink Connector有效,用于消费源Kafka Topic中的数据,命名规则为
connector-eb-cluster-<任务名称>-<connector名称>。
-
-
在运行配置配置向导,将日志投递方式设置为投递至 SLS或者投递至 Kafka,在角色授权卡片设置Connect依赖的角色配置,然后单击保存。
重要建议配置的角色包含AliyunSAEFullAccess权限,否则可能会导致任务运行失败。
-
-
任务属性
设置此任务的重试策略及死信队列。更多信息,请参见重试和死信。
等待任务状态变为运行中,此时Connector已经在正常工作中。
-
Sink Connector
-
下载MongoDB Connector1.8.1版本文件,解压至本地。
重要下载MongoDB Connector文件时请选择适配Java 8的版本。
. ├── assets │ ├── mongodb-leaf.png │ └── mongodb-logo.png ├── doc │ ├── LICENSE.txt │ └── README.md ├── etc │ ├── MongoSinkConnector.properties │ └── MongoSourceConnector.properties ├── lib │ └── mongo-kafka-connect-1.9.1-confluent.jar └── manifest.json -
从Maven仓库中下载avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,将这两个jar包移动至MongoDB Connector文件夹中的lib目录下,然后将其压缩成.ZIP文件,上传至提前创建好的OSS Bucket。更多信息,请参见上传文件。
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
-
在左侧导航栏,选择。
-
在任务列表页面,单击创建任务。
-
在创建任务面板。设置任务名称,配置以下配置项。
-
任务创建
-
在Source(源)配置向导,选择数据提供方为Apache Kafka Connect,单击下一步。
-
在连接器配置配置向导,设置以下配置项,然后单击下一步。
配置项
参数
说明
Kafka Connect插件
Bucket存储桶
选择OSS Bucket。
文件
选择上传的.ZIP文件。
Kafka资源信息
Kafka参数配置
选择Sink Connect。
Kafka实例
选择前提条件中创建的实例。
专有网络VPC
选择VPC ID。
交换机
选择vSwitch ID。
安全组
选择安全组。
Kafka Connect配置信息
解析当前ZIP包下的properties文件
选择新建 properties 文件。选择.ZIP文件中包含的SinkConnector对应的.properties文件。路径为/etc/MongoSinkConnector.properties。
Connector全量参数,请参见MongoDB Sink Connector Properties。
-
在实例配置配置向导,设置以下参数,然后单击下一步。
配置项
参数
说明
Worker规格
Worker 规格
选择合适的Worker规格。
最小 Worker 数
设置为1。
最大 Worker 数
设置为1。此值不能超过tasks.max的取值。
Kafka Connect Worker 配置
自动创建Kafka Connect Worker依赖资源
建议勾选此项,此时会在选择的Kafka实例中自动创建Kafka Connect运行所需的一些Internal Topic以及ConsumerGroup,并将这些必填配置自动填入配置框中,包括以下配置项:
-
Offset Topic:用于存储源数据偏移量,命名规则为
connect-eb-offset-<任务名称>。 -
Config Topic:用于存储Connectors以及Tasks的配置信息,命名规则为
connect-eb-config-<任务名称>。 -
Status Topic:用于存储Connectors以及Tasks状态信息,命名规则为
connect-eb-status-<任务名称>。 -
Kafka Connect Consumer Group:Kafka Connect Worker用于消费Internal Topics的消费组,命名规则为
connect-eb-cluster-<任务名称>。 -
Kafka Source Connector Consumer Group:只针对Sink Connector有效,用于消费源Kafka Topic中的数据,命名规则为
connector-eb-cluster-<任务名称>-<connector名称>。
-
-
在运行配置配置向导,将日志投递方式设置为投递至 SLS或者投递至 Kafka,在角色授权卡片设置Connect依赖的角色配置,然后单击保存。
重要建议配置的角色包含AliyunSAEFullAccess权限,否则可能会导致任务运行失败。
-
-
任务属性
设置此任务的重试策略及死信队列。更多信息,请参见重试和死信。
等待任务状态变为运行中,此时Connector已经在正常工作中。
-
步骤三:测试Connector
Source Connector
-
在DMS数据管理服务平台,向步骤一:创建数据表中创建的Connection插入一条数据。例如,插入一条Key为testKey,Value为testValue的数据,命令如下。
db.testCollection.insert({"testKey":"testValue"}) -
登录云消息队列 Kafka 版控制台,在实例列表页面,单击目标实例。
-
在目标实例页面,单击目标Topic,然后单击消息查询,查看插入的消息数据,消息Value示例如下。
{"_id": {"_data": "826464A63D000000012B022C0100296E5A1004CB11AB15FD6D4C409E37370B43A4B82246645F696400646464A624458CE6B7B626645B****"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1684317757, "i": 1}}, "fullDocument": {"_id": {"$oid": "6464a624458ce6b7b626****"}, "testKey": "testValue"}, "ns": {"db": "test", "coll": "mongo"}, "documentKey": {"_id": {"$oid": "6464a624458ce6b7b626****"}}}
Sink Connector
-
登录云消息队列 Kafka 版控制台,在实例列表页面,单击目标实例。
-
在左侧导航栏,单击Topic管理,然后单击目标Topic。
-
在Topic详情页面右上角,单击体验发送消息。
-
在快速体验消息收发面板,设置消息内容。例如在目标表格中添加一条Key为Key1,Value为Value1的数据,消息内容如下。
{"key1": "value1"} -
在DMS数据管理服务平台,执行以下命令,查看目标集合中接收的数据。
db.mongo.find()接收数据示例如下:
{ "_id":"ObjectId("643f4d5551daf4552246****")" "_insertedTS":"ISODate("2023-05-18T02:09:25.314Z")" "_modifiedTS":"ISODate("2023-05-18T02:09:25.314Z")" "key1":"value1" }
常见报错
场景一:所有Tasks运行失败
错误信息:
All tasks under connector mongo-source failed, please check the error trace of the task.
解决方法:在消息流入任务详情页面,单击基础信息区域的诊断链接,即可跳转到Connector监控页面,可以看到Tasks运行失败的详细错误信息。
场景二:Kafka Connect退出
错误信息:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.
解决方法:由于状态获取可能会有延迟,建议您先尝试刷新页面。若刷新后仍然是失败状态,您可以按照以下步骤查看错误信息。
-
在消息流入任务详情页面的Worker 信息区域,单击SAE 应用后的实例名称,跳转到SAE应用详情页面。
-
在基本信息页面,单击实例部署信息页签。
-
在实例右侧操作列,单击Webshell登录Kafka Connect运行环境。在实例列表中找到目标实例,单击其操作列的 Webshell 按钮,进入实例终端。
-
执行
vi /home/admin/connector-bootstrap.log命令,查看Connector启动日志,查找其中是否包含错误信息。 -
执行
vi /opt/kafka/logs/connect.log命令,查看Connector运行日志,在其中查找ERROR或者WARN字段来查看是否有错误信息。
-
基于错误信息提示进行修复操作后,可以重新启动对应任务。
场景三:Connector参数校验失败
错误信息:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
解决方法:此时需要根据错误信息,找出具体哪个参数出错,更新对应参数即可。若基于上述错误信息无法定位具体的出错参数,可以参考上文场景二中的步骤登录Kafka Connect运行环境,执行以下命令,查询参数是否校验通过。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate
该指令会返回Connector参数中每个参数是否校验通过,若不通过,则errors属性非空。
"value":{
"name":"snapshot.mode",
"value":null,
"recommended_values":[
"never",
"initial_only",
"when_needed",
"initial",
"schema_only",
"schema_only_recovery"
],
"errors":[
"Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
],
"visible":true
}
场景四:无法连接服务器
错误信息:
Start or update connector mongo-source failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):Unable to connect to the server.
解决方法:请查看Connector的配置信息是否正确,然后检查MongoDB实例与Kafka实例是否在同一个VPC及vSwitch下,如果不在一个vSwitch但是在同一VPC下,那么需要将Kafka实例所在的交换机网段加入MongoDB实例的白名单。
场景五:数据库用户名或者密码信息包含特殊字符
错误信息:
The connection string contains invalid user information. If the username or password contains a colon (:) or an at-sign (@) then it must be urlencoded\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
解决方法:请检查MongoDB账户名和密码中是否包含有at(@)或冒号(:),如有请转义对应的符号。at(@)和冒号(:)进行16进制URL编码后分别为%40和%3A。