使用服务端订阅

更新时间:
重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

使用服务端订阅

管理消费组

消费组是消息消费端的身份标识,多个消费者组成消息消费组接入物联网平台。设置服务端订阅后,物联网平台会将收到的设备消息转发到相应消费组,每条被流转到消费组的消息会被其中一个消费者接收。本文介绍如何在物联网平台创建、查看和删除消费组。

工作原理

image9.svg

服务端订阅会将同一产品所有设备的指定类型消息,转发到一个或多个消费组中,每个消费组中随机一个消费者收到消息。不同消费组通过消费组 ID 区分,每个客户端只能配置一个消费组 ID,每个消费组最多包括 64 个客户端。您需要先创建消费组,然后将消费组 ID 配置到客户端,最后配置服务端订阅

  • **订阅关系 1:**将产品 1 的消息转发到消费组 1 和消费组 2。

  • **订阅关系 2:**将产品 2 的消息转发到消费组 2。

服务端订阅只能流转同一产品所有设备的指定类型消息,如果需要更灵活的流转设备消息。例如转发部分设备消息到客户端进行消费,可以使用物联网平台提供的云产品流转功能,先指定设备 Topic 消息转发至服务端订阅的消费组,进而流转到消费对应消费组的客户端。

创建消费组

  1. 登录物联网平台控制台

  2. 在左侧导航栏,选择消息转发 > 服务端订阅,单击消费组列表页签。

  3. 单击创建消费组

  4. 创建消费组对话框中,输入组名,单击确认。消费组名称支持中文、英文字母、日文、数字和下划线(_),长度范围为 4~30 个字符。一个中文或日文占 2 个字符。

查看消费组

您可以查看消费组内订阅的产品,以及消费日志。

  1. 消费组列表中,找到要查看的消费组,并单击对应的查看

  2. 消费组详情页面,单击消费日志页签,可查看具体的消费记录。

删除消费组

警告

删除消费组后,消费组内的所有消费端会停止接收消息,消费组相关的服务端订阅服务不可用,可能导致用户业务中断,请谨慎操作。

用户创建的消费组可以删除,物联网平台的默认消费组不可删除。

  1. 解除订阅。如果消费组已关联订阅关系,则需先解除订阅;如果消费组无订阅关系,请忽略此步骤。

  2. 服务端订阅页面的消费组列表页签下,单击消费组对应的删除,然后单击确认

配置服务端订阅

当需要实时获取设备上报的数据时,如果使用云端 API 只能获取物模型数据,而且可能无法实时获取,使用服务端订阅功能,可以在业务服务器实时、可靠地获取设备上报消息。本文为您介绍配置服务端订阅的操作步骤。

前提条件

  • 已创建产品。

  • 已创建消费组。您可使用物联网平台默认消费组(DEFAULT_GROUP)或创建消费组。

  • 设备上报的物模型 Topic 数据必须符合格式,才能触发服务端订阅。设备上报的自定义 Topic 数据没有格式要求。

使用限制

  • 建立连接之后,需要立刻发送认证请求。如果 15 秒内没有认证成功,服务器会主动关闭连接。

如何配置

步骤一:创建消费组

  1. 参照前文创建一个消费组。

步骤二:创建服务端订阅

在物联网平台控制台创建订阅,关联消费组以及对应的设备消息类型。

  1. 在左侧导航栏,选择消息转发 > 服务端订阅

  2. 服务端订阅页面,单击创建订阅

  3. 创建订阅对话框中,完成配置,单击确认

参数

说明

产品

物联网平台会转发该产品下所有设备的消息。一个产品只能创建一个相同类型的服务端订阅。

订阅类型

支持选择 AMQP 或者 Kafka 两种类型。

消费组

物联网平台提供默认消费组。

选择消费组列表,在右侧选择目标消费组面板,可以选择多个消费组,也可以

单击右下角的创建消费组

推送消息类型

  • 物模型历史数据上报:如果订阅异步服务调用响应数据,设备端返回的响应消息Id必须与物联网平台下发消息的Id相同,才可实现数据正常订阅。

  • OTA模块版本号上报:当设备上报OTA模块版本号,且版本号有变更时进行消息转发。

  • OTA升级设备状态通知:包括升级包验证和批量升级时,设备升级成功、失败、取消和进度的事件通知。

  • OTA升级批次状态通知:设备OTA升级批次状态变化通知。

  • 设备状态变化通知:该产品下的设备上下线状态变化时通知的消息。

  • 网关子设备发现上报:网关将发现的子设备信息上报给物联网平台。需要网关上的应用程序支持。网关产品特有消息类型。

  • 设备拓扑关系变更:子设备和网关之间的拓扑关系建立和解除消息。网关产品特有消息类型。

  • 设备生命周期变更:设备创建、删除等消息。

步骤三:运行客户端

  1. AMQP 客户端

//参数详见下方表格

