本文为您介绍如何使用MongoDB连接器。
背景信息
MongoDB是一个面向文档的非结构化数据库,能够简化应用程序的开发及扩展。MongoDB连接器支持的信息如下:
类别 | 详情 |
支持类型 | 源表、维表和结果表 |
运行模式 | 仅支持流模式 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API 种类 | DataStream和SQL |
是否支持更新或删除结果表数据 | 是 |
特色功能
MongoDB的CDC源表,即MongoDB的流式源表,会先读取数据库的历史全量数据,并平滑切换到oplog读取上,保证不多读一条也不少读一条。即使发生故障,也能保证通过Exactly Once语义处理数据。MongoDB CDC支持通过Change Stream API高效地捕获MongoDB的数据库和集合中的文档变更,监控文档的插入、修改、替换、删除事件,并将其转换为Flink能够处理的Changelog数据流。作为源表,支持以下功能特性:
支持利用MongoDB 3.6新增的Change Stream API,更高效地监控变化。
精确一次处理:在作业任何阶段失败都能保证Exactly-once语义。
支持全增量一体化监测:支持快照阶段完成后自动切换为增量读取阶段。
支持初始快照阶段的并行读取,需要MongoDB >= 4.0。
支持多种启动模式:
initial模式:在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的oplog。
latest-offset模式:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从oplog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
timestamp:跳过快照阶段,从指定的时间戳开始读取oplog事件,需要MongoDB >= 4.0。
支持产生Full Changelog事件流,需要MongoDB >= 6.0,详情请参见关于MongoDB的变更前后像记录功能。
实时计算Flink VVR 8.0.6及以上版本支持通过CREATE TABLE AS(CTAS)语句或CREATE DATABASE AS(CDAS)语句将MongoDB的数据和Schema变更同步到下游表。使用时需开启MongoDB数据库的前像后像(Pre- and Post-images)记录功能,详情请参见关于MongoDB的变更前后像记录功能。
前提条件
CDC源表
CDC连接器支持通过副本集或分片集架构模式读取阿里云云数据库MongoDB版的数据,也支持读取自建MongoDB数据库的数据 。
使用MongoDB CDC连接器的基础功能时,必须开启待监控的MongoDB数据库的副本集(Replica Set)功能,详情请参见Replication。
如需使用Full Changelog事件流功能,则需开启MongoDB数据库的前像后像(Pre- and Post-images)记录功能,详情请参见Document Preimages和关于MongoDB的变更前后像记录功能。
如果启用了MongoDB的鉴权功能,则需要使用具有以下数据库权限的MongoDB用户:
splitVector权限
listDatabases权限
listCollections权限
collStats权限
find权限
changeStream权限
config.collections和config.chunks集合的访问权限
维表和结果表
已创建MongoDB数据库和表
已设置IP白名单
使用限制
仅支持读写3.6及以上版本的MongoDB。
CDC源表
实时计算引擎VVR 8.0.1及以上版本支持使用MongoDB CDC连接器。
MongoDB 6.0及以上版本支持产生Full Changelog事件流。
MongoDB 4.0及以上版本支持指定时间戳的启动模式。
MongoDB 4.0及以上版本支持初始快照阶段并行读取。如果您需要启用并行模式进行初始快照,则需要将
scan.incremental.snapshot.enabled
配置项设置为true。
结果表
实时计算引擎VVR 8.0.5以下版本仅支持插入数据。
实时计算引擎VVR 8.0.5及以上版本,结果表中声明主键时,支持插入、更新和删除数据,未声明主键时仅支持插入数据。
维表
实时计算引擎VVR 8.0.5及以上版本支持使用MongoDB维表。
语法结构
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)
在创建CDC源表时,您必须声明_id STRING
列,并将其作为唯一的主键。
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
连接器名称。
String
是
无
作为源表:
实时计算引擎VVR 8.0.4及之前版本,填写为mongodb-cdc。
实时计算引擎VVR 8.0.5及之后版本,填写为mongodb或mongodb-cdc。
作为维表或结果表时,固定值为mongodb。
uri
MongoDB连接uri。
String
否
无
说明参数
uri
与hosts
必须指定其中之一。若指定uri
,则无需指定scheme
、hosts
、username
、password
、connector.options
。当两者均指定时将使用uri
进行连接。hosts
MongoDB所在的主机名称。
String
否
无
可以使用英文逗号(
,
)分隔提供多个主机名。scheme
MongoDB使用的连接协议。
String
否
mongodb
可选的取值包括:
mongodb
:代表使用默认的MongoDB协议进行连接mongodb+srv
:代表使用DNS SRV记录协议进行连接
username
连接到MongoDB时使用的用户名。
String
否
无
开启身份验证功能时,必须配置该参数。
password
连接到MongoDB时使用的密码。
String
否
无
开启身份验证功能时,必须配置该参数。
重要为了避免您的密码信息泄露,建议您通过密钥管理的方式填写密码取值,详情请参见变量和密钥管理。
database
MongoDB数据库名称。
String
否
无
作为源表时,数据库名称支持正则表达式匹配。
不配置该参数代表监控全部数据库。
collection
MongoDB集合名称。
String
否
无
作为源表时,集合名称支持正则表达式匹配。
重要如果您要监控的集合名称中包含正则表达式特殊字符,则必须提供完全限定的名字空间(数据库名称.集合名称),否则无法捕获对应集合的变更。
不配置该参数代表监控全部集合。
connection.options
MongoDB侧的连接参数。
String
否
无
使用
&
分隔的key=value
式额外连接参数。例如connectTimeoutMS=12000&socketTimeoutMS=13000。源表独有
参数
说明
数据类型
是否必填
默认值
备注
scan.startup.mode
MongoDB CDC的启动模式。
String
否
initial
参数取值如下:
initial:从初始位点开始拉取全部数据。
latest-offset:从当前位点开始拉取变更数据。
timestamp:从指定的时间戳开始拉取变更数据。
详情请参见Startup Properties。
scan.startup.timestamp-millis
指定位点消费的起始时间戳。
Long
取决于 scan.startup.mode的取值
initial:否
latest-offset:否
timestamp:是
无
参数格式为自Linux Epoch时间戳以来的毫秒数。
仅适用于
timestamp
启动模式。initial.snapshotting.queue.size
进行初始快照时的队列大小限制。
Integer
否
10240
仅在
scan.startup.mode
选项设置为initial
时生效。batch.size
游标的批处理大小。
Integer
否
1024
无。
poll.max.batch.size
同一批处理的最多变更文档数量。
Integer
否
1024
此参数控制流处理时一次拉取最多变更文档的个数。取值越大,连接器内部分配的缓冲区越大。
poll.await.time.ms
两次拉取数据之间的时间间隔。
Integer
否
1000
单位为毫秒。
heartbeat.interval.ms
发送心跳包的时间间隔。
Integer
否
0
单位为毫秒。
MongoDB CDC连接器主动向数据库发送心跳包来保证回溯状态最新。设置为0代表永不发送心跳包。
重要对于更新不频繁的集合,强烈建议设定此选项。
scan.incremental.snapshot.enabled
是否启用并行模式进行初始快照。
Boolean
否
false
实验性功能。
scan.incremental.snapshot.chunk.size.mb
并行模式读取快照时的分片大小。
Integer
否
64
实验性功能。
单位为MB。
仅在启用并行快照时生效。
scan.full-changelog
产生完整的Full Changelog事件流。
Boolean
否
false
实验性功能。
说明MongoDB数据库需要为6.0及以上版本,并且已开启前像后像功能,开启方法请参见Document Preimages。
scan.flatten-nested-columns.enabled
是否将以
.
分隔的字段名解析为嵌套BSON文档读取。Boolean
否
false
若开启,在如下示例的BSON文档中,
col
字段在schema中名称为nested.col
。{"nested":{"col":true}}
说明仅VVR 8.0.5及以上版本支持该参数。
scan.primitive-as-string
是否将BSON文档中的原始类型都解析为字符串类型。
Boolean
否
false
说明仅VVR 8.0.5及以上版本支持该参数。
维表独有
参数
说明
数据类型
是否必填
默认值
备注
lookup.cache
Cache策略。
String
否
NONE
目前支持以下两种缓存策略:
None:无缓存。
Partial:只在外部数据库中查找数据时缓存。
lookup.max-retries
查询数据库失败的最大重试次数。
Integer
否
3
无。
lookup.retry.interval
如果查询数据库失败,重试的时间间隔。
Duration
否
1s
无。
lookup.partial-cache.expire-after-access
缓存中的记录最长保留时间。
Duration
否
无
支持时间单位ms、s、min、h和d。
使用该配置时
lookup.cache
必须设置为PARTIAL
。lookup.partial-cache.expire-after-write
在记录写入缓存后该记录的最大保留时间。
Duration
否
无
使用该配置时
lookup.cache
必须设置为PARTIAL
。lookup.partial-cache.max-rows
缓存的最大条数。超过该值,最旧的行将过期。
Long
否
无
使用该配置时
lookup.cache
必须设置为PARTIAL
。lookup.partial-cache.cache-missing-key
在物理表中未关联到数据时,是否缓存空记录。
Boolean
否
True
使用该配置时
lookup.cache
必须设置为PARTIAL
。结果表独有
参数
说明
数据类型
是否必填
默认值
备注
sink.buffer-flush.max-rows
每次按批写入数据时的最大记录数。
Integer
否
1000
无。
sink.buffer-flush.interval
写入数据的刷新间隔。
Duration
否
1s
无。
sink.delivery-guarantee
写入数据时的语义保证。
String
否
at-least-once
可选的取值包括:
none
at-least-once
说明目前不支持exactly-once。
sink.max-retries
写入数据库失败时的最大重试次数。
Integer
否
3
无。
sink.retry.interval
写入数据库失败时的重试时间间隔。
Duration
否
1s
无。
sink.parallelism
自定义sink并行度。
Integer
否
空
无。
类型映射
CDC源表
BSON类型
Flink SQL类型
Int32
INT
Int64
BIGINT
Double
DOUBLE
Decimal128
DECIMAL(p, s)
Boolean
BOOLEAN
Date Timestamp
DATE
Date Timestamp
TIME
DateTime
TIMESTAMP(3)
TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP(0)
TIMESTAMP_LTZ(0)
String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex
STRING
Binary
BYTES
Object
ROW
Array
ARRAY
DBPointer
ROW<$ref STRING, $id STRING>
GeoJSON
Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>
Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
维表和结果表
BSON类型
Flink SQL类型
Int32
INT
Int64
BIGINT
Double
DOUBLE
Decimal128
DECIMAL
Boolean
BOOLEAN
DateTime
TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP_LTZ(0)
String
ObjectId
STRING
Binary
BYTES
Object
ROW
Array
ARRAY
使用示例
CDC源表
CREATE TEMPORARY TABLE mongo_source ( `_id` STRING, --must be declared name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA VIRTUAL, op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection', 'scan.incremental.snapshot.enabled' = 'true', 'scan.full-changelog' = 'true' ); CREATE TEMPORARY TABLE productssink ( name STRING, weight DECIMAL, tags ARRAY<STRING>, price_amount DECIMAL, suppliers_name STRING, db_name STRING, collection_name STRING, op_ts TIMESTAMP_LTZ(3) ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO productssink SELECT name, weight, tags, price.amount, suppliers[1].name, db_name, collection_name, op_ts FROM mongo_source;
维表
CREATE TEMPORARY TABLE datagen_source ( id STRING, a int, b BIGINT, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mongo_dim ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection', 'lookup.cache' = 'PARTIAL', 'lookup.partial-cache.expire-after-access' = '10min', 'lookup.partial-cache.expire-after-write' = '10min', 'lookup.partial-cache.max-rows' = '100' ); CREATE TEMPORARY TABLE print_sink ( name STRING, weight DECIMAL, tags ARRAY<STRING>, price_amount DECIMAL, suppliers_name STRING ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO print_sink SELECT T.id, T.a, T.b, H.name FROM datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;
结果表
CREATE TEMPORARY TABLE datagen_source ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>> ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mongo_sink ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection' ); INSERT INTO mongo_sink SELECT * FROM datagen_source;
元数据
MongoDB CDC源表支持元数据列语法,您可以通过元数据列访问以下元数据。
元数据key | 元数据类型 | 描述 |
database_name | STRING NOT NULL | 包含该文档的数据库名。 |
collection_name | STRING NOT NULL | 包含该文档的集合名。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 该文档在数据库中的变更时间,如果该文档来自表的存量历史数据而不是从ChangeStream中获取,则该值总是0。 |
关于MongoDB的变更前后像记录功能
MongoDB 6.0 之前的版本默认不会提供变更前文档及被删除文档的数据,在未开启变更前后像记录功能时,利用已有信息只能实现 Upsert 语义(即缺失了 Update Before 数据条目)。但在 Flink 中许多有用的算子操作都依赖完整的 Insert、Update Before、Update After、Delete 变更流。
为了补充缺失的变更前事件,目前 Flink SQL Planner 会自动为 Upsert 类型的数据源生成一个 ChangelogNormalize 节点,该节点会在 Flink 状态中缓存所有文档的当前版本快照,在遇到被更新或删除的文档时,查表即可得知变更前的状态,但该算子节点需要存储体积巨大的状态数据。
MongoDB 6.0版本支持开启数据库的前像后像(Pre- and Post-images)记录功能,详情可参考Document Preimages。开启该功能后,MongoDB会在每次变更发生时,在一个特殊的集合中记录文档变更前后的完整状态。此时在作业中启用scan.full-changelog
配置项,MongoDB CDC会从变更文档记录中生成Update After记录,从而支持产生完整事件流,消除了对ChangelogNormalize节点的依赖。
Mongo CDC DataStream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法。
创建DataStream API程序并使用MongoDBSource。代码示例如下:
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
在使用DataStream API时,若要启用增量快照功能,请在构造MongoDBSource数据源时,使用com.ververica.cdc.connectors.mongodb.source
包中的MongoDBSource#builder()
;否则,使用com.ververica.cdc.connectors.mongodb
中的MongoDBSource#builder()
。
在构造MongoDBSource时,可以配置以下参数:
参数 | 说明 |
hosts | 需要连接的MongoDB数据库的主机名称。 |
username | MongoDB数据库服务的用户名。 说明 若MongoDB服务器未启用鉴权,则无需配置此参数。 |
password | MongoDB数据库服务的密码。 说明 若MongoDB服务器未启用鉴权,则无需配置此参数。 |
databaseList | 需要监控的MongoDB数据库名称。 说明 数据库名称支持正则表达式以读取多个数据库的数据,您可以使用 |
collectionList | 需要监控的MongoDB集合名称。 说明 集合名称支持正则表达式以读取多个集合的数据,您可以使用 |
startupOptions | 选择MongoDB CDC的启动模式。 合法的取值包括:
详情请参见Startup Properties。 |
deserializer | 反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下:
|