OSS CDC(公测中)

更新时间:
复制 MD 格式

OSS CDC连接器通过消费阿里云消息服务MNS队列中的OSS事件通知,实时捕获对象存储OSS的变更事件(新增、删除文件等),实现OSS数据变更的增量读取。同时支持全量扫描指定路径下的对象元数据。本文为您介绍如何使用OSS CDC连接器。

背景信息

OSS CDC连接器支持的信息如下。

类别

详情

支持类型

源表

运行模式

仅支持流模式

数据格式

暂不适用(固定Schema)

API种类

SQL和数据摄入YAML

是否支持更新或删除结果表数据

不涉及

特色功能

  • 事件驱动:通过MNS队列实时消费OSS事件通知,延迟低,无需轮询OSS接口。

  • 全量+增量:支持SNAPSHOT模式全量扫描指定路径下的对象,INITIAL模式先全量再增量,实现全增量一体化。

  • 路径过滤:SNAPSHOTINITIAL模式支持通过path参数指定多个路径前缀,仅扫描目标目录。

  • 限速控制:支持scan.rate-limit.records-per-second参数限制读取速率,防止下游压力过大。

  • 灵活启动:支持6种启动模式(EARLIEST_OFFSET、LATEST_OFFSET、COMMITTED_OFFSET、TIMESTAMP、INITIAL、SNAPSHOT),满足不同业务场景。

前提条件

使用OSS CDC连接器前,需满足以下条件。

配置项

要求

OSS事件通知

已在OSS Bucket上配置事件通知,将ObjectCreated、ObjectRemove等事件推送到MNS队列。

MNS队列(可选)

已创建MNS队列,且与Flink作业所在VPC网络互通。

说明

仅在使用增量启动模式时,才需要MNS消息队列。

AccessKey

已创建阿里云AccessKey,且具有MNSOSS的访问权限。

OSS V4签名(可选)

OSS开启了V4签名,需设置oss-enable-v4-signaturetrue并配置oss-region

MNS事件通知的配置方式请参见OSS事件通知

使用限制

  • 仅实时计算引擎VVR 11.8.0.preview.2及以上版本支持。

  • 仅支持源表,不支持维表和结果表。

  • DDL中定义的列必须为固定Schema中支持的列(参见数据类型映射),不支持自定义列。

  • 源表不支持定义Watermark。

  • MNS队列中的消息保留时间有限,若作业停止时间过长导致消息过期,可能丢失事件。

  • SNAPSHOTINITIAL模式依赖OSS ListObjects API,对大规模Bucket的全量扫描可能耗时较长。

注意事项

  • DDL中的key字段必须声明为NOT NULL,该字段作为对象的唯一标识。

  • 列名匹配不区分大小写,但建议使用固定Schema中定义的列名。

  • 使用SNAPSHOTINITIAL模式时,必须配置oss-endpointoss-bucket参数。若配置了path参数,则oss-bucket为必填。

  • 使用TIMESTAMP启动模式时,必须配置scan.startup.timestamp-millis参数。

  • 建议根据业务需求合理设置batch-sizepolling-wait-time,以平衡吞吐量和延迟。

准备工作

配置MNS消息队列

若需通过MNS同步OSS增量文件变更,需完成以下配置。若仅需同步一次全量数据(SNAPSHOT模式),可跳过此步骤。

  1. 登录OSS管理控制台,创建或选择需要监控的Bucket。

  2. 登录MNS控制台,在左侧导航栏单击队列模型 > 队列列表,创建队列。输入队列名称,其他参数保持默认即可。

  3. MNS控制台左侧导航栏,单击事件通知 > 对象存储OSS,单击创建规则

  4. 在弹窗中输入规则名称,勾选需要监控的事件动作。建议仅勾选创建、复制、删除三类事件,不要勾选下载文件事件,避免出现事件循环。

  5. 选择需要监控的OSS Bucket及匹配前缀。例如,仅监控images/虚拟目录下的文件变更,可在匹配规则处选择前缀并输入images/

  6. 创建完成后,回到队列模型 > 队列列表,复制接入点中的公网访问地址备用。

配置OSS鉴权

