使用MongoDB Connector连接MongoDB数据库

本文介绍如何创建MongoDB Connector,将MongoDB数据库和云消息队列 Kafka 版的数据进行双向同步。

前提条件

步骤一:创建数据表

  1. 登录云数据库MongoDB管理控制台,创建实例或使用已有实例。以创建分片集群实例为例,详细步骤,请参见创建分片集群实例

    重要
    • 创建实例或使用已有实例时,请保证实例与前提条件中云消息队列 Kafka 版实例使用相同的VPC,否则将会导致链路不通。

    • 创建实例时会自动创建root用户名,设置密码时请勿包含at(@)或冒号(:)。

    • 创建实例时,请选择与云消息队列 Kafka 版实例相同的vSwitch,若使用已有实例,请检查vSwitch是否相同,若不在同一vSwitch且在同一VPC下,可以将Kafka实例的vSwitch网段添加至MongoDB访问白名单中。详情请见设置白名单。vSwitch的网段信息可以在专有网络控制台的交换机详情页面获取。

  2. 实例创建完成后,在实例列表页面单击目标实例,然后在实例详情页面的左侧导航栏,完成以下操作。

    1. 将DMS服务器的IP地址加入白名单。更多信息,请参见设置白名单

    2. 基本信息页面的连接信息区域,记录专有网络的连接地址。

      连接地址
  3. 基本信息页面,单击登录数据库进入DMS数据管理服务平台。更多信息,请参见通过DMS连接MongoDB分片集群实例

  4. 在目标实例中创建数据库和集合。

    • SQL Console页面中,执行以下命令,创建test数据库。

      use test
    • 在test数据库中执行以下命令,创建名为mongo的集合。

      db.createCollection("mongo")

    更多信息,请参见创建数据库和集合并写入数据

步骤二:创建Connector

