全部产品
云市场

使用开源 Java SDK 访问阿里云 MQ

更新时间:2019-09-13 22:00:00

如果您已使用开源 Java SDK 进行生产,只需参考方法,重新配置参数,即可实现无缝上云。

前提条件

  • 已在阿里云 MQ 控制台创建资源,包括 Topic、Group ID(GID)、接入点(Endpoint),以及 AccessKeyId 和 AccessKeySecret。
  • 已下载开源 RocketMQ 4.5.1 或以上版本,以支持连接阿里云 MQ。

添加 Maven 依赖

请在 pom.xml 文件中添加以下依赖:

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.5.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.rocketmq</groupId>
  8. <artifactId>rocketmq-acl</artifactId>
  9. <version>4.5.1</version>
  10. </dependency>

设置参数

下文中只涉及部分示例代码,完整的代码请参见 Example

注意事项

在设置代码时,需注意以下关键配置项:

  • AccessChannel

    阿里云 MQ 和开源 rocketmq 默认使用的鉴权通道不同:

    • 本地自建使用:AccessChannel:LOCAL
    • 上云连接阿里云 MQ 设置:AccessChannel:CLOUD
  • 接入点

    阿里云 MQ 使用接入点来做 Name Server 的负载均衡,并屏蔽了具体的 IP 地址。此外,针对实例化用户通过在接入点前加入实例 ID 用于区分,因此要接入阿里云直接使用接入点即可。

  • ACL

    阿里云提供一套完整的鉴权控制,开源版本的 SDK 提供了 ACL 功能,能够兼容阿里云的鉴权算法,这里只要配置正确的 AccessKeyId 和 AccessKeySecret 即可实现互联互通。

生产者示例代码

  1. //设置为云上创建的 GID, 以及替换为自己的 AccessKeyId 和 AccessKeySecret
  2. DefaultMQProducer producer = new DefaultMQProducer("GID_XXXX", new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)));
  3. //设置为自己的云上接入点
  4. producer.setNamesrvAddr("http://XXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
  5. // 云上消息轨迹需要设置为 CLOUD
  6. producer.setAccessChannel(AccessChannel.CLOUD);
  7. producer.start();
  8. // 设置为云上创建的 Topic 名字
  9. Message msg = new Message("TOPIC", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  10. SendResult sendResult = producer.send(msg);

消费者示例代码

  1. //设置为云上创建的 GID, 以及替换为自己的 AccessKeyId 和 AccessKeySecret
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GID_XXXX", new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)), new AllocateMessageQueueAveragely());
  3. //设置为云上接入点
  4. consumer.setNamesrvAddr("http://XXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
  5. // 云上消息轨迹需要设置为 CLOUD
  6. consumer.setAccessChannel(AccessChannel.CLOUD);
  7. // 设置为云上创建的 Topic
  8. consumer.subscribe("TOPIC", "*");
  9. consumer.registerMessageListener(new MessageListenerConcurrently() {
  10. @Override
  11. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  12. ConsumeConcurrentlyContext context) {
  13. System.out.printf("Receive New Messages: %s %n", msgs);
  14. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  15. }
  16. });
  17. consumer.start();

更多信息

如需查看完整的示例信息,请参见 Example