This topic introduces the ApsaraMQ for RocketMQ connector.
ApsaraMQ for RocketMQ 4.x Standard Edition instances share an elastic upper limit of 5,000 API calls per second. Exceeding this limit when connecting an instance to Realtime Compute for Apache Flink triggers throttling, which can destabilize your Flink jobs. Therefore, if you use or plan to use a Standard Edition RocketMQ instance to integrate with Flink, carefully evaluate the potential impact. If possible, consider an alternative messaging middleware, such as Kafka, Simple Log Service (SLS), or DataHub. If you must use an ApsaraMQ for RocketMQ 4.x Standard Edition instance for high message volumes, submit a ticket to request a higher throttling quota.
Background information
ApsaraMQ for RocketMQ is a distributed messaging middleware developed by Alibaba Cloud based on Apache RocketMQ. It provides low latency, high concurrency, high availability, and high reliability. ApsaraMQ for RocketMQ offers asynchronous decoupling and peak shaving for distributed application systems. It also provides features for Internet applications, such as massive message accumulation, high throughput, and reliable retries.
The following table describes the ApsaraMQ for RocketMQ connector.
|
Item |
Description |
|
Supported type |
source table and sink table |
|
Running mode |
streaming mode only |
|
Data format |
CSV and binary formats |
|
Connector-specific metrics |
|
|
API type |
DataStream API (for RocketMQ 4.x only) and SQL API |
|
Support for data updates or deletions in a sink table |
Only data insertion into a sink table is supported. Updates and deletions are not supported. |
Features
ApsaraMQ for RocketMQ source and sink tables support the following metadata fields.
-
Fields for a source table
Field
Type
Description
topic
VARCHAR METADATA VIRTUAL
The message topic.
queue-id
INT METADATA VIRTUAL
The queue ID.
queue-offset
BIGINT METADATA VIRTUAL
The consumption offset.
msg-id
VARCHAR METADATA VIRTUAL
The message ID.
store-timestamp
TIMESTAMP(3) METADATA VIRTUAL
The message storage timestamp.
born-timestamp
TIMESTAMP(3) METADATA VIRTUAL
The message generation timestamp.
keys
VARCHAR METADATA VIRTUAL
The message keys.
tags
VARCHAR METADATA VIRTUAL
The message tags.
-
Fields for a sink table
Field
Type
Description
keys
VARCHAR METADATA
The message keys.
tags
VARCHAR METADATA
The message tags.
Prerequisites
You have created a Message Queue for Apache RocketMQ resource. For instructions, see Create resources.
Limitations
-
ApsaraMQ for RocketMQ 5.x requires Flink real-time computing engine VVR 8.0.3 or later.
-
The ApsaraMQ for RocketMQ connector uses pull consumers, distributing the workload across all subtasks.
Syntax
CREATE TABLE mq_source(
x varchar,
y varchar,
z varchar
) WITH (
'connector' = 'mq5',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'consumerGroup' = '<yourConsumerGroup>'
);
WITH parameters
General
|
Parameter |
Description |
Type |
Required |
Default |
Remarks |
|
connector |
The connector type. |
String |
Yes |
None |
|
|
endPoint |
The service endpoint. |
String |
Yes |
None |
ApsaraMQ for RocketMQ provides two types of endpoints:
Important
We recommend that you use a VPC endpoint. Public network connections may be unstable due to dynamic changes in Alibaba Cloud network security policies.
|
|
topic |
The topic name. |
String |
Yes |
None |
None |
|
accessId |
|
String |
|
None |
Important
To avoid exposing your AccessKey pair, we recommend using a project variable to specify the AccessKey ID and AccessKey Secret.
|
|
accessKey |
|
String |
|
None |
|
|
tag |
The message tag to subscribe to or write. |
String |
No |
None |
Note
When used as a sink, this parameter is supported only for RocketMQ 4.x. For RocketMQ 5.x, specify the message tag in the sink's metadata fields. |
|
encoding |
The encoding format. |
String |
No |
UTF-8 |
None |
|
instanceID |
The ID of the Alibaba Cloud Message Queue for Apache RocketMQ instance. |
String |
No |
None |
Note
This parameter is supported only for RocketMQ 4.x. |
Source-specific
|
Parameter |
Description |
Type |
Required |
Default |
Remarks |
|
consumerGroup |
The name of the consumer group. |
String |
Yes |
None |
None |
|
pullIntervalMs |
The polling interval in milliseconds for the source when no data is available. |
Int |
Yes |
None |
Unit: milliseconds. A throttling mechanism is not available. You cannot set the rate for reading data from RocketMQ. Note
This parameter is supported only for RocketMQ 4.x. |
|
timeZone |
The time zone. |
String |
No |
None |
Example: Asia/Shanghai. |
|
startTimeMs |
The start time for data consumption. |
Long |
No |
None |
A timestamp in milliseconds. |
|
startMessageOffset |
The message offset from which to start consuming. |
Int |
No |
None |
If this parameter is specified, data loading starts from the offset specified by |
|
lineDelimiter |
The line delimiter used to parse records. |
String |
No |
\n |
None |
|
fieldDelimiter |
The field delimiter. |
String |
No |
\u0001 |
The delimiter varies based on the terminal mode:
|
|
lengthCheck |
The policy for checking the number of fields in each record. |
String |
No |
NONE |
Valid values:
|
|
columnErrorDebug |
Specifies whether to enable the debug mode for column parsing errors. |
Boolean |
No |
false |
If set to true, detailed logs for parsing exceptions are printed. |
|
pullBatchSize |
The maximum number of messages to pull in a single batch. |
Int |
No |
64 |
Supported in VVR 8.0.7 and later versions. |
Sink-specific
|
Parameter |
Description |
Type |
Required |
Default |
Remarks |
|
producerGroup |
The name of the producer group. |
String |
Yes |
None |
None |
|
retryTimes |
The number of times to retry a failed write operation. |
Int |
No |
10 |
None |
|
sleepTimeMs |
The interval between retries, in milliseconds. |
Long |
No |
5000 |
None |
|
partitionField |
The name of the field to use as the partition key. |
String |
No |
None |
This parameter is required if the Note
Supported in VVR 8.0.5 and later versions. |
|
deliveryTimestampMode |
The delivery mode for delayed messages. This parameter works with the |
String |
No |
None |
Valid values:
Note
Supported in VVR 11.1 and later versions. |
|
deliveryTimestampType |
The time reference type for delayed messages. |
String |
No |
processing_time |
Valid values:
Note
Supported in VVR 11.1 and later versions. |
|
deliveryTimestampValue |
The delivery time of a delayed message. |
Long |
No |
None |
The meaning of this parameter depends on the value of
Note
Supported in VVR 11.1 and later versions. |
|
deliveryTimestampField |
Specifies the field used as the delivery time for delayed messages. The data type must be |
String |
No |
None |
Takes effect when Note
Supported in VVR 11.1 and later versions. |
Type mapping
|
Flink type |
RocketMQ type |
|
BOOLEAN |
STRING |
|
VARBINARY |
|
|
VARCHAR |
|
|
TINYINT |
|
|
INTEGER |
|
|
BIGINT |
|
|
FLOAT |
|
|
DOUBLE |
|
|
DECIMAL |
Examples
Source table examples
-
CSV format
Suppose a message contains the following data records in CSV format.
1,name,male 2,name,femaleNoteA Message Queue for Apache RocketMQ message can contain zero or more data records, separated by
\n.Use the following DDL in your Flink job to declare a Message Queue for Apache RocketMQ source table.
-
RocketMQ 5.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq5', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );-
RocketMQ 4.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' ); -
-
Binary format
-
RocketMQ 5.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq5', 'endpoint' = '<yourEndpoint>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table; -
RocketMQ 4.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
-
Sink table examples
-
Create a sink table
-
RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' ); -
RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );NoteFor RocketMQ messages in binary format, the DDL must define a single field with the VARBINARY data type.
-
-
Create a sink table that maps the
keysandtagsmetadata fields to the message's key and tag-
RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' ); -
RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
-
DataStream API
To read and write data using the DataStream API, use the corresponding DataStream connector to connect to Realtime Compute for Apache Flink. For more information about configuring a DataStream connector, see Integrate DataStream connectors.
VVR provides MetaQSource to read from RocketMQ and MetaQOutputFormat, an implementation of OutputFormat, to write to RocketMQ. The following examples show how to read from and write to RocketMQ:
RocketMQ 5.x
In ApsaraMQ for RocketMQ 5.x, the access key pair represents the username and password for the instance. You do not need to configure this pair if you access the instance over an internal network and ACL authentication is disabled.
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.mq5.shaded.org.apache.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.mq5.sink.RocketMQOutputFormat;
import com.alibaba.ververica.connectors.mq5.source.RocketMQSource;
import com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
* A demo that shows how to consume, convert, and then produce messages to ApsaraMQ for RocketMQ.
*/
public class RocketMQ5DataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Set up the streaming execution environment
Configuration conf = new Configuration();
// The following two configurations are for local debugging only. Delete them before you package the job and upload it to Realtime Compute for Apache Flink.
conf.setString("pipeline.classpaths", "file://" + "The absolute path of the uber JAR");
conf.setString(
"classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
final DataStreamSource<String> ds =
env.fromSource(
RocketMQSource.<String>builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopic(SOURCE_TOPIC)
.setConsumerGroup(CONSUMER_GROUP)
.setDeserializationSchema(new MyDeserializer())
.setStartOffset(1)
.build(),
WatermarkStrategy.noWatermarks(),
"source");
ds.map(new ToMessage())
.addSink(
new OutputFormatSinkFunction<>(
new RocketMQOutputFormat.Builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.build()));
env.execute();
}
private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
@Override
public void deserialize(List<MessageExt> record, Collector<String> out) {
for (MessageExt messageExt : record) {
out.collect(new String(messageExt.getBody()));
}
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
}
private static class ToMessage implements MapFunction<String, List<MessageExt>> {
public ToMessage() {
}
@Override
public List<MessageExt> map(String s) {
final MessageExt message = new MessageExt();
message.setBody(s.getBytes());
message.setWaitStoreMsgOK(true);
return Collections.singletonList(message);
}
}
}
RocketMQ 4.x
import com.alibaba.ververica.connector.mq.shaded.com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.metaq.sink.MetaQOutputFormat;
import com.alibaba.ververica.connectors.metaq.source.MetaQSource;
import com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static com.alibaba.ververica.connector.mq.shaded.com.taobao.metaq.client.ExternConst.*;
/**
* A demo that shows how to consume, convert, and then produce messages to ApsaraMQ for RocketMQ.
*/
public class RocketMQDataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String INSTANCE_ID = "<instanceID>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Set up the streaming execution environment
Configuration conf = new Configuration();
// The following two configurations are for local debugging only. Delete them before you package the job and upload it to Realtime Compute for Apache Flink.
conf.setString("pipeline.classpaths", "file://" + "The absolute path of the uber JAR");
conf.setString("classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// Create and add the ApsaraMQ for RocketMQ source.
env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
// Convert the message body to uppercase.
.map(RocketMQDataStreamDemo2::convertMessages)
// Create and add the ApsaraMQ for RocketMQ sink.
.addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
.name(RocketMQDataStreamDemo2.class.getSimpleName());
// Compile and submit the job.
env.execute("RocketMQ connector end-to-end DataStream demo");
}
private static MetaQSource<MessageExt> createRocketMQSource() {
Properties mqProperties = createMQProperties();
return new MetaQSource<>(SOURCE_TOPIC,
CONSUMER_GROUP,
null, // always null
null, // tag of the messages to consume
Long.MAX_VALUE, // stop timestamp in milliseconds
-1, // start timestamp in milliseconds. Set to -1 to disable starting from an offset.
0, // start offset
300_000, // partition discovery interval
mqProperties,
Boundedness.CONTINUOUS_UNBOUNDED,
new MyDeserializationSchema());
}
private static MetaQOutputFormat createRocketMQOutputFormat() {
return new MetaQOutputFormat.Builder()
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.setMqProperties(createMQProperties())
.build();
}
private static Properties createMQProperties() {
Properties properties = new Properties();
properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
properties.put(NAMESRV_ADDR, ENDPOINT);
properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
return properties;
}
private static List<MessageExt> convertMessages(MessageExt messages) {
return Collections.singletonList(messages);
}
public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
@Override
public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
for (MessageExt messageExt : list) {
collector.collect(messageExt);
}
}
@Override
public TypeInformation<MessageExt> getProducedType() {
return TypeInformation.of(MessageExt.class);
}
}
}
}
}
XML
-
ApsaraMQ for RocketMQ 4.x: MQ DataStream connector.
-
ApsaraMQ for RocketMQ 5.x: MQ DataStream connector.
<!--ApsaraMQ for RocketMQ 5.x-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq5</artifactId>
<version>${vvr-version}</version>
<scope>provided</scope>
</dependency>
<!--ApsaraMQ for RocketMQ 4.x-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq</artifactId>
<version>${vvr-version}</version>
</dependency>
For more information about configuring the endpoint for ApsaraMQ for RocketMQ, see Announcement on the settings of TCP internal endpoints.
FAQ
How does RocketMQ detect partition count changes during topic scaling?