文档

JMS P2P(主账号与RAM账号场景)

更新时间:
一键部署

本文说明JMS客户端如何在主账号与RAM账号场景接入云消息队列 RabbitMQ 版并实现P2P消息收发。

前提条件

背景信息

  • JMS P2P

    JMS P2P(点对点)消息收一条消息被一个指定的订阅者消费。某个发布者向某个Queue(队列)发送消息后,某个指定的订阅者从该Queue接收消息。JMS P2P model发布者和订阅者之间不存在时间依赖性。发布者向Queue发送消息时不需要订阅者同时处于活跃状态,订阅者从Queue接收消息时同样不需发布者处于活跃状态。

  • 开源RabbitMQ客户端接入云上服务时,需要先通过 AccessKey IDAccessKey Secret生成用户名和密码,将用户名和密码设置到开源客户端SDK的userNamepassWord参数中。云消息队列 RabbitMQ 版会通过用户名和密码进行权限认证。

    重要

    您的客户端在调用SDK收发消息时,请尽可能使用长期存活的Connection,以免每次收发消息时都需要创建新的Connection,消耗大量的网络资源和服务端资源,甚至引起服务端SYN Flood防护。更多信息,请参见Connection

收发消息流程

消息收发流程

获取接入点

您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的复制图标,复制该接入点。

    类型

    说明

    示例值

    公网接入点

    公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。

    XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com

    VPC接入点

    VPC环境可读写。按量付费实例和预付费实例默认都支持。

    XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

安装JMS依赖库

在pom.xml中添加以下依赖。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp.jms</groupId>
    <artifactId>mq-amqp-jms-client</artifactId>
    <version>1.11.2-1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.4</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
</dependency>

生成用户名密码

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 在左侧导航栏,单击静态用户名密码

  4. 静态用户名密码页面,单击创建用户名密码

  5. 创建用户名密码面板,输入AccessKey IDAccessKey Secret,然后单击确定

    说明

    AccessKey IDAccessKey Secret需要在阿里云RAM控制台获取,具体获取方式,请参见创建AccessKey

    静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。用户名密码

  6. 在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。

配置Object类型消息

创建Object类型消息类Person.java

import java.io.Serializable;

public class Person implements Serializable{
    
    private static final long serialVersionUID = -5809782578272943****;
    String name = null;
    int age = 0;
    String address =null;

    public Person(String name, int age, String address){
        this.name = name;
        this.age = age;
        this.address = address;
    }

    public String getName(){
        return name;
    }

    public int getAge(){
        return age;
    }

    public String getAddress(){
        return address;
    }
}

发送消息

创建并编译运行QueueSend.java

重要

编译运行QueueSend.java发送消息之前,您需要根据代码提示信息适配配置参数列表中所列举的参数。

配置参数列表

参数

示例值

描述

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

云消息队列 RabbitMQ 版实例接入点。您可以在云消息队列 RabbitMQ 版控制台实例详情页面获取。

Port

5672

默认端口。非加密端口为5672,加密端口为5671。

userName

MjoxODgwNzcwODY5MD****

云消息队列 RabbitMQ 版控制台将阿里云账号或RAM账号的AccessKey ID、AccessKey Secret和云消息队列 RabbitMQ 版实例ID通过Base64编码后生成的静态用户名。您可以在云消息队列 RabbitMQ 版控制台静态用户名密码页面获取。

passWord

NDAxREVDQzI2MjA0OT****

云消息队列 RabbitMQ 版控制台将阿里云账号或RAM账号的AccessKey Secret和timestamp参数(系统当前时间)通过HMAC-SHA1生成一个签名后,再将这个签名和timestamp参数(系统当前时间)通过Base64编码后生成的静态密码。您可以在云消息队列 RabbitMQ 版控制台静态用户名密码页面,根据实例ID搜索已创建的用户名以及对应密码。

virtualHost

Test

云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台Vhost 列表页面查看。如何查看Vhost,请参见查看Vhost连接详情

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
import java.io.Serializable;

public class QueueSend {

    public void sendTextMsg(Session session, String MsgContent, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        Message msg = session.createTextMessage(MsgContent);
        msgProducer.send(msg);
        System.out.println("文本类型消息已发送");
    }

    public void sendMap(Session session, MapMessage map, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        msgProducer.send(map);
        System.out.println("Map类型消息已发送");
    }

    public void sendObj(Session session, Person obj, String name) throws JMSException {
        Destination queue = new RMQDestination(name, true, false);
        MessageProducer msgProducer = session.createProducer(queue);
        ObjectMessage objMsg = session.createObjectMessage((Serializable) obj);//发送对象时必须让该对象实现serializable接口
        MessageProducer msgPorducer = session.createProducer(queue);
        msgPorducer.send(objMsg);
        System.out.println("Object类型的消息已发送");
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        QueueSend qs = new QueueSend();
        qs.sendTextMsg(session, "发送JMS文本类型消息", "queue.msgText");
        MapMessage mapMsg = session.createMapMessage();
        mapMsg.setString("name", "李某");
        mapMsg.setBoolean("IsHero", false);
        mapMsg.setInt("age", 23);
        qs.sendMap(session, mapMsg, "queue.msgMap");
        Person person = new Person("李某", 23, "北京.大兴");//发送Object类型消息
        qs.sendObj(session, person, "queue.msgObj");
        session.close();
        connection.close();
    }
}

订阅消息

创建并编译运行QueueAccept.java

重要

编译运行QueueAccept.java订阅消息之前,您需要根据代码提示信息适配配置参数列表中所列举的参数。

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class QueueAccept implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage text = (TextMessage) message;
            try {
                System.out.println("发送的文本消息内容为:" + text.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        if (message instanceof MapMessage) {
            MapMessage map = (MapMessage) message;
            try {
                System.out.println("姓名:" + map.getString("name"));
                System.out.println("是否是英雄:" + map.getBoolean("IsHero"));
                System.out.println("年龄:" + map.getInt("age"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        if (message instanceof ObjectMessage) {
            ObjectMessage objMsg = (ObjectMessage) message;
            try {
                Person person = (Person) objMsg.getObject();
                System.out.println("用户名:" + person.getName() + ",年龄:" + person.getAge() + ",地址:" + person.getAddress());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        try {
            message.acknowledge();
            System.out.println("消息手动确认");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        if (connectionFactory == null) {
            connectionFactory = new RMQConnectionFactory();
            connectionFactory.setHost("xxx.xxx.aliyuncs.com");
            connectionFactory.setUsername("${Username}");
            connectionFactory.setPassword("${Password}");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("${VhostName}");
            connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
                @Override
                public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                    cf.setAutomaticRecoveryEnabled(true);
                    cf.setNetworkRecoveryInterval(5000);
                    System.out.println(cf.getPassword());
                }
            });
        }
        if (connection == null) {
            connection = connectionFactory.createConnection();
            connection.start();
        }
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Destination destination = new RMQDestination("queue.msgText", true, false);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new QueueAccept());
        Destination queue1 = new RMQDestination("queue.msgMap", true, false);
        MessageConsumer consumer1 = session.createConsumer(queue1);
        consumer1.setMessageListener(new QueueAccept());
        Destination queue2 = new RMQDestination("queue.msgObj", true, false);
        MessageConsumer consumer2 = session.createConsumer(queue2);
        consumer2.setMessageListener(new QueueAccept());
    }
}
  • 本页导读 (1)
文档反馈