ProtoBuf数据解析示例

本文介绍通过数据解析任务解析并存储设备上报的ProtoBuf格式数据的完整流程。

前提条件

  • 已创建产品和设备,获取设备证书(ProductKey、DeviceName和DeviceSecret)。具体操作,请参见创建产品单个创建设备

  • 已完成开发环境安装与配置。

    本文示例使用Python Link SDK开发设备上报ProtoBuf格式数据,使用Linux操作系统(Ubuntu 20.04 64-bit)作为开发环境。Python安装以及SDK配置,请参见环境要求与配置

说明

本示例操作步骤以普通用户权限为例。如果您在操作过程中涉及到管理员权限才能执行的操作,可尝试使用sudo命令执行。

安装ProtoBuf

  1. 登录已准备的开发环境。

  2. 执行以下命令,安装ProtoBuf工具。

    pip install protobuf
  3. 依此执行以下命令,下载ProtoBuf编译器文件包,并解压缩。

    说明

    编译器文件包版本需要与安装的ProtoBuf版本一致。您可根据上一步安装返回的ProtoBuf版本号,查找对应版本的编译器文件包。具体信息,请参见ProtoBuf版本

    wget https://github.com/protocolbuffers/protobuf/releases/download/v24.2/protoc-24.2-linux-x86_64.zip
    unzip protoc-24.2-linux-x86_64.zip

    解压文件如下:

    image.png
  4. 执行以下命令查看PIP安装路径。

    which pip

    本示例返回:

    /usr/bin/pip
  5. 执行以下命令,将bin/protoc文件复制到PIP安装路径目录下。

    cp bin/protoc /usr/bin
  6. 执行以下命令,返回如图所示信息,表示编译器安装成功。

    protoc
    image.png

为示例数据生成.desc和.data文件

本示例以设备通过自定义Topic上报ProtoBuf格式数据为例,通过.desc文件读取对应的二进制数据.data文件,最终解析输出JSON格式数据,以便数据解析任务后续处理和存储。

本示例解析的JSON格式数据如下:

说明

数据解析任务不支持解析数组类型数据。

{
  "pname": "test",
  "pid": 1234,
  "pemail": "test@example.com"
}

在Linux操作系统中,使用已安装的ProtoBuf,生成对应文件。

  1. 根据Proto协议编写.proto文件,解析JSON数据示例中字段。

    1. 执行以下命令,创建并打开book.proto文件。

      vim book.proto
    2. 输入以下内容后,保存并退出。

      syntax = "proto3";
      
      package tutorial;
      
      message Person {
        optional string pname = 1;
        optional int32 pid = 2;
        optional string pemail = 3;
      }
      
      message AddressBook {
        repeated Person people = 1;
      }
  2. 执行以下命令,生成对应的book.desc文件。

    protoc  --descriptor_set_out=./book.desc ./book.proto
  3. 执行以下命令生成一个可调用的Python类文件book_pb2.py

    protoc --python_out=.  book.proto
  4. 根据定义的book.proto文件创建文件person.py,使用person.SerializeToString()将数据序列化转化为二进制数据,并写进person.data文件。

    1. 执行以下命令,创建并打开person.py文件。

      vim person.py
    2. 输入以下内容后,保存并退出。

      import book_pb2
      person = book_pb2.Person()
      person.pid = 1234
      person.pname = "test"
      person.pemail = "test@example.com"
      with open('person.data', "wb") as f:
          f.write(person.SerializeToString())  
         
    3. 执行以下命令,运行person.py,生成二进制数据文件person.data

      python3 person.py
  5. 将生成的book.descperson.data下载到本地保存。

配置数据解析任务

本示例将设备上报的ProtoBuf格式数据进行解析后存储到自定义存储表。

  1. 物联网平台控制台实例概览页面,单击目标企业版实例卡片

  2. 创建自定义存储表表显示名表标识符设置为testProtoc,其他设置使用默认值或自定义。

  3. 创建数据解析任务:任务名称设置为testProtoc

  4. 配置源节点

    1. 数据源类型选择IoT实例TopicTopic类型选择自定义Topic,单击下一步

      image.png
    2. Topic数据格式选择ProtoBuf,上传已生成保存的book.descperson.data文件,单击校验解析

      解析成功,可在解析预览中查看数据。

      image.png
    3. 单击保存

  5. 配置数据过滤:如下图所示,当pid值等于1234时,输出源节点中字段数据。

    image.png
  6. 配置目标节点:将输出数据存储到自定义存储表testProtoc

    image.png
  7. 启动数据解析任务

开发设备接入并上报数据

