直接使用AMQP服务端订阅实时获取设备数据,只能从产品维度获取所有设备的数据,使用消息转发的云产品流转功能,可以将全部设备或指定设备发送到物联网平台的消息,先经过SQL表达式处理和过滤,再转发到AMQP服务端订阅消费组,通过AMQP客户端消费。
工作原理
云产品流转可将同一产品所有设备或指定设备的指定Topic消息,实时转发到一个或多个消费组中,每个消费组中包括多个消费者即AMQP客户端。每条消息转发到消费组时,消费组中随机一个消费者收到消息,不同消费组通过消费组ID区分。在上图中:
消费者:使用AMQP SDK注册消费组的消费者,用于接收物联网平台转发到AMQP消费组中的消息。
物联网平台:配置云产品流转规则的数据源Topic、SQL表达式和数据目的,并启动规则,将设备消息转发到AMQP服务端订阅的消费组。
AMQP客户端:启动规则后,物联网平台会自动将设备消息转发到AMQP客户端,不需要在AMQP客户端代码中订阅Topic,只要AMQP客户端在线就可以接收消息。
云产品流转不支持从AMQP客户端下发消息给设备,如果需要下发指令给设备,请调用消息通信API。
应用场景
业务服务器接收设备消息:云产品流转可以灵活地转发设备消息到AMQP服务端订阅的消息组。
转发指定设备的消息。
转发指定Topic的消息。
消息过滤或处理后再转发。
AMQP客户端直接实时获取指定产品下所有设备的消息,可以直接配置AMQP服务端订阅。具体内容,请参见配置AMQP服务端订阅。
使用限制
前提条件
已创建消费组,作为数据转发目的地。您可使用物联网平台默认消费组(DEFAULT_GROUP)或创建消费组。
步骤一:配置数据转发目的
登录物联网平台控制台。
在实例概览页签的全部环境下,找到对应的实例,单击实例卡片。
在左侧导航栏,选择 。
单击规则对应的查看,进入数据流转规则页面。
重要若当前页面为云产品流转新版页面,需先单击右上角返回旧版,再单击目标规则对应的查看。
单击转发数据一栏对应的添加操作。
在添加操作对话框中,选择操作为发布到AMQP服务端订阅消费组。按照界面提示,设置其他信息,单击确认。
参数
描述
选择操作
选择发布到AMQP服务端订阅消费组。
消费组
选择一个已创建的消费组作为数据转发目标。单击创建消费组可以进行消费组创建。
Tag
设置tag后,所有通过该操作流转到AMQP服务端订阅消费组里的消息都会携带该tag。
tag长度为1~128个字符,可以输入常量或变量。
常量支持输入中文汉字、英文字母、数字。
变量格式为
${key}
,代表SQL处理后的JSON数据中key对应的value值。如果取不到value值,则消息不携带tag。
回到云产品流转页,单击规则对应的启动按钮启动规则。
步骤二:运行AMQP客户端
建议使用阿里云物联网平台提供的AMQP SDK接入示例。对于您自研的AMQP SDK,阿里云不提供后续技术支持服务。
本示例使用Java语言,其他语言的示例请参见AMQP客户端接入说明。
本示例购买Alibaba Cloud Linux操作系统的ECS实例,作为AMQP客户端的开发环境:
登录ECS实例。登录方式,请参见连接方式概述。
执行以下命令,下载Demo文件。
wget https://linkkit-export.oss-cn-shanghai.aliyuncs.com/amqp/amqp-demo.zip
执行以下命令,解压demo文件。
unzip amqp-demo.zip
在
src/main/java/com.aliyun.iotx.demo
目录下AmqpClient.java
文件中,参照下表修改AMQP的接入信息。重要本示例Demo代码中,添加了结束程序的代码(
Thread.sleep(60 * 1000);
),即程序启动成功,运行一分钟后会结束。实际场景中,您可根据需要自行设置运行时间。参数
说明
accessKey
阿里云主账号或RAM用户的AccessKey ID和AccessKey Secret。
登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。
重要为避免将AccessKey硬编码到业务代码中带来的安全风险,可采用配置环境变量的方法管理AccessKey。
您需在本地操作系统中添加环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET,并分别写入已准备好的AccessKey ID和AccessKey Secret。
在示例代码中可通过以下方法获取:
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
accessSecret
consumerGroupId
当前物联网平台对应实例中的消费组ID。
登录物联网平台控制台,在对应实例的
查看您的消费组ID。iotInstanceId
实例ID。您可在物联网平台控制台的实例概览页面,查看当前实例的ID。
若有ID值,必须传入该ID值。
若无实例概览页面或ID值,传入空值,即
iotInstanceId = ""
。
clientId
表示客户端ID,用户自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。
AMQP客户端接入并启动成功后,登录物联网平台控制台,在对应实例的 页签,单击消费组对应的查看,消费组详情页面将显示该参数,方便您识别区分不同的客户端。
connectionCount
启动AMQP客户端的连接数,最大不超过128个。用于实时消息推送的扩容。
消费组详情页面会以
${clientId}+"-"+数字
形式,显示连接的客户端。其中数字最小值为0。host
AMQP接入域名。
${YourHost}
对应的AMQP接入域名信息,请参见查看和配置实例终端节点信息(Endpoint)。在
pom.xml
文件中,已添加相关Maven依赖。在amqp-demo
根目录执行以下命令,重新加载Maven变更,构建项目。mvn clean package
在
amqp-demo/target
目录执行以下命令,运行生成的JAR包。java -jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
运行示例代码后返回如下信息,表示AMQP客户端已接入物联网平台并成功接收消息。
重要只有当AMQP客户端在线时,才能在服务器上收到设备消息。
10:42:43.254 [main] INFO com.aliyun.iotx.demo.AmqpClient - amqp demo is started successfully, and will exit after 60s 10:59:46.405 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Dispatching received message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.409 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1:1 } ackType: DELIVERED (5) 10:59:46.432 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Delivered Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.441 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } ackType: ACCEPTED (6) 10:59:46.442 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Accepted Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.452 [pool-1-thread-1] INFO com.aliyun.iotx.demo.AmqpClient - receive message, topic = /g18******/device01/thing/event/property/post, messageId = 1731508564705******, content = {"temperature":10,"humidity":56}
在相应消费组显示在线的AMQP客户端。
amqp-demo
中connectionCount = 4
代表4个客户端。