ApsaraMQ for RocketMQ

更新时间:
复制 MD 格式

This topic introduces the ApsaraMQ for RocketMQ connector.

Important

ApsaraMQ for RocketMQ 4.x Standard Edition instances share an elastic upper limit of 5,000 API calls per second. Exceeding this limit when connecting an instance to Realtime Compute for Apache Flink triggers throttling, which can destabilize your Flink jobs. Therefore, if you use or plan to use a Standard Edition RocketMQ instance to integrate with Flink, carefully evaluate the potential impact. If possible, consider an alternative messaging middleware, such as Kafka, Simple Log Service (SLS), or DataHub. If you must use an ApsaraMQ for RocketMQ 4.x Standard Edition instance for high message volumes, submit a ticket to request a higher throttling quota.

Background information

ApsaraMQ for RocketMQ is a distributed messaging middleware developed by Alibaba Cloud based on Apache RocketMQ. It provides low latency, high concurrency, high availability, and high reliability. ApsaraMQ for RocketMQ offers asynchronous decoupling and peak shaving for distributed application systems. It also provides features for Internet applications, such as massive message accumulation, high throughput, and reliable retries.

The following table describes the ApsaraMQ for RocketMQ connector.

Item

Description

Supported type

source table and sink table

Running mode

streaming mode only

Data format

CSV and binary formats

Connector-specific metrics

Metrics

  • Source table

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

  • Sink table

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Note

See Metrics for more details.

API type

DataStream API (for RocketMQ 4.x only) and SQL API

Support for data updates or deletions in a sink table

Only data insertion into a sink table is supported. Updates and deletions are not supported.

Features

ApsaraMQ for RocketMQ source and sink tables support the following metadata fields.

  • Fields for a source table

    Field

    Type

    Description

    topic

    VARCHAR METADATA VIRTUAL

    The message topic.

    queue-id

    INT METADATA VIRTUAL

    The queue ID.

    queue-offset

    BIGINT METADATA VIRTUAL

    The consumption offset.

    msg-id

    VARCHAR METADATA VIRTUAL

    The message ID.

    store-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    The message storage timestamp.

    born-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    The message generation timestamp.

    keys

    VARCHAR METADATA VIRTUAL

    The message keys.

    tags

    VARCHAR METADATA VIRTUAL

    The message tags.

  • Fields for a sink table

    Field

    Type

    Description

    keys

    VARCHAR METADATA

    The message keys.

    tags

    VARCHAR METADATA

    The message tags.

Prerequisites

You have created a Message Queue for Apache RocketMQ resource. For instructions, see Create resources.

Limitations

  • ApsaraMQ for RocketMQ 5.x requires Flink real-time computing engine VVR 8.0.3 or later.

  • The ApsaraMQ for RocketMQ connector uses pull consumers, distributing the workload across all subtasks.

Syntax

CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq5',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'consumerGroup' = '<yourConsumerGroup>'
);

WITH parameters

General

Parameter

Description

Type

Required

Default

Remarks

connector

The connector type.

String

Yes

None

  • For RocketMQ 4.x, the value is mq.

  • For RocketMQ 5.x, the value is mq5.

endPoint

The service endpoint.

String

Yes

None

ApsaraMQ for RocketMQ provides two types of endpoints:

  • Endpoint for the MQ service on the internal network (classic network or VPC): On the details page of the target instance in the MQ console, select Endpoints > TCP Protocol Client Endpoints > Internal Network Access to obtain the corresponding endpoint.

  • Public MQ service endpoint: On the details page of the target instance in the MQ console, select Endpoint > TCP Protocol > Client Endpoint > Public Access to obtain the corresponding endpoint.

Important

We recommend that you use a VPC endpoint. Public network connections may be unstable due to dynamic changes in Alibaba Cloud network security policies.

  • Internal networks do not support cross-region access. For example, if your Realtime Compute for Apache Flink service is in the China (Hangzhou) region and your Alibaba Cloud Message Queue for Apache RocketMQ instance is in the China (Shanghai) region, the connection fails.

  • To connect over the public network, you must enable public access for the instance. For more information, see Network connections.

topic

The topic name.

String

Yes

None

None

accessId

  • For RocketMQ 4.x: The AccessKey ID of your Alibaba Cloud account.

  • For RocketMQ 5.x:

    The username of the RocketMQ instance.

String

  • For RocketMQ 4.x: Yes

  • For RocketMQ 5.x: No

None

Important

To avoid exposing your AccessKey pair, we recommend using a project variable to specify the AccessKey ID and AccessKey Secret.

  • RocketMQ 5.x:

    • You use a public endpoint.

    • You use a VPC endpoint and authentication-free access over the internal network is disabled.

    • This setting is not required if you use a VPC endpoint and authentication-free internal network access is enabled.

