本文介绍通过数据解析任务解析并存储设备上报的ProtoBuf格式数据的完整流程。
前提条件
已创建产品和设备,获取设备证书(ProductKey、DeviceName和DeviceSecret)。具体操作,请参见创建产品和单个创建设备。
已完成开发环境安装与配置。
本文示例使用Python Link SDK开发设备上报ProtoBuf格式数据,使用Linux操作系统(Ubuntu 20.04 64-bit)作为开发环境。Python安装以及SDK配置,请参见环境要求与配置。
本示例操作步骤以普通用户权限为例。如果您在操作过程中涉及到管理员权限才能执行的操作,可尝试使用sudo
命令执行。
安装ProtoBuf
登录已准备的开发环境。
执行以下命令,安装ProtoBuf工具。
pip install protobuf
依此执行以下命令,下载ProtoBuf编译器文件包,并解压缩。
说明编译器文件包版本需要与安装的ProtoBuf版本一致。您可根据上一步安装返回的ProtoBuf版本号,查找对应版本的编译器文件包。具体信息,请参见ProtoBuf版本。
wget https://github.com/protocolbuffers/protobuf/releases/download/v24.2/protoc-24.2-linux-x86_64.zip unzip protoc-24.2-linux-x86_64.zip
解压文件如下:
执行以下命令查看PIP安装路径。
which pip
本示例返回:
/usr/bin/pip
执行以下命令,将
bin/protoc
文件复制到PIP安装路径目录下。cp bin/protoc /usr/bin
执行以下命令,返回如图所示信息,表示编译器安装成功。
protoc
为示例数据生成.desc和.data文件
本示例以设备通过自定义Topic上报ProtoBuf格式数据为例,通过.desc
文件读取对应的二进制数据.data
文件,最终解析输出JSON格式数据,以便数据解析任务后续处理和存储。
本示例解析的JSON格式数据如下:
数据解析任务不支持解析数组类型数据。
{
"pname": "test",
"pid": 1234,
"pemail": "test@example.com"
}
在Linux操作系统中,使用已安装的ProtoBuf,生成对应文件。
根据Proto协议编写
.proto
文件,解析JSON数据示例中字段。执行以下命令,创建并打开
book.proto
文件。vim book.proto
输入以下内容后,保存并退出。
syntax = "proto3"; package tutorial; message Person { optional string pname = 1; optional int32 pid = 2; optional string pemail = 3; } message AddressBook { repeated Person people = 1; }
执行以下命令,生成对应的
book.desc
文件。protoc --descriptor_set_out=./book.desc ./book.proto
执行以下命令生成一个可调用的Python类文件
book_pb2.py
。protoc --python_out=. book.proto
根据定义的
book.proto
文件创建文件person.py
,使用person.SerializeToString()
将数据序列化转化为二进制数据,并写进person.data
文件。执行以下命令,创建并打开
person.py
文件。vim person.py
输入以下内容后,保存并退出。
import book_pb2 person = book_pb2.Person() person.pid = 1234 person.pname = "test" person.pemail = "test@example.com" with open('person.data', "wb") as f: f.write(person.SerializeToString())
执行以下命令,运行
person.py
,生成二进制数据文件person.data
。python3 person.py
将生成的
book.desc
和person.data
下载到本地保存。
配置数据解析任务
本示例将设备上报的ProtoBuf格式数据进行解析后存储到自定义存储表。
开发设备接入并上报数据
返回Linux操作系统,开发设备接入物联网平台,并上报ProtoBuf格式数据。
执行以下命令,创建并打开设备程序文件
mqtt_pub.py
。cd ~ vim mqtt_pub.py
输入以下代码后,保存并退出。
说明代码中
product_key
、device_name
、device_secret
及lk.publish_topic
的/gs*****/device1/user/update
中ProductKey值,都需替换为设备证书真实值。import sys from linkkit import linkkit import logging import os.path # config log __log_format = '%(asctime)s-%(process)d-%(thread)d - %(name)s:%(module)s:%(funcName)s - %(levelname)s - %(message)s' logging.basicConfig(format=__log_format) lk = linkkit.LinkKit( host_name="cn-shanghai", product_key="gs*****", device_name="device1", device_secret="06aec**************728f56" ) lk.enable_logger(logging.DEBUG) def on_device_dynamic_register(rc, value, userdata): if rc == 0: print("dynamic register device success, value:" + value) else: print("dynamic register device fail, message:" + value) def on_connect(session_flag, rc, userdata): print("on_connect:%d,rc:%d" % (session_flag, rc)) pass def on_disconnect(rc, userdata): print("on_disconnect:rc:%d,userdata:" % rc) def on_topic_message(topic, payload, qos, userdata): print("on_topic_message:" + topic + " payload:" + str(payload) + " qos:" + str(qos)) pass def on_subscribe_topic(mid, granted_qos, userdata): print("on_subscribe_topic mid:%d, granted_qos:%s" % (mid, str(','.join('%s' % it for it in granted_qos)))) pass def on_unsubscribe_topic(mid, userdata): print("on_unsubscribe_topic mid:%d" % mid) pass def on_publish_topic(mid, userdata): print("on_publish_topic mid:%d" % mid) def read_into_buffer(filename): buf = bytearray(os.path.getsize(filename)) with open(filename, 'rb') as f: f.readinto(buf) return buf lk.on_device_dynamic_register = on_device_dynamic_register lk.on_connect = on_connect lk.on_disconnect = on_disconnect lk.on_topic_message = on_topic_message lk.on_subscribe_topic = on_subscribe_topic lk.on_unsubscribe_topic = on_unsubscribe_topic lk.on_publish_topic = on_publish_topic lk.config_device_info("Eth|03ACDEFF0032|Eth|03ACDEFF0031") lk.connect_async() lk.start_worker_loop() buf = read_into_buffer('person.data') print(buf) while True: try: msg = input() except KeyboardInterrupt: sys.exit() else: if msg == "1": lk.disconnect() elif msg == "2": lk.connect_async() elif msg == "3": rc, mid = lk.subscribe_topic([(lk.to_full_topic("user/receive/from/jiexi"), 1)]) if rc == 0: print("subscribe multiple topics success:%r, mid:%r" % (rc, mid)) else: print("subscribe multiple topics fail:%d" % rc) elif msg == "4": rc, mid = lk.publish_topic("/gs*****/device1/user/update", buf) if rc == 0: print("publish topic success:%r, mid:%r" % (rc, mid)) else: print("publish topic fail:%d" % rc) else: sys.exit()
执行以下命令,确认激活VirtualEnvironments。
cd work_dir source test_env/bin/activate
执行以下命令,运行设备程序。
cd ~ python3 mqtt_pub.py
设备读取到的数据如下:
bytearray(b'\n\x04test\x10\xd2\t\x1a\x10test@example.com')
输入
4
,触发设备上报消息。4 2023-09-01 11:26:46,149-****** - Paho:client:_easy_log - DEBUG - Sending PUBLISH (d0, q1, r0, m1), 'b'/gs*****/device1/user/update'', ... (43 bytes) publish topic success:0, mid:1 2023-09-01 11:26:46,172-****** - Paho:client:_easy_log - DEBUG - Received PUBACK (Mid: 1) 2023-09-01 11:26:46,172-****** - linkkit:linkkit:debug - DEBUG - post_message :'on_publish' 2023-09-01 11:26:46,172-****** - linkkit:linkkit:debug - DEBUG - post_message success 2023-09-01 11:26:46,173-****** - linkkit:linkkit:debug - DEBUG - thread runnable pop cmd:'on_publish' 2023-09-01 11:26:46,173-****** - linkkit:linkkit:debug - DEBUG - __on_internal_publish message:1 on_publish_topic mid:1
查看数据解析结果
返回物联网平台控制台实例下,在 页面的云端运行日志页签,可查看到设备上线并上报数据。
在 页面的 页签,单击testProtoc表右侧操作列的数据预览,可查看到数据解析任务存储的数据。