轻量消息队列MNS

更新时间:
复制为 MD 格式

本文介绍如何使用轻量消息队列MNS连接器。

背景信息

轻量消息队列(原 MNS)是一种高效、可靠、安全、便捷、可弹性扩展的分布式消息服务。MNS提供OSS事件通知能力,通过创建事件通知规则,MNS可以将对象存储 OSS指定资源上产生的事件(例如新文件被创建)以消息的方式推送到MNS队列中。

Flink作业可以使用MNS连接器消费这些事件,例如在实时图像处理场景,用户可以使用MNS连接器实时获取OSS Bucket中的新文件的路径,再配合实时计算Flink版提供的FETCH_CONTENT来下载图片内容并调用AI大语言模型集成相关能力进行实时多模态分析。

类别

详情

支持类型

源表

运行模式

流模式

数据格式

Orc、Parquet、Avro、Csv、JSONRaw

特有监控指标

duplicateMessages(重复消息数)、deletedMessages(已删除消息数)、failedDeletes(删除失败数)、deserializationErrors(反序列化错误数)

API种类

SQL

前提条件

使用限制

  • 仅实时计算引擎VVR 11.6.0及以上版本支持MNS连接器。

  • MNS连接器与Kafka不同,不支持从指定位点消费消息,也不支持回溯。详情请参见轻量消息队列(原 MNS)

  • 并行度固定为1:MNS 连接器通过 Connector 侧去重实现 Exactly-Once 语义,仅支持单并发消费。

  • 必须开启Checkpoint:MNS连接器依赖Checkpoint实现消息的确认和删除。如果未开启Checkpoint,消息将不会被删除,导致无限重复消费。

  • 消息体大小限制:MNS单条消息Body大小不能超过64 KB。如需处理更大消息,请参见MNS超大消息传输最佳实践。

  • 消息可见性设置:用户在创建MNS队列时需设置消息可见性(Visibility Timeout)。消息可见性超时时间建议大于 Checkpoint 间隔,避免消息重复投递。

  • MNS连接器不保证消费事件时的严格顺序。MNS 普通队列非严格 FIFO 队列,消费超时后消息重投递可能导致顺序变化。

语法结构

