Simple Message Queue (formerly MNS) connector

更新时间:
复制 MD 格式

The SMQ connector lets Flink jobs consume messages from Simple Message Queue (formerly MNS) queues in real time, including OSS event notifications pushed by MNS.

Overview

For example, in a real-time image processing pipeline, the SMQ connector detects newly uploaded files in an OSS bucket, the FETCH_CONTENT function downloads the image, and AI large language model integration performs multimodal analysis.

Category

Details

Supported type

SQL source

Execution mode

Stream

Data format

ORC, Parquet, Avro, CSV, JSON, and raw

Monitoring metrics

duplicateMessages, deletedMessages, failedDeletes, deserializationErrors

API type

Flink SQL

Prerequisites

Before you begin, make sure that you have:

Limitations

  • The SMQ connector requires Ververica Runtime (VVR) 11.6.0 or later.

  • No offset-based consumption: Unlike Apache Kafka, the SMQ connector does not support consuming from a specific offset or seeking.

  • Fixed parallelism of 1: The SMQ connector achieves exactly-once semantics through connector-side deduplication and supports a parallelism of 1 only.

  • Checkpoints required: The connector relies on checkpoints to acknowledge and delete messages. Without checkpoints, messages are never deleted, which causes infinite repeated consumption.

  • Message body size limit: A single MNS message body cannot exceed 64 KB. For larger messages, see Transmit oversized messages.

  • Visibility timeout configuration: Set the visibility timeout to a value greater than the checkpoint interval to avoid message redelivery.

  • No strict ordering: MNS standard queues are not strictly First-In, First-Out (FIFO). Message redelivery after a timeout can alter the consumption order.

Syntax

CREATE TABLE mns_source (
    data STRING
) WITH (
    'connector' = 'mns',
    'endpoint' = '${endpoint}',
    'region' = '${region}',
    'queueName' = '${queueName}',
    'accessKeyId' = '${accessKeyId}',
    'accessKeySecret' = '${accessKeySecret}',
    'format' = 'json',
    'batchSize' = '8',
    'pollingWaitTime' = '10s',
    'messageType' = 'RAW'
);

Connector options

General

Option

Type

Required

Default

Description

connector

String

Yes

-

The connector type. Fixed to mns.

endpoint

String

Yes

-

The MNS service endpoint.

Format: http://{account-id}.mns.{region}.aliyuncs.com. For details, see Endpoints.

region

String

Yes

-

The region where the SMQ queue resides.

Example: ap-southeast-1. For details, see Endpoints.

queueName

String

Yes

-

The name of the MNS queue to consume from.

accessKeyId

String

Yes

-

The AccessKey ID for SMQ authentication.

Use an existing AccessKey or create an AccessKey pair.

accessKeySecret

String

Yes

-

The AccessKey secret for MNS authentication.

Use an existing AccessKey or create an AccessKey pair.

format

String

Yes

-

The message deserialization format.

Valid values: csv, json, avro, parquet, orc, raw.

batchSize

Integer

No

1

The maximum number of messages per batch poll request.

Valid range: 1–16. Larger values increase throughput. The SMQ API allows up to 16 messages per call.

pollingWaitTime

Duration

No

10s

The long-polling wait time when no messages are available in the queue.

Valid range: 0s–30s. 0s disables long polling.

messageType

String

No

RAW

The message payload type. Valid values:

  • RAW

  • OSS

deleteMaxRetries

Integer

No

3

The maximum number of retries if message deletion fails.

startTimeMs

Long

No

-1

The consumption start time as a Unix timestamp in milliseconds. -1 disables timestamp filtering and consumes all visible messages. Unlike Kafka, this option filters messages by time only and does not support message replay.

messageType values:

  • RAW: Treats the message body as standard data and deserializes it using the configured format.

  • OSS: Parses JSON from OSS event notifications. Requires an MNS subscription to OSS events. The connector extracts the first element from the events array, then flattens and maps nested fields to the table schema.

Note

When messageType is set to OSS, format must be json.

Read OSS event notifications

When 'messageType' = 'OSS', the SMQ connector automatically parses OSS event notification JSON and maps the fields to a flat table structure. The following field name mappings are supported (case-insensitive):

Field name

JSON path

Description

eventName

eventName