accessKey

  • For RocketMQ 4.x: The AccessKey Secret of your Alibaba Cloud account.

  • For RocketMQ 5.x: The password of the instance.

String

  • For RocketMQ 4.x: Yes

  • For RocketMQ 5.x: No

None

tag

The message tag to subscribe to or write.

String

No

None

  • When RocketMQ is used as a source, you can read messages with a single tag.

  • When RocketMQ is used as a sink, you can specify multiple tags, separated by commas (,).

Note

When used as a sink, this parameter is supported only for RocketMQ 4.x. For RocketMQ 5.x, specify the message tag in the sink's metadata fields.

encoding

The encoding format.

String

No

UTF-8

None

instanceID

The ID of the Alibaba Cloud Message Queue for Apache RocketMQ instance.

String

No

None

  • If the instance does not have a dedicated namespace, do not configure the instanceID parameter.

  • If the instance has a dedicated namespace, the instanceID parameter is required.

Note

This parameter is supported only for RocketMQ 4.x.

Source-specific

Parameter

Description

Type

Required

Default

Remarks

consumerGroup

The name of the consumer group.

String

Yes

None

None

pullIntervalMs

The polling interval in milliseconds for the source when no data is available.

Int

Yes

None

Unit: milliseconds.

A throttling mechanism is not available. You cannot set the rate for reading data from RocketMQ.

Note

This parameter is supported only for RocketMQ 4.x.

timeZone

The time zone.

String

No

None

Example: Asia/Shanghai.

startTimeMs

The start time for data consumption.

Long

No

None

A timestamp in milliseconds.

startMessageOffset

The message offset from which to start consuming.

Int

No

None

If this parameter is specified, data loading starts from the offset specified by startMessageOffset, which takes precedence.

lineDelimiter

The line delimiter used to parse records.

String

No

\n

None

fieldDelimiter

The field delimiter.

String

No

\u0001

The delimiter varies based on the terminal mode:

  • In read-only mode (default), the delimiter is \u0001. The delimiter is not visible in this mode.

  • In edit mode, the delimiter is ^A.

lengthCheck

The policy for checking the number of fields in each record.

String

No

NONE

Valid values:

  • NONE: The default value.

    • If a record has more fields than the schema defines, extra fields on the right are truncated.

    • If a record has fewer fields than the schema defines, the record is skipped.

  • SKIP: Skips any record where the field count does not match the schema.

  • EXCEPTION: Throws an exception if the field count does not match the schema.

  • PAD: Pads fields from left to right.

    • If a record has more fields than the schema defines, extra fields on the right are truncated.

    • If a record has fewer fields than the schema defines, missing fields on the right are padded with null values.

columnErrorDebug

Specifies whether to enable the debug mode for column parsing errors.

Boolean

No

false

If set to true, detailed logs for parsing exceptions are printed.

pullBatchSize

The maximum number of messages to pull in a single batch.

Int

No

64

Supported in VVR 8.0.7 and later versions.

Sink-specific

Parameter

Description

Type

Required

Default

Remarks

producerGroup

The name of the producer group.

String

Yes

None

None

retryTimes

The number of times to retry a failed write operation.

Int

No

10

None

sleepTimeMs

The interval between retries, in milliseconds.

Long

No

5000

None

partitionField

The name of the field to use as the partition key.

String

No

None

This parameter is required if the mode parameter is set to partition.

Note

Supported in VVR 8.0.5 and later versions.

deliveryTimestampMode

The delivery mode for delayed messages. This parameter works with the deliveryTimestampValue parameter to determine when a delayed message is delivered.

String

No

None

Valid values:

  • fixed: fixed timestamp mode.

  • relative: relative delay mode.

  • field: field-based mode.

Note

Supported in VVR 11.1 and later versions.

deliveryTimestampType

The time reference type for delayed messages.

String

No

processing_time

Valid values:

  • event_time: event time.

  • processing_time: processing time.

Note

Supported in VVR 11.1 and later versions.

deliveryTimestampValue

The delivery time of a delayed message.

Long

No

None

The meaning of this parameter depends on the value of deliveryTimestampMode:

  • deliveryTimestampMode=fixed: The message is delayed until the specified timestamp in milliseconds. If the current time is later than the timestamp, the message is delivered immediately.

  • deliveryTimestampMode=relative: The delay duration, in milliseconds, relative to the time reference specified by deliveryTimestampType.

  • deliveryTimestampMode=field: This parameter is ignored. The delivery time is determined by the value of the field specified by deliveryTimestampField.

