本文为您介绍如何使用MongoDB连接器。

背景信息

  • 云数据库MongoDB版完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份恢复、性能优化等功能。
  • MongoDB连接器支持的信息如下。
    类别详情
    支持类型结果表
    支持的模式流模式
    数据格式JSON
    特有监控指标
    • numRecordsOut
    • numRecordsOutPerSecond
    • currentSendTime
    说明 指标的含义及查看监控指标的操作,请参见查看监控指标
    API种类SQL
    是否支持更新或删除结果表数据

前提条件

已创建云数据库MongoDB实例,详情请参见创建单节点实例

使用限制

  • 仅Flink计算引擎VVR 2.0.0及以上版本支持云数据库MongoDB连接器。
  • 目前MongoDB结果表不支持主键更新,支持重复插入。

语法结构

CREATE TABLE mongodb_sink(
  id INT, 
  number INT
) WITH (
  'connector' = 'mongodb',
  'database' = '<yourDatabase>',
  'collection' = '<yourCollection>', 
  'uri' = '<yourUri>'
);

WITH参数

参数说明数据类型是否必填默认值备注
connector结果表类型String固定值为mongodb。
database数据库名称String无。
collection数据集合名称String无。
uriMongoDB连接串String例如mongodb://root:xxxxx@dds-xxxxxx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xxxxxxx
maxConnectionIdleTime连接超时时长Long60000单位为毫秒。
batchSize每次批量写入的条数Integer1024无。

类型映射

Flink字段类型MongoDB字段类型
STRINGString
DOUBLEDouble
BOOLEANBoolean
INT32-bit integer
BIGINT64-bit integer
ARRAYArray
DATEDate
TIMESTAMPTimestamp

代码示例

CREATE TEMPORARY TABLE datagen_source (
  v INT, 
  p INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE mongodb_sink(
  v INT, 
  p INT
) WITH (
  'connector'='mongodb',
  'database' = '<yourDatabase>',
  'collection' = '<yourCollection>', 
  'uri'='<yourUri>'
);

INSERT INTO mongodb_sink SELECT v, p FROM datagen_source;