云数据库MongoDB

更新时间: 2023-09-22 16:07:27

本文为您介绍如何使用MongoDB连接器。

背景信息

  • 云数据库MongoDB版完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份恢复、性能优化等功能。

  • MongoDB连接器支持的信息如下。

    类别

    详情

    支持类型

    结果表、源表(文档请参见MongoDB CDC(公测中)

    支持的模式

    流模式

    数据格式

    JSON

    特有监控指标

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    说明

    指标的含义及查看监控指标的操作,请参见上报和查看监控指标

    API种类

    SQL

    是否支持更新或删除结果表数据

前提条件

已创建云数据库MongoDB实例,详情请参见创建单节点实例

使用限制

  • 仅Flink计算引擎VVR 2.0.0及以上版本支持云数据库MongoDB连接器。

  • 目前MongoDB结果表不支持主键更新,支持重复插入。

语法结构

CREATE TABLE mongodb_sink(
  id INT, 
  number INT
) WITH (
  'connector' = 'mongodb',
  'database' = '<yourDatabase>',
  'collection' = '<yourCollection>', 
  'uri' = '<yourUri>'
);

WITH参数

参数

说明

数据类型

是否必填

默认值

备注

connector

结果表类型

String

固定值为mongodb。

database

数据库名称

String

无。

collection

数据集合名称

String

无。

uri

MongoDB连接串

String

例如mongodb://root:xxxxx@dds-xxxxxx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xxxxxxx

maxConnectionIdleTime

连接超时时长

Long

60000

单位为毫秒。

batchSize

每次批量写入的条数

Integer

1024

无。

类型映射

Flink字段类型

MongoDB字段类型

STRING

String

DOUBLE

Double

BOOLEAN

Boolean

INT

32-bit integer

BIGINT

64-bit integer

ARRAY

Array

DATE

Date

TIMESTAMP

Timestamp

代码示例

CREATE TEMPORARY TABLE datagen_source (
  v INT, 
  p INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE mongodb_sink(
  v INT, 
  p INT
) WITH (
  'connector'='mongodb',
  'database' = '<yourDatabase>',
  'collection' = '<yourCollection>', 
  'uri'='<yourUri>'
);

INSERT INTO mongodb_sink SELECT v, p FROM datagen_source;
阿里云首页 实时计算Flink版 相关技术圈