本文介绍如何创建MongoDB Connector,将MongoDB数据库和云消息队列 Kafka 版的数据进行双向同步。

前提条件

步骤一:创建数据表

  1. 登录云数据库MongoDB管理控制台,创建实例或使用已有实例。以创建分片集群实例为例,详细步骤,请参见创建分片集群实例
    重要
    • 创建实例或使用已有实例时,请保证实例与前提条件中云消息队列 Kafka 版实例使用相同的VPC,否则将会导致链路不通。
    • 创建实例时会自动创建root用户名,设置密码时请勿包含at(@)或冒号(:)。
    • 创建实例时,请选择与云消息队列 Kafka 版实例相同的vSwitch,若使用已有实例,请检查vSwitch是否相同,若不在同一vSwitch且在同一VPC下,可以将Kafka实例的vSwitch网段添加至MongoDB访问白名单中。详情请见设置白名单。vSwitch的网段信息可以在专有网络控制台的交换机详情页面获取。
  2. 实例创建完成后,在实例列表页面单击目标实例,然后在实例详情页面的左侧导航栏,完成以下操作。
    1. 将DMS服务器的IP地址加入白名单。更多信息,请参见设置白名单
    2. 基本信息页面的连接信息区域,记录专有网络的连接地址。
      连接地址
  3. 基本信息页面,单击登录数据库进入DMS数据管理服务平台。更多信息,请参见通过DMS连接MongoDB分片集群实例
  4. 在目标实例中创建数据库和集合。
    • SQL Console页面中,执行以下命令,创建test数据库。
      use test
    • 在test数据库中执行以下命令,创建名为mongo的集合。
      db.createCollection("mongo")

    更多信息,请参见创建数据库和集合并写入数据

步骤二:创建Connector

