The SMQ connector lets Flink jobs consume messages from Simple Message Queue (formerly MNS) queues in real time, including OSS event notifications pushed by MNS.
Overview
For example, in a real-time image processing pipeline, the SMQ connector detects newly uploaded files in an OSS bucket, the FETCH_CONTENT function downloads the image, and AI large language model integration performs multimodal analysis.
|
Category |
Details |
|
Supported type |
SQL source |
|
Execution mode |
Stream |
|
Data format |
ORC, Parquet, Avro, CSV, JSON, and raw |
|
Monitoring metrics |
duplicateMessages, deletedMessages, failedDeletes, deserializationErrors |
|
API type |
Flink SQL |
Prerequisites
Before you begin, make sure that you have:
-
Simple Message Queue (formerly MNS): Activation and permissions. For internal network access, the SMQ queue must be in the same region as your Flink workspace.
-
(For public internet access) Enabled public access for your Flink workspace and added its public IP addresses to the MNS queue allowlist. For details, see Access control.
Limitations
-
The SMQ connector requires Ververica Runtime (VVR) 11.6.0 or later.
-
No offset-based consumption: Unlike Apache Kafka, the SMQ connector does not support consuming from a specific offset or seeking.
-
Fixed parallelism of 1: The SMQ connector achieves exactly-once semantics through connector-side deduplication and supports a parallelism of 1 only.
-
Checkpoints required: The connector relies on checkpoints to acknowledge and delete messages. Without checkpoints, messages are never deleted, which causes infinite repeated consumption.
-
Message body size limit: A single MNS message body cannot exceed 64 KB. For larger messages, see Transmit oversized messages.
-
Visibility timeout configuration: Set the visibility timeout to a value greater than the checkpoint interval to avoid message redelivery.
-
No strict ordering: MNS standard queues are not strictly First-In, First-Out (FIFO). Message redelivery after a timeout can alter the consumption order.
Syntax
CREATE TABLE mns_source (
data STRING
) WITH (
'connector' = 'mns',
'endpoint' = '${endpoint}',
'region' = '${region}',
'queueName' = '${queueName}',
'accessKeyId' = '${accessKeyId}',
'accessKeySecret' = '${accessKeySecret}',
'format' = 'json',
'batchSize' = '8',
'pollingWaitTime' = '10s',
'messageType' = 'RAW'
);
Connector options
General
|
Option |
Type |
Required |
Default |
Description |
|
connector |
String |
Yes |
- |
The connector type. Fixed to |
|
endpoint |
String |
Yes |
- |
The MNS service endpoint. Format: |
|
region |
String |
Yes |
- |
The region where the SMQ queue resides. Example: |
|
queueName |
String |
Yes |
- |
The name of the MNS queue to consume from. |
|
accessKeyId |
String |
Yes |
- |
The AccessKey ID for SMQ authentication. Use an existing AccessKey or create an AccessKey pair. |
|
accessKeySecret |
String |
Yes |
- |
The AccessKey secret for MNS authentication. Use an existing AccessKey or create an AccessKey pair. |
|
format |
String |
Yes |
- |
The message deserialization format. Valid values: |
|
batchSize |
Integer |
No |
1 |
The maximum number of messages per batch poll request. Valid range: 1–16. Larger values increase throughput. The SMQ API allows up to 16 messages per call. |
|
pollingWaitTime |
Duration |
No |
10s |
The long-polling wait time when no messages are available in the queue. Valid range: 0s–30s. |
|
messageType |
String |
No |
|
The message payload type. Valid values:
|
|
deleteMaxRetries |
Integer |
No |
|
The maximum number of retries if message deletion fails. |
|
startTimeMs |
Long |
No |
|
The consumption start time as a Unix timestamp in milliseconds. |
messageType values:
-
RAW: Treats the message body as standard data and deserializes it using the configuredformat. -
OSS: Parses JSON from OSS event notifications. Requires an MNS subscription to OSS events. The connector extracts the first element from theeventsarray, then flattens and maps nested fields to the table schema.
When messageType is set to OSS, format must be json.
Read OSS event notifications
When 'messageType' = 'OSS', the SMQ connector automatically parses OSS event notification JSON and maps the fields to a flat table structure. The following field name mappings are supported (case-insensitive):
|
Field name |
JSON path |
Description |
|
eventName |
eventName |
The event type. |
|
eventSource |
eventSource |
The event source. |
|
eventTime |
eventTime |
The event time. |
|
eventVersion |
eventVersion |
The event protocol version. |
|
region |
region |
The bucket region. |
|
ossBucketArn |
oss.bucket.arn |
The bucket's unique identifier. |
|
ossBucketName |
oss.bucket.name |
The bucket name. |
|
ossBucketOwnerIdentity |
oss.bucket.ownerIdentity |
The user ID of the bucket creator. |
|
ossObjectKey |
oss.object.key |
The object name. |
|
ossObjectSize |
oss.object.size |
The object size. |
|
ossObjectETag |
oss.object.eTag |
The object's ETag, used to detect content changes. |
|
ossObjectDeltaSize |
oss.object.deltaSize |
The change in object size. |
|
ossObjectReadFrom |
oss.object.readFrom |
The file read start position. |
|
ossObjectReadTo |
oss.object.readTo |
The file read end position. |
|
ossOssSchemaVersion |
oss.ossSchemaVersion |
The OSS schema version number. |
|
ossRuleId |
oss.ruleId |
The ID of the matched rule. |
|
requestParametersSourceIPAddress |
requestParameters.sourceIPAddress |
The source IP address of the request. |
|
responseElementsRequestId |
responseElements.requestId |
The request ID. |
|
userIdentityPrincipalId |
userIdentity.principalId |
The user ID (UID) of the requester. |
The following JSON format parameters control parsing behavior:
-
json.fail-on-missing-field: Whether to fail if a field is missing. Default:false. -
json.ignore-parse-errors: Whether to ignore parse errors. Default:false. -
json.timestamp-format.standard: The timestamp format standard. Default: ISO-8601. -
json.timestamp-format.pattern: A custom pattern for the timestamp format.
For detailed field descriptions, see OSS event notifications.
Examples
Example 1: Consume standard JSON messages
-- Create an MNS source table.
-- Field names must match the keys in the JSON message body.
CREATE TEMPORARY TABLE mns_source (
`userId` BIGINT,
`action` STRING,
`timestamp` TIMESTAMP(3),
`payload` STRING
) WITH (
'connector' = 'mns',
'endpoint' = 'http://your-account-id.mns.cn-hangzhou.aliyuncs.com',
'region' = 'cn-hangzhou',
'queueName' = 'my-events-queue',
'accessKeyId' = 'your-ak',
'accessKeySecret' = 'your-sk',
'format' = 'json'
);
-- Create a print sink to test output.
CREATE TEMPORARY TABLE print_sink (
`userId` BIGINT,
`action` STRING,
`timestamp` TIMESTAMP(3),
`payload` STRING
) WITH (
'connector' = 'print'
);
-- Consume messages and write to the sink.
INSERT INTO print_sink
SELECT userId, action, timestamp, payload
FROM mns_source;
Example 2: Consume OSS event notification messages
-- Create an MNS source table to automatically parse OSS event notification JSON.
-- For field name mappings, see the "Read OSS event notifications" section.
CREATE TEMPORARY TABLE oss_event_source (
`eventName` STRING,
`eventSource` STRING,
`eventTime` TIMESTAMP(3),
`region` STRING,
`ossBucketName` STRING,
`ossObjectKey` STRING,
`ossObjectSize` BIGINT,
`responseElementsRequestId` STRING,
`userIdentityPrincipalId` STRING
) WITH (
'connector' = 'mns',
'endpoint' = 'http://123456789.mns.cn-hangzhou.aliyuncs.com',
'region' = 'cn-hangzhou',
'queueName' = 'oss-events-queue',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'format' = 'json',
'messageType' = 'OSS'
);
-- Create a print sink to test output.
CREATE TEMPORARY TABLE print_sink (
`eventName` STRING,
`ossBucketName` STRING,
`ossObjectKey` STRING,
`ossObjectSize` BIGINT,
`eventTime` TIMESTAMP(3)
) WITH (
'connector' = 'print'
);
-- Filter for ObjectCreated events and output results.
INSERT INTO print_sink
SELECT eventName, ossBucketName, ossObjectKey, ossObjectSize, eventTime
FROM oss_event_source
WHERE eventName LIKE 'ObjectCreated:%';