CREATE TABLE mns_source (
    data STRING
) WITH (
    'connector' = 'mns',
    'endpoint' = '${endpoint}',
    'region' = '${region}',
    'queueName' = '${queueName}',
    'accessKeyId' = '${accessKeyId}',
    'accessKeySecret' = '${accessKeySecret}',
    'format' = 'json',
    'batchSize' = '8',
    'pollingWaitTime' = '10s',
    'messageType' = 'RAW'
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为mns

    endpoint

    MNS服务接入点

    String

    URI格式,例如http://{account-id}.mns.{region}.aliyuncs.com

    详情请参见地域接入点

    region

    MNS服务所在地域。

    String

    例如cn-hangzhou,支持的地域请参见接入点

    queueName

    MNS队列名。

    String

    MNS控制台创建队列时设置的队列名称。

    accessKeyId

    访问MNS服务所需AK

    String

    请使用已有AccessKey或者参考创建AccessKey重新创建。

    accessKeySecret

    访问MNS服务所需SK

    String

    format

    数据格式。

    String

    参数取值如下:

    • csv

    • json

    • avro

    • parquet

    • orc

    • raw

    batchSize

    单次从MNS队列拉取消息的最大条数。

    Integer

    1

    有效范围:1-16。该参数影响读性能,调大可增加吞吐。注意由于MNS服务接口限制,单次最多返回16条消息。

    pollingWaitTime

    每次从MNS队列拉取消息时的最长等待时间。

    Duration

    10s

    取值范围0-30s。设为0则代表不等待。

    messageType

    MNS消息体载荷类型。

    String

    RAW

    取值为RAWOSS

    RAW(默认值):将消息体视为标准JSON,由配置的 format 反序列化器进行解析。

    OSS:自动解析OSS事件通知格式的JSON(需要MNS服务订阅OSS事件)。连接器会自动从 events 数组中提取第一个元素,并将嵌套字段扁平化映射到表结构。

    说明

    当 messageType 设为 OSS 时,format 必须为 json

    deleteMaxRetries

    MNS SDK删除消息失败时的最大重试次数。

    Integer

    3

    无。

    startTimeMs

    消息消费起始时间(Unix时间戳,单位毫秒)。

    Long

    -1

    -1代表不按时间戳过滤事件,即会尝试消费队列中所有可见消息。

    说明

    MNS连接器不支持类似Kafka那样的消息回溯,此处的startTimeMs只用来过滤。MNS连接器不会将进入MNS队列时间早于该时间戳的消息发送到下游。

读取OSS事件通知

当设置 'messageType' = 'OSS' 时,MNS连接器会自动解析OSS事件通知中的JSON字段并映射为扁平的表结构。支持以下字段名映射(不区分大小写):

用户表字段名

JSON路径

说明

eventName

eventName

事件类型。

eventSource

eventSource

事件源。

eventTime

eventTime

事件产生的时间。

eventVersion

eventVersion

事件协议的版本。

region

region

Bucket所在的地域。

ossBucketArn

oss.bucket.arn

Bucket的唯一标识符。

ossBucketName

oss.bucket.name

Bucket的名称。

ossBucketOwnerIdentity

oss.bucket.ownerIdentity

创建Bucket的用户ID。

ossObjectKey

oss.object.key

Object的名称。

ossObjectSize

oss.object.size

Object的大小。

ossObjectETag

oss.object.eTag

Object 的 ETag 值,可用于校验内容是否变化。

ossObjectDeltaSize

oss.object.deltaSize

Object的大小变化量。

ossObjectReadFrom

oss.object.readFrom

文件开始读取的位置。

ossObjectReadTo

oss.object.readTo

文件最后读取的位置。

ossOssSchemaVersion

oss.ossSchemaVersion

OSS模式的版本号。

ossRuleId

oss.ruleId

事件匹配的规则ID。

requestParametersSourceIPAddress

requestParameters.sourceIPAddress

请求的源IP。

responseElementsRequestId

responseElements.requestId

请求对应的Request ID。

userIdentityPrincipalId

userIdentity.principalId

请求发起者的UID。

也可以通过JSON格式参数控制解析行为:

  • json.fail-on-missing-field:缺失字段时是否报错(默认false)

  • json.ignore-parse-errors:是否忽略解析错误(默认false)

  • json.timestamp-format.standard:时间戳格式(默认iso-8601)

  • json.timestamp-format.pattern:自定义时间戳格式模式

以上字段名详细介绍参考OSS事件通知

使用示例

  • 示例一:消费标准JSON消息

    -- 创建MNS源表
    -- 字段名来自于消息体的json格式
    CREATE TEMPORARY TABLE mns_source (
      `userId` BIGINT,
      `action` STRING,
      `timestamp` TIMESTAMP(3),
      `payload` STRING
    ) WITH (
      'connector' = 'mns',
      'endpoint' = 'http://your-account-id.mns.cn-hangzhou.aliyuncs.com',
      'region' = 'cn-hangzhou',
      'queueName' = 'my-events-queue',
      'accessKeyId' = 'your-ak',
      'accessKeySecret' = 'your-sk',
      'format' = 'json'
    );
    
    -- 创建结果表用于测试输出
    CREATE TEMPORARY TABLE print_sink (
      `userId` BIGINT,
      `action` STRING,
      `timestamp` TIMESTAMP(3),
      `payload` STRING
    ) WITH (
      'connector' = 'print'
    );
    
    -- 消费并输出
    INSERT INTO print_sink
    SELECT userId, action, timestamp, payload
    FROM mns_source;
  • 示例二:消费OSS事件通知消息

    -- 创建MNS源表,自动解析OSS事件通知JSON
    -- 字段名定义参考“读取OSS事件通知“
    CREATE TEMPORARY TABLE oss_event_source (
      `eventName` STRING,
      `eventSource` STRING,
      `eventTime` TIMESTAMP(3),
      `region` STRING,
      `ossBucketName` STRING,
      `ossObjectKey` STRING,
      `ossObjectSize` BIGINT,
      `responseElementsRequestId` STRING,
      `userIdentityPrincipalId` STRING
    ) WITH (
      'connector' = 'mns',
      'endpoint' = 'http://123456789.mns.cn-hangzhou.aliyuncs.com',
      'region' = 'cn-hangzhou',
      'queueName' = 'oss-events-queue',
      'accessKeyId' = '${secret_values.ak_id}',
      'accessKeySecret' = '${secret_values.ak_secret}',
      'format' = 'json',
      'messageType' = 'OSS'
    );
    
    -- 创建结果表用于测试输出
    CREATE TEMPORARY TABLE print_sink (
      `eventName` STRING,
      `ossBucketName` STRING,
      `ossObjectKey` STRING,
      `ossObjectSize` BIGINT,
      `eventTime` TIMESTAMP(3)
    ) WITH (
      'connector' = 'print'
    );
    
    -- 过滤出ObjectCreated事件并输出
    INSERT INTO print_sink
    SELECT eventName, ossBucketName, ossObjectKey, ossObjectSize, eventTime
    FROM oss_event_source
    WHERE eventName LIKE 'ObjectCreated:%';