本文为您介绍如何使用MongoDB连接器。

背景信息

MongoDB是一个面向文档的非结构化数据库,能够简化应用程序的开发及扩展。MongoDB连接器支持的信息如下:

类别

详情

支持类型

源表、维表和结果表

运行模式

仅支持流模式

特有监控指标

  • 源表

    • numBytesIn

    • numBytesInPerSecond

    • numRecordsIn

    • numRecordsInPerSecond

    • numRecordsInErrors

    • currentFetchEventTimeLag

    • currentEmitEventTimeLag

    • watermarkLag

    • sourceIdleTime

  • 维表和结果表:无。

说明

指标含义详情,请参见监控指标说明

API 种类

DataStream和SQL

是否支持更新或删除结果表数据

特色功能

  • MongoDB的CDC源表,即MongoDB的流式源表,会先读取数据库的历史全量数据,并平滑切换到oplog读取上,保证不多读一条也不少读一条。即使发生故障,也能保证通过Exactly Once语义处理数据。MongoDB CDC支持通过Change Stream API高效地捕获MongoDB的数据库和集合中的文档变更,监控文档的插入、修改、替换、删除事件,并将其转换为Flink能够处理的Changelog数据流。作为源表,支持以下功能特性:

    • 支持利用MongoDB 3.6新增的Change Stream API,更高效地监控变化。

    • 精确一次处理:在作业任何阶段失败都能保证Exactly-once语义。

    • 支持全增量一体化监测:支持快照阶段完成后自动切换为增量读取阶段。

    • 支持初始快照阶段的并行读取,需要MongoDB >= 4.0。

    • 支持多种启动模式:

      • initial模式:在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的oplog。

      • latest-offset模式:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从oplog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。

      • timestamp:跳过快照阶段,从指定的时间戳开始读取oplog事件,需要MongoDB >= 4.0。

    • 支持产生Full Changelog事件流,需要MongoDB >= 6.0,详情请参见关于MongoDB的变更前后像记录功能

  • 实时计算Flink VVR 8.0.6及以上版本支持通过CREATE TABLE AS(CTAS)语句CREATE DATABASE AS(CDAS)语句将MongoDB的数据和Schema变更同步到下游表。使用时需开启MongoDB数据库的前像后像(Pre- and Post-images)记录功能,详情请参见关于MongoDB的变更前后像记录功能

  • 实时计算Flink VVR 8.0.9及以上版本扩展维表关联读取能力,支持读取内置ObjectId 类型的_id字段。

前提条件

  • CDC源表

    • CDC连接器支持通过副本集或分片集架构模式读取阿里云云数据库MongoDB版的数据,也支持读取自建MongoDB数据库的数据 。

    • 使用MongoDB CDC连接器的基础功能时,必须开启待监控的MongoDB数据库的副本集(Replica Set)功能,详情请参见Replication

    • 如需使用Full Changelog事件流功能,则需开启MongoDB数据库的前像后像(Pre- and Post-images)记录功能,详情请参见Document Preimages关于MongoDB的变更前后像记录功能

    • 如果启用了MongoDB的鉴权功能,则需要使用具有以下数据库权限的MongoDB用户:

      • splitVector权限

      • listDatabases权限

      • listCollections权限

      • collStats权限

      • find权限

      • changeStream权限

      • config.collections和config.chunks集合的访问权限

  • 维表和结果表

    • 已创建MongoDB数据库和表

    • 已设置IP白名单

使用限制

  • 仅支持读写3.6及以上版本的MongoDB。

  • CDC源表

    • 实时计算引擎VVR 8.0.1及以上版本支持使用MongoDB CDC连接器。

    • MongoDB 6.0及以上版本支持产生Full Changelog事件流。

    • MongoDB 4.0及以上版本支持指定时间戳的启动模式。

    • MongoDB 4.0及以上版本支持初始快照阶段并行读取。如果您需要启用并行模式进行初始快照,则需要将scan.incremental.snapshot.enabled配置项设置为true。

    • 由于MongoDB Change Stream流订阅限制,不支持读取admin、local、config数据库及system集合中的数据,详情请参见MongoDB文档

  • 结果表

    • 实时计算引擎VVR 8.0.5以下版本仅支持插入数据。

    • 实时计算引擎VVR 8.0.5及以上版本,结果表中声明主键时,支持插入、更新和删除数据,未声明主键时仅支持插入数据。

  • 维表

    • 实时计算引擎VVR 8.0.5及以上版本支持使用MongoDB维表。

语法结构

CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)
说明

