数据转发到AMQP服务端订阅消费组

直接使用AMQP服务端订阅实时获取设备数据,只能从产品维度获取所有设备的数据,使用消息转发的云产品流转功能,可以将全部设备或指定设备发送到物联网平台的消息,先经过SQL表达式处理和过滤,再转发到AMQP服务端订阅消费组,通过AMQP客户端消费。

工作原理

image

云产品流转可将同一产品所有设备或指定设备的指定Topic消息,实时转发到一个或多个消费组中,每个消费组中包括多个消费者即AMQP客户端。每条消息转发到消费组时,消费组中随机一个消费者收到消息,不同消费组通过消费组ID区分。在上图中:

  1. 消费者:使用AMQP SDK注册消费组的消费者,用于接收物联网平台转发到AMQP消费组中的消息。

  2. 物联网平台:配置云产品流转规则数据源TopicSQL表达式数据目的,并启动规则,将设备消息转发到AMQP服务端订阅的消费组。

    • 数据源:支持的Topic类型消息,请参见数据格式

      例如:

      • 数据源Topic_产品A:将产品A所有设备的消息转发到消费组。

      • 数据源Topic_设备B:将产品B中指定的一个设备B的消息转发到消费组。

    • SQL表达式:编写SQL表达式来解析和处理设备消息的JSON数据。SQL表达式的语法说明,请参见SQL表达式

      说明

      二进制格式的数据不做解析,直接透传。

    • 数据目的:接收设备数据的AMQP服务端订阅消费组。具体内容,请参见管理AMQP消费组

  3. AMQP客户端:启动规则后,物联网平台会自动将设备消息转发到AMQP客户端,不需要在AMQP客户端代码中订阅Topic,只要AMQP客户端在线就可以接收消息。

说明

云产品流转不支持从AMQP客户端下发消息给设备,如果需要下发指令给设备,请调用消息通信API

应用场景

业务服务器接收设备消息:云产品流转可以灵活地转发设备消息到AMQP服务端订阅的消息组。

  • 转发指定设备的消息。

  • 转发指定Topic的消息。

  • 消息过滤或处理后再转发。

AMQP客户端直接实时获取指定产品下所有设备的消息,可以直接配置AMQP服务端订阅。具体内容,请参见配置AMQP服务端订阅

使用限制

  • AMQP客户端建立连接之后,需要立刻发送认证请求。如果15秒内没有认证成功,服务器会主动关闭连接。

  • AMQP客户端的一个连接限流1,000 TPS,消息转发TPS限流由实例的消息转发TPS规格决定,消息大小无限制。更多的AMQP服务端订阅限制,请参见服务端订阅使用限制

  • 云网关产品和设备,及MQTT型实例下产品和设备,不支持使用云产品流转(旧版)功能。如果流转对应产品和设备的数据,需要使用云产品流转(新版)功能。具体内容,请参见云产品流转(新版)

前提条件

步骤一:配置数据转发目的

  1. 登录物联网平台控制台

  2. 实例概览页签的全部环境下,找到对应的实例,单击实例卡片。

  3. 在左侧导航栏,选择消息转发 > 云产品流转

  4. 单击规则对应的查看,进入数据流转规则页面。

    重要

    若当前页面为云产品流转新版页面,需先单击右上角返回旧版,再单击目标规则对应的查看

  5. 单击转发数据一栏对应的添加操作

  6. 添加操作对话框中,选择操作为发布到AMQP服务端订阅消费组。按照界面提示,设置其他信息,单击确认

    添加操作

    参数

    描述

    选择操作

    选择发布到AMQP服务端订阅消费组

    消费组

    选择一个已创建的消费组作为数据转发目标。单击创建消费组可以进行消费组创建。

    Tag

    设置tag后,所有通过该操作流转到AMQP服务端订阅消费组里的消息都会携带该tag。

    tag长度为1~128个字符,可以输入常量或变量。

    • 常量支持输入中文汉字、英文字母、数字。

    • 变量格式为${key},代表SQL处理后的JSON数据中key对应的value值。如果取不到value值,则消息不携带tag。

  7. 回到云产品流转页,单击规则对应的启动按钮启动规则。

