您可以使用规则引擎,将物联网平台数据转发到消息队列(Kafka)中存储,从而实现消息从设备、物联网平台、Kafka到应用服务器之间的全链路高可靠传输能力。本文以物模型数据上报Topic为例,介绍流转消息数据的完整流程。
前提条件
背景信息
转发的数据目的配置完成后,会自动完成以下配置,实现设备数据通过物联网平台的规则引擎转发到消息队列(Kafka)。
- 物联网平台占用Kafka实例所在虚拟交换机的2个IP地址。
- 在Kafka实例所在的VPC网络下创建托管安全组,安全组名称默认以sg-nsm-开头。
创建数据目的
配置并启动解析器
- 创建解析器,例如DataParser。具体操作,请参见创建解析器。
- 在解析器详情页面,关联数据源。
- 在配置向导的数据源下,单击关联数据源。
- 在弹出的对话框中,单击数据源下拉列表,选择已创建的数据源DataSource,单击确定。
- 在解析器详情页面,关联数据目的。
- 在解析器详情页面,单击解析器。
- 在脚本输入框,输入解析脚本。脚本编辑方法,请参见脚本示例。函数参数说明,请参见函数列表。
//通过payload函数,获取设备上报的消息内容,并按照JSON格式转换。 var data = payload("json"); //直接流转物模型上报数据。 writeKafka(1000, data, "调试");
- 单击调试,根据页面提示,选择产品和设备,输入Topic和Payload数据,验证脚本可执行。参数示例如下:
运行结果如下,表示脚本执行成功。
action: transmit to kafka[destinationId=1000], data:{"deviceType":"CustomCategory","iotId":"JCp9u***","requestId":"1626948228247","checkFailedData":{},"productKey":"a1o***","gmtCreate":1626948134445,"deviceName":"Device1","items":{"Temperature":{"time":1626948134319,"value":38},"Humidity":{"time":1626948134319,"value":25}}} variables: data : {"deviceType":"CustomCategory","iotId":"JCp9u***","requestId":"1626948228247","checkFailedData":{},"productKey":"a1o***","gmtCreate":1626948134445,"deviceName":"Device1","items":{"Temperature":{"time":1626948134319,"value":38},"Humidity":{"time":1626948134319,"value":25}}}
- 单击发布。
- 回到云产品流转页面的解析器页签,单击解析器DataParser对应的启动按钮,启动解析器。
- 在消息队列Kafka版控制台对应实例的Topic详情页面,查询流转的消息。