本文介绍如何使用轻量消息队列MNS连接器。
背景信息
轻量消息队列(原 MNS)是一种高效、可靠、安全、便捷、可弹性扩展的分布式消息服务。MNS提供OSS事件通知能力,通过创建事件通知规则,MNS可以将对象存储 OSS指定资源上产生的事件(例如新文件被创建)以消息的方式推送到MNS队列中。
Flink作业可以使用MNS连接器消费这些事件,例如在实时图像处理场景,用户可以使用MNS连接器实时获取OSS Bucket中的新文件的路径,再配合实时计算Flink版提供的FETCH_CONTENT来下载图片内容并调用AI大语言模型集成相关能力进行实时多模态分析。
类别 | 详情 |
支持类型 | 源表 |
运行模式 | 流模式 |
数据格式 | Orc、Parquet、Avro、Csv、JSON和Raw |
特有监控指标 | duplicateMessages(重复消息数)、deletedMessages(已删除消息数)、failedDeletes(删除失败数)、deserializationErrors(反序列化错误数) |
API种类 | SQL |
前提条件
已开通轻量消息队列(原 MNS)并授权与Flink工作空间处于同一地域(内网访问)。
MNS 通过公网访问时,Flink 工作空间需开启公网访问,并将 Flink 的公网 IP 加入 MNS 队列的白名单,详情请参见MNS访问控制。
使用限制
仅实时计算引擎VVR 11.6.0及以上版本支持MNS连接器。
MNS连接器与Kafka不同,不支持从指定位点消费消息,也不支持回溯。详情请参见轻量消息队列(原 MNS)。
并行度固定为1:MNS 连接器通过 Connector 侧去重实现 Exactly-Once 语义,仅支持单并发消费。
必须开启Checkpoint:MNS连接器依赖Checkpoint实现消息的确认和删除。如果未开启Checkpoint,消息将不会被删除,导致无限重复消费。
消息体大小限制:MNS单条消息Body大小不能超过64 KB。如需处理更大消息,请参见MNS的超大消息传输最佳实践。
消息可见性设置:用户在创建MNS队列时需设置消息可见性(Visibility Timeout)。消息可见性超时时间建议大于 Checkpoint 间隔,避免消息重复投递。
MNS连接器不保证消费事件时的严格顺序。MNS 普通队列非严格 FIFO 队列,消费超时后消息重投递可能导致顺序变化。
语法结构
CREATE TABLE mns_source (
data STRING
) WITH (
'connector' = 'mns',
'endpoint' = '${endpoint}',
'region' = '${region}',
'queueName' = '${queueName}',
'accessKeyId' = '${accessKeyId}',
'accessKeySecret' = '${accessKeySecret}',
'format' = 'json',
'batchSize' = '8',
'pollingWaitTime' = '10s',
'messageType' = 'RAW'
);WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为
mns。endpoint
MNS服务接入点
String
是
无
URI格式,例如
http://{account-id}.mns.{region}.aliyuncs.com。详情请参见地域接入点。
region
MNS服务所在地域。
String
是
无
例如cn-hangzhou,支持的地域请参见接入点。
queueName
MNS队列名。
String
是
无
在MNS控制台创建队列时设置的队列名称。
accessKeyId
访问MNS服务所需AK
String
是
无
请使用已有AccessKey或者参考创建AccessKey重新创建。
accessKeySecret
访问MNS服务所需SK
String
是
无
format
数据格式。
String
是
无
参数取值如下:
csv
json
avro
parquet
orc
raw
batchSize
单次从MNS队列拉取消息的最大条数。
Integer
否
1
有效范围:1-16。该参数影响读性能,调大可增加吞吐。注意由于MNS服务接口限制,单次最多返回16条消息。
pollingWaitTime
每次从MNS队列拉取消息时的最长等待时间。
Duration
否
10s
取值范围0-30s。设为0则代表不等待。
messageType
MNS消息体载荷类型。
String
否
RAW
取值为
RAW或OSS。RAW(默认值):将消息体视为标准JSON,由配置的format反序列化器进行解析。OSS:自动解析OSS事件通知格式的JSON(需要MNS服务订阅OSS事件)。连接器会自动从events数组中提取第一个元素,并将嵌套字段扁平化映射到表结构。说明当
messageType设为OSS时,format必须为json。deleteMaxRetries
当MNS SDK删除消息失败时的最大重试次数。
Integer
否
3
无。
startTimeMs
消息消费起始时间(Unix时间戳,单位毫秒)。
Long
否
-1
-1代表不按时间戳过滤事件,即会尝试消费队列中所有可见消息。
说明MNS连接器不支持类似Kafka那样的消息回溯,此处的startTimeMs只用来过滤。MNS连接器不会将进入MNS队列时间早于该时间戳的消息发送到下游。
读取OSS事件通知
当设置 'messageType' = 'OSS' 时,MNS连接器会自动解析OSS事件通知中的JSON字段并映射为扁平的表结构。支持以下字段名映射(不区分大小写):
用户表字段名 | JSON路径 | 说明 |
eventName | eventName | 事件类型。 |
eventSource | eventSource | 事件源。 |
eventTime | eventTime | 事件产生的时间。 |
eventVersion | eventVersion | 事件协议的版本。 |
region | region | Bucket所在的地域。 |
ossBucketArn | oss.bucket.arn | Bucket的唯一标识符。 |
ossBucketName | oss.bucket.name | Bucket的名称。 |
ossBucketOwnerIdentity | oss.bucket.ownerIdentity | 创建Bucket的用户ID。 |
ossObjectKey | oss.object.key | Object的名称。 |
ossObjectSize | oss.object.size | Object的大小。 |
ossObjectETag | oss.object.eTag | Object 的 ETag 值,可用于校验内容是否变化。 |
ossObjectDeltaSize | oss.object.deltaSize | Object的大小变化量。 |
ossObjectReadFrom | oss.object.readFrom | 文件开始读取的位置。 |
ossObjectReadTo | oss.object.readTo | 文件最后读取的位置。 |
ossOssSchemaVersion | oss.ossSchemaVersion | OSS模式的版本号。 |
ossRuleId | oss.ruleId | 事件匹配的规则ID。 |
requestParametersSourceIPAddress | requestParameters.sourceIPAddress | 请求的源IP。 |
responseElementsRequestId | responseElements.requestId | 请求对应的Request ID。 |
userIdentityPrincipalId | userIdentity.principalId | 请求发起者的UID。 |
也可以通过JSON格式参数控制解析行为:
json.fail-on-missing-field:缺失字段时是否报错(默认false)json.ignore-parse-errors:是否忽略解析错误(默认false)json.timestamp-format.standard:时间戳格式(默认iso-8601)json.timestamp-format.pattern:自定义时间戳格式模式
以上字段名详细介绍参考OSS事件通知。
使用示例
示例一:消费标准JSON消息
-- 创建MNS源表 -- 字段名来自于消息体的json格式 CREATE TEMPORARY TABLE mns_source ( `userId` BIGINT, `action` STRING, `timestamp` TIMESTAMP(3), `payload` STRING ) WITH ( 'connector' = 'mns', 'endpoint' = 'http://your-account-id.mns.cn-hangzhou.aliyuncs.com', 'region' = 'cn-hangzhou', 'queueName' = 'my-events-queue', 'accessKeyId' = 'your-ak', 'accessKeySecret' = 'your-sk', 'format' = 'json' ); -- 创建结果表用于测试输出 CREATE TEMPORARY TABLE print_sink ( `userId` BIGINT, `action` STRING, `timestamp` TIMESTAMP(3), `payload` STRING ) WITH ( 'connector' = 'print' ); -- 消费并输出 INSERT INTO print_sink SELECT userId, action, timestamp, payload FROM mns_source;示例二:消费OSS事件通知消息
-- 创建MNS源表,自动解析OSS事件通知JSON -- 字段名定义参考“读取OSS事件通知“ CREATE TEMPORARY TABLE oss_event_source ( `eventName` STRING, `eventSource` STRING, `eventTime` TIMESTAMP(3), `region` STRING, `ossBucketName` STRING, `ossObjectKey` STRING, `ossObjectSize` BIGINT, `responseElementsRequestId` STRING, `userIdentityPrincipalId` STRING ) WITH ( 'connector' = 'mns', 'endpoint' = 'http://123456789.mns.cn-hangzhou.aliyuncs.com', 'region' = 'cn-hangzhou', 'queueName' = 'oss-events-queue', 'accessKeyId' = '${secret_values.ak_id}', 'accessKeySecret' = '${secret_values.ak_secret}', 'format' = 'json', 'messageType' = 'OSS' ); -- 创建结果表用于测试输出 CREATE TEMPORARY TABLE print_sink ( `eventName` STRING, `ossBucketName` STRING, `ossObjectKey` STRING, `ossObjectSize` BIGINT, `eventTime` TIMESTAMP(3) ) WITH ( 'connector' = 'print' ); -- 过滤出ObjectCreated事件并输出 INSERT INTO print_sink SELECT eventName, ossBucketName, ossObjectKey, ossObjectSize, eventTime FROM oss_event_source WHERE eventName LIKE 'ObjectCreated:%';