// 指定单个进程启动的连接数
private static int connectionCount = 4;

//业务处理异步线程池
private final static ExecutorService executorService = new ThreadPoolExecutor(
    Runtime.getRuntime().availableProcessors(),
    Runtime.getRuntime().availableProcessors() * 2, 3600, TimeUnit.SECONDS,
    new LinkedBlockingQueue(50000));

public static void main(String[] args) throws Exception {
    List<Connection> connections = new ArrayList<>();

    for (int i = 0; i < connectionCount; i++) {
        long timeStamp = System.currentTimeMillis();
        //签名方法
        String signMethod = "hmacsha1";
        
        String userName = clientId + "-" + i + "|authMode=aksign"
                + ",signMethod=" + signMethod
                + ",timestamp=" + timeStamp
                + ",authId=" + accessKey
                + ",consumerGroupId=" + consumerGroupId
                + "|";

        //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
        String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
        String password = doSign(signContent, accessSecret, signMethod);
        String connectionUrl = "failover:(amqp://" + host + ":5672?amqp.idleTimeout=80000)"
                + "?failover.reconnectDelay=30";

        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", connectionUrl);
        hashtable.put("queue.QUEUE", consumerGroupId);
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
        Destination queue = (Destination)context.lookup("QUEUE");
        // 创建连接。
        Connection connection = cf.createConnection(userName, password);
        connections.add(connection);

        ((JmsConnection)connection).addConnectionListener(myJmsConnectionListener);
        // 创建会话。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        connection.start();
        // 创建Receiver连接。
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(messageListener);
    }

    logger.info("amqp demo is started successfully, and will exit after 60s ");

    // 结束程序运行 
    Thread.sleep(60 * 1000 * 30);
    logger.info("run shutdown");

    connections.forEach(c-> {
        try {
            c.close();
        } catch (JMSException e) {
            logger.error("failed to close connection", e);
        }
    });

    executorService.shutdown();
    if (executorService.awaitTermination(10, TimeUnit.SECONDS)) {
        logger.info("shutdown success");
    } else {
        logger.info("failed to handle messages");
    }
}

/**
* 在这里处理您收到消息后的具体业务逻辑。
*/
private static void processMessage(Message message) {
    try {
        byte[] body = message.getBody(byte[].class);
        String content = new String(body);
        String topic = message.getStringProperty("topic");
        String messageId = message.getStringProperty("messageId");
        long generateTime = message.getLongProperty("generateTime");
        logger.info("receive message"
            + ",\n topic = " + topic
            + ",\n messageId = " + messageId
            + ",\n generateTime = " + generateTime
            + ",\n content = " + content);
    } catch (Exception e) {
        logger.error("processMessage occurs error ", e);
    }
}

private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {...};

/**
 * 计算签名
 */
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
    SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
    Mac mac = Mac.getInstance(signMethod);
    mac.init(signingKey);
    byte[] rawHmac = mac.doFinal(toSignString.getBytes());
    return Base64.encodeBase64String(rawHmac);
}

参数说明如下:

参数

说明

accessKey

平台颁发给开发者的,用于调用接口的密钥ID和密钥,在系统管理>密钥管理获取。

accessSecret

consumerGroupId

当前物联网平台对应实例中的消费组ID。

登录物联网平台控制台,在消息转发 > 服务端订阅 > 消费组管理查看您的消费组ID。

clientId

表示客户端ID,用户自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。

signMethod

签名方式。支持如下三种:

  • hmacmd5

  • hmacsha1

  • hmacsha256

host

AMQP接入域名。

${YourHost}对应的AMQP接入域名信息,请在物联网控制台 系统管理 > 开发配置 查看AMQP服务器的host。

  1. Kafka 客户端

在物联网控制台 系统管理 > 开发配置 可以获取到Kafka服务器地址和端口。

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "alikafka-post-cn-29t3s87td001-1-vpc.alikafka.aliyuncs.com:9094,alikafka-post-cn-29t3s87td001-2-vpc.alikafka.aliyuncs.com:9094,alikafka-post-cn-29t3s87td001-3-vpc.alikafka.aliyuncs.com:9094"); // Kafka 服务器地址
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DEFAULT_GROUP"); // Group为您的消费者组ID,默认为DEFAULT_GROUP
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("DEFAULT_GROUP")); // 订阅主题,主题为您的消费组ID,默认为DEFAULT_GROUP

try {
    while (true) {
        for (ConsumerRecord<String, String> record : consumer.poll(100)) {
            System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());
        }
    }
} finally {
    consumer.close();
}

管理服务端订阅

设置服务端订阅成功后,在服务端订阅页面的订阅列表下,找到已订阅的产品名称,可执行以下操作。

  • 编辑:单击产品对应操作列的编辑,在编辑订阅对话框,修改消费组推送消息类型

  • 删除:单击产品对应操作列的删除。单击确认