使用服务端订阅
本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
使用服务端订阅
管理消费组
消费组是消息消费端的身份标识,多个消费者组成消息消费组接入物联网平台。设置服务端订阅后,物联网平台会将收到的设备消息转发到相应消费组,每条被流转到消费组的消息会被其中一个消费者接收。本文介绍如何在物联网平台创建、查看和删除消费组。
工作原理
服务端订阅会将同一产品所有设备的指定类型消息,转发到一个或多个消费组中,每个消费组中随机一个消费者收到消息。不同消费组通过消费组 ID 区分,每个客户端只能配置一个消费组 ID,每个消费组最多包括 64 个客户端。您需要先创建消费组,然后将消费组 ID 配置到客户端,最后配置服务端订阅
**订阅关系 1:**将产品 1 的消息转发到消费组 1 和消费组 2。
**订阅关系 2:**将产品 2 的消息转发到消费组 2。
服务端订阅只能流转同一产品所有设备的指定类型消息,如果需要更灵活的流转设备消息。例如转发部分设备消息到客户端进行消费,可以使用物联网平台提供的云产品流转功能,先指定设备 Topic 消息转发至服务端订阅的消费组,进而流转到消费对应消费组的客户端。
创建消费组
登录物联网平台控制台
在左侧导航栏,选择消息转发 > 服务端订阅,单击消费组列表页签。
单击创建消费组。
在创建消费组对话框中,输入组名,单击确认。消费组名称支持中文、英文字母、日文、数字和下划线(_),长度范围为 4~30 个字符。一个中文或日文占 2 个字符。
查看消费组
您可以查看消费组内订阅的产品,以及消费日志。
在消费组列表中,找到要查看的消费组,并单击对应的查看。
在消费组详情页面,单击消费日志页签,可查看具体的消费记录。
删除消费组
删除消费组后,消费组内的所有消费端会停止接收消息,消费组相关的服务端订阅服务不可用,可能导致用户业务中断,请谨慎操作。
用户创建的消费组可以删除,物联网平台的默认消费组不可删除。
解除订阅。如果消费组已关联订阅关系,则需先解除订阅;如果消费组无订阅关系,请忽略此步骤。
在服务端订阅页面的消费组列表页签下,单击消费组对应的删除,然后单击确认。
配置服务端订阅
当需要实时获取设备上报的数据时,如果使用云端 API 只能获取物模型数据,而且可能无法实时获取,使用服务端订阅功能,可以在业务服务器实时、可靠地获取设备上报消息。本文为您介绍配置服务端订阅的操作步骤。
前提条件
已创建产品。
已创建消费组。您可使用物联网平台默认消费组(DEFAULT_GROUP)或创建消费组。
设备上报的物模型 Topic 数据必须符合格式,才能触发服务端订阅。设备上报的自定义 Topic 数据没有格式要求。
使用限制
建立连接之后,需要立刻发送认证请求。如果 15 秒内没有认证成功,服务器会主动关闭连接。
如何配置
步骤一:创建消费组
参照前文创建一个消费组。
步骤二:创建服务端订阅
在物联网平台控制台创建订阅,关联消费组以及对应的设备消息类型。
在左侧导航栏,选择消息转发 > 服务端订阅。
在服务端订阅页面,单击创建订阅。
在创建订阅对话框中,完成配置,单击确认。
参数 | 说明 |
产品 | 物联网平台会转发该产品下所有设备的消息。一个产品只能创建一个相同类型的服务端订阅。 |
订阅类型 | 支持选择 AMQP 或者 Kafka 两种类型。 |
消费组 | 物联网平台提供默认消费组。 选择消费组列表,在右侧选择目标消费组面板,可以选择多个消费组,也可以 单击右下角的创建消费组。 |
推送消息类型 |
|
步骤三:运行客户端
AMQP 客户端
//参数详见下方表格
// 指定单个进程启动的连接数
private static int connectionCount = 4;
//业务处理异步线程池
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2, 3600, TimeUnit.SECONDS,
new LinkedBlockingQueue(50000));
public static void main(String[] args) throws Exception {
List<Connection> connections = new ArrayList<>();
for (int i = 0; i < connectionCount; i++) {
long timeStamp = System.currentTimeMillis();
//签名方法
String signMethod = "hmacsha1";
String userName = clientId + "-" + i + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKey
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;
String password = doSign(signContent, accessSecret, signMethod);
String connectionUrl = "failover:(amqp://" + host + ":5672?amqp.idleTimeout=80000)"
+ "?failover.reconnectDelay=30";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", connectionUrl);
hashtable.put("queue.QUEUE", consumerGroupId);
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
// 创建连接。
Connection connection = cf.createConnection(userName, password);
connections.add(connection);
((JmsConnection)connection).addConnectionListener(myJmsConnectionListener);
// 创建会话。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// 创建Receiver连接。
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}
logger.info("amqp demo is started successfully, and will exit after 60s ");
// 结束程序运行
Thread.sleep(60 * 1000 * 30);
logger.info("run shutdown");
connections.forEach(c-> {
try {
c.close();
} catch (JMSException e) {
logger.error("failed to close connection", e);
}
});
executorService.shutdown();
if (executorService.awaitTermination(10, TimeUnit.SECONDS)) {
logger.info("shutdown success");
} else {
logger.info("failed to handle messages");
}
}
/**
* 在这里处理您收到消息后的具体业务逻辑。
*/
private static void processMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
long generateTime = message.getLongProperty("generateTime");
logger.info("receive message"
+ ",\n topic = " + topic
+ ",\n messageId = " + messageId
+ ",\n generateTime = " + generateTime
+ ",\n content = " + content);
} catch (Exception e) {
logger.error("processMessage occurs error ", e);
}
}
private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {...};
/**
* 计算签名
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}
参数说明如下:
参数 | 说明 |
accessKey | 平台颁发给开发者的,用于调用接口的密钥ID和密钥,在系统管理>密钥管理获取。 |
accessSecret | |
consumerGroupId | 当前物联网平台对应实例中的消费组ID。 登录物联网平台控制台,在消息转发 > 服务端订阅 > 消费组管理查看您的消费组ID。 |
clientId | 表示客户端ID,用户自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。 |
signMethod | 签名方式。支持如下三种:
|
host | AMQP接入域名。
|
Kafka 客户端
在物联网控制台 系统管理 > 开发配置 可以获取到Kafka服务器地址和端口。
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "alikafka-post-cn-29t3s87td001-1-vpc.alikafka.aliyuncs.com:9094,alikafka-post-cn-29t3s87td001-2-vpc.alikafka.aliyuncs.com:9094,alikafka-post-cn-29t3s87td001-3-vpc.alikafka.aliyuncs.com:9094"); // Kafka 服务器地址
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DEFAULT_GROUP"); // Group为您的消费者组ID,默认为DEFAULT_GROUP
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("DEFAULT_GROUP")); // 订阅主题,主题为您的消费组ID,默认为DEFAULT_GROUP
try {
while (true) {
for (ConsumerRecord<String, String> record : consumer.poll(100)) {
System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
管理服务端订阅
设置服务端订阅成功后,在服务端订阅页面的订阅列表下,找到已订阅的产品名称,可执行以下操作。
编辑:单击产品对应操作列的编辑,在编辑订阅对话框,修改消费组或推送消息类型。
删除:单击产品对应操作列的删除。单击确认。