文档

数据转发到AMQP服务端订阅消费组消费

更新时间:

直接使用AMQP服务端订阅实时获取设备数据,只能从产品维度获取所有设备的数据,使用消息转发的云产品流转功能,可以将全部设备或指定设备发送到物联网平台的消息,先经过解析脚本处理和过滤,再转发到AMQP服务端订阅消费组,通过AMQP客户端消费。本文以物模型数据上报Topic为例,介绍流转消息数据的完整流程。

工作原理

image

云产品流转可将同一产品所有设备或指定设备的指定Topic消息,实时转发到一个或多个消费组中,每个消费组中包括多个消费者即AMQP客户端。每条消息转发到消费组时,消费组中随机一个消费者收到消息,不同消费组通过消费组ID区分。在上图中:

  1. 消费者:使用AMQP SDK注册消费组的消费者,用于接收物联网平台转发到AMQP消费组中的消息。

  2. 物联网平台:配置云产品流转数据源Topic数据目的解析器脚本,并启动解析器,将设备消息转发到AMQP服务端订阅的消费组。

  3. AMQP客户端:启动解析器后,物联网平台会自动将设备消息转发到AMQP客户端,不需要在AMQP客户端代码中订阅Topic,只要AMQP客户端在线就可以接收消息。

说明

云产品流转不支持从AMQP客户端下发消息给设备,如果需要下发指令给设备,请调用消息通信API

应用场景

业务服务器接收设备消息:云产品流转可以灵活地转发设备消息到AMQP服务端订阅的消息组。

  • 转发指定设备的消息。

  • 转发指定Topic的消息。

  • 消息过滤或处理后再转发。

AMQP客户端直接实时获取指定产品下所有设备的消息,可以直接配置AMQP服务端订阅。具体内容,请参见配置AMQP服务端订阅

前提条件

使用限制

  • AMQP客户端建立连接之后,需要立刻发送认证请求。如果15秒内没有认证成功,服务器会主动关闭连接。

  • AMQP客户端的一个连接限流1,000 TPS,消息转发TPS限流由实例的消息转发TPS规格决定,消息大小无限制。更多的AMQP服务端订阅限制,请参见服务端订阅使用限制

步骤一:在物联网平台配置数据目的和解析器

step1:配置数据目的

  1. 登录物联网平台控制台

  2. 实例概览页签的全部环境下,找到对应的实例,单击实例卡片。

  3. 在左侧导航栏,选择消息转发 > 云产品流转

  4. 云产品流转页面,单击右上角体验新版,进入新版功能页面。

    说明

    如果您已执行过此操作,再次进入云产品流转页面,会直接进入新版功能页面。

  5. 单击数据目的页签,然后单击创建数据目的

  6. 创建数据目的对话框,输入数据目的名称,例如DataPurpose,按照以下参数说明,完成配置,然后单击确定

    选择操作

    参数

    描述

    选择操作

    选择发布到AMQP服务端订阅消费组

    消费组

    选择一个已创建的消费组作为数据转发目标。单击创建消费组可以进行消费组创建。

step2:配置并启动解析器

  1. 创建解析器,例如DataParser。具体操作,请参见创建解析器

  2. 解析器详情页面,关联数据源。

    1. 在配置向导的数据源下,单击关联数据源

    2. 在弹出的对话框中,单击数据源下拉列表,选择已创建的数据源DataSource,单击确定

  3. 解析器详情页面,关联数据目的。

    1. 单击配置向导的数据目的,然后单击数据目的列表右上方的关联数据目的

    2. 在弹出的对话框中,单击数据目的下拉列表,选择已创建的数据目的DataPurpose,单击确定

    3. 在数据目的列表,查看并保存数据目的ID,例如为1000

      后续解析脚本中,需使用此处的数据目的ID

  4. 解析器详情页面,单击解析器

  5. 在脚本输入框,输入解析脚本。

    解析脚本类似JavaScript语言,编辑脚本的语法参考JavaScript语法,详细编辑方法,请参见脚本语法

    转发数据到AMQP服务端订阅消息组,需要使用函数writeAmqp(destinationId, payload, tag)函数参数说明,请参见函数列表

    • 无需指定设备,转发产品下全部设备的数据:

      //通过payload函数,获取设备上报的消息内容,并按照JSON格式转换。
      var data = payload("json");
      //直接流转物模型上报数据。
      writeAmqp(1000, data, "调试");
    • 指定某一个设备,仅转发该设备的消息:

      //通过payload函数,获取设备上报的消息内容,并按照JSON格式转换。
      var data = payload("json");
      //获取上报消息的设备名称。
      var dn = deviceName();
      //流转指定设备的物模型上报数据。
      if (dn == 'device01') { 
          writeAmqp(1000, data, "调试");  
      }
  6. 单击调试,根据页面提示,选择产品和设备,输入Topic和Payload数据,验证脚本可执行。

    参数示例如下:调试示例

    运行结果如下,表示脚本执行成功。

    调试结果

  7. 单击发布

  8. 回到云产品流转页面的解析器页签,单击解析器DataParser对应的启动按钮,启动解析器。