Note

Supported in VVR 11.1 and later versions.

deliveryTimestampField

Specifies the field used as the delivery time for delayed messages. The data type must be BIGINT.

String

No

None

Takes effect when deliveryTimestampMode is field.

Note

Supported in VVR 11.1 and later versions.

Type mapping

Flink type

RocketMQ type

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

Examples

Source table examples

  • CSV format

    Suppose a message contains the following data records in CSV format.

    1,name,male 
    2,name,female
    Note

    A Message Queue for Apache RocketMQ message can contain zero or more data records, separated by \n.

    Use the following DDL in your Flink job to declare a Message Queue for Apache RocketMQ source table.

    • RocketMQ 5.x

    CREATE TABLE mq_source(
      id varchar,
      name varchar,
      gender varchar,
      topic varchar metadata virtual
    ) WITH (
      'connector' = 'mq5',
      'topic' = 'mq-test',
      'endpoint' = '<yourEndpoint>',
      'consumerGroup' = 'mq-group',
      'fieldDelimiter' = ','
    );
    • RocketMQ 4.x

    CREATE TABLE mq_source(
      id varchar,
      name varchar,
      gender varchar,
      topic varchar metadata virtual
    ) WITH (
      'connector' = 'mq',
      'topic' = 'mq-test',
      'endpoint' = '<yourEndpoint>',
      'pullIntervalMs' = '1000',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'consumerGroup' = 'mq-group',
      'fieldDelimiter' = ','
    );
  • Binary format

    • RocketMQ 5.x

      CREATE TEMPORARY TABLE source_table (
        mess varbinary
      ) WITH (
        'connector' = 'mq5',
        'endpoint' = '<yourEndpoint>',
        'topic' = 'mq-test',
        'consumerGroup' = 'mq-group'
      );
      
      CREATE TEMPORARY TABLE out_table (
        commodity varchar
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO out_table
      select 
        cast(mess as varchar)
      FROM source_table;
    • RocketMQ 4.x

      CREATE TEMPORARY TABLE source_table (
        mess varbinary
      ) WITH (
        'connector' = 'mq',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '500',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'topic' = 'mq-test',
        'consumerGroup' = 'mq-group'
      );
      
      CREATE TEMPORARY TABLE out_table (
        commodity varchar
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO out_table
      select 
        cast(mess as varchar)
      FROM source_table;

Sink table examples

  • Create a sink table

    • RocketMQ 5.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR
      ) WITH (
        'connector'='mq5',
        'endpoint'='<yourEndpoint>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
    • RocketMQ 4.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='${secret_values.ak_id}',
        'accessKey'='${secret_values.ak_secret}',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
      Note

      For RocketMQ messages in binary format, the DDL must define a single field with the VARBINARY data type.

  • Create a sink table that maps the keys and tags metadata fields to the message's key and tag

    • RocketMQ 5.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR,
        keys VARCHAR METADATA,
        tags VARCHAR METADATA
      ) WITH (
        'connector'='mq5',
        'endpoint'='<yourEndpoint>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
    • RocketMQ 4.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR,
        keys VARCHAR METADATA,
        tags VARCHAR METADATA
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='${secret_values.ak_id}',
        'accessKey'='${secret_values.ak_secret}',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );

DataStream API

Important

To read and write data using the DataStream API, use the corresponding DataStream connector to connect to Realtime Compute for Apache Flink. For more information about configuring a DataStream connector, see Integrate DataStream connectors.

VVR provides MetaQSource to read from RocketMQ and MetaQOutputFormat, an implementation of OutputFormat, to write to RocketMQ. The following examples show how to read from and write to RocketMQ:

RocketMQ 5.x

Note

In ApsaraMQ for RocketMQ 5.x, the access key pair represents the username and password for the instance. You do not need to configure this pair if you access the instance over an internal network and ACL authentication is disabled.

import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.mq5.shaded.org.apache.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.mq5.sink.RocketMQOutputFormat;
import com.alibaba.ververica.connectors.mq5.source.RocketMQSource;
import com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
 * A demo that shows how to consume, convert, and then produce messages to ApsaraMQ for RocketMQ.
 */
public class RocketMQ5DataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Set up the streaming execution environment
        Configuration conf = new Configuration();

        // The following two configurations are for local debugging only. Delete them before you package the job and upload it to Realtime Compute for Apache Flink.
        conf.setString("pipeline.classpaths", "file://" + "The absolute path of the uber JAR");
        conf.setString(
                "classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        final DataStreamSource<String> ds =
                env.fromSource(
                        RocketMQSource.<String>builder()
                                .setEndpoint(ENDPOINT)
                                .setAccessId(ACCESS_ID)
                                .setAccessKey(ACCESS_KEY)
                                .setTopic(SOURCE_TOPIC)
                                .setConsumerGroup(CONSUMER_GROUP)
                                .setDeserializationSchema(new MyDeserializer())
                                .setStartOffset(1)
                                .build(),
                        WatermarkStrategy.noWatermarks(),
                        "source");

        ds.map(new ToMessage())
                .addSink(
                        new OutputFormatSinkFunction<>(
                                new RocketMQOutputFormat.Builder()
                                        .setEndpoint(ENDPOINT)
                                        .setAccessId(ACCESS_ID)
                                        .setAccessKey(ACCESS_KEY)
                                        .setTopicName(SINK_TOPIC)
                                        .setProducerGroup(PRODUCER_GROUP)
                                        .build()));

        env.execute();
    }

    private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
        @Override
        public void deserialize(List<MessageExt> record, Collector<String> out) {
            for (MessageExt messageExt : record) {
                out.collect(new String(messageExt.getBody()));
            }
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }

    private static class ToMessage implements MapFunction<String, List<MessageExt>> {

        public ToMessage() {
        }

        @Override
        public List<MessageExt> map(String s) {
            final MessageExt message = new MessageExt();
            message.setBody(s.getBytes());
            message.setWaitStoreMsgOK(true);
            return Collections.singletonList(message);
        }
    }
}