Source Connector

  1. 下载MongoDB Connector1.8.1版本文件,解压至本地,目录结构如下所示。
    .
    ├── assets
    │   ├── mongodb-leaf.png
    │   └── mongodb-logo.png
    ├── doc
    │   ├── LICENSE.txt
    │   └── README.md
    ├── etc
    │   ├── MongoSinkConnector.properties
    │   └── MongoSourceConnector.properties
    ├── lib
    │   └── mongo-kafka-connect-1.9.1-confluent.jar
    └── manifest.json
  2. Maven仓库中下载avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,将这两个jar包移动至MongoDB Connector文件夹中的lib目录下,然后将其压缩成.ZIP文件,上传至提前创建好的OSS Bucket。更多信息,请参见控制台上传文件
  3. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
  4. 在左侧导航栏,选择Connector生态集成 > 消息流入(Source)
  5. 消息流入(Source)页面,单击创建任务
  6. 消息流入创建面板。设置任务名称,选择流入类型Apache Kafka Connect
    1. 在连接器配置区域,设置以下参数。
      配置项参数说明
      Kafka Connect插件Bucket存储桶选择OSS Bucket。
      文件选择上传的.ZIP文件。
      Kafka资源信息Kafka实例选择前提条件中创建的实例。
      专有网络VPC选择VPC ID。
      交换机选择vSwitch ID。
      安全组选择安全组。
      Kafka Connect配置信息解析当前ZIP包下的properties文件选择新建properties文件。选择.ZIP文件中包含的SourceConnector对应的.properties文件。路径为/etc/MongoSourceConnector.properties。
      在输入框中更新相关字段的取值。展开查看字段描述
      字段名描述
      connector.class运行的Connector包名称,无需修改。
      tasks.maxTask的最大数量。在MongoDB Source Connector中此参数取值只能为1。
      connection.url填写步骤一:创建数据表中获取的MongoDB数据库的专有网络连接地址。地址中的****需替换为root账号的密码。
      databaseMongoDB数据库名称。
      collectionMongoDB集合名称。
      topic.namespace.map目标Topic信息,为Key-Value 结构,Key的组成为database.{collection},Value为目标Topic名称,此参数表示指定Collection的数据变化会传输至指定Topic中。在投递数据前,请提前创建好目标Topic。
      copy.existing是否将源MongoDB Collection中的已存在数据全量同步至Kafka中。若为true,则在Connector首次启动时,就会将MongoDB Collection中的存量数据全量同步至下游Kafka Topic中。建议在首次全量同步完成后,将该值更新为false,防止后续将Connector删除重建后,再次进行数据全量同步,导致重复消费。
      展开查看示例代码
      connector.class=com.mongodb.kafka.connect.MongoSourceConnector
      name=mongo-source
      batch.size=0
      change.stream.full.document=updateLookup
      collection=testCollection
      connection.uri=mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
      database=testDatabase
      poll.await.time.ms=5000
      poll.max.batch.size=1000
      tasks.max=1
      topic.namespace.map={"testDatabase.testCollection": "targetTopic"}
      # 是否将源MongoDB Collection中的已存在数据全量同步至Kafka中。
      # 若为true,则在Connector首次启动时,就会将MongoDB Collection中的存量数据全量同步至下游Kafka Topic中。
      # 建议在首次全量同步完成后,将该值更新为false,防止后续将Connector删除重建后,再次进行数据全量同步,导致重复消费。
      copy.existing=true

      Connector全量参数,请参见MongoDB Source Connector Properties

    2. 实例配置区域,设置以下参数。
      配置项参数说明
      Worker规格Worker规格选择合适的Worker规格。
      最小Worker数设置为1。
      最大Worker数设置为1。
      Kafka Connect Worker配置自动创建Kafka Connect Worker依赖资源建议勾选此项,此时会在选择的Kafka实例中自动创建Kafka Connect运行所需的一些Internal Topic以及ConsumerGroup,并将这些必填配置自动填入配置框中,包括以下配置项:
      • Offset Topic:用于存储源数据偏移量,命名规则为connect-eb-offset-<任务名称>
      • Config Topic:用于存储Connectors以及Tasks的配置信息,命名规则为connect-eb-config-<任务名称>
      • Status Topic:用于存储Connectors以及Tasks状态信息,命名规则为connect-eb-status-<任务名称>
      • Kafka Connect Consumer Group:Kafka Connect Worker用于消费Internal Topics的消费组,命名规则为connect-eb-cluster-<任务名称>
      • Kafka Source Connector Consumer Group:只针对Sink Connector有效,用于消费源Kafka Topic中的数据,命名规则为connector-eb-cluster-<任务名称>-<connector名称>
    3. 运行配置区域,将日志投递方式设置为投递至SLS或者投递至Kafka,在角色授权卡片设置Connect依赖的角色配置
      重要 建议配置的角色包含AliyunSAEFullAccess权限,否则可能会导致任务运行失败。
    4. 单击确定
    等待任务状态变为运行中,此时Connector已经在正常工作中。