无论使用CDC YAML还是Flink SQL作业,均需在作业的运行参数配置 > 其他配置中增加OSS鉴权参数。详情请参见配置Bucket鉴权信息

SQL

语法结构

CREATE TEMPORARY TABLE oss_cdc_source (
  `url` STRING,
  `key` STRING NOT NULL,
  region STRING,
  bucket STRING,
  fileName STRING,
  `size` BIGINT,
  modificationTime TIMESTAMP_LTZ(3),
  eTag STRING
) WITH (
  'connector' = 'oss-cdc',
  'endpoint' = '<yourMnsEndpoint>',
  'region' = '<yourMnsRegion>',
  'queue-name' = '<yourMnsQueueName>',
  'access-key-id' = '<yourAccessKeyId>',
  'access-key-secret' = '<yourAccessKeySecret>'
);        

DDL中声明的列可以是固定Schema支持列的子集,但key列必须声明且为NOT NULL。不支持声明固定Schema以外的列。

WITH参数

以下为OSS CDC连接器在Flink SQL中的WITH参数。connector固定填写oss-cdc

通用参数

参数

说明

类型

是否必填

默认值

备注

connector

连接器类型。

STRING

固定填写oss-cdc

endpoint

MNS数据面服务Endpoint。

STRING

格式:http://{account-id}.mns.{region}.aliyuncs.com

region

MNS服务所在地域。

STRING

例如cn-hangzhou

queue-name

MNS队列名称。

STRING

用于消费OSS事件通知的MNS队列名称。

access-key-id

阿里云AccessKey ID。

STRING

access-key-secret

阿里云AccessKey Secret。

STRING

启动模式参数

参数

说明

类型

是否必填

默认值

备注

scan.startup.mode

消费启动模式。

STRING

COMMITTED_OFFSET

取值如下:

  • EARLIEST_OFFSET:从MNS队列最早可用消息开始消费。

  • LATEST_OFFSET:从最新消息开始消费。

  • COMMITTED_OFFSET(默认):从上次提交的消费位点开始消费,若无已提交位点则等同于LATEST_OFFSET

  • TIMESTAMP:从指定时间戳开始消费,需配合scan.startup.timestamp-millis使用。

  • INITIAL:先全量扫描OSS指定路径,再切换为增量消费MNS事件。

  • SNAPSHOT:仅全量扫描OSS指定路径,不消费增量事件。

scan.startup.timestamp-millis

启动消费的时间戳(毫秒)。

LONG

仅在scan.startup.modeTIMESTAMP时生效。

消费调优参数

参数

说明

类型

是否必填

默认值

备注

batch-size

单次批量拉取MNS消息的最大条数。

INTEGER

1

取值范围:1~16。

polling-wait-time

MNS长轮询等待时间。

DURATION

10s

取值范围:0~30秒。

delete-max-retries

MNS消息删除失败时的最大重试次数。

INTEGER

3

scan.rate-limit.records-per-second

每秒最大输出记录数。

DOUBLE

用于限制源表读取速率。不设置则不限速。

scan.parallelism

源表并行度。

INTEGER

不设置则使用作业默认并行度。

OSS参数(SNAPSHOT/INITIAL模式)

参数

说明

类型

是否必填

默认值

备注

oss-endpoint

OSS服务Endpoint。

STRING

SNAPSHOTINITIAL模式下必填。

oss-region

OSS服务地域。

STRING

若未设置,则回退至region参数值。启用V4签名时必须配置。

oss-bucket

OSS Bucket名称。

STRING

SNAPSHOTINITIAL模式下必填。配置path时也为必填。

oss-enable-v4-signature

是否启用OSS V4签名。

BOOLEAN

false

启用时必须配置oss-region(或region)。

path

OSS对象路径前缀。

STRING

多个前缀以英文逗号(,)分隔。用于SNAPSHOTINITIAL模式下过滤扫描范围。

数据类型映射

OSS CDC使用固定Schema,DDL中声明的列必须为以下支持的列。

OSS事件字段

DDL列名

Flink SQL类型

是否必填

说明

对象Key

key

STRING

是(NOT NULL)

