直接使用AMQP服务端订阅实时获取设备数据,只能从产品维度获取所有设备的数据,使用消息转发的云产品流转功能,可以将全部设备或指定设备发送到物联网平台的消息,先经过SQL表达式处理和过滤,再转发到AMQP服务端订阅消费组,通过AMQP客户端消费。
工作原理
云产品流转可将同一产品所有设备或指定设备的指定Topic消息,实时转发到一个或多个消费组中,每个消费组中包括多个消费者即AMQP客户端。每条消息转发到消费组时,消费组中随机一个消费者收到消息,不同消费组通过消费组ID区分。在上图中:
- 消费者:使用AMQP SDK注册消费组的消费者,用于接收物联网平台转发到AMQP消费组中的消息。 
- 物联网平台:配置云产品流转规则的数据源Topic、SQL表达式和数据目的,并启动规则,将设备消息转发到AMQP服务端订阅的消费组。 
- AMQP客户端:启动规则后,物联网平台会自动将设备消息转发到AMQP客户端,不需要在AMQP客户端代码中订阅Topic,只要AMQP客户端在线就可以接收消息。 
云产品流转不支持从AMQP客户端下发消息给设备,如果需要下发指令给设备,请调用消息通信API。
应用场景
业务服务器接收设备消息:云产品流转可以灵活地转发设备消息到AMQP服务端订阅的消息组。
- 转发指定设备的消息。 
- 转发指定Topic的消息。 
- 消息过滤或处理后再转发。 
AMQP客户端直接实时获取指定产品下所有设备的消息,可以直接配置AMQP服务端订阅。具体内容,请参见配置AMQP服务端订阅。
使用限制
前提条件
- 已创建消费组,作为数据转发目的地。您可使用物联网平台默认消费组(DEFAULT_GROUP)或创建消费组。 
步骤一:配置数据转发目的
- 登录物联网平台控制台。 
- 在实例概览页签的全部环境下,找到对应的实例,单击实例卡片。 
- 在左侧导航栏,选择。 
- 单击规则对应的查看,进入数据流转规则页面。 重要- 若当前页面为云产品流转新版页面,需先单击右上角返回旧版,再单击目标规则对应的查看。 
- 单击转发数据一栏对应的添加操作。 
- 在添加操作对话框中,选择操作为发布到AMQP服务端订阅消费组。按照界面提示,设置其他信息,单击确认。  - 参数 - 描述 - 选择操作 - 选择发布到AMQP服务端订阅消费组。 - 消费组 - 选择一个已创建的消费组作为数据转发目标。单击创建消费组可以进行消费组创建。 - Tag - 设置tag后,所有通过该操作流转到AMQP服务端订阅消费组里的消息都会携带该tag。 - tag长度为1~128个字符,可以输入常量或变量。 - 常量支持输入中文汉字、英文字母、数字。 
- 变量格式为 - ${key},代表SQL处理后的JSON数据中key对应的value值。如果取不到value值,则消息不携带tag。
 
- 回到云产品流转页,单击规则对应的启动按钮启动规则。 
步骤二:运行AMQP客户端
- 建议使用阿里云物联网平台提供的AMQP SDK接入示例。对于您自研的AMQP SDK,阿里云不提供后续技术支持服务。 
- 本示例使用Java语言,其他语言的示例请参见AMQP客户端接入说明。 
- 本示例购买Alibaba Cloud Linux操作系统的ECS实例,作为AMQP客户端的开发环境: 
- 登录ECS实例。登录方式,请参见连接方式概述。 
- 执行以下命令,下载Demo文件。 - wget https://linkkit-export.oss-cn-shanghai.aliyuncs.com/amqp/amqp-demo.zip
- 执行以下命令,解压demo文件。 - unzip amqp-demo.zip
- 在 - src/main/java/com.aliyun.iotx.demo目录下- AmqpClient.java文件中,参照下表修改AMQP的接入信息。重要- 本示例Demo代码中,添加了结束程序的代码( - Thread.sleep(60 * 1000);),即程序启动成功,运行一分钟后会结束。实际场景中,您可根据需要自行设置运行时间。- 参数 - 说明 - accessKey - 阿里云主账号或RAM用户的AccessKey ID和AccessKey Secret。 - 登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。 重要- 为避免将AccessKey硬编码到业务代码中带来的安全风险,可采用配置环境变量的方法管理AccessKey。 - 您需在本地操作系统中添加环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET,并分别写入已准备好的AccessKey ID和AccessKey Secret。 - 在示例代码中可通过以下方法获取: - System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
- System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
 - accessSecret - consumerGroupId - 当前物联网平台对应实例中的消费组ID。 - 登录物联网平台控制台,在对应实例的查看您的消费组ID。 - iotInstanceId - 实例ID。您可在物联网平台控制台的实例概览页面,查看当前实例的ID。 - 若有ID值,必须传入该ID值。 
- 若无实例概览页面或ID值,传入空值,即 - iotInstanceId = ""。
 - clientId - 表示客户端ID,用户自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。 - AMQP客户端接入并启动成功后,登录物联网平台控制台,在对应实例的页签,单击消费组对应的查看,消费组详情页面将显示该参数,方便您识别区分不同的客户端。 - connectionCount - 启动AMQP客户端的连接数,最大不超过128个。用于实时消息推送的扩容。 - 消费组详情页面会以 - ${clientId}+"-"+数字形式,显示连接的客户端。其中数字最小值为0。- host - AMQP接入域名。 - ${YourHost}对应的AMQP接入域名信息,请参见查看和配置实例终端节点信息(Endpoint)。
- 在 - pom.xml文件中,已添加相关Maven依赖。在- amqp-demo根目录执行以下命令,重新加载Maven变更,构建项目。- mvn clean package
- 在 - amqp-demo/target目录执行以下命令,运行生成的JAR包。- java -jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
- 运行示例代码后返回如下信息,表示AMQP客户端已接入物联网平台并成功接收消息。 重要- 只有当AMQP客户端在线时,才能在服务器上收到设备消息。 - 10:42:43.254 [main] INFO com.aliyun.iotx.demo.AmqpClient - amqp demo is started successfully, and will exit after 60s 10:59:46.405 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Dispatching received message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.409 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1:1 } ackType: DELIVERED (5) 10:59:46.432 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Delivered Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.441 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } ackType: ACCEPTED (6) 10:59:46.442 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Accepted Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.452 [pool-1-thread-1] INFO com.aliyun.iotx.demo.AmqpClient - receive message, topic = /g18******/device01/thing/event/property/post, messageId = 1731508564705******, content = {"temperature":10,"humidity":56}- 在相应消费组显示在线的AMQP客户端。 - amqp-demo中- connectionCount = 4代表4个客户端。