在创建CDC源表时,您必须声明_id STRING列,并将其作为唯一的主键。

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    连接器名称。

    String

    • 作为源表:

      • 实时计算引擎VVR 8.0.4及之前版本,填写为mongodb-cdc。

      • 实时计算引擎VVR 8.0.5及之后版本,填写为mongodb或mongodb-cdc。

    • 作为维表或结果表时,固定值为mongodb。

    uri

    MongoDB连接uri。

    String

    说明

    参数urihosts必须指定其中之一。若指定uri,则无需指定schemehostsusernamepasswordconnector.options。当两者均指定时将使用uri进行连接。

    hosts

    MongoDB所在的主机名称。

    String

    可以使用英文逗号(,)分隔提供多个主机名。

    scheme

    MongoDB使用的连接协议。

    String

    mongodb

    可选的取值包括:

    • mongodb:代表使用默认的MongoDB协议进行连接

    • mongodb+srv:代表使用DNS SRV记录协议进行连接

    username

    连接到MongoDB时使用的用户名。

    String

    开启身份验证功能时,必须配置该参数。

    password

    连接到MongoDB时使用的密码。

    String

    开启身份验证功能时,必须配置该参数。

    重要

    为了避免您的密码信息泄露,建议您通过密钥管理的方式填写密码取值,详情请参见变量管理

    database

    MongoDB数据库名称。

    String

    • 作为源表时,数据库名称支持正则表达式匹配。

    • 不配置该参数代表监控全部数据库。

    重要

    不支持监控admin、local、config数据库中的数据。

    collection

    MongoDB集合名称。

    String

    • 作为源表时,集合名称支持正则表达式匹配。

      重要

      如果您要监控的集合名称中包含正则表达式特殊字符,则必须提供完全限定的名字空间(数据库名称.集合名称),否则无法捕获对应集合的变更。

    • 不配置该参数代表监控全部集合。

    重要

    不支持监控system集合中的数据。

    connection.options

    MongoDB侧的连接参数。

    String

    使用&分隔的key=value式额外连接参数。例如connectTimeoutMS=12000&socketTimeoutMS=13000。

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    scan.startup.mode

    MongoDB CDC的启动模式。

    String

    initial

    参数取值如下:

    • initial:从初始位点开始拉取全部数据。

    • latest-offset:从当前位点开始拉取变更数据。

    • timestamp:从指定的时间戳开始拉取变更数据。

    详情请参见Startup Properties

    scan.startup.timestamp-millis

    指定位点消费的起始时间戳。

    Long

    取决于 scan.startup.mode的取值

    • initial:否

    • latest-offset:否

    • timestamp:是

    参数格式为自Linux Epoch时间戳以来的毫秒数。

    仅适用于timestamp启动模式。

    initial.snapshotting.queue.size

    进行初始快照时的队列大小限制。

    Integer

    10240

    仅在scan.startup.mode选项设置为initial 时生效。

    batch.size

    游标的批处理大小。

    Integer

    1024

    无。

    poll.max.batch.size

    同一批处理的最多变更文档数量。

    Integer

    1024

    此参数控制流处理时一次拉取最多变更文档的个数。取值越大,连接器内部分配的缓冲区越大。

    poll.await.time.ms

    两次拉取数据之间的时间间隔。

    Integer

    1000

    单位为毫秒。

    heartbeat.interval.ms

    发送心跳包的时间间隔。

    Integer

    0

    单位为毫秒。

    MongoDB CDC连接器主动向数据库发送心跳包来保证回溯状态最新。设置为0代表永不发送心跳包。

    重要

    对于更新不频繁的集合,强烈建议设定此选项。

    scan.incremental.snapshot.enabled

    是否启用并行模式进行初始快照。

    Boolean

    false

    实验性功能。

    scan.incremental.snapshot.chunk.size.mb

    并行模式读取快照时的分片大小。

    Integer

    64

    实验性功能。

    单位为MB。

    仅在启用并行快照时生效。

    scan.full-changelog

    产生完整的Full Changelog事件流。

    Boolean

    false

    实验性功能。

    说明

    MongoDB数据库需要为6.0及以上版本,并且已开启前像后像功能,开启方法请参见Document Preimages

    scan.flatten-nested-columns.enabled

    是否将以.分隔的字段名解析为嵌套BSON文档读取。

    Boolean

    false

    若开启,在如下示例的BSON文档中,col字段在schema中名称为nested.col

    {"nested":{"col":true}}
    说明

    仅VVR 8.0.5及以上版本支持该参数。

    scan.primitive-as-string

    是否将BSON文档中的原始类型都解析为字符串类型。

    Boolean

    false

    说明

    仅VVR 8.0.5及以上版本支持该参数。

  • 维表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    lookup.cache

    Cache策略。

    String

    NONE

    目前支持以下两种缓存策略:

    • None:无缓存。

    • Partial:只在外部数据库中查找数据时缓存。

    lookup.max-retries

    查询数据库失败的最大重试次数。

    Integer

    3

    无。

    lookup.retry.interval

    如果查询数据库失败,重试的时间间隔。

    Duration

    1s

    无。

    lookup.partial-cache.expire-after-access

    缓存中的记录最长保留时间。

    Duration

    支持时间单位ms、s、min、h和d。

    使用该配置时 lookup.cache 必须设置为 PARTIAL

    lookup.partial-cache.expire-after-write

    在记录写入缓存后该记录的最大保留时间。

    Duration

    使用该配置时 lookup.cache 必须设置为 PARTIAL

    lookup.partial-cache.max-rows

    缓存的最大条数。超过该值,最旧的行将过期。

    Long

    使用该配置时 lookup.cache 必须设置为 PARTIAL

    lookup.partial-cache.cache-missing-key

    在物理表中未关联到数据时,是否缓存空记录。

    Boolean

    True

    使用该配置时 lookup.cache 必须设置为 PARTIAL

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    sink.buffer-flush.max-rows

    每次按批写入数据时的最大记录数。

    Integer

    1000

    无。

    sink.buffer-flush.interval

    写入数据的刷新间隔。

    Duration

    1s

    无。

    sink.delivery-guarantee

    写入数据时的语义保证。

    String

    at-least-once

    可选的取值包括:

    • none

    • at-least-once

    说明

    目前不支持exactly-once。

    sink.max-retries

    写入数据库失败时的最大重试次数。

    Integer

    3

    无。

    sink.retry.interval

    写入数据库失败时的重试时间间隔。

    Duration

    1s

    无。

    sink.parallelism

    自定义sink并行度。

    Integer

    无。

