本文为您介绍如何使用MongoDB连接器。
背景信息
MongoDB是一个面向文档的非结构化数据库,能够简化应用程序的开发及扩展。MongoDB连接器支持的信息如下:
| 类别 | 详情 | 
| 支持类型 | 源表、维表、结果表、数据摄入 | 
| 运行模式 | 仅支持流模式 | 
| 特有监控指标 | |
| API 种类 | DataStream和SQL | 
| 是否支持更新或删除结果表数据 | 是 | 
特色功能
MongoDB CDC源表通过Change Stream API实现全增量一体化数据捕获,先读取历史全量数据(快照),再无缝切换至增量 oplog 读取 ,确保数据不重不漏,并支持Exactly-Once 语义 ,保证故障恢复时数据一致性。
- 基于Change Stream API - 使用MongoDB 3.6的Change Stream API,高效捕获数据库/集合的插入、更新、替换、删除等变更事件,转化为 Flink 可处理的 Changelog 流。 
- 全量 + 增量一体化 - 自动完成初始快照读取,并平滑过渡到增量模式,无需手动干预。 
- 并行快照读取 - 支持并行读取历史数据,提升性能(需 MongoDB ≥ 4.0)。 
- 多种启动模式 - initial:首次启动执行全量快照,之后持续读取 oplog。
- latest-offset:仅从当前 oplog 末尾开始,不读历史数据。
- timestamp:从指定时间戳开始读取 oplog,跳过快照(需 MongoDB ≥ 4.0)。
 
- Full Changelog支持 - 支持输出包含变更前(before)和变更后(after)的完整 changelog(需 MongoDB ≥ 6.0,且开启前像/后像记录功能)。 
Flink 集成增强
- VVR 8.0.6+ - 支持通过CREATE TABLE AS(CTAS)语句或CREATE DATABASE AS(CDAS)语句,同步 MongoDB 的数据与 Schema 变更至下游,开启前像/后像记录功能。 
- VVR 8.0.9+ - 扩展维表关联能力,支持读取内置ObjectId 类型的 - _id字段。
前提条件
- MongoDB实例要求 - 仅支持3.6及以上版本的阿里云 MongoDB(副本集/分片集群)或自建 MongoDB。 
- 必须开启待监控的MongoDB数据库的副本集(Replica Set)功能,详情请参见Replication。 
 
- MongoDB功能依赖 - 使用Full Changelog事件流功能,需要开启前像/后像记录功能。 
- 启用了MongoDB的鉴权功能,需要具备以下数据库权限。 
 
- MongoDB网络与其他准备 - 已配置IP白名单,允许Flink访问MongoDB。 
- 已创建目标MongoDB数据和表。 
 
使用限制
- CDC源表 - MongoDB 4.0及以上版本支持初始快照阶段并行读取。如果您需要启用并行模式进行初始快照,则需要将 - scan.incremental.snapshot.enabled配置项设置为true。
- 由于MongoDB Change Stream流订阅限制,不支持读取admin、local、config数据库及system集合中的数据,详情请参见MongoDB文档。 
 
- 结果表 - 实时计算引擎VVR 8.0.5以下版本仅支持插入数据。 
- 实时计算引擎VVR 8.0.5及以上版本,结果表中声明主键时,支持插入、更新和删除数据,未声明主键时仅支持插入数据。 
 
- 维表 - 实时计算引擎VVR 8.0.5及以上版本支持使用MongoDB维表。 
 
SQL
语法结构
CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)在创建CDC源表时,您必须声明_id STRING列,并将其作为唯一的主键。
WITH参数
通用
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| connector | 连接器名称。 | String | 是 | 无 | 
 | 