OSS对象的唯一标识,即对象路径。

对象完整URL

url

STRING

OSS URL,格式如oss://my-bucket/some/file.bin,可直接用于FETCH_CONTENT函数获取文件内容。

所在地域

region

STRING

OSS Bucket所在地域。

Bucket名称

bucket

STRING

OSS Bucket名称。

文件名

fileName

STRING

对象文件名。

文件大小

size

BIGINT

对象大小,单位:字节。

修改时间

modificationTime

TIMESTAMP_LTZ(3)

对象最后修改时间。

ETag

eTag

STRING

对象的ETag值。

DDL中只需声明业务需要的列,未声明的列不会被读取。列名匹配不区分大小写。

使用示例

示例一:从MNS队列消费OSS事件(增量模式)

-- OSS CDC源表:消费MNS队列中的OSS事件通知
CREATE TEMPORARY TABLE oss_cdc_source (
  `key` STRING NOT NULL,
  bucket STRING,
  fileName STRING,
  `size` BIGINT
) WITH (
  'connector' = 'oss-cdc',
  'endpoint' = 'http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com',
  'region' = 'cn-hangzhou',
  'queue-name' = '<yourQueueName>',
  'access-key-id' = '<yourAccessKeyId>',
  'access-key-secret' = '<yourAccessKeySecret>'
);

-- 打印到控制台
CREATE TEMPORARY TABLE print_sink (
  `key` STRING,
  bucket STRING,
  fileName STRING,
  `size` BIGINT
) WITH (
  'connector' = 'print'
);

INSERT INTO print_sink
SELECT `key`, bucket, fileName, `size`
FROM oss_cdc_source;       

示例二:SNAPSHOT模式全量扫描OSS路径

CREATE TEMPORARY TABLE oss_cdc_snapshot (
  `key` STRING NOT NULL,
  bucket STRING,
  fileName STRING,
  `size` BIGINT,
  modificationTime TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'oss-cdc',
  'endpoint' = 'http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com',
  'region' = 'cn-hangzhou',
  'queue-name' = '<yourQueueName>',
  'access-key-id' = '<yourAccessKeyId>',
  'access-key-secret' = '<yourAccessKeySecret>',
  'scan.startup.mode' = 'SNAPSHOT',
  'oss-endpoint' = 'https://oss-cn-hangzhou.aliyuncs.com',
  'oss-bucket' = '<yourBucketName>',
  'path' = 'data/log/,data/csv/'
);        

示例三:INITIAL模式先全量再增量

CREATE TEMPORARY TABLE oss_cdc_initial (
  `key` STRING NOT NULL,
  bucket STRING,
  `size` BIGINT,
  eTag STRING
) WITH (
  'connector' = 'oss-cdc',
  'endpoint' = 'http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com',
  'region' = 'cn-hangzhou',
  'queue-name' = '<yourQueueName>',
  'access-key-id' = '<yourAccessKeyId>',
  'access-key-secret' = '<yourAccessKeySecret>',
  'scan.startup.mode' = 'INITIAL',
  'oss-endpoint' = 'https://oss-cn-hangzhou.aliyuncs.com',
  'oss-bucket' = '<yourBucketName>',
  'path' = 'data/'
);          

示例四:带调优参数的增量消费

CREATE TEMPORARY TABLE oss_cdc_tuned (
  `key` STRING NOT NULL,
  bucket STRING,
  fileName STRING,
  `size` BIGINT
) WITH (
  'connector' = 'oss-cdc',
  'endpoint' = 'http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com',
  'region' = 'cn-hangzhou',
  'queue-name' = '<yourQueueName>',
  'access-key-id' = '<yourAccessKeyId>',
  'access-key-secret' = '<yourAccessKeySecret>',
  'batch-size' = '8',
  'polling-wait-time' = '5s',
  'scan.parallelism' = '4'
);          

示例五:使用FETCH_CONTENT函数获取文件内容