步骤二:运行AMQP客户端

重要
  1. 登录ECS实例。登录方式,请参见连接方式概述

  2. 执行以下命令,下载Demo文件。

    wget https://linkkit-export.oss-cn-shanghai.aliyuncs.com/amqp/amqp-demo.zip
  3. 执行以下命令,解压demo文件。

    unzip amqp-demo.zip
  4. src/main/java/com.aliyun.iotx.demo目录下AmqpClient.java文件中,参照下表修改AMQP的接入信息。

    重要

    本示例Demo代码中,添加了结束程序的代码(Thread.sleep(60 * 1000);),即程序启动成功,运行一分钟后会结束。实际场景中,您可根据需要自行设置运行时间。

    参数

    说明

    accessKey

    阿里云主账号或RAM用户的AccessKey ID和AccessKey Secret。

    登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。

    重要

    为避免将AccessKey硬编码到业务代码中带来的安全风险,可采用配置环境变量的方法管理AccessKey。

    您需在本地操作系统中添加环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET,并分别写入已准备好的AccessKey ID和AccessKey Secret。

    在示例代码中可通过以下方法获取:

    • System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")

    • System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")

    accessSecret

    consumerGroupId

    当前物联网平台对应实例中的消费组ID。

    登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表查看您的消费组ID。

    iotInstanceId

    实例ID。您可在物联网平台控制台实例概览页面,查看当前实例的ID。

    • 若有ID值,必须传入该ID值。

    • 若无实例概览页面或ID值,传入空值,即iotInstanceId = ""

    clientId

    表示客户端ID,用户自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。

    AMQP客户端接入并启动成功后,登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表页签,单击消费组对应的查看消费组详情页面将显示该参数,方便您识别区分不同的客户端。

    connectionCount

    启动AMQP客户端的连接数,最大不超过128个。用于实时消息推送的扩容。

    消费组详情页面会以${clientId}+"-"+数字形式,显示连接的客户端。其中数字最小值为0。

    host

    AMQP接入域名。

    ${YourHost}对应的AMQP接入域名信息,请参见查看和配置实例终端节点信息(Endpoint)

  5. pom.xml文件中,已添加相关Maven依赖。在amqp-demo根目录执行以下命令,重新加载Maven变更,构建项目。

    mvn clean package
  6. amqp-demo/target目录执行以下命令,运行生成的JAR包。

    java -jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
  7. 运行示例代码后返回如下信息,表示AMQP客户端已接入物联网平台并成功接收消息。

    重要

    只有当AMQP客户端在线时,才能在服务器上收到设备消息。

    10:42:43.254 [main] INFO com.aliyun.iotx.demo.AmqpClient - amqp demo is started successfully, and will exit after 60s 
    10:59:46.405 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Dispatching received message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.409 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1:1 } ackType: DELIVERED (5)
    10:59:46.432 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Delivered Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.441 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } ackType: ACCEPTED (6)
    10:59:46.442 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Accepted Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.452 [pool-1-thread-1] INFO com.aliyun.iotx.demo.AmqpClient - receive message,
     topic = /g18******/device01/thing/event/property/post,
     messageId = 1731508564705******,
     content = {"temperature":10,"humidity":56}

    在相应消费组显示在线的AMQP客户端。amqp-democonnectionCount = 4代表4个客户端。

    image.png

后续操作

  1. 如果AMQP客户端不在线,AMQP服务端订阅消息会堆积,AMQP客户端重新上线后,物联网平台重新推送消息。如果不需要消费堆积的消息,可在AMQP客户端上线前,清空堆积的消息

  2. 所有配置完成,设备上报订阅数据并被AMQP客户端接收后,您可以登录物联网平台控制台,进入对应实例查看消息运行日志。

    • 监控运维 > 日志服务 > 云端运行日志页签,查看设备上报数据、物联网平台转发数据到AMQP客户端和AMQP客户端返回ACK的日志记录。具体操作,请参见查询云端运行日志

    • 消息转发 > 服务端订阅 > 消费组列表页签,单击目标消费组右侧操作列的查看,在消费组详情页面,查看消息消费速率、消息堆积量、消费日志等。具体操作,请参见查看和监控消费组

常见问题