类型映射

  • CDC源表

    BSON类型

    Flink SQL类型

    Int32

    INT

    Int64

    BIGINT

    Double

    DOUBLE

    Decimal128

    DECIMAL(p, s)

    Boolean

    BOOLEAN

    Date Timestamp

    DATE

    Date Timestamp

    TIME

    DateTime

    TIMESTAMP(3)

    TIMESTAMP_LTZ(3)

    Timestamp

    TIMESTAMP(0)

    TIMESTAMP_LTZ(0)

    String

    ObjectId

    UUID

    Symbol

    MD5

    JavaScript

    Regex

    STRING

    Binary

    BYTES

    Object

    ROW

    Array

    ARRAY

    DBPointer

    ROW<$ref STRING, $id STRING>

    GeoJSON

    Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>

    Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>

  • 维表和结果表

    BSON类型

    Flink SQL类型

    Int32

    INT

    Int64

    BIGINT

    Double

    DOUBLE

    Decimal128

    DECIMAL

    Boolean

    BOOLEAN

    DateTime

    TIMESTAMP_LTZ(3)

    Timestamp

    TIMESTAMP_LTZ(0)

    String

    ObjectId

    STRING

    Binary

    BYTES

    Object

    ROW

    Array

    ARRAY

使用示例

  • CDC源表

    CREATE TEMPORARY TABLE mongo_source (
      `_id` STRING, --must be declared
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>,
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      collection_name STRING METADATA VIRTUAL,
      op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
      PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
      'username' = 'root',
      'password' = '${secret_values.password}',
      'database' = 'flinktest',
      'collection' = 'flinkcollection',
      'scan.incremental.snapshot.enabled' = 'true',
      'scan.full-changelog' = 'true'
    );
    
    CREATE TEMPORARY TABLE  productssink (
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price_amount DECIMAL,
      suppliers_name STRING,
      db_name STRING,
      collection_name STRING,
      op_ts TIMESTAMP_LTZ(3)
    ) WITH (
      'connector' = 'print',
      'logger' = 'true'
    );
    
    INSERT INTO productssink  
    SELECT
      name,
      weight,
      tags,
      price.amount,
      suppliers[1].name,
      db_name,
      collection_name,
      op_ts
    FROM
      mongo_source;
  • 维表

    CREATE TEMPORARY TABLE datagen_source (
      id STRING,
      a int,
      b BIGINT,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mongo_dim (
      `_id` STRING,
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>,
      PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
      'username' = 'root',
      'password' = '${secret_values.password}',
      'database' = 'flinktest',
      'collection' = 'flinkcollection',
      'lookup.cache' = 'PARTIAL',
      'lookup.partial-cache.expire-after-access' = '10min',
      'lookup.partial-cache.expire-after-write' = '10min',
      'lookup.partial-cache.max-rows' = '100'
    );
    
    CREATE TEMPORARY TABLE print_sink (
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price_amount DECIMAL,
      suppliers_name STRING
    ) WITH (
      'connector' = 'print',
      'logger' = 'true'
    );
    
    INSERT INTO print_sink
    SELECT
      T.id,
      T.a,
      T.b,
      H.name
    FROM
      datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;
  • 结果表

    CREATE TEMPORARY TABLE datagen_source (
      `_id` STRING,
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mongo_sink (
      `_id` STRING,
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>,
      PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
      'username' = 'root',
      'password' = '${secret_values.password}',
      'database' = 'flinktest',
      'collection' = 'flinkcollection'
    );
    
    INSERT INTO mongo_sink
    SELECT * FROM datagen_source;

元数据

MongoDB CDC源表支持元数据列语法,您可以通过元数据列访问以下元数据。

元数据key

元数据类型

描述

database_name

STRING NOT NULL

包含该文档的数据库名。

collection_name

STRING NOT NULL

包含该文档的集合名。

op_ts

TIMESTAMP_LTZ(3) NOT NULL

该文档在数据库中的变更时间,如果该文档来自表的存量历史数据而不是从ChangeStream中获取,则该值总是0。

关于MongoDB的变更前后像记录功能

MongoDB 6.0 之前的版本默认不会提供变更前文档及被删除文档的数据,在未开启变更前后像记录功能时,利用已有信息只能实现 Upsert 语义(即缺失了 Update Before 数据条目)。但在 Flink 中许多有用的算子操作都依赖完整的 Insert、Update Before、Update After、Delete 变更流。

为了补充缺失的变更前事件,目前 Flink SQL Planner 会自动为 Upsert 类型的数据源生成一个 ChangelogNormalize 节点,该节点会在 Flink 状态中缓存所有文档的当前版本快照,在遇到被更新或删除的文档时,查表即可得知变更前的状态,但该算子节点需要存储体积巨大的状态数据。

image.png

MongoDB 6.0版本支持开启数据库的前像后像(Pre- and Post-images)记录功能,详情可参考Document Preimages。开启该功能后,MongoDB会在每次变更发生时,在一个特殊的集合中记录文档变更前后的完整状态。此时在作业中启用scan.full-changelog配置项,MongoDB CDC会从变更文档记录中生成Update Before记录,从而支持产生完整事件流,消除了对ChangelogNormalize节点的依赖。

Mongo CDC DataStream API

重要

通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法

创建DataStream API程序并使用MongoDBSource。代码示例如下:

Java

MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")
  .collectionList("testcoll")
  .startupOptions(StartupOptions.initial())
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();

XML

Maven中央仓库已经放置了VVR MongoDB连接器,以供您在作业开发时直接使用。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>${vvr.version}</version>
</dependency>
说明

在使用DataStream API时,若要启用增量快照功能,请在构造MongoDBSource数据源时,使用com.ververica.cdc.connectors.mongodb.source包中的MongoDBSource#builder();否则,使用com.ververica.cdc.connectors.mongodb中的MongoDBSource#builder()

在构造MongoDBSource时,可以配置以下参数:

参数

说明

hosts

需要连接的MongoDB数据库的主机名称。

username

MongoDB数据库服务的用户名。

说明

若MongoDB服务器未启用鉴权,则无需配置此参数。

password

MongoDB数据库服务的密码。

说明

若MongoDB服务器未启用鉴权,则无需配置此参数。

databaseList

需要监控的MongoDB数据库名称。

说明

数据库名称支持正则表达式以读取多个数据库的数据,您可以使用.*匹配所有数据库。

collectionList

需要监控的MongoDB集合名称。

说明

集合名称支持正则表达式以读取多个集合的数据,您可以使用.*匹配所有集合。

startupOptions

选择MongoDB CDC的启动模式。

合法的取值包括:

  • StartupOptions.initial()

    • 从初始位点开始拉取全部数据

  • StartupOptions.latest-offset()

    • 从当前位点开始拉取变更数据

  • StartupOptions.timestamp()

    • 从指定的时间戳开始拉取变更数据

详情请参见Startup Properties

deserializer

反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下:

  • MongoDBConnectorDeserializationSchema:将Upsert模式下产生的SourceRecord转成Flink Table API或SQL API内部数据结构RowData。

  • MongoDBConnectorFullChangelogDeserializationSchema:将Full Changelog模式下产生的SourceRecord转成Flink Table或SQL内部数据结构RowData。

  • JsonDebeziumDeserializationSchema:将SourceRecord转成JSON格式的String。