直接使用AMQP服务端订阅实时获取设备数据,只能从产品维度获取所有设备的数据,使用消息转发的云产品流转功能,可以将全部设备或指定设备发送到物联网平台的消息,先经过解析脚本处理和过滤,再转发到AMQP服务端订阅消费组,通过AMQP客户端消费。本文以物模型数据上报Topic为例,介绍流转消息数据的完整流程。
工作原理
云产品流转可将同一产品所有设备或指定设备的指定Topic消息,实时转发到一个或多个消费组中,每个消费组中包括多个消费者即AMQP客户端。每条消息转发到消费组时,消费组中随机一个消费者收到消息,不同消费组通过消费组ID区分。在上图中:
消费者:使用AMQP SDK注册消费组的消费者,用于接收物联网平台转发到AMQP消费组中的消息。
物联网平台:配置云产品流转的数据源Topic、数据目的和解析器脚本,并启动解析器,将设备消息转发到AMQP服务端订阅的消费组。
数据源:支持的Topic类型消息,请参见数据格式(非云网关产品和设备)、自定义Topic(MQTT云网关)、消息转发Topic(GB/T 32960云网关)、消息转发Topic(JT/T 808云网关)、消息转发Topic(SL 651云网关)。
例如:
数据源Topic_产品A:将产品A所有设备的消息转发到消费组。
数据源Topic_设备B:将产品B中指定的一个设备B的消息转发到消费组。
数据目的:接收设备数据的AMQP服务端订阅消费组。具体内容,请参见管理AMQP消费组。
解析器脚本:配置通过数据流转函数
writeAmqp(destinationId, payload, tag)
将设备数据转发到AMQP客户端的消费组中。函数说明,请参见流转数据到数据目的函数。
AMQP客户端:启动解析器后,物联网平台会自动将设备消息转发到AMQP客户端,不需要在AMQP客户端代码中订阅Topic,只要AMQP客户端在线就可以接收消息。
云产品流转不支持从AMQP客户端下发消息给设备,如果需要下发指令给设备,请调用消息通信API。
应用场景
业务服务器接收设备消息:云产品流转可以灵活地转发设备消息到AMQP服务端订阅的消息组。
转发指定设备的消息。
转发指定Topic的消息。
消息过滤或处理后再转发。
AMQP客户端直接实时获取指定产品下所有设备的消息,可以直接配置AMQP服务端订阅。具体内容,请参见配置AMQP服务端订阅。
前提条件
使用限制
步骤一:在物联网平台配置数据目的和解析器
step1:配置数据目的
登录物联网平台控制台。
在实例概览页签的全部环境下,找到对应的实例,单击实例卡片。
在左侧导航栏,选择 。
在云产品流转页面,单击右上角体验新版,进入新版功能页面。
说明如果您已执行过此操作,再次进入云产品流转页面,会直接进入新版功能页面。
单击数据目的页签,然后单击创建数据目的。
在创建数据目的对话框,输入数据目的名称,例如DataPurpose,按照以下参数说明,完成配置,然后单击确定。
参数
描述
选择操作
选择发布到AMQP服务端订阅消费组。
消费组
选择一个已创建的消费组作为数据转发目标。单击创建消费组可以进行消费组创建。
step2:配置并启动解析器
创建解析器,例如DataParser。具体操作,请参见创建解析器。
在解析器详情页面,关联数据源。
在配置向导的数据源下,单击关联数据源。
在弹出的对话框中,单击数据源下拉列表,选择已创建的数据源DataSource,单击确定。
在解析器详情页面,关联数据目的。
单击配置向导的数据目的,然后单击数据目的列表右上方的关联数据目的。
在弹出的对话框中,单击数据目的下拉列表,选择已创建的数据目的DataPurpose,单击确定。
在数据目的列表,查看并保存数据目的ID,例如为1000。
后续解析脚本中,需使用此处的数据目的ID。
在解析器详情页面,单击解析器。
在脚本输入框,输入解析脚本。
解析脚本类似JavaScript语言,编辑脚本的语法参考JavaScript语法,详细编辑方法,请参见脚本语法。
转发数据到AMQP服务端订阅消息组,需要使用函数
writeAmqp(destinationId, payload, tag)
。函数参数说明,请参见函数列表。无需指定设备,转发产品下全部设备的数据:
//通过payload函数,获取设备上报的消息内容,并按照JSON格式转换。 var data = payload("json"); //直接流转物模型上报数据。 writeAmqp(1000, data, "调试");
指定某一个设备,仅转发该设备的消息:
//通过payload函数,获取设备上报的消息内容,并按照JSON格式转换。 var data = payload("json"); //获取上报消息的设备名称。 var dn = deviceName(); //流转指定设备的物模型上报数据。 if (dn == 'device01') { writeAmqp(1000, data, "调试"); }
单击调试,根据页面提示,选择产品和设备,输入Topic和Payload数据,验证脚本可执行。
参数示例如下:
运行结果如下,表示脚本执行成功。
单击发布。
回到云产品流转页面的解析器页签,单击解析器DataParser对应的启动按钮,启动解析器。
步骤二:运行AMQP客户端
建议使用阿里云物联网平台提供的AMQP SDK接入示例。对于您自研的AMQP SDK,阿里云不提供后续技术支持服务。
本示例使用Java语言,其他语言的示例请参见AMQP客户端接入说明。
本示例购买Alibaba Cloud Linux操作系统的ECS实例,作为AMQP客户端的开发环境:
登录ECS实例。登录方式,请参见连接方式概述。
执行以下命令,下载Demo文件。
wget https://linkkit-export.oss-cn-shanghai.aliyuncs.com/amqp/amqp-demo.zip
执行以下命令,解压demo文件。
unzip amqp-demo.zip
在
src/main/java/com.aliyun.iotx.demo
目录下AmqpClient.java
文件中,参照下表修改AMQP的接入信息。重要本示例Demo代码中,添加了结束程序的代码(
Thread.sleep(60 * 1000);
),即程序启动成功,运行一分钟后会结束。实际场景中,您可根据需要自行设置运行时间。参数
说明
accessKey
阿里云主账号或RAM用户的AccessKey ID和AccessKey Secret。
登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。
重要为避免将AccessKey硬编码到业务代码中带来的安全风险,可采用配置环境变量的方法管理AccessKey。
您需在本地操作系统中添加环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET,并分别写入已准备好的AccessKey ID和AccessKey Secret。
在示例代码中可通过以下方法获取:
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
accessSecret
consumerGroupId
当前物联网平台对应实例中的消费组ID。
登录物联网平台控制台,在对应实例的
查看您的消费组ID。iotInstanceId
实例ID。您可在物联网平台控制台的实例概览页面,查看当前实例的ID。
若有ID值,必须传入该ID值。
若无实例概览页面或ID值,传入空值,即
iotInstanceId = ""
。
clientId
表示客户端ID,用户自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。
AMQP客户端接入并启动成功后,登录物联网平台控制台,在对应实例的 页签,单击消费组对应的查看,消费组详情页面将显示该参数,方便您识别区分不同的客户端。
connectionCount
启动AMQP客户端的连接数,最大不超过128个。用于实时消息推送的扩容。
消费组详情页面会以
${clientId}+"-"+数字
形式,显示连接的客户端。其中数字最小值为0。host
AMQP接入域名。
${YourHost}
对应的AMQP接入域名信息,请参见查看和配置实例终端节点信息(Endpoint)。在
pom.xml
文件中,已添加相关Maven依赖。在amqp-demo
根目录执行以下命令,重新加载Maven变更,构建项目。mvn clean package
在
amqp-demo/target
目录执行以下命令,运行生成的JAR包。java -jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
运行示例代码后返回如下信息,表示AMQP客户端已接入物联网平台并成功接收消息。
重要只有当AMQP客户端在线时,才能在服务器上收到设备消息。
10:42:43.254 [main] INFO com.aliyun.iotx.demo.AmqpClient - amqp demo is started successfully, and will exit after 60s 10:59:46.405 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Dispatching received message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.409 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1:1 } ackType: DELIVERED (5) 10:59:46.432 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Delivered Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.441 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } ackType: ACCEPTED (6) 10:59:46.442 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Accepted Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.452 [pool-1-thread-1] INFO com.aliyun.iotx.demo.AmqpClient - receive message, topic = /g18******/device01/thing/event/property/post, messageId = 1731508564705******, content = {"temperature":10,"humidity":56}
在相应消费组显示在线的AMQP客户端。
amqp-demo
中connectionCount = 4
代表4个客户端。