Source Connector

  1. 下载MongoDB Connector1.8.1版本文件,解压至本地,目录结构如下所示。

    重要

    下载MongoDB Connector文件时请选择适配Java 8的版本。

    .
    ├── assets
    │   ├── mongodb-leaf.png
    │   └── mongodb-logo.png
    ├── doc
    │   ├── LICENSE.txt
    │   └── README.md
    ├── etc
    │   ├── MongoSinkConnector.properties
    │   └── MongoSourceConnector.properties
    ├── lib
    │   └── mongo-kafka-connect-1.9.1-confluent.jar
    └── manifest.json
  2. Maven仓库中下载avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,将这两个jar包移动至MongoDB Connector文件夹中的lib目录下,然后将其压缩成.ZIP文件,上传至提前创建好的OSS Bucket。更多信息,请参见控制台上传文件

  3. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。

  4. 在左侧导航栏,选择Connector生态集成 > 任务列表

  5. 任务列表页面,单击创建任务列表

  6. 创建任务面板。设置任务名称,配置以下配置项。

    • 任务创建

      1. Source(源)配置向导,选择数据提供方Apache Kafka Connect,单击下一步

      2. 连接器配置配置向导,设置以下配置项,然后单击下一步

        配置项

        参数

        说明

        Kafka Connect插件

        Bucket存储桶

        选择OSS Bucket。

        文件

        选择上传的.ZIP文件。

        Kafka资源信息

        Kafka参数配置

        选择Source Connect。

        Kafka实例

        选择前提条件中创建的实例。

        专有网络VPC

        选择VPC ID。

        交换机

        选择vSwitch ID。

        安全组

        选择安全组。

        Kafka Connect配置信息

        解析当前ZIP包下的properties文件

        选择新建properties文件。选择.ZIP文件中包含的SourceConnector对应的.properties文件。路径为/etc/MongoSourceConnector.properties。

        在输入框中更新相关字段的取值。展开查看字段描述

        字段名

        描述

        connector.class

        运行的Connector包名称,无需修改。

        tasks.max

        Task的最大数量。在MongoDB Source Connector中此参数取值只能为1。

        connection.url

        填写步骤一:创建数据表中获取的MongoDB数据库的专有网络连接地址。地址中的****需替换为root账号的密码。

        database

        MongoDB数据库名称。

        collection

        MongoDB集合名称。

        topic.namespace.map

        目标Topic信息,为Key-Value 结构,Key的组成为database.{collection},Value为目标Topic名称,此参数表示指定Collection的数据变化会传输至指定Topic中。在投递数据前,请提前创建好目标Topic。

        copy.existing

        是否将源MongoDB Collection中的已存在数据全量同步至Kafka中。若为true,则在Connector首次启动时,就会将MongoDB Collection中的存量数据全量同步至下游Kafka Topic中。建议在首次全量同步完成后,将该值更新为false,防止后续将Connector删除重建后,再次进行数据全量同步,导致重复消费。

        展开查看示例代码

        connector.class=com.mongodb.kafka.connect.MongoSourceConnector
        name=mongo-source
        batch.size=0
        change.stream.full.document=updateLookup
        collection=testCollection
        connection.uri=mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
        database=testDatabase
        poll.await.time.ms=5000
        poll.max.batch.size=1000
        tasks.max=1
        topic.namespace.map={"testDatabase.testCollection": "targetTopic"}
        # 是否将源MongoDB Collection中的已存在数据全量同步至Kafka中。
        # 若为true,则在Connector首次启动时,就会将MongoDB Collection中的存量数据全量同步至下游Kafka Topic中。
        # 建议在首次全量同步完成后,将该值更新为false,防止后续将Connector删除重建后,再次进行数据全量同步,导致重复消费。
        copy.existing=true

        Connector全量参数,请参见MongoDB Source Connector Properties

      3. 实例配置配置向导,设置以下参数,然后单击下一步

        配置项

        参数

        说明

        Worker规格

        Worker规格

        选择合适的Worker规格。

        最小Worker数

        设置为1。

        最大Worker数

        设置为1。

        Kafka Connect Worker配置

        自动创建Kafka Connect Worker依赖资源

        建议勾选此项,此时会在选择的Kafka实例中自动创建Kafka Connect运行所需的一些Internal Topic以及ConsumerGroup,并将这些必填配置自动填入配置框中,包括以下配置项:

        • Offset Topic:用于存储源数据偏移量,命名规则为connect-eb-offset-<任务名称>

        • Config Topic:用于存储Connectors以及Tasks的配置信息,命名规则为connect-eb-config-<任务名称>

        • Status Topic:用于存储Connectors以及Tasks状态信息,命名规则为connect-eb-status-<任务名称>

        • Kafka Connect Consumer Group:Kafka Connect Worker用于消费Internal Topics的消费组,命名规则为connect-eb-cluster-<任务名称>

        • Kafka Source Connector Consumer Group:只针对Sink Connector有效,用于消费源Kafka Topic中的数据,命名规则为connector-eb-cluster-<任务名称>-<connector名称>

      4. 运行配置配置向导,将日志投递方式设置为投递至SLS或者投递至Kafka,在角色授权卡片设置Connect依赖的角色配置,然后单击保存

        重要

        建议配置的角色包含AliyunSAEFullAccess权限,否则可能会导致任务运行失败。

    • 任务属性

      设置此任务的重试策略及死信队列。更多信息,请参见重试和死信

    等待任务状态变为运行中,此时Connector已经在正常工作中。