CREATE TEMPORARY TABLE oss_cdc_source (
  `key` STRING NOT NULL,
  `url` STRING,
  bucket STRING,
  `size` BIGINT,
  eTag STRING
) WITH (
  'connector' = 'oss-cdc',
  'endpoint' = 'http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com',
  'region' = 'cn-hangzhou',
  'queue-name' = '<yourQueueName>',
  'access-key-id' = '<yourAccessKeyId>',
  'access-key-secret' = '<yourAccessKeySecret>',
  'scan.startup.mode' = 'INITIAL',
  'oss-endpoint' = 'oss-cn-hangzhou.aliyuncs.com',
  'oss-bucket' = '<yourBucketName>',
  'path' = 'images/'
);

CREATE TEMPORARY TABLE print_sink (
  `key` STRING,
  bucket STRING,
  `size` BIGINT,
  eTag STRING,
  `value` BYTES
) WITH (
  'connector' = 'print'
);

INSERT INTO print_sink
SELECT `key`, bucket, `size`, eTag, FETCH_CONTENT(url) AS `value`
FROM oss_cdc_source;      

Flink SQL完整入湖示例

以下示例展示通过Flink SQLOSS变更数据写入Paimon Blob存储的完整流程。

步骤一:创建Paimon目标表

BLOB列在建表时需通过blob-fieldblob-as-descriptor选项指定。详情请参见DLF Paimon Blob存储

CREATE TABLE my_catalog.my_db.image_table (
    key STRING,
    size BIGINT,
    eTag STRING,
    image BYTES,
    embedding VECTOR<FLOAT>
) WITH (
    'row-tracking.enabled' = 'true',
    'data-evolution.enabled' = 'true',
    'blob-field' = 'image',
    'blob-as-descriptor' = 'true'
);          

步骤二:创建OSS CDC源表

CREATE TEMPORARY TABLE oss_cdc_source (
  `key` STRING NOT NULL,
  `url` STRING,
  bucket STRING,
  `size` BIGINT,
  eTag STRING
) WITH (
  'connector' = 'oss-cdc',
  'endpoint' = 'http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com',
  'region' = 'cn-hangzhou',
  'queue-name' = '<yourQueueName>',
  'access-key-id' = '<yourAccessKeyId>',
  'access-key-secret' = '<yourAccessKeySecret>',
  'scan.startup.mode' = 'INITIAL',
  'oss-endpoint' = 'oss-cn-hangzhou.aliyuncs.com',
  'oss-bucket' = '<yourBucketName>',
  'path' = 'images/'
);       

步骤三:写入Paimon

通过dlf_catalog.sys.path_to_descriptor内置过程将URL转换为Blob描述符写入。

INSERT INTO dlf_catalog.my_db.image_table
SELECT
  `key`, `size`, eTag, dlf_catalog.sys.path_to_descriptor(url) AS image
FROM
  oss_cdc_source;          

数据摄入

语法结构

source:
  type: oss-cdc
  name: OSS CDC Source
  endpoint: http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com
  region: cn-hangzhou
  queue-name: <yourQueueName>
  access-key-id: <yourAccessKeyId>
  access-key-secret: <yourAccessKeySecret>

transform:
  - source-table: <yourBucketName>
    projection: <columns>
    table-options: <tableOptions>

route:
  - source-table: <yourBucketName>
    sink-table: <sinkDatabase>.<sinkTable>

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: rest
  catalog.properties.uri: <dlfEndpoint>
  catalog.properties.warehouse: <warehouseName>
  catalog.properties.token.provider: dlf

配置项

以下为OSS CDC连接器在数据摄入YAML中的配置项。type固定填写oss-cdc

通用参数

参数

说明

类型

是否必填

默认值

备注

type

连接器类型。

STRING

固定填写oss-cdc

endpoint

MNS数据面服务Endpoint。

STRING

格式:http://{account-id}.mns.{region}.aliyuncs.com

region

MNS服务所在地域。

STRING

例如cn-hangzhou

queue-name

MNS队列名称。

STRING

用于消费OSS事件通知的MNS队列名称。

access-key-id

阿里云AccessKey ID。

STRING

access-key-secret

阿里云AccessKey Secret。

STRING

启动模式参数

参数

说明

类型

是否必填

默认值

备注

scan.startup.mode

消费启动模式。

STRING