RocketMQ 4.x

import com.alibaba.ververica.connector.mq.shaded.com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.metaq.sink.MetaQOutputFormat;
import com.alibaba.ververica.connectors.metaq.source.MetaQSource;
import com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static com.alibaba.ververica.connector.mq.shaded.com.taobao.metaq.client.ExternConst.*;
/**
 * A demo that shows how to consume, convert, and then produce messages to ApsaraMQ for RocketMQ.
 */
public class RocketMQDataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String INSTANCE_ID = "<instanceID>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Set up the streaming execution environment
        Configuration conf = new Configuration();

        // The following two configurations are for local debugging only. Delete them before you package the job and upload it to Realtime Compute for Apache Flink.
        conf.setString("pipeline.classpaths", "file://" + "The absolute path of the uber JAR");
        conf.setString("classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        // Create and add the ApsaraMQ for RocketMQ source.
        env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
                // Convert the message body to uppercase.
                .map(RocketMQDataStreamDemo2::convertMessages)
                // Create and add the ApsaraMQ for RocketMQ sink.
                .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
                .name(RocketMQDataStreamDemo2.class.getSimpleName());
        // Compile and submit the job.
        env.execute("RocketMQ connector end-to-end DataStream demo");
    }

    private static MetaQSource<MessageExt> createRocketMQSource() {
        Properties mqProperties = createMQProperties();

        return new MetaQSource<>(SOURCE_TOPIC,
                CONSUMER_GROUP,
                null, // always null
                null, // tag of the messages to consume
                Long.MAX_VALUE, // stop timestamp in milliseconds
                -1, // start timestamp in milliseconds. Set to -1 to disable starting from an offset.
                0, // start offset
                300_000, // partition discovery interval
                mqProperties,
                Boundedness.CONTINUOUS_UNBOUNDED,
                new MyDeserializationSchema());
    }

    private static MetaQOutputFormat createRocketMQOutputFormat() {
        return new MetaQOutputFormat.Builder()
                .setTopicName(SINK_TOPIC)
                .setProducerGroup(PRODUCER_GROUP)
                .setMqProperties(createMQProperties())
                .build();
    }

    private static Properties createMQProperties() {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, ENDPOINT);
        properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
        properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
        return properties;
    }

    private static List<MessageExt> convertMessages(MessageExt messages) {
        return Collections.singletonList(messages);
    }

    public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
        @Override
        public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
            for (MessageExt messageExt : list) {
                collector.collect(messageExt);
            }
        }

        @Override
        public TypeInformation<MessageExt> getProducedType() {
            return TypeInformation.of(MessageExt.class);
        }
    }
}
    }
}

XML

<!--ApsaraMQ for RocketMQ 5.x-->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq5</artifactId>
    <version>${vvr-version}</version>
    <scope>provided</scope>
</dependency>

<!--ApsaraMQ for RocketMQ 4.x-->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq</artifactId>
    <version>${vvr-version}</version>
</dependency>
Note

For more information about configuring the endpoint for ApsaraMQ for RocketMQ, see Announcement on the settings of TCP internal endpoints.

FAQ

How does RocketMQ detect partition count changes during topic scaling?