云产品流转
使用物联网平台规则引擎的数据流转功能,可将Topic中的数据消息转发至其他Topic或其他阿里云产品进行存储或处理。
同一组数据请勿同时开通云产品流转和服务端订阅两种订阅模式,避免消息干扰。
配置SQL
-
登录物联网平台控制台,单击实例名称,在对应实例下的左侧导航栏,选择消息转发 > 云产品流转。
在云产品流转页面,单击创建规则。
重要若当前页面显示新版功能,先单击右上角返回旧版,进入旧版功能页面,再单击创建规则。
-
填写以下参数后,单击确认。
规则名称设置为充电桩数据流转,数据格式选择JSON。
-
单击前往编辑,编辑数据流转规则。
-
在数据流转规则页单击编写SQL,编写处理消息字段的SQL。
本案例中按以下方式编写SQL,然后单击确认。
在编写SQL对话框中,设置字段为
productKey,deviceName,requestId,code,message,data,Topic依次选择物模型数据上报 > 充电桩 > 全部设备(+) >thing/downlink/reply/message。
调试SQL
-
编写SQL后,单击SQL调试。
-
在SQL调试对话框的调试参数页签下,输入用于调试数据,然后单击调试。
请根据Topic上报的数据格式,输入调试用的Payload数据。数据格式说明:
-
若Topic为自定义Topic,输入的Payload数据格式需与Topic上报的数据格式一致。
-
若Topic为系统Topic,请参见设备下行指令结果。
本案例中按以下方式调试SQL。
选择产品为充电桩,设备为device-1。在Payload 数据区域输入JSON格式的调试数据,包含
gmtCreate、productKey、deviceName、requestId(值为1234)、code(值为200)、message(值为success)、topic和data等字段。单击调试后,可查看调试结果。
调试结果页签显示返回的JSON数据,其中
code为200、message为success,表示SQL调试通过。 -
转发数据
您可以使用规则引擎将设备发送到物联网平台的消息,经过规则SQL处理后,可转发到多个云产品中。本案例转发消息到AMQP服务端订阅消费组,并通过AMQP客户端消费消息。
-
单击转发数据一栏的添加操作,设置数据转发目的地为AMQP服务端订阅消费组,然后单击确认。
其中消费组选择默认消费组。
-
返回云产品流转页面,单击充电桩数据流转右侧的启动。
启动数据流转后,您的服务器需通过AMQP客户端消费消息。
AMQP客户端开发示例如下所示。
请将示例代码中的Your_AccessKey_ID和Your_AccessKey_Secret的参数值替换为您自己的阿里云账号AccessKey ID和AccessKey Secret。
将光标定位到您的账号头像上,选择AccessKey管理,进入安全信息管理页,可创建或查看您的AccessKey。
将cn-shanghai替换为您物联网设备所属地域ID。地域ID说明请参见地域和可用区。
String accessKey = "Your_AccessKey_ID";
String accessSecret = "Your_AccessKey_Secret";
try {
DefaultProfile.addEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com");
} catch (Exception e) {
System.out.println("DefaultProfile exception");
}
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", accessKey, accessSecret);
DefaultAcsClient defaultAcsClient = new DefaultAcsClient(profile);
SetDevicePropertyRequest setDevicePropertyRequest = new SetDevicePropertyRequest();
// 若是企业版实例或新版公共实例,下行代码中传入真实实例ID,然后删除注释符号。
//createProductRequest.setIotInstanceId("iothub-test-xxx");
setDevicePropertyRequest.setProductKey(pk);
setDevicePropertyRequest.setDeviceName(dn);
Map<String, Integer> properties = new HashMap<>();
// key为物模型中属性标识符"acOutMeterIty",value需要遵循属性值规范:int类型,取值范围在0~200之间。
properties.put("acOutMeterIty", 98);
setDevicePropertyRequest.setItems(JSON.toJSONString(properties));
SetDevicePropertyResponse response = null;
try {
response = defaultAcsClient.getAcsResponse(setDevicePropertyRequest);
} catch (Exception e) {
Log.error("执行失败:e:" + e.getMessage());
}
System.out.println("===============");
System.out.println("setDeviceProperty request : " + JSON.toJSONString(setDevicePropertyRequest));
System.out.println("setDeviceProperty response : " + JSON.toJSONString(response.getData()));
System.out.println("setDeviceProperty requestId : " + response.getRequestId());
System.out.println("===============");
通过云产品流转规则过滤后,获取到的数据如下所示。
AMQP receive message, topic = /xxx/xxx/+/thing/downlink/reply/message, messageId = xxx, content = {"code":200,"requestId":"xxx","productKey":"xxx","deviceName":"xxx"}