Sink Connector

  1. 下载MongoDB Connector1.8.1版本文件,解压至本地,目录结构如下所示。

    重要

    下载MongoDB Connector文件时请选择适配Java 8的版本。

    .
    ├── assets
    │   ├── mongodb-leaf.png
    │   └── mongodb-logo.png
    ├── doc
    │   ├── LICENSE.txt
    │   └── README.md
    ├── etc
    │   ├── MongoSinkConnector.properties
    │   └── MongoSourceConnector.properties
    ├── lib
    │   └── mongo-kafka-connect-1.9.1-confluent.jar
    └── manifest.json
  2. Maven仓库中下载avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,将这两个jar包移动至MongoDB Connector文件夹中的lib目录下,然后将其压缩成.ZIP文件,上传至提前创建好的OSS Bucket。更多信息,请参见控制台上传文件

  3. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。

  4. 在左侧导航栏,选择Connector生态集成 > 任务列表

  5. 任务列表页面,单击创建任务列表

  6. 创建任务面板。设置任务名称,配置以下配置项。

    • 任务创建

      1. Source(源)配置向导,选择数据提供方Apache Kafka Connect,单击下一步

      2. 连接器配置配置向导,设置以下配置项,然后单击下一步

        配置项

        参数

        说明

        Kafka Connect插件

        Bucket存储桶

        选择OSS Bucket。

        文件

        选择上传的.ZIP文件。

        Kafka资源信息

        Kafka参数配置

        选择Sink Connect。

        Kafka实例

        选择前提条件中创建的实例。

        专有网络VPC

        选择VPC ID。

        交换机

        选择vSwitch ID。

        安全组

        选择安全组。

        Kafka Connect配置信息

        解析当前ZIP包下的properties文件

        选择新建properties文件。选择.ZIP文件中包含的SinkConnector对应的.properties文件。路径为/etc/MongoSinkConnector.properties。

        在输入框中更新相关字段的取值。展开查看字段描述

        字段名

        描述

        connector.class

        运行的Connector包名称,无需修改。

        tasks.max

        Task的最大数量。

        topics

        数据源Topic名称。不同Topic间以英文逗号(,)分隔。

        connection.url

        数据库连接地址。填写步骤一:创建数据表中获取的专有网络连接地址。地址中的****需替换为root账号的密码。

        database

        目标MongoDB数据库名称。

        connection

        目标MongoDB集合名称。

        展开查看示例代码

        connector.class=com.mongodb.kafka.connect.MongoSinkConnector
        name=mongo-sink
        collection=testCollection
        connection.uri=mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
        database=testDatabase
        delete.on.null.values=false
        key.converter=org.apache.kafka.connect.storage.StringConverter
        key.converter.schemas.enable=false
        max.batch.size=0
        rate.limiting.every.n=0
        rate.limiting.timeout=0
        tasks.max=2
        topics=testTopic
        value.converter=org.apache.kafka.connect.storage.StringConverter
        value.converter.schemas.enable=true
        writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

        Connector全量参数,请参见MongoDB Sink Connector Properties

      3. 实例配置配置向导,设置以下参数,然后单击下一步

        配置项

        参数

        说明

        Worker规格

        Worker规格

        选择合适的Worker规格。

        最小Worker数

        设置为1。

        最大Worker数

        设置为1。此值不能超过tasks.max的取值。

        Kafka Connect Worker 配置

        自动创建Kafka Connect Worker依赖资源

        建议勾选此项,此时会在选择的Kafka实例中自动创建Kafka Connect运行所需的一些Internal Topic以及ConsumerGroup,并将这些必填配置自动填入配置框中,包括以下配置项:

        • Offset Topic:用于存储源数据偏移量,命名规则为connect-eb-offset-<任务名称>

        • Config Topic:用于存储Connectors以及Tasks的配置信息,命名规则为connect-eb-config-<任务名称>

        • Status Topic:用于存储Connectors以及Tasks状态信息,命名规则为connect-eb-status-<任务名称>

        • Kafka Connect Consumer Group:Kafka Connect Worker用于消费Internal Topics的消费组,命名规则为connect-eb-cluster-<任务名称>

        • Kafka Source Connector Consumer Group:只针对Sink Connector有效,用于消费源Kafka Topic中的数据,命名规则为connector-eb-cluster-<任务名称>-<connector名称>

      4. 运行配置配置向导,将日志投递方式设置为投递至SLS或者投递至Kafka,在角色授权卡片设置Connect依赖的角色配置,然后单击保存

        重要

        建议配置的角色包含AliyunSAEFullAccess权限,否则可能会导致任务运行失败。

    • 任务属性

      设置此任务的重试策略及死信队列。更多信息,请参见重试和死信

    等待任务状态变为运行中,此时Connector已经在正常工作中。

步骤三:测试Connector