Sink Connector

  1. 下载MongoDB Connector1.8.1版本文件,解压至本地,目录结构如下所示。
    .
    ├── assets
    │   ├── mongodb-leaf.png
    │   └── mongodb-logo.png
    ├── doc
    │   ├── LICENSE.txt
    │   └── README.md
    ├── etc
    │   ├── MongoSinkConnector.properties
    │   └── MongoSourceConnector.properties
    ├── lib
    │   └── mongo-kafka-connect-1.9.1-confluent.jar
    └── manifest.json
  2. Maven仓库中下载avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,将这两个jar包移动至MongoDB Connector文件夹中的lib目录下,然后将其压缩成.ZIP文件,上传至提前创建好的OSS Bucket。更多信息,请参见控制台上传文件
  3. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
  4. 在左侧导航栏,选择Connector生态集成 > 消息流出(Sink)
  5. 消息流出(Sink)页面,单击创建任务
  6. 消息流出创建面板。设置任务名称,选择流出类型Apache Kafka Connect
    1. 在连接器配置区域,设置以下参数。
      配置项参数说明
      Kafka Connect插件Bucket存储桶选择OSS Bucket。
      文件选择上传的.ZIP文件。
      Kafka资源信息Kafka实例选择前提条件中创建的实例。
      专有网络VPC选择VPC ID。
      交换机选择vSwitch ID。
      安全组选择安全组。
      Kafka Connect配置信息解析当前ZIP包下的properties文件选择新建properties文件。选择.ZIP文件中包含的SinkConnector对应的.properties文件。路径为/etc/MongoSinkConnector.properties。
      在输入框中更新相关字段的取值。展开查看字段描述
      字段名描述
      connector.class运行的Connector包名称,无需修改。
      tasks.maxTask的最大数量。
      topics数据源Topic名称。不同Topic间以英文逗号(,)分隔。
      connection.url数据库连接地址。填写步骤一:创建数据表中获取的专有网络连接地址。地址中的****需替换为root账号的密码。
      database目标MongoDB数据库名称。
      connection目标MongoDB集合名称。
      展开查看示例代码
      connector.class=com.mongodb.kafka.connect.MongoSinkConnector
      name=mongo-sink
      collection=testCollection
      connection.uri=mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
      database=testDatabase
      delete.on.null.values=false
      key.converter=org.apache.kafka.connect.storage.StringConverter
      key.converter.schemas.enable=false
      max.batch.size=0
      rate.limiting.every.n=0
      rate.limiting.timeout=0
      tasks.max=2
      topics=testTopic
      value.converter=org.apache.kafka.connect.storage.StringConverter
      value.converter.schemas.enable=true
      writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

      Connector全量参数,请参见MongoDB Sink Connector Properties

    2. 实例配置区域,设置以下参数。
      配置项参数说明
      Worker规格Worker规格选择合适的Worker规格。
      最小Worker数设置为1。
      最大Worker数设置为1。此值不能超过tasks.max的取值。
      Kafka Connect Worker 配置自动创建Kafka Connect Worker依赖资源建议勾选此项,此时会在选择的Kafka实例中自动创建Kafka Connect运行所需的一些Internal Topic以及ConsumerGroup,并将这些必填配置自动填入配置框中,包括以下配置项:
      • Offset Topic:用于存储源数据偏移量,命名规则为connect-eb-offset-<任务名称>
      • Config Topic:用于存储Connectors以及Tasks的配置信息,命名规则为connect-eb-config-<任务名称>
      • Status Topic:用于存储Connectors以及Tasks状态信息,命名规则为connect-eb-status-<任务名称>
      • Kafka Connect Consumer Group:Kafka Connect Worker用于消费Internal Topics的消费组,命名规则为connect-eb-cluster-<任务名称>
      • Kafka Source Connector Consumer Group:只针对Sink Connector有效,用于消费源Kafka Topic中的数据,命名规则为connector-eb-cluster-<任务名称>-<connector名称>
    3. 运行配置区域,将日志投递方式设置为投递至SLS或者投递至Kafka,在角色授权卡片设置Connect依赖的角色配置
      重要 建议配置的角色包含AliyunSAEFullAccess权限,否则可能会导致任务运行失败。
    4. 单击确定
    等待任务状态变为运行中,此时Connector已经在正常工作中。

步骤三:测试Connector

Source Connector

  1. 在DMS数据管理服务平台,向步骤一:创建数据表中创建的Connection插入一条数据。例如,插入一条Key为testKey,Value为testValue的数据,命令如下。
    db.testCollection.insert({"testKey":"testValue"})
  2. 登录云消息队列 Kafka 版控制台,在实例列表页面,单击目标实例。
  3. 在目标实例页面,单击目标Topic,然后单击消息查询,查看插入的消息数据,消息Value示例如下。
    {"_id": {"_data": "826464A63D000000012B022C0100296E5A1004CB11AB15FD6D4C409E37370B43A4B82246645F696400646464A624458CE6B7B626645B****"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1684317757, "i": 1}}, "fullDocument": {"_id": {"$oid": "6464a624458ce6b7b626****"}, "testKey": "testValue"}, "ns": {"db": "test", "coll": "mongo"}, "documentKey": {"_id": {"$oid": "6464a624458ce6b7b626****"}}}

