OSS CDC连接器通过消费阿里云消息服务MNS队列中的OSS事件通知,实时捕获对象存储OSS的变更事件(新增、删除文件等),实现OSS数据变更的增量读取。同时支持全量扫描指定路径下的对象元数据。本文为您介绍如何使用OSS CDC连接器。
背景信息
OSS CDC连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表 |
运行模式 | 仅支持流模式 |
数据格式 | 暂不适用(固定Schema) |
API种类 | SQL和数据摄入YAML |
是否支持更新或删除结果表数据 | 不涉及 |
特色功能
事件驱动:通过MNS队列实时消费OSS事件通知,延迟低,无需轮询OSS接口。
全量+增量:支持SNAPSHOT模式全量扫描指定路径下的对象,INITIAL模式先全量再增量,实现全增量一体化。
路径过滤:SNAPSHOT和INITIAL模式支持通过
path参数指定多个路径前缀,仅扫描目标目录。限速控制:支持
scan.rate-limit.records-per-second参数限制读取速率,防止下游压力过大。灵活启动:支持6种启动模式(EARLIEST_OFFSET、LATEST_OFFSET、COMMITTED_OFFSET、TIMESTAMP、INITIAL、SNAPSHOT),满足不同业务场景。
前提条件
使用OSS CDC连接器前,需满足以下条件。
配置项 | 要求 |
OSS事件通知 | 已在OSS Bucket上配置事件通知,将ObjectCreated、ObjectRemove等事件推送到MNS队列。 |
MNS队列(可选) | 已创建MNS队列,且与Flink作业所在VPC网络互通。 说明 仅在使用增量启动模式时,才需要MNS消息队列。 |
AccessKey | 已创建阿里云AccessKey,且具有MNS和OSS的访问权限。 |
OSS V4签名(可选) | 若OSS开启了V4签名,需设置 |
MNS事件通知的配置方式请参见OSS事件通知。
使用限制
仅实时计算引擎VVR 11.8.0.preview.2及以上版本支持。
仅支持源表,不支持维表和结果表。
DDL中定义的列必须为固定Schema中支持的列(参见数据类型映射),不支持自定义列。
源表不支持定义Watermark。
MNS队列中的消息保留时间有限,若作业停止时间过长导致消息过期,可能丢失事件。
SNAPSHOT和INITIAL模式依赖OSS ListObjects API,对大规模Bucket的全量扫描可能耗时较长。
注意事项
DDL中的
key字段必须声明为NOT NULL,该字段作为对象的唯一标识。列名匹配不区分大小写,但建议使用固定Schema中定义的列名。
使用SNAPSHOT或INITIAL模式时,必须配置
oss-endpoint和oss-bucket参数。若配置了path参数,则oss-bucket为必填。使用TIMESTAMP启动模式时,必须配置
scan.startup.timestamp-millis参数。建议根据业务需求合理设置
batch-size和polling-wait-time,以平衡吞吐量和延迟。
准备工作
配置MNS消息队列
若需通过MNS同步OSS增量文件变更,需完成以下配置。若仅需同步一次全量数据(SNAPSHOT模式),可跳过此步骤。
配置OSS鉴权
无论使用CDC YAML还是Flink SQL作业,均需在作业的运行参数配置 > 其他配置中增加OSS鉴权参数。详情请参见配置Bucket鉴权信息。
SQL
语法结构
CREATE TEMPORARY TABLE oss_cdc_source (
`url` STRING,
`key` STRING NOT NULL,
region STRING,
bucket STRING,
fileName STRING,
`size` BIGINT,
modificationTime TIMESTAMP_LTZ(3),
eTag STRING
) WITH (
'connector' = 'oss-cdc',
'endpoint' = '<yourMnsEndpoint>',
'region' = '<yourMnsRegion>',
'queue-name' = '<yourMnsQueueName>',
'access-key-id' = '<yourAccessKeyId>',
'access-key-secret' = '<yourAccessKeySecret>'
); DDL中声明的列可以是固定Schema支持列的子集,但key列必须声明且为NOT NULL。不支持声明固定Schema以外的列。
WITH参数
以下为OSS CDC连接器在Flink SQL中的WITH参数。connector固定填写oss-cdc。
通用参数
参数 | 说明 | 类型 | 是否必填 | 默认值 | 备注 |
connector | 连接器类型。 | STRING | 是 | 无 | 固定填写 |
endpoint | MNS数据面服务Endpoint。 | STRING | 是 | 无 | 格式: |
region | MNS服务所在地域。 | STRING | 是 | 无 | 例如 |
queue-name | MNS队列名称。 | STRING | 是 | 无 | 用于消费OSS事件通知的MNS队列名称。 |
access-key-id | 阿里云AccessKey ID。 | STRING | 是 | 无 | 无 |
access-key-secret | 阿里云AccessKey Secret。 | STRING | 是 | 无 | 无 |
启动模式参数
参数 | 说明 | 类型 | 是否必填 | 默认值 | 备注 |
scan.startup.mode | 消费启动模式。 | STRING | 否 | COMMITTED_OFFSET | 取值如下:
|
scan.startup.timestamp-millis | 启动消费的时间戳(毫秒)。 | LONG | 否 | 无 | 仅在 |
消费调优参数
参数 | 说明 | 类型 | 是否必填 | 默认值 | 备注 |
batch-size | 单次批量拉取MNS消息的最大条数。 | INTEGER | 否 | 1 | 取值范围:1~16。 |
polling-wait-time | MNS长轮询等待时间。 | DURATION | 否 | 10s | 取值范围:0~30秒。 |
delete-max-retries | MNS消息删除失败时的最大重试次数。 | INTEGER | 否 | 3 | 无 |
scan.rate-limit.records-per-second | 每秒最大输出记录数。 | DOUBLE | 否 | 无 | 用于限制源表读取速率。不设置则不限速。 |
scan.parallelism | 源表并行度。 | INTEGER | 否 | 无 | 不设置则使用作业默认并行度。 |
OSS参数(SNAPSHOT/INITIAL模式)
参数 | 说明 | 类型 | 是否必填 | 默认值 | 备注 |
oss-endpoint | OSS服务Endpoint。 | STRING | 否 | 无 | SNAPSHOT和INITIAL模式下必填。 |
oss-region | OSS服务地域。 | STRING | 否 | 无 | 若未设置,则回退至 |
oss-bucket | OSS Bucket名称。 | STRING | 否 | 无 | SNAPSHOT和INITIAL模式下必填。配置 |
oss-enable-v4-signature | 是否启用OSS V4签名。 | BOOLEAN | 否 | false | 启用时必须配置 |
path | OSS对象路径前缀。 | STRING | 否 | 无 | 多个前缀以英文逗号( |
数据类型映射
OSS CDC使用固定Schema,DDL中声明的列必须为以下支持的列。
OSS事件字段 | DDL列名 | Flink SQL类型 | 是否必填 | 说明 |
对象Key |
| STRING | 是(NOT NULL) | OSS对象的唯一标识,即对象路径。 |
对象完整URL |
| STRING | 否 | OSS URL,格式如 |
所在地域 |
| STRING | 否 | OSS Bucket所在地域。 |
Bucket名称 |
| STRING | 否 | OSS Bucket名称。 |
文件名 |
| STRING | 否 | 对象文件名。 |
文件大小 |
| BIGINT | 否 | 对象大小,单位:字节。 |
修改时间 |
| TIMESTAMP_LTZ(3) | 否 | 对象最后修改时间。 |
ETag |
| STRING | 否 | 对象的ETag值。 |
DDL中只需声明业务需要的列,未声明的列不会被读取。列名匹配不区分大小写。
使用示例
示例一:从MNS队列消费OSS事件(增量模式)
-- OSS CDC源表:消费MNS队列中的OSS事件通知
CREATE TEMPORARY TABLE oss_cdc_source (
`key` STRING NOT NULL,
bucket STRING,
fileName STRING,
`size` BIGINT
) WITH (
'connector' = 'oss-cdc',
'endpoint' = 'http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com',
'region' = 'cn-hangzhou',
'queue-name' = '<yourQueueName>',
'access-key-id' = '<yourAccessKeyId>',
'access-key-secret' = '<yourAccessKeySecret>'
);
-- 打印到控制台
CREATE TEMPORARY TABLE print_sink (
`key` STRING,
bucket STRING,
fileName STRING,
`size` BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO print_sink
SELECT `key`, bucket, fileName, `size`
FROM oss_cdc_source; 示例二:SNAPSHOT模式全量扫描OSS路径
CREATE TEMPORARY TABLE oss_cdc_snapshot (
`key` STRING NOT NULL,
bucket STRING,
fileName STRING,
`size` BIGINT,
modificationTime TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'oss-cdc',
'endpoint' = 'http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com',
'region' = 'cn-hangzhou',
'queue-name' = '<yourQueueName>',
'access-key-id' = '<yourAccessKeyId>',
'access-key-secret' = '<yourAccessKeySecret>',
'scan.startup.mode' = 'SNAPSHOT',
'oss-endpoint' = 'https://oss-cn-hangzhou.aliyuncs.com',
'oss-bucket' = '<yourBucketName>',
'path' = 'data/log/,data/csv/'
); 示例三:INITIAL模式先全量再增量
CREATE TEMPORARY TABLE oss_cdc_initial (
`key` STRING NOT NULL,
bucket STRING,
`size` BIGINT,
eTag STRING
) WITH (
'connector' = 'oss-cdc',
'endpoint' = 'http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com',
'region' = 'cn-hangzhou',
'queue-name' = '<yourQueueName>',
'access-key-id' = '<yourAccessKeyId>',
'access-key-secret' = '<yourAccessKeySecret>',
'scan.startup.mode' = 'INITIAL',
'oss-endpoint' = 'https://oss-cn-hangzhou.aliyuncs.com',
'oss-bucket' = '<yourBucketName>',
'path' = 'data/'
); 示例四:带调优参数的增量消费
CREATE TEMPORARY TABLE oss_cdc_tuned (
`key` STRING NOT NULL,
bucket STRING,
fileName STRING,
`size` BIGINT
) WITH (
'connector' = 'oss-cdc',
'endpoint' = 'http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com',
'region' = 'cn-hangzhou',
'queue-name' = '<yourQueueName>',
'access-key-id' = '<yourAccessKeyId>',
'access-key-secret' = '<yourAccessKeySecret>',
'batch-size' = '8',
'polling-wait-time' = '5s',
'scan.parallelism' = '4'
); 示例五:使用FETCH_CONTENT函数获取文件内容
CREATE TEMPORARY TABLE oss_cdc_source (
`key` STRING NOT NULL,
`url` STRING,
bucket STRING,
`size` BIGINT,
eTag STRING
) WITH (
'connector' = 'oss-cdc',
'endpoint' = 'http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com',
'region' = 'cn-hangzhou',
'queue-name' = '<yourQueueName>',
'access-key-id' = '<yourAccessKeyId>',
'access-key-secret' = '<yourAccessKeySecret>',
'scan.startup.mode' = 'INITIAL',
'oss-endpoint' = 'oss-cn-hangzhou.aliyuncs.com',
'oss-bucket' = '<yourBucketName>',
'path' = 'images/'
);
CREATE TEMPORARY TABLE print_sink (
`key` STRING,
bucket STRING,
`size` BIGINT,
eTag STRING,
`value` BYTES
) WITH (
'connector' = 'print'
);
INSERT INTO print_sink
SELECT `key`, bucket, `size`, eTag, FETCH_CONTENT(url) AS `value`
FROM oss_cdc_source; Flink SQL完整入湖示例
以下示例展示通过Flink SQL将OSS变更数据写入Paimon Blob存储的完整流程。
步骤一:创建Paimon目标表
BLOB列在建表时需通过blob-field和blob-as-descriptor选项指定。详情请参见DLF Paimon Blob存储。
CREATE TABLE my_catalog.my_db.image_table (
key STRING,
size BIGINT,
eTag STRING,
image BYTES,
embedding VECTOR<FLOAT>
) WITH (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'blob-field' = 'image',
'blob-as-descriptor' = 'true'
); 步骤二:创建OSS CDC源表
CREATE TEMPORARY TABLE oss_cdc_source (
`key` STRING NOT NULL,
`url` STRING,
bucket STRING,
`size` BIGINT,
eTag STRING
) WITH (
'connector' = 'oss-cdc',
'endpoint' = 'http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com',
'region' = 'cn-hangzhou',
'queue-name' = '<yourQueueName>',
'access-key-id' = '<yourAccessKeyId>',
'access-key-secret' = '<yourAccessKeySecret>',
'scan.startup.mode' = 'INITIAL',
'oss-endpoint' = 'oss-cn-hangzhou.aliyuncs.com',
'oss-bucket' = '<yourBucketName>',
'path' = 'images/'
); 步骤三:写入Paimon表
通过dlf_catalog.sys.path_to_descriptor内置过程将URL转换为Blob描述符写入。
INSERT INTO dlf_catalog.my_db.image_table
SELECT
`key`, `size`, eTag, dlf_catalog.sys.path_to_descriptor(url) AS image
FROM
oss_cdc_source; 数据摄入
语法结构
source:
type: oss-cdc
name: OSS CDC Source
endpoint: http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com
region: cn-hangzhou
queue-name: <yourQueueName>
access-key-id: <yourAccessKeyId>
access-key-secret: <yourAccessKeySecret>
transform:
- source-table: <yourBucketName>
projection: <columns>
table-options: <tableOptions>
route:
- source-table: <yourBucketName>
sink-table: <sinkDatabase>.<sinkTable>
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: rest
catalog.properties.uri: <dlfEndpoint>
catalog.properties.warehouse: <warehouseName>
catalog.properties.token.provider: dlf配置项
以下为OSS CDC连接器在数据摄入YAML中的配置项。type固定填写oss-cdc。
通用参数
参数 | 说明 | 类型 | 是否必填 | 默认值 | 备注 |
type | 连接器类型。 | STRING | 是 | 无 | 固定填写 |
endpoint | MNS数据面服务Endpoint。 | STRING | 是 | 无 | 格式: |
region | MNS服务所在地域。 | STRING | 是 | 无 | 例如 |
queue-name | MNS队列名称。 | STRING | 是 | 无 | 用于消费OSS事件通知的MNS队列名称。 |
access-key-id | 阿里云AccessKey ID。 | STRING | 是 | 无 | 无 |
access-key-secret | 阿里云AccessKey Secret。 | STRING | 是 | 无 | 无 |
启动模式参数
参数 | 说明 | 类型 | 是否必填 | 默认值 | 备注 |
scan.startup.mode | 消费启动模式。 | STRING | 否 | COMMITTED_OFFSET | 取值如下:
|
scan.startup.timestamp-millis | 启动消费的时间戳(毫秒)。 | LONG | 否 | 无 | 仅在 |
消费调优参数
参数 | 说明 | 类型 | 是否必填 | 默认值 | 备注 |
batch-size | 单次批量拉取MNS消息的最大条数。 | INTEGER | 否 | 1 | 取值范围:1~16。 |
polling-wait-time | MNS长轮询等待时间。 | DURATION | 否 | 10s | 取值范围:0~30秒。 |
delete-max-retries | MNS消息删除失败时的最大重试次数。 | INTEGER | 否 | 3 | 无 |
scan.rate-limit.records-per-second | 每秒最大输出记录数。 | DOUBLE | 否 | 无 | 用于限制源表读取速率。不设置则不限速。 |
scan.parallelism | 源表并行度。 | INTEGER | 否 | 无 | 不设置则使用作业默认并行度。 |
OSS参数(SNAPSHOT/INITIAL模式)
参数 | 说明 | 类型 | 是否必填 | 默认值 | 备注 |
oss-endpoint | OSS服务Endpoint。 | STRING | 否 | 无 | SNAPSHOT和INITIAL模式下必填。 |
oss-region | OSS服务地域。 | STRING | 否 | 无 | 若未设置,则回退至 |
oss-bucket | OSS Bucket名称。 | STRING | 否 | 无 | SNAPSHOT和INITIAL模式下必填。配置 |
oss-enable-v4-signature | 是否启用OSS V4签名。 | BOOLEAN | 否 | false | 启用时必须配置 |
path | OSS对象路径前缀。 | STRING | 否 | 无 | 多个前缀以英文逗号( |
使用示例
文件元数据入湖
以下示例仅将OSS元数据的引用写入Paimon表,不会拷贝OSS文件的实际内容。
source:
type: oss-cdc
name: OSS CDC Source
endpoint: http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com
region: cn-hangzhou
queue-name: <yourQueueName>
access-key-id: <yourAccessKeyId>
access-key-secret: <yourAccessKeySecret>
transform:
- source-table: <yourBucketName>
projection: key, url, fileName, eTag
table-options: blob-field=url;blob-descriptor-field=url;blob-as-descriptor=true;row-tracking.enabled=true;data-evolution.enabled=true
route:
- source-table: <yourBucketName>
sink-table: paimon_db.paimon_table
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: rest
catalog.properties.uri: http://cn-shenzhen-vpc.dlf.aliyuncs.com
catalog.properties.warehouse: dlf_paimon
catalog.properties.token.provider: dlf
文件元数据和内容同时入湖
以下示例通过FETCH_CONTENT函数获取文件内容,将OSS文件内容和元数据一并写入Paimon表。(替换对应的OSS Bucket和AK/SK)
source:
type: oss-cdc
name: OSS CDC Source
endpoint: http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com
region: cn-hangzhou
queue-name: <yourQueueName>
access-key-id: <yourAccessKeyId>
access-key-secret: <yourAccessKeySecret>
transform:
- source-table: <yourBucketName>
projection: key, url, fileName, eTag, FETCH_CONTENT(`url`) as blob_field
table-options: blob-field=url,blob_field;blob-descriptor-field=url;blob-as-descriptor=true;row-tracking.enabled=true;data-evolution.enabled=true
route:
- source-table: <yourBucketName>
sink-table: paimon_db.paimon_table
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: rest
catalog.properties.uri: http://cn-shenzhen-vpc.dlf.aliyuncs.com
catalog.properties.warehouse: dlf_paimon
catalog.properties.token.provider: dlf