Source Connector

  1. 在DMS数据管理服务平台,向步骤一:创建数据表中创建的Connection插入一条数据。例如,插入一条Key为testKey,Value为testValue的数据,命令如下。

    db.testCollection.insert({"testKey":"testValue"})
  2. 登录云消息队列 Kafka 版控制台,在实例列表页面,单击目标实例。

  3. 在目标实例页面,单击目标Topic,然后单击消息查询,查看插入的消息数据,消息Value示例如下。

    {"_id": {"_data": "826464A63D000000012B022C0100296E5A1004CB11AB15FD6D4C409E37370B43A4B82246645F696400646464A624458CE6B7B626645B****"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1684317757, "i": 1}}, "fullDocument": {"_id": {"$oid": "6464a624458ce6b7b626****"}, "testKey": "testValue"}, "ns": {"db": "test", "coll": "mongo"}, "documentKey": {"_id": {"$oid": "6464a624458ce6b7b626****"}}}

Sink Connector

  1. 登录云消息队列 Kafka 版控制台,在实例列表页面,单击目标实例。

  2. 在左侧导航栏,单击Topic管理,然后单击目标Topic。

  3. 在Topic详情页面右上角,单击体验发送消息

  4. 快速体验消息收发面板,设置消息内容。例如在目标表格中添加一条Key为Key1,Value为Value1的数据,消息内容如下。

    {"key1": "value1"}
  5. 在DMS数据管理服务平台,执行以下命令,查看目标集合中接收的数据。

    db.mongo.find()

    接收数据示例如下:

    {
        "_id":"ObjectId("643f4d5551daf4552246****")"
        "_insertedTS":"ISODate("2023-05-18T02:09:25.314Z")"
        "_modifiedTS":"ISODate("2023-05-18T02:09:25.314Z")"
        "key1":"value1"
    }

常见报错

场景一:所有Tasks运行失败

错误信息:

All tasks under connector mongo-source failed, please check the error trace of the task.

解决方法:在消息流入任务详情页面,单击基础信息区域的诊断链接,即可跳转到Connector监控页面,可以看到Tasks运行失败的详细错误信息。

场景二:Kafka Connect退出

错误信息:

Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.

解决方法:由于状态获取可能会有延迟,建议您先尝试刷新页面。若刷新后仍然是失败状态,您可以按照以下步骤查看错误信息。

  1. 在消息流入任务详情页面的Worker信息区域,单击SAE应用后的实例名称,跳转到SAE应用详情页面。

  2. 基本信息页面,单击实例部署信息页签。

  3. 在实例右侧操作列,单击Webshell登录Kafka Connect运行环境。实例部署信息

    • 执行vi /home/admin/connector-bootstrap.log命令,查看Connector启动日志,查找其中是否包含错误信息。

    • 执行vi /opt/kafka/logs/connect.log命令,查看Connector运行日志,在其中查找ERROR或者WARN字段来查看是否有错误信息。

基于错误信息提示进行修复操作后,可以重新启动对应任务。

场景三:Connector参数校验失败

错误信息:

Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

解决方法:此时需要根据错误信息,找出具体哪个参数出错,更新对应参数即可。若基于上述错误信息无法定位具体的出错参数,可以参考上文场景二中的步骤登录Kafka Connect运行环境,执行以下命令,查询参数是否校验通过。

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

该指令会返回Connector参数中每个参数是否校验通过,若不通过,则errors属性非空,如下所示。

"value":{
    "name":"snapshot.mode",
    "value":null,
    "recommended_values":[
        "never",
        "initial_only",
        "when_needed",
        "initial",
        "schema_only",
        "schema_only_recovery"
    ],
    "errors":[
        "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
    ],
    "visible":true
}

场景四:无法连接服务器

错误信息:

Start or update connector mongo-source failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):Unable to connect to the server.

解决方法:请查看Connector的配置信息是否正确,然后检查MongoDB实例与Kafka实例是否在同一个VPC及vSwitch下,如果不在一个vSwitch但是在同一VPC下,那么需要将Kafka实例所在的交换机网段加入MongoDB实例的白名单。

场景五:数据库用户名或者密码信息包含特殊字符

错误信息:

The connection string contains invalid user information. If the username or password contains a colon (:) or an at-sign (@) then it must be urlencoded\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

解决方法:请检查MongoDB账户名和密码中是否包含有at(@)或冒号(:),如有请转义对应的符号。at(@)和冒号(:)进行16进制URL编码后分别为%40%3A