本文为您介绍如何使用MongoDB连接器。
背景信息
MongoDB是一个面向文档的非结构化数据库,能够简化应用程序的开发及扩展。MongoDB连接器支持的信息如下:
类别 | 详情 |
支持类型 | 源表、维表和结果表 |
运行模式 | 仅支持流模式 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API 种类 | DataStream和SQL |
是否支持更新或删除结果表数据 | 是 |
特色功能
MongoDB的CDC源表,即MongoDB的流式源表,会先读取数据库的历史全量数据,并平滑切换到oplog读取上,保证不多读一条也不少读一条。即使发生故障,也能保证通过Exactly Once语义处理数据。MongoDB CDC支持通过Change Stream API高效地捕获MongoDB的数据库和集合中的文档变更,监控文档的插入、修改、替换、删除事件,并将其转换为Flink能够处理的Changelog数据流。作为源表,支持以下功能特性:
支持利用MongoDB 3.6新增的Change Stream API,更高效地监控变化。
精确一次处理:在作业任何阶段失败都能保证Exactly-once语义。
支持全增量一体化监测:支持快照阶段完成后自动切换为增量读取阶段。
支持初始快照阶段的并行读取,需要MongoDB >= 4.0。
支持多种启动模式:
initial模式:在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的oplog。
latest-offset模式:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从oplog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
timestamp:跳过快照阶段,从指定的时间戳开始读取oplog事件,需要MongoDB >= 4.0。
支持产生Full Changelog事件流,需要MongoDB >= 6.0,详情请参见关于MongoDB的变更前后像记录功能。
实时计算Flink VVR 8.0.6及以上版本支持通过CREATE TABLE AS(CTAS)语句或CREATE DATABASE AS(CDAS)语句将MongoDB的数据和Schema变更同步到下游表。使用时需开启MongoDB数据库的前像后像(Pre- and Post-images)记录功能,详情请参见关于MongoDB的变更前后像记录功能。
实时计算Flink VVR 8.0.9及以上版本扩展维表关联读取能力,支持读取内置ObjectId 类型的
_id
字段。
前提条件
CDC源表
CDC连接器支持通过副本集或分片集架构模式读取阿里云云数据库MongoDB版的数据,也支持读取自建MongoDB数据库的数据 。
使用MongoDB CDC连接器的基础功能时,必须开启待监控的MongoDB数据库的副本集(Replica Set)功能,详情请参见Replication。
如需使用Full Changelog事件流功能,则需开启MongoDB数据库的前像后像(Pre- and Post-images)记录功能,详情请参见Document Preimages和关于MongoDB的变更前后像记录功能。
如果启用了MongoDB的鉴权功能,则需要使用具有以下数据库权限的MongoDB用户:
splitVector权限
listDatabases权限
listCollections权限
collStats权限
find权限
changeStream权限
config.collections和config.chunks集合的访问权限
维表和结果表
已创建MongoDB数据库和表
已设置IP白名单
使用限制
仅支持读写3.6及以上版本的MongoDB。
CDC源表
实时计算引擎VVR 8.0.1及以上版本支持使用MongoDB CDC连接器。
MongoDB 6.0及以上版本支持产生Full Changelog事件流。
MongoDB 4.0及以上版本支持指定时间戳的启动模式。
MongoDB 4.0及以上版本支持初始快照阶段并行读取。如果您需要启用并行模式进行初始快照,则需要将
scan.incremental.snapshot.enabled
配置项设置为true。由于MongoDB Change Stream流订阅限制,不支持读取admin、local、config数据库及system集合中的数据,详情请参见MongoDB文档。
结果表
实时计算引擎VVR 8.0.5以下版本仅支持插入数据。
实时计算引擎VVR 8.0.5及以上版本,结果表中声明主键时,支持插入、更新和删除数据,未声明主键时仅支持插入数据。
维表
实时计算引擎VVR 8.0.5及以上版本支持使用MongoDB维表。
语法结构
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)
在创建CDC源表时,您必须声明_id STRING
列,并将其作为唯一的主键。
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
连接器名称。
String
是
无
作为源表:
实时计算引擎VVR 8.0.4及之前版本,填写为mongodb-cdc。
实时计算引擎VVR 8.0.5及之后版本,填写为mongodb或mongodb-cdc。
作为维表或结果表时,固定值为mongodb。
uri
MongoDB连接uri。
String
否
无
说明参数
uri
与hosts
必须指定其中之一。若指定uri
,则无需指定scheme
、hosts
、username
、password
、connector.options
。当两者均指定时将使用uri
进行连接。hosts
MongoDB所在的主机名称。
String
否
无
可以使用英文逗号(
,
)分隔提供多个主机名。scheme
MongoDB使用的连接协议。
String
否
mongodb
可选的取值包括:
mongodb
:代表使用默认的MongoDB协议进行连接mongodb+srv
:代表使用DNS SRV记录协议进行连接
username
连接到MongoDB时使用的用户名。
String
否
无
开启身份验证功能时,必须配置该参数。
password
连接到MongoDB时使用的密码。
String
否
无
开启身份验证功能时,必须配置该参数。
重要为了避免您的密码信息泄露,建议您通过密钥管理的方式填写密码取值,详情请参见变量管理。
database
MongoDB数据库名称。
String
否
无
作为源表时,数据库名称支持正则表达式匹配。
不配置该参数代表监控全部数据库。
重要不支持监控admin、local、config数据库中的数据。
collection
MongoDB集合名称。
String
否
无
作为源表时,集合名称支持正则表达式匹配。
重要如果您要监控的集合名称中包含正则表达式特殊字符,则必须提供完全限定的名字空间(数据库名称.集合名称),否则无法捕获对应集合的变更。
不配置该参数代表监控全部集合。
重要不支持监控system集合中的数据。
connection.options
MongoDB侧的连接参数。
String
否
无
使用
&
分隔的key=value
式额外连接参数。例如connectTimeoutMS=12000&socketTimeoutMS=13000。源表独有
参数
说明
数据类型
是否必填
默认值
备注
scan.startup.mode
MongoDB CDC的启动模式。
String
否
initial
参数取值如下:
initial:从初始位点开始拉取全部数据。
latest-offset:从当前位点开始拉取变更数据。
timestamp:从指定的时间戳开始拉取变更数据。
详情请参见Startup Properties。
scan.startup.timestamp-millis
指定位点消费的起始时间戳。
Long
取决于 scan.startup.mode的取值
initial:否
latest-offset:否
timestamp:是
无
参数格式为自Linux Epoch时间戳以来的毫秒数。
仅适用于
timestamp
启动模式。initial.snapshotting.queue.size
进行初始快照时的队列大小限制。
Integer
否
10240
仅在
scan.startup.mode
选项设置为initial
时生效。batch.size
游标的批处理大小。
Integer
否
1024
无。
poll.max.batch.size
同一批处理的最多变更文档数量。
Integer
否
1024
此参数控制流处理时一次拉取最多变更文档的个数。取值越大,连接器内部分配的缓冲区越大。
poll.await.time.ms
两次拉取数据之间的时间间隔。
Integer
否
1000
单位为毫秒。
heartbeat.interval.ms
发送心跳包的时间间隔。
Integer
否
0
单位为毫秒。
MongoDB CDC连接器主动向数据库发送心跳包来保证回溯状态最新。设置为0代表永不发送心跳包。
重要对于更新不频繁的集合,强烈建议设定此选项。
scan.incremental.snapshot.enabled
是否启用并行模式进行初始快照。
Boolean
否
false
实验性功能。
scan.incremental.snapshot.chunk.size.mb
并行模式读取快照时的分片大小。
Integer
否
64
实验性功能。
单位为MB。
仅在启用并行快照时生效。
scan.full-changelog
产生完整的Full Changelog事件流。
Boolean
否
false
实验性功能。
说明MongoDB数据库需要为6.0及以上版本,并且已开启前像后像功能,开启方法请参见Document Preimages。
scan.flatten-nested-columns.enabled
是否将以
.
分隔的字段名解析为嵌套BSON文档读取。Boolean
否
false
若开启,在如下示例的BSON文档中,
col
字段在schema中名称为nested.col
。{"nested":{"col":true}}
说明仅VVR 8.0.5及以上版本支持该参数。
scan.primitive-as-string
是否将BSON文档中的原始类型都解析为字符串类型。
Boolean
否
false
说明仅VVR 8.0.5及以上版本支持该参数。
维表独有
参数
说明
数据类型
是否必填
默认值
备注
lookup.cache
Cache策略。
String
否
NONE
目前支持以下两种缓存策略:
None:无缓存。
Partial:只在外部数据库中查找数据时缓存。
lookup.max-retries
查询数据库失败的最大重试次数。
Integer
否
3
无。
lookup.retry.interval
如果查询数据库失败,重试的时间间隔。
Duration
否
1s
无。
lookup.partial-cache.expire-after-access
缓存中的记录最长保留时间。
Duration
否
无
支持时间单位ms、s、min、h和d。
使用该配置时
lookup.cache
必须设置为PARTIAL
。lookup.partial-cache.expire-after-write
在记录写入缓存后该记录的最大保留时间。
Duration
否
无
使用该配置时
lookup.cache
必须设置为PARTIAL
。lookup.partial-cache.max-rows
缓存的最大条数。超过该值,最旧的行将过期。
Long
否
无
使用该配置时
lookup.cache
必须设置为PARTIAL
。lookup.partial-cache.cache-missing-key
在物理表中未关联到数据时,是否缓存空记录。
Boolean
否
True
使用该配置时
lookup.cache
必须设置为PARTIAL
。结果表独有
参数
说明
数据类型
是否必填
默认值
备注
sink.buffer-flush.max-rows
每次按批写入数据时的最大记录数。
Integer
否
1000
无。
sink.buffer-flush.interval
写入数据的刷新间隔。
Duration
否
1s
无。
sink.delivery-guarantee
写入数据时的语义保证。
String
否
at-least-once
可选的取值包括:
none
at-least-once
说明目前不支持exactly-once。
sink.max-retries
写入数据库失败时的最大重试次数。
Integer
否
3
无。
sink.retry.interval
写入数据库失败时的重试时间间隔。
Duration
否
1s
无。
sink.parallelism
自定义sink并行度。
Integer
否
空
无。
类型映射
CDC源表
BSON类型
Flink SQL类型
Int32
INT
Int64
BIGINT
Double
DOUBLE
Decimal128
DECIMAL(p, s)
Boolean
BOOLEAN
Date Timestamp
DATE
Date Timestamp
TIME
DateTime
TIMESTAMP(3)
TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP(0)
TIMESTAMP_LTZ(0)
String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex
STRING
Binary
BYTES
Object
ROW
Array
ARRAY
DBPointer
ROW<$ref STRING, $id STRING>
GeoJSON
Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>
Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
维表和结果表
BSON类型
Flink SQL类型
Int32
INT
Int64
BIGINT
Double
DOUBLE
Decimal128
DECIMAL
Boolean
BOOLEAN
DateTime
TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP_LTZ(0)
String
ObjectId
STRING
Binary
BYTES
Object
ROW
Array
ARRAY
使用示例
CDC源表
CREATE TEMPORARY TABLE mongo_source ( `_id` STRING, --must be declared name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA VIRTUAL, op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection', 'scan.incremental.snapshot.enabled' = 'true', 'scan.full-changelog' = 'true' ); CREATE TEMPORARY TABLE productssink ( name STRING, weight DECIMAL, tags ARRAY<STRING>, price_amount DECIMAL, suppliers_name STRING, db_name STRING, collection_name STRING, op_ts TIMESTAMP_LTZ(3) ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO productssink SELECT name, weight, tags, price.amount, suppliers[1].name, db_name, collection_name, op_ts FROM mongo_source;
维表
CREATE TEMPORARY TABLE datagen_source ( id STRING, a int, b BIGINT, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mongo_dim ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection', 'lookup.cache' = 'PARTIAL', 'lookup.partial-cache.expire-after-access' = '10min', 'lookup.partial-cache.expire-after-write' = '10min', 'lookup.partial-cache.max-rows' = '100' ); CREATE TEMPORARY TABLE print_sink ( name STRING, weight DECIMAL, tags ARRAY<STRING>, price_amount DECIMAL, suppliers_name STRING ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO print_sink SELECT T.id, T.a, T.b, H.name FROM datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;
结果表
CREATE TEMPORARY TABLE datagen_source ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>> ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mongo_sink ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection' ); INSERT INTO mongo_sink SELECT * FROM datagen_source;
元数据
MongoDB CDC源表支持元数据列语法,您可以通过元数据列访问以下元数据。
元数据key | 元数据类型 | 描述 |
database_name | STRING NOT NULL | 包含该文档的数据库名。 |
collection_name | STRING NOT NULL | 包含该文档的集合名。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 该文档在数据库中的变更时间,如果该文档来自表的存量历史数据而不是从ChangeStream中获取,则该值总是0。 |
关于MongoDB的变更前后像记录功能
MongoDB 6.0 之前的版本默认不会提供变更前文档及被删除文档的数据,在未开启变更前后像记录功能时,利用已有信息只能实现 Upsert 语义(即缺失了 Update Before 数据条目)。但在 Flink 中许多有用的算子操作都依赖完整的 Insert、Update Before、Update After、Delete 变更流。
为了补充缺失的变更前事件,目前 Flink SQL Planner 会自动为 Upsert 类型的数据源生成一个 ChangelogNormalize 节点,该节点会在 Flink 状态中缓存所有文档的当前版本快照,在遇到被更新或删除的文档时,查表即可得知变更前的状态,但该算子节点需要存储体积巨大的状态数据。
MongoDB 6.0版本支持开启数据库的前像后像(Pre- and Post-images)记录功能,详情可参考Document Preimages。开启该功能后,MongoDB会在每次变更发生时,在一个特殊的集合中记录文档变更前后的完整状态。此时在作业中启用scan.full-changelog
配置项,MongoDB CDC会从变更文档记录中生成Update Before记录,从而支持产生完整事件流,消除了对ChangelogNormalize节点的依赖。
Mongo CDC DataStream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法。
创建DataStream API程序并使用MongoDBSource。代码示例如下:
Java
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
XML
Maven中央仓库已经放置了VVR MongoDB连接器,以供您在作业开发时直接使用。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>${vvr.version}</version>
</dependency>
在使用DataStream API时,若要启用增量快照功能,请在构造MongoDBSource数据源时,使用com.ververica.cdc.connectors.mongodb.source
包中的MongoDBSource#builder()
;否则,使用com.ververica.cdc.connectors.mongodb
中的MongoDBSource#builder()
。
在构造MongoDBSource时,可以配置以下参数:
参数 | 说明 |
hosts | 需要连接的MongoDB数据库的主机名称。 |
username | MongoDB数据库服务的用户名。 说明 若MongoDB服务器未启用鉴权,则无需配置此参数。 |
password | MongoDB数据库服务的密码。 说明 若MongoDB服务器未启用鉴权,则无需配置此参数。 |
databaseList | 需要监控的MongoDB数据库名称。 说明 数据库名称支持正则表达式以读取多个数据库的数据,您可以使用 |
collectionList | 需要监控的MongoDB集合名称。 说明 集合名称支持正则表达式以读取多个集合的数据,您可以使用 |
startupOptions | 选择MongoDB CDC的启动模式。 合法的取值包括:
详情请参见Startup Properties。 |
deserializer | 反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下:
|