步骤二:运行AMQP客户端

重要
  1. 登录ECS实例。登录方式,请参见连接方式概述

  2. 执行以下命令,下载Demo文件。

    wget https://linkkit-export.oss-cn-shanghai.aliyuncs.com/amqp/amqp-demo.zip
  3. 执行以下命令,解压demo文件。

    unzip amqp-demo.zip
  4. src/main/java/com.aliyun.iotx.demo目录下AmqpClient.java文件中,参照下表修改AMQP的接入信息。

    重要

    本示例Demo代码中,添加了结束程序的代码(Thread.sleep(60 * 1000);),即程序启动成功,运行一分钟后会结束。实际场景中,您可根据需要自行设置运行时间。

    参数

    说明

    accessKey

    阿里云主账号或RAM用户的AccessKey ID和AccessKey Secret。

    登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。

    重要

    为避免将AccessKey硬编码到业务代码中带来的安全风险,可采用配置环境变量的方法管理AccessKey。

    您需在本地操作系统中添加环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET,并分别写入已准备好的AccessKey ID和AccessKey Secret。

    在示例代码中可通过以下方法获取:

    • System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")

    • System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")

    accessSecret

    consumerGroupId

    当前物联网平台对应实例中的消费组ID。

    登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表查看您的消费组ID。

    iotInstanceId

    实例ID。您可在物联网平台控制台实例概览页面,查看当前实例的ID。

    • 若有ID值,必须传入该ID值。

    • 若无实例概览页面或ID值,传入空值,即iotInstanceId = ""

    clientId

    表示客户端ID,用户自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。

    AMQP客户端接入并启动成功后,登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表页签,单击消费组对应的查看消费组详情页面将显示该参数,方便您识别区分不同的客户端。

    connectionCount

    启动AMQP客户端的连接数,最大不超过128个。用于实时消息推送的扩容。

    消费组详情页面会以${clientId}+"-"+数字形式,显示连接的客户端。其中数字最小值为0。

    host

    AMQP接入域名。

    ${YourHost}对应的AMQP接入域名信息,请参见查看和配置实例终端节点信息(Endpoint)

  5. pom.xml文件中,已添加相关Maven依赖。在amqp-demo根目录执行以下命令,重新加载Maven变更,构建项目。

    mvn clean package
  6. amqp-demo/target目录执行以下命令,运行生成的JAR包。

    java -jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
  7. 运行示例代码后返回如下信息,表示AMQP客户端已接入物联网平台并成功接收消息。

    重要

    只有当AMQP客户端在线时,才能在服务器上收到设备消息。

    10:42:43.254 [main] INFO com.aliyun.iotx.demo.AmqpClient - amqp demo is started successfully, and will exit after 60s 
    10:59:46.405 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Dispatching received message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.409 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1:1 } ackType: DELIVERED (5)
    10:59:46.432 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Delivered Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.441 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } ackType: ACCEPTED (6)
    10:59:46.442 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Accepted Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.452 [pool-1-thread-1] INFO com.aliyun.iotx.demo.AmqpClient - receive message,
     topic = /g18******/device01/thing/event/property/post,
     messageId = 1731508564705******,
     content = {"temperature":10,"humidity":56}

    在相应消费组显示在线的AMQP客户端。amqp-democonnectionCount = 4代表4个客户端。

    image.png

后续操作

  1. 如果AMQP客户端不在线,AMQP服务端订阅消息会堆积,AMQP客户端重新上线后,物联网平台重新推送消息。如果不需要消费堆积的消息,可在AMQP客户端上线前,清空堆积的消息

  2. 所有配置完成,设备上报订阅数据并被AMQP客户端接收后,您可以登录物联网平台控制台,进入对应实例查看消息运行日志。

    • 监控运维 > 日志服务 > 云端运行日志页签,查看设备上报数据、物联网平台转发数据到AMQP客户端和AMQP客户端返回ACK的日志记录。具体操作,请参见查询云端运行日志

    • 消息转发 > 服务端订阅 > 消费组列表页签,单击目标消费组右侧操作列的查看,在消费组详情页面,查看消息消费速率、消息堆积量、消费日志等。具体操作,请参见查看和监控消费组

常见问题