The event type.

eventSource

eventSource

The event source.

eventTime

eventTime

The event time.

eventVersion

eventVersion

The event protocol version.

region

region

The bucket region.

ossBucketArn

oss.bucket.arn

The bucket's unique identifier.

ossBucketName

oss.bucket.name

The bucket name.

ossBucketOwnerIdentity

oss.bucket.ownerIdentity

The user ID of the bucket creator.

ossObjectKey

oss.object.key

The object name.

ossObjectSize

oss.object.size

The object size.

ossObjectETag

oss.object.eTag

The object's ETag, used to detect content changes.

ossObjectDeltaSize

oss.object.deltaSize

The change in object size.

ossObjectReadFrom

oss.object.readFrom

The file read start position.

ossObjectReadTo

oss.object.readTo

The file read end position.

ossOssSchemaVersion

oss.ossSchemaVersion

The OSS schema version number.

ossRuleId

oss.ruleId

The ID of the matched rule.

requestParametersSourceIPAddress

requestParameters.sourceIPAddress

The source IP address of the request.

responseElementsRequestId

responseElements.requestId

The request ID.

userIdentityPrincipalId

userIdentity.principalId

The user ID (UID) of the requester.

The following JSON format parameters control parsing behavior:

  • json.fail-on-missing-field: Whether to fail if a field is missing. Default: false.

  • json.ignore-parse-errors: Whether to ignore parse errors. Default: false.

  • json.timestamp-format.standard: The timestamp format standard. Default: ISO-8601.

  • json.timestamp-format.pattern: A custom pattern for the timestamp format.

For detailed field descriptions, see OSS event notifications.

Examples

Example 1: Consume standard JSON messages

-- Create an MNS source table.
-- Field names must match the keys in the JSON message body.
CREATE TEMPORARY TABLE mns_source (
  `userId` BIGINT,
  `action` STRING,
  `timestamp` TIMESTAMP(3),
  `payload` STRING
) WITH (
  'connector' = 'mns',
  'endpoint' = 'http://your-account-id.mns.cn-hangzhou.aliyuncs.com',
  'region' = 'cn-hangzhou',
  'queueName' = 'my-events-queue',
  'accessKeyId' = 'your-ak',
  'accessKeySecret' = 'your-sk',
  'format' = 'json'
);

-- Create a print sink to test output.
CREATE TEMPORARY TABLE print_sink (
  `userId` BIGINT,
  `action` STRING,
  `timestamp` TIMESTAMP(3),
  `payload` STRING
) WITH (
  'connector' = 'print'
);

-- Consume messages and write to the sink.
INSERT INTO print_sink
SELECT userId, action, timestamp, payload
FROM mns_source;

Example 2: Consume OSS event notification messages

-- Create an MNS source table to automatically parse OSS event notification JSON.
-- For field name mappings, see the "Read OSS event notifications" section.
CREATE TEMPORARY TABLE oss_event_source (
  `eventName` STRING,
  `eventSource` STRING,
  `eventTime` TIMESTAMP(3),
  `region` STRING,
  `ossBucketName` STRING,
  `ossObjectKey` STRING,
  `ossObjectSize` BIGINT,
  `responseElementsRequestId` STRING,
  `userIdentityPrincipalId` STRING
) WITH (
  'connector' = 'mns',
  'endpoint' = 'http://123456789.mns.cn-hangzhou.aliyuncs.com',
  'region' = 'cn-hangzhou',
  'queueName' = 'oss-events-queue',
  'accessKeyId' = '${secret_values.ak_id}',
  'accessKeySecret' = '${secret_values.ak_secret}',
  'format' = 'json',
  'messageType' = 'OSS'
);

-- Create a print sink to test output.
CREATE TEMPORARY TABLE print_sink (
  `eventName` STRING,
  `ossBucketName` STRING,
  `ossObjectKey` STRING,
  `ossObjectSize` BIGINT,
  `eventTime` TIMESTAMP(3)
) WITH (
  'connector' = 'print'
);

-- Filter for ObjectCreated events and output results.
INSERT INTO print_sink
SELECT eventName, ossBucketName, ossObjectKey, ossObjectSize, eventTime
FROM oss_event_source
WHERE eventName LIKE 'ObjectCreated:%';