| uri | MongoDB连接uri。 | String | 否 | 无 | 说明  参数 | 
| hosts | MongoDB所在的主机名称。 | String | 否 | 无 | 可以使用英文逗号( | 
| scheme | MongoDB使用的连接协议。 | String | 否 | mongodb | 可选的取值包括: 
 | 
| username | 连接到MongoDB时使用的用户名。 | String | 否 | 无 | 开启身份验证功能时,必须配置该参数。 | 
| password | 连接到MongoDB时使用的密码。 | String | 否 | 无 | 开启身份验证功能时,必须配置该参数。 重要  为了避免您的密码信息泄露,建议您使用变量的方式填写密码取值,详情请参见项目变量。 | 
| database | MongoDB数据库名称。 | String | 否 | 无 | 
 重要  不支持监控admin、local、config数据库中的数据。 | 
| collection | MongoDB集合名称。 | String | 否 | 无 | 
 重要  不支持监控system集合中的数据。 | 
| connection.options | MongoDB侧的连接参数。 | String | 否 | 无 | 使用 重要  默认情况下,MongoDB CDC不会自动设置Socket连接超时时间,这可能会在网络抖动时产生长时间的中断。 建议您始终在此处设置socketTimeoutMS为一个合理的值来避免此问题。 | 
源表独有
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| scan.startup.mode | MongoDB CDC的启动模式。 | String | 否 | initial | 参数取值如下: 
 详情请参见Startup Properties。 | 
| scan.startup.timestamp-millis | 指定位点消费的起始时间戳。 | Long | 取决于 scan.startup.mode的取值 
 | 无 | 参数格式为自Linux Epoch时间戳以来的毫秒数。 仅适用于 | 
| initial.snapshotting.queue.size | 进行初始快照时的队列大小限制。 | Integer | 否 | 10240 | 仅在 | 
| batch.size | 游标的批处理大小。 | Integer | 否 | 1024 | 无。 | 
| poll.max.batch.size | 同一批处理的最多变更文档数量。 | Integer | 否 | 1024 | 此参数控制流处理时一次拉取最多变更文档的个数。取值越大,连接器内部分配的缓冲区越大。 | 
| poll.await.time.ms | 两次拉取数据之间的时间间隔。 | Integer | 否 | 1000 | 单位为毫秒。 | 
| heartbeat.interval.ms | 发送心跳包的时间间隔。 | Integer | 否 | 0 | 单位为毫秒。 MongoDB CDC连接器主动向数据库发送心跳包来保证回溯状态最新。设置为0代表永不发送心跳包。 重要  对于更新不频繁的集合,强烈建议设定此选项。 | 
| scan.incremental.snapshot.enabled | 是否启用并行模式进行初始快照。 | Boolean | 否 | false | 实验性功能。 | 
| scan.incremental.snapshot.chunk.size.mb | 并行模式读取快照时的分片大小。 | Integer | 否 | 64 | 实验性功能。 单位为MB。 仅在启用并行快照时生效。 | 
| scan.full-changelog | 产生完整的Full Changelog事件流。 | Boolean | 否 | false | 实验性功能。 说明  MongoDB数据库需要为6.0及以上版本,并且已开启前像后像功能,开启方法请参见Document Preimages。 | 
| scan.flatten-nested-columns.enabled | 是否将以 | Boolean | 否 | false | 若开启,在如下示例的BSON文档中, 说明  仅VVR 8.0.5及以上版本支持该参数。 | 
| scan.primitive-as-string | 是否将BSON文档中的原始类型都解析为字符串类型。 | Boolean | 否 | false | 说明  仅VVR 8.0.5及以上版本支持该参数。 | 
| scan.ignore-delete.enabled | 是否忽略delete(-D)类型的消息。 | Boolean | 否 | false | 在对MongoDB源端数据进行归档时,可能在OpLog中产生大量的 DELETE 事件。如果您不希望将这些事件同步到下游,可开启此参数忽略删除事件。 说明  
 | 
| scan.incremental.snapshot.backfill.skip | 是否跳过增量快照算法的回填水位过程。 | Boolean | 否 | false | 启用此开关只能提供at-least-once语义。 说明  仅VVR 11.1及以上版本支持该参数。 | 
| initial.snapshotting.pipeline | MongoDB 管道操作,在快照读取阶段,会把该操作下推到 MongoDB,只筛选所需的数据,从而提高读取效率。 | String | 否 | 无。 | 
 | 
| initial.snapshotting.max.threads | 执行数据复制时使用的线程数。 | Integer | 否 | 无。 | 仅在 scan.startup.mode 选项设置为 initial 时生效。 说明  仅VVR 11.1及以上版本支持该参数。 | 
| initial.snapshotting.queue.size | 进行初始快照时的队列大小。 | Integer | 否 | 16000 | 仅在 scan.startup.mode 选项设置为 initial 时生效。 说明  仅VVR 11.1及以上版本支持该参数。 | 
| scan.change-stream.reading.parallelism | 订阅 Change Stream 时的并行度。 | Integer | 否 | 1 | 仅当 scan.incremental.snapshot.enabled 参数开启时生效。 重要  如需多并发订阅 Change Stream 流,需要同时设置 heartbeat.interval.ms 参数。 说明  仅 VVR 11.2 及以上版本支持该参数。 | 
| scan.change-stream.reading.queue-size | 并发订阅 Change Stream 时的消息队列大小。 | Integer | 否 | 16384 | 仅当 scan.change-stream.reading.parallelism 参数开启时有效。 说明  仅 VVR 11.2 及以上版本支持该参数。 | 
维表独有
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| lookup.cache | Cache策略。 | String | 否 | NONE | 目前支持以下两种缓存策略: 
 | 
| lookup.max-retries | 查询数据库失败的最大重试次数。 | Integer | 否 | 3 | 无。 | 
| lookup.retry.interval | 如果查询数据库失败,重试的时间间隔。 | Duration | 否 | 1s | 无。 | 
| lookup.partial-cache.expire-after-access | 缓存中的记录最长保留时间。 | Duration | 否 | 无 | 支持时间单位ms、s、min、h和d。 使用该配置时  | 
| lookup.partial-cache.expire-after-write | 在记录写入缓存后该记录的最大保留时间。 | Duration | 否 | 无 | 使用该配置时  | 
| lookup.partial-cache.max-rows | 缓存的最大条数。超过该值,最旧的行将过期。 | Long | 否 | 无 | 使用该配置时  | 
| lookup.partial-cache.cache-missing-key | 在物理表中未关联到数据时,是否缓存空记录。 | Boolean | 否 | True | 使用该配置时  | 
结果表独有
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| sink.buffer-flush.max-rows | 每次按批写入数据时的最大记录数。 | Integer | 否 | 1000 | 无。 | 
| sink.buffer-flush.interval | 写入数据的刷新间隔。 | Duration | 否 | 1s | 无。 | 
| sink.delivery-guarantee | 写入数据时的语义保证。 | String | 否 | at-least-once | 可选的取值包括: 
 说明  目前不支持exactly-once。 | 
| sink.max-retries | 写入数据库失败时的最大重试次数。 | Integer | 否 | 3 | 无。 | 
| sink.retry.interval | 写入数据库失败时的重试时间间隔。 | Duration | 否 | 1s | 无。 | 
| sink.parallelism | 自定义sink并行度。 | Integer | 否 | 空 | 无。 | 
类型映射
CDC源表
| BSON类型 | Flink SQL类型 | 
| Int32 | INT | 
| Int64 | BIGINT | 
| Double | DOUBLE | 
| Decimal128 | DECIMAL(p, s) | 
| Boolean | BOOLEAN | 
| Date Timestamp | DATE | 
| Date Timestamp | TIME | 
| DateTime | TIMESTAMP(3) TIMESTAMP_LTZ(3) | 
| Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) | 
| String ObjectId UUID Symbol MD5 JavaScript Regex | STRING | 
| Binary | BYTES | 
| Object | ROW | 
| Array | ARRAY | 
| DBPointer | ROW<$ref STRING, $id STRING> | 
| GeoJSON | Point: ROW<type STRING, coordinates ARRAY<DOUBLE>> Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> | 
维表和结果表
| BSON类型 | Flink SQL类型 | 
| Int32 | INT | 
| Int64 | BIGINT | 
| Double | DOUBLE | 
| Decimal128 | DECIMAL | 
| Boolean | BOOLEAN | 
| DateTime | TIMESTAMP_LTZ(3) | 
| Timestamp | TIMESTAMP_LTZ(0) | 
| String ObjectId | STRING | 
| Binary | BYTES | 
| Object | ROW | 
| Array | ARRAY | 
使用示例
CDC源表
CREATE TEMPORARY TABLE mongo_source (
  `_id` STRING, --must be declared
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  collection_name STRING METADATA VIRTUAL,
  op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'scan.incremental.snapshot.enabled' = 'true',
  'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE  productssink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING,
  db_name STRING,
  collection_name STRING,
  op_ts TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO productssink  
SELECT
  name,
  weight,
  tags,
  price.amount,
  suppliers[1].name,
  db_name,
  collection_name,
  op_ts
FROM
  mongo_source;维表
CREATE TEMPORARY TABLE datagen_source (
  id STRING,
  a int,
  b BIGINT,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.expire-after-access' = '10min',
  'lookup.partial-cache.expire-after-write' = '10min',
  'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO print_sink
SELECT
  T.id,
  T.a,
  T.b,
  H.name
FROM
  datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;结果表
CREATE TEMPORARY TABLE datagen_source (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;元数据
MongoDB CDC源表支持元数据列语法,您可以通过元数据列访问以下元数据。
| 元数据key | 元数据类型 | 描述 | 
| database_name | STRING NOT NULL | 包含该文档的数据库名。 | 
| collection_name | STRING NOT NULL | 包含该文档的集合名。 | 
| op_ts | TIMESTAMP_LTZ(3) NOT NULL | 该文档在数据库中的变更时间,如果该文档来自表的存量历史数据而不是从ChangeStream中获取,则该值总是0。 | 
| row_kind | STRING NOT NULL | 表示数据变更类型,取值如下: 
 说明  仅VVR 11.1及以上版本支持使用。 | 
关于MongoDB的变更前后像记录功能
MongoDB 6.0 之前的版本默认不会提供变更前文档及被删除文档的数据,在未开启变更前后像记录功能时,利用已有信息只能实现 Upsert 语义(即缺失了 Update Before 数据条目)。但在 Flink 中许多有用的算子操作都依赖完整的 Insert、Update Before、Update After、Delete 变更流。
为了补充缺失的变更前事件,目前 Flink SQL Planner 会自动为 Upsert 类型的数据源生成一个 ChangelogNormalize 节点,该节点会在 Flink 状态中缓存所有文档的当前版本快照,在遇到被更新或删除的文档时,查表即可得知变更前的状态,但该算子节点需要存储体积巨大的状态数据。

MongoDB 6.0版本支持开启数据库的前像后像(Pre- and Post-images)记录功能,详情可参考使用MongoDB变更流(Change Stream)实时捕获数据变更。开启该功能后,MongoDB会在每次变更发生时,在一个特殊的集合中记录文档变更前后的完整状态。此时在作业中启用scan.full-changelog配置项,MongoDB CDC会从变更文档记录中生成Update Before记录,从而支持产生完整事件流,消除了对ChangelogNormalize节点的依赖。
Mongo CDC DataStream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法。
创建DataStream API程序并使用MongoDBSource。代码示例如下:
Java
MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")
  .collectionList("testcoll")
  .startupOptions(StartupOptions.initial())
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();XML
Maven中央仓库已经放置了VVR MongoDB连接器,以供您在作业开发时直接使用。
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>${vvr.version}</version>
</dependency>在使用DataStream API时,若要启用增量快照功能,请在构造MongoDBSource数据源时,使用com.ververica.cdc.connectors.mongodb.source包中的MongoDBSource#builder();否则,使用com.ververica.cdc.connectors.mongodb中的MongoDBSource#builder()。
在构造MongoDBSource时,可以配置以下参数:
| 参数 | 说明 | 
| hosts | 需要连接的MongoDB数据库的主机名称。 | 
| username | MongoDB数据库服务的用户名。 说明  若MongoDB服务器未启用鉴权,则无需配置此参数。 | 
| password | MongoDB数据库服务的密码。 说明  若MongoDB服务器未启用鉴权,则无需配置此参数。 | 
| databaseList | 需要监控的MongoDB数据库名称。 说明  数据库名称支持正则表达式以读取多个数据库的数据,您可以使用 | 
| collectionList | 需要监控的MongoDB集合名称。 说明  集合名称支持正则表达式以读取多个集合的数据,您可以使用 | 
| startupOptions | 选择MongoDB CDC的启动模式。 合法的取值包括: 
 详情请参见Startup Properties。 | 
| deserializer | 反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下: 
 |