更新时间:2020-01-14 16:40
本文将引导您快速体验 SOFAStack 消息队列,从创建资源、配置接入点到使用 SDK 收发消息。具体操作步骤如下:
在使用 SOFAStack 消息队列时,请注意以下网络访问限制:
要使用消息队列,您需要确保 SOFAStack 控制台已创建至少一个工作空间。如 SOFAStack 未创建工作空间或您需要创建一个新的工作空间,可参见 管理工作空间 > 添加工作空间。创建好工作空间后,将为您自动创建一个消息队列实例。
Topic 是消息队列里对消息的一级归类。消息生产者将消息发送到一个 Topic,而消息消费者则通过订阅该 Topic 来获取和消费消息。
消息类型的更多信息,请参见 消息类型。
创建完 Topic 后,您需要为消息的消费者(或生产者)创建客户端 ID ,即 Group ID 作为标识。
Group ID 和 Topic 的关系是 N:N,即一个消费者可以订阅多个 Topic,同一个 Topic 也可以被多个消费者订阅;一个生产者可以向多个 Topic 发送消息,同一个 Topic 也可以接收来自多个生产者的消息。
说明:消费者必须有对应的 Group ID,生产者不做强制要求。
阿里云 AccessKey 用于收发消息时进行账户鉴权。
在调用 SDK 发送和订阅消息的时候,除了需要指定创建的 Topic 和 Group ID 以外,还需输入您在 RAM 控制台创建的身份验证信息,即 AccessKey。AccessKey 的信息包含 AccessKeyId 和 AcessKeySecret。
创建 AccessKey 的具体步骤,参见 创建 AccessKey。
在控制台创建好资源后,您需通过控制台获取工作空间的接入点。在收发消息时,您需要为生产端和消费端配置该接入点,以此接入某个具体工作空间或地域的服务。
您可以通过控制台发送测试消息或通过调用 TCP Java SDK 发送消息。
用于快速验证 Topic 资源的可用性,主要用作测试。
通过 Maven 方式引入依赖。Java SDK 的最新版本号,可参见 SDK 版本说明。
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofamq-client-all</artifactId>
<version>"XXX"</version>
//设置为 Java SDK 的最新版本号
</dependency>
<repositories>
<repository>
<id>antcloudrelease</id>
<name>Ant Cloud</name>
<url>http://mvn.cloud.alipay.com/nexus/content/groups/open</url>
</repository>
</repositories>
根据以下说明设置相关参数,运行示例代码:
import java.util.Properties;
import com.alipay.sofa.sofamq.client.PropertyKeyConst;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;
import io.openmessaging.api.Producer;
import io.openmessaging.api.SendResult;
public class Main {
public static void main(String... args) {
Properties credentials = new Properties();
// 鉴权用 AccessKeyId,在阿里云服务器管理控制台创建
credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "$accessKey");
// 鉴权用 AccessKeySecret,在阿里云服务器管理控制台创建
credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "xxxxx");
// 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
.withCredentials(credentials).build();
Properties properties = new Properties();
// 设置用户实例,进入控制台的概览页面查看接入点配置
properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
// 您在控制台创建的 Group ID
properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
Producer producer = accessPoint.createProducer(properties);
producer.start();
Message message = new Message("$topic", "YOUR_TAG", "hello world".getBytes());
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
}
消息发送成功后,需要启动消费者来订阅消息。
调用 TCP Java SDK 订阅消息。
您可以运行以下示例代码来启动消费者,并测试订阅消息的功能。请按照说明正确设置相关参数。
import java.util.Properties;
import com.alipay.sofa.sofamq.client.PropertyKeyConst;
import io.openmessaging.api.Action;
import io.openmessaging.api.ConsumeContext;
import io.openmessaging.api.Consumer;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessageListener;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;
public class Main {
public static void main(String... args) {
Properties credentials = new Properties();
// 鉴权用 AccessKeyId,在阿里云服务器管理控制台创建
credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "$accessKey");
// 鉴权用 AccessKeySecret,在阿里云服务器管理控制台创建
credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "xxxxx");
// 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
.withCredentials(credentials).build();
Properties properties = new Properties();
// 设置用户实例,进入控制台的概览页面查看接入点配置
properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
// 您在控制台创建的 Group ID
properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
Consumer consumer = accessPoint.createConsumer(properties);
consumer.subscribe("YOUR_TOPIC", "YOUR_TAG", new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext context) {
System.out.println(new String(message.getBody()));
return Action.CommitMessage;
}
});
consumer.start();
}
}
在文档使用中是否遇到以下问题
更多建议
匿名提交