云数据库MongoDB
本文为您介绍如何使用MongoDB连接器。
背景信息
云数据库MongoDB版完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份恢复、性能优化等功能。
MongoDB连接器支持的信息如下。
类别
详情
支持类型
结果表、源表(文档请参见MongoDB CDC(公测中))
支持的模式
流模式
数据格式
JSON
特有监控指标
numRecordsOut
numRecordsOutPerSecond
currentSendTime
说明指标的含义及查看监控指标的操作,请参见上报和查看监控指标。
API种类
SQL
是否支持更新或删除结果表数据
是
前提条件
已创建云数据库MongoDB实例,详情请参见创建单节点实例。
使用限制
仅Flink计算引擎VVR 2.0.0及以上版本支持云数据库MongoDB连接器。
目前MongoDB结果表不支持主键更新,支持重复插入。
语法结构
CREATE TABLE mongodb_sink(
id INT,
number INT
) WITH (
'connector' = 'mongodb',
'database' = '<yourDatabase>',
'collection' = '<yourCollection>',
'uri' = '<yourUri>'
);
WITH参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 结果表类型 | String | 是 | 无 | 固定值为mongodb。 |
database | 数据库名称 | String | 是 | 无 | 无。 |
collection | 数据集合名称 | String | 是 | 无 | 无。 |
uri | MongoDB连接串 | String | 是 | 无 | 例如 |
maxConnectionIdleTime | 连接超时时长 | Long | 否 | 60000 | 单位为毫秒。 |
batchSize | 每次批量写入的条数 | Integer | 否 | 1024 | 无。 |
类型映射
Flink字段类型 | MongoDB字段类型 |
STRING | String |
DOUBLE | Double |
BOOLEAN | Boolean |
INT | 32-bit integer |
BIGINT | 64-bit integer |
ARRAY | Array |
DATE | Date |
TIMESTAMP | Timestamp |
代码示例
CREATE TEMPORARY TABLE datagen_source (
v INT,
p INT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongodb_sink(
v INT,
p INT
) WITH (
'connector'='mongodb',
'database' = '<yourDatabase>',
'collection' = '<yourCollection>',
'uri'='<yourUri>'
);
INSERT INTO mongodb_sink SELECT v, p FROM datagen_source;