本文介绍如何创建MongoDB Connector,将MongoDB数据库和云消息队列 Kafka 版的数据进行双向同步。
前提条件
- 开通事件总线EventBridge并授权。
- 已开通对象存储OSS服务并创建存储空间(Bucket)。更多信息,请参见控制台创建存储空间。
- 已开通Serverless应用引擎服务,更多信息,请参见准备工作。
- 已创建专有网络及交换机。更多信息,请参见步骤一:创建专有网络和交换机。
- 已购买并部署云消息队列 Kafka 版实例。更多信息,请参见购买并部署实例。
步骤一:创建数据表
- 登录云数据库MongoDB管理控制台,创建实例或使用已有实例。以创建分片集群实例为例,详细步骤,请参见创建分片集群实例。重要
- 创建实例或使用已有实例时,请保证实例与前提条件中云消息队列 Kafka 版实例使用相同的VPC,否则将会导致链路不通。
- 创建实例时会自动创建root用户名,设置密码时请勿包含at(@)或冒号(:)。
- 创建实例时,请选择与云消息队列 Kafka 版实例相同的vSwitch,若使用已有实例,请检查vSwitch是否相同,若不在同一vSwitch且在同一VPC下,可以将Kafka实例的vSwitch网段添加至MongoDB访问白名单中。详情请见设置白名单。vSwitch的网段信息可以在专有网络控制台的交换机详情页面获取。
- 实例创建完成后,在实例列表页面单击目标实例,然后在实例详情页面的左侧导航栏,完成以下操作。
- 在基本信息页面,单击登录数据库进入DMS数据管理服务平台。更多信息,请参见通过DMS连接MongoDB分片集群实例。
- 在目标实例中创建数据库和集合。
- 在SQL Console页面中,执行以下命令,创建test数据库。
use test
- 在test数据库中执行以下命令,创建名为mongo的集合。
db.createCollection("mongo")
更多信息,请参见创建数据库和集合并写入数据。
- 在SQL Console页面中,执行以下命令,创建test数据库。
步骤二:创建Connector
Source Connector
- 下载MongoDB Connector1.8.1版本文件,解压至本地,目录结构如下所示。
. ├── assets │ ├── mongodb-leaf.png │ └── mongodb-logo.png ├── doc │ ├── LICENSE.txt │ └── README.md ├── etc │ ├── MongoSinkConnector.properties │ └── MongoSourceConnector.properties ├── lib │ └── mongo-kafka-connect-1.9.1-confluent.jar └── manifest.json
- 从Maven仓库中下载avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,将这两个jar包移动至MongoDB Connector文件夹中的lib目录下,然后将其压缩成.ZIP文件,上传至提前创建好的OSS Bucket。更多信息,请参见控制台上传文件。
- 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
- 在左侧导航栏,选择 。
- 在消息流入(Source)页面,单击创建任务。
- 在消息流入创建面板。设置任务名称,选择流入类型为Apache Kafka Connect。等待任务状态变为运行中,此时Connector已经在正常工作中。
Sink Connector
- 下载MongoDB Connector1.8.1版本文件,解压至本地,目录结构如下所示。
. ├── assets │ ├── mongodb-leaf.png │ └── mongodb-logo.png ├── doc │ ├── LICENSE.txt │ └── README.md ├── etc │ ├── MongoSinkConnector.properties │ └── MongoSourceConnector.properties ├── lib │ └── mongo-kafka-connect-1.9.1-confluent.jar └── manifest.json
- 从Maven仓库中下载avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,将这两个jar包移动至MongoDB Connector文件夹中的lib目录下,然后将其压缩成.ZIP文件,上传至提前创建好的OSS Bucket。更多信息,请参见控制台上传文件。
- 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
- 在左侧导航栏,选择 。
- 在消息流出(Sink)页面,单击创建任务。
- 在消息流出创建面板。设置任务名称,选择流出类型为Apache Kafka Connect。等待任务状态变为运行中,此时Connector已经在正常工作中。
步骤三:测试Connector
Source Connector
- 在DMS数据管理服务平台,向步骤一:创建数据表中创建的Connection插入一条数据。例如,插入一条Key为testKey,Value为testValue的数据,命令如下。
db.testCollection.insert({"testKey":"testValue"})
- 登录云消息队列 Kafka 版控制台,在实例列表页面,单击目标实例。
- 在目标实例页面,单击目标Topic,然后单击消息查询,查看插入的消息数据,消息Value示例如下。
{"_id": {"_data": "826464A63D000000012B022C0100296E5A1004CB11AB15FD6D4C409E37370B43A4B82246645F696400646464A624458CE6B7B626645B****"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1684317757, "i": 1}}, "fullDocument": {"_id": {"$oid": "6464a624458ce6b7b626****"}, "testKey": "testValue"}, "ns": {"db": "test", "coll": "mongo"}, "documentKey": {"_id": {"$oid": "6464a624458ce6b7b626****"}}}
Sink Connector
- 登录云消息队列 Kafka 版控制台,在实例列表页面,单击目标实例。
- 在左侧导航栏,单击Topic管理,然后单击目标Topic。
- 在Topic详情页面右上角,单击体验发送消息。
- 在快速体验消息收发面板,设置消息内容。例如在目标表格中添加一条Key为Key1,Value为Value1的数据,消息内容如下。
{"key1": "value1"}
- 在DMS数据管理服务平台,执行以下命令,查看目标集合中接收的数据。
db.mongo.find()
接收数据示例如下:{ "_id":"ObjectId("643f4d5551daf4552246****")" "_insertedTS":"ISODate("2023-05-18T02:09:25.314Z")" "_modifiedTS":"ISODate("2023-05-18T02:09:25.314Z")" "key1":"value1" }
常见报错
场景一:所有Tasks运行失败
错误信息:
All tasks under connector mongo-source failed, please check the error trace of the task.
解决方法:在消息流入任务详情页面,单击基础信息区域的诊断链接,即可跳转到Connector监控页面,可以看到Tasks运行失败的详细错误信息。
场景二:Kafka Connect退出
错误信息:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.
解决方法:由于状态获取可能会有延迟,建议您先尝试刷新页面。若刷新后仍然是失败状态,您可以按照以下步骤查看错误信息。
- 在消息流入任务详情页面的Worker信息区域,单击SAE应用后的实例名称,跳转到SAE应用详情页面。
- 在基本信息页面,单击实例部署信息页签。
- 在实例右侧操作列,单击Webshell登录Kafka Connect运行环境。
- 执行
vi /home/admin/connector-bootstrap.log
命令,查看Connector启动日志,查找其中是否包含错误信息。 - 执行
vi /opt/kafka/logs/connect.log
命令,查看Connector运行日志,在其中查找ERROR或者WARN字段来查看是否有错误信息。
- 执行
基于错误信息提示进行修复操作后,可以重新启动对应任务。
场景三:Connector参数校验失败
错误信息:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
解决方法:此时需要根据错误信息,找出具体哪个参数出错,更新对应参数即可。若基于上述错误信息无法定位具体的出错参数,可以参考上文场景二中的步骤登录Kafka Connect运行环境,执行以下命令,查询参数是否校验通过。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate
该指令会返回Connector参数中每个参数是否校验通过,若不通过,则errors属性非空,如下所示。"value":{
"name":"snapshot.mode",
"value":null,
"recommended_values":[
"never",
"initial_only",
"when_needed",
"initial",
"schema_only",
"schema_only_recovery"
],
"errors":[
"Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
],
"visible":true
}
场景四:无法连接服务器
错误信息:
Start or update connector mongo-source failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):Unable to connect to the server.
解决方法:请查看Connector的配置信息是否正确,然后检查MongoDB实例与Kafka实例是否在同一个VPC及vSwitch下,如果不在一个vSwitch但是在同一VPC下,那么需要将Kafka实例所在的交换机网段加入MongoDB实例的白名单。
场景五:数据库用户名或者密码信息包含特殊字符
错误信息:
The connection string contains invalid user information. If the username or password contains a colon (:) or an at-sign (@) then it must be urlencoded\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
解决方法:请检查MongoDB账户名和密码中是否包含有at(@)或冒号(:),如有请转义对应的符号。at(@)和冒号(:)进行16进制URL编码后分别为%40
和%3A
。