Sink Connector

  1. 登录云消息队列 Kafka 版控制台,在实例列表页面,单击目标实例。
  2. 在左侧导航栏,单击Topic管理,然后单击目标Topic。
  3. 在Topic详情页面右上角,单击体验发送消息
  4. 快速体验消息收发面板,设置消息内容。例如在目标表格中添加一条Key为Key1,Value为Value1的数据,消息内容如下。
    {"key1": "value1"}
  5. 在DMS数据管理服务平台,执行以下命令,查看目标集合中接收的数据。
    db.mongo.find()
    接收数据示例如下:
    {
        "_id":"ObjectId("643f4d5551daf4552246****")"
        "_insertedTS":"ISODate("2023-05-18T02:09:25.314Z")"
        "_modifiedTS":"ISODate("2023-05-18T02:09:25.314Z")"
        "key1":"value1"
    }

常见报错

场景一:所有Tasks运行失败

错误信息:
All tasks under connector mongo-source failed, please check the error trace of the task.

解决方法:在消息流入任务详情页面,单击基础信息区域的诊断链接,即可跳转到Connector监控页面,可以看到Tasks运行失败的详细错误信息。

场景二:Kafka Connect退出

错误信息:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.
解决方法:由于状态获取可能会有延迟,建议您先尝试刷新页面。若刷新后仍然是失败状态,您可以按照以下步骤查看错误信息。
  1. 在消息流入任务详情页面的Worker信息区域,单击SAE应用后的实例名称,跳转到SAE应用详情页面。
  2. 基本信息页面,单击实例部署信息页签。
  3. 在实例右侧操作列,单击Webshell登录Kafka Connect运行环境。实例部署信息
    • 执行vi /home/admin/connector-bootstrap.log命令,查看Connector启动日志,查找其中是否包含错误信息。
    • 执行vi /opt/kafka/logs/connect.log命令,查看Connector运行日志,在其中查找ERROR或者WARN字段来查看是否有错误信息。

基于错误信息提示进行修复操作后,可以重新启动对应任务。

场景三:Connector参数校验失败

错误信息:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
解决方法:此时需要根据错误信息,找出具体哪个参数出错,更新对应参数即可。若基于上述错误信息无法定位具体的出错参数,可以参考上文场景二中的步骤登录Kafka Connect运行环境,执行以下命令,查询参数是否校验通过。
curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate
该指令会返回Connector参数中每个参数是否校验通过,若不通过,则errors属性非空,如下所示。
"value":{
    "name":"snapshot.mode",
    "value":null,
    "recommended_values":[
        "never",
        "initial_only",
        "when_needed",
        "initial",
        "schema_only",
        "schema_only_recovery"
    ],
    "errors":[
        "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
    ],
    "visible":true
}

场景四:无法连接服务器

错误信息:
Start or update connector mongo-source failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):Unable to connect to the server.

解决方法:请查看Connector的配置信息是否正确,然后检查MongoDB实例与Kafka实例是否在同一个VPC及vSwitch下,如果不在一个vSwitch但是在同一VPC下,那么需要将Kafka实例所在的交换机网段加入MongoDB实例的白名单。

场景五:数据库用户名或者密码信息包含特殊字符

错误信息:
The connection string contains invalid user information. If the username or password contains a colon (:) or an at-sign (@) then it must be urlencoded\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

解决方法:请检查MongoDB账户名和密码中是否包含有at(@)或冒号(:),如有请转义对应的符号。at(@)和冒号(:)进行16进制URL编码后分别为%40%3A