返回Linux操作系统,开发设备接入物联网平台,并上报ProtoBuf格式数据。

  1. 执行以下命令,创建并打开设备程序文件mqtt_pub.py

    cd ~
    vim mqtt_pub.py
  2. 输入以下代码后,保存并退出。

    说明

    代码中product_keydevice_namedevice_secretlk.publish_topic/gs*****/device1/user/update中ProductKey值,都需替换为设备证书真实值。

    import sys
    from linkkit import linkkit
    import logging
    import os.path
    
    # config log
    __log_format = '%(asctime)s-%(process)d-%(thread)d - %(name)s:%(module)s:%(funcName)s - %(levelname)s - %(message)s'
    logging.basicConfig(format=__log_format)
    
    lk = linkkit.LinkKit(
        host_name="cn-shanghai",
        product_key="gs*****",
        device_name="device1",
        device_secret="06aec**************728f56"
         )
    lk.enable_logger(logging.DEBUG)
    
    
    def on_device_dynamic_register(rc, value, userdata):
        if rc == 0:
            print("dynamic register device success, value:" + value)
        else:
            print("dynamic register device fail, message:" + value)
    
    
    def on_connect(session_flag, rc, userdata):
        print("on_connect:%d,rc:%d" % (session_flag, rc))
        pass
    
    
    def on_disconnect(rc, userdata):
        print("on_disconnect:rc:%d,userdata:" % rc)
    
    
    def on_topic_message(topic, payload, qos, userdata):
        print("on_topic_message:" + topic + " payload:" + str(payload) + " qos:" + str(qos))
        pass
    
    
    def on_subscribe_topic(mid, granted_qos, userdata):
        print("on_subscribe_topic mid:%d, granted_qos:%s" %
              (mid, str(','.join('%s' % it for it in granted_qos))))
        pass
    
    
    def on_unsubscribe_topic(mid, userdata):
        print("on_unsubscribe_topic mid:%d" % mid)
        pass
    
    
    def on_publish_topic(mid, userdata):
        print("on_publish_topic mid:%d" % mid)
    
    def read_into_buffer(filename):
        buf = bytearray(os.path.getsize(filename))
        with open(filename, 'rb') as f:
            f.readinto(buf)
        return buf
    
                                  
    lk.on_device_dynamic_register = on_device_dynamic_register
    lk.on_connect = on_connect
    lk.on_disconnect = on_disconnect
    lk.on_topic_message = on_topic_message
    lk.on_subscribe_topic = on_subscribe_topic
    lk.on_unsubscribe_topic = on_unsubscribe_topic
    lk.on_publish_topic = on_publish_topic
    
    
    lk.config_device_info("Eth|03ACDEFF0032|Eth|03ACDEFF0031")
    lk.connect_async()
    lk.start_worker_loop()
    buf = read_into_buffer('person.data')
    print(buf)
    
    
    while True:
        try:
            msg = input()
        except KeyboardInterrupt:
            sys.exit()
        else:
            if msg == "1":
                lk.disconnect()
            elif msg == "2":
                lk.connect_async()
            elif msg == "3":
                rc, mid = lk.subscribe_topic([(lk.to_full_topic("user/receive/from/jiexi"), 1)])
               
                if rc == 0:
                    print("subscribe multiple topics success:%r, mid:%r" % (rc, mid))
                else:
                    print("subscribe multiple topics fail:%d" % rc)
            elif msg == "4":
                rc, mid = lk.publish_topic("/gs*****/device1/user/update", buf)
                if rc == 0:
                    print("publish topic success:%r, mid:%r" % (rc, mid))
                else:
                    print("publish topic fail:%d" % rc)
            else:
                sys.exit()
                
  3. 执行以下命令,确认激活VirtualEnvironments。

    cd work_dir
    source test_env/bin/activate
  4. 执行以下命令,运行设备程序。

    cd ~
    python3 mqtt_pub.py

    设备读取到的数据如下:

    bytearray(b'\n\x04test\x10\xd2\t\x1a\x10test@example.com')
  5. 输入4,触发设备上报消息。

    4
    2023-09-01 11:26:46,149-****** - Paho:client:_easy_log - DEBUG - Sending PUBLISH (d0, q1, r0, m1), 'b'/gs*****/device1/user/update'', ... (43 bytes)
    publish topic success:0, mid:1
    2023-09-01 11:26:46,172-****** - Paho:client:_easy_log - DEBUG - Received PUBACK (Mid: 1)
    2023-09-01 11:26:46,172-****** - linkkit:linkkit:debug - DEBUG - post_message :'on_publish' 
    2023-09-01 11:26:46,172-****** - linkkit:linkkit:debug - DEBUG - post_message success
    2023-09-01 11:26:46,173-****** - linkkit:linkkit:debug - DEBUG - thread runnable pop cmd:'on_publish'
    2023-09-01 11:26:46,173-****** - linkkit:linkkit:debug - DEBUG - __on_internal_publish message:1
    on_publish_topic mid:1

查看数据解析结果

  1. 返回物联网平台控制台实例下,在监控运维 > 日志服务页面的云端运行日志页签,可查看到设备上线并上报数据。

    image.png
  2. 数据服务 > 数据存储页面的离线存储 > 自定义存储表页签,单击testProtoc表右侧操作列的数据预览,可查看到数据解析任务存储的数据

    image.png