COMMITTED_OFFSET

取值如下:

  • EARLIEST_OFFSET:从MNS队列最早可用消息开始消费。

  • LATEST_OFFSET:从最新消息开始消费。

  • COMMITTED_OFFSET(默认):从上次提交的消费位点开始消费,若无已提交位点则等同于LATEST_OFFSET

  • TIMESTAMP:从指定时间戳开始消费,需配合scan.startup.timestamp-millis使用。

  • INITIAL:先全量扫描OSS指定路径,再切换为增量消费MNS事件。

  • SNAPSHOT:仅全量扫描OSS指定路径,不消费增量事件。

scan.startup.timestamp-millis

启动消费的时间戳(毫秒)。

LONG

仅在scan.startup.modeTIMESTAMP时生效。

消费调优参数

参数

说明

类型

是否必填

默认值

备注

batch-size

单次批量拉取MNS消息的最大条数。

INTEGER

1

取值范围:1~16。

polling-wait-time

MNS长轮询等待时间。

DURATION

10s

取值范围:0~30秒。

delete-max-retries

MNS消息删除失败时的最大重试次数。

INTEGER

3

scan.rate-limit.records-per-second

每秒最大输出记录数。

DOUBLE

用于限制源表读取速率。不设置则不限速。

scan.parallelism

源表并行度。

INTEGER

不设置则使用作业默认并行度。

OSS参数(SNAPSHOT/INITIAL模式)

参数

说明

类型

是否必填

默认值

备注

oss-endpoint

OSS服务Endpoint。

STRING

SNAPSHOTINITIAL模式下必填。

oss-region

OSS服务地域。

STRING

若未设置,则回退至region参数值。启用V4签名时必须配置。

oss-bucket

OSS Bucket名称。

STRING

SNAPSHOTINITIAL模式下必填。配置path时也为必填。

oss-enable-v4-signature

是否启用OSS V4签名。

BOOLEAN

false

启用时必须配置oss-region(或region)。

path

OSS对象路径前缀。

STRING

多个前缀以英文逗号(,)分隔。用于SNAPSHOTINITIAL模式下过滤扫描范围。

使用示例

文件元数据入湖

以下示例仅将OSS元数据的引用写入Paimon表,不会拷贝OSS文件的实际内容。

source:
  type: oss-cdc
  name: OSS CDC Source
  endpoint: http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com
  region: cn-hangzhou
  queue-name: <yourQueueName>
  access-key-id: <yourAccessKeyId>
  access-key-secret: <yourAccessKeySecret>

transform:
  - source-table: <yourBucketName>
    projection: key, url, fileName, eTag
    table-options: blob-field=url;blob-descriptor-field=url;blob-as-descriptor=true;row-tracking.enabled=true;data-evolution.enabled=true

route:
  - source-table: <yourBucketName>
    sink-table: paimon_db.paimon_table

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: rest
  catalog.properties.uri: http://cn-shenzhen-vpc.dlf.aliyuncs.com
  catalog.properties.warehouse: dlf_paimon
  catalog.properties.token.provider: dlf
          

文件元数据和内容同时入湖

以下示例通过FETCH_CONTENT函数获取文件内容,将OSS文件内容和元数据一并写入Paimon表。(替换对应的OSS BucketAK/SK)

source:
  type: oss-cdc
  name: OSS CDC Source
  endpoint: http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com
  region: cn-hangzhou
  queue-name: <yourQueueName>
  access-key-id: <yourAccessKeyId>
  access-key-secret: <yourAccessKeySecret>

transform:
  - source-table: <yourBucketName>
    projection: key, url, fileName, eTag, FETCH_CONTENT(`url`) as blob_field
    table-options: blob-field=url,blob_field;blob-descriptor-field=url;blob-as-descriptor=true;row-tracking.enabled=true;data-evolution.enabled=true

route:
  - source-table: <yourBucketName>
    sink-table: paimon_db.paimon_table

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: rest
  catalog.properties.uri: http://cn-shenzhen-vpc.dlf.aliyuncs.com
  catalog.properties.warehouse: dlf_paimon
  catalog.properties.token.provider: dlf