文档

示例二:MQTT数据流模式文件上传

更新时间:

MQTT数据流模式适用于未知文件大小的上传,例如边采集边上传的录音文件,无法确定停止时文件的具体大小,可以通过MQTT数据流模式来上传文件。

操作步骤

  1. 初始化MQTT模块。

  2. 初始化MQTT Upload模块,在mqtt_upload_stream_send_demo.c文件中设置需要上传文件的信息。

        /* 增加MQTT UPLOAD 组件的初始化,创建upload会话实例 */
        void *up_handle = aiot_mqtt_upload_init();
     /* 把Upload会话和MQTT会话关联起来 */
        aiot_mqtt_upload_setopt(up_handle, AIOT_MQTT_UPLOADOPT_MQTT_HANDLE, mqtt_handle);
     /* (可选)设置每包数据超时重发的超时时间,默认是5s超时重试 */
        uint32_t rsp_timeout = 10000;
        aiot_mqtt_upload_setopt(up_handle, AIOT_MQTT_UPLOADOPT_RSP_TIMEOUT_MS, &rsp_timeout);
     /* (可选)设置每包数据超时重试的次数,默认是超时重试次数是10次 */
        uint32_t rety_count = 5;
        aiot_mqtt_upload_setopt(up_handle, AIOT_MQTT_UPLOADOPT_RETRY_COUNT, &rety_count);
     /* (可选)设置每包数据发送的大小,默认是2KB,最小不能小于256Byte,最大不能大于128KB */
        uint32_t block_size = 256;
        aiot_mqtt_upload_setopt(up_handle, AIOT_MQTT_UPLOADOPT_DEFAULT_BLOCK_SIZE, &block_size);
    
        /* 配置MQTT需要上传的文件信息,未知文件大小,文件长度设置为-1,回调函数read_data_handler设置为NULL,
           使用aiot_mqtt_upload_send_data接口进行数据发送 */
     /* 当MQTT上传的文件长度为-1时,您可以上传任意大小不超过16M的文件,上传过程中您可以随时将is_commplete参数设置为1,结束并完成文件上传 */
        uint32_t file_size = -1;
        aiot_mqtt_upload_file_opt_t file_option = {
            .file_name = MQTT_UPLOAD_SINGLE_FILE_NAME,
            .file_size = file_size,
            .mode = AIOT_MQTT_UPLOAD_FILE_MODE_OVERWRITE,
            .digest = NULL,
            .read_data_handler = NULL,
            .userdata = NULL
        };
        aiot_mqtt_upload_setopt(up_handle, AIOT_MQTT_UPLOADOPT_FILE_OPTION, &file_option);
  3. 请求文件上传。

    请求文件上传aiot_mqtt_upload_open_stream接口是同步接口,您需要等待应答信息后才能开始进行上传。

        aiot_mqtt_upload_recv_t recv_packet = {0};
        res = aiot_mqtt_upload_open_stream(up_handle, MQTT_UPLOAD_SINGLE_FILE_NAME, &recv_packet);
        if (res < STATE_SUCCESS) {
            goto exit;
        }
  4. 上传分片文件。

    您主动调用aiot_mqtt_upload_send_data接口,进行分片数据的上传。

    is_commplete参数是通知物联网平台是否为文件上传的最后一包数据,所以在上传的过程中该参数需要设置为0,上传最后一包数据时is_commplete参数需设置为1。

    if ((upload_file_len + data_len) < MQTT_UPLOAD_SINGLE_FILE_SIZE) {
        /* 发送分包数据 */
        data_len = block_size;
        is_commplete = 0;
    } else {
        /* 发送最后一包文件内容 */
        data_len = MQTT_UPLOAD_SINGLE_FILE_SIZE - upload_file_len;
        is_commplete = 1;
    }
    
    res = aiot_mqtt_upload_send_data(up_handle, MQTT_UPLOAD_SINGLE_FILE_NAME,            &g_upload_single_file_buf[upload_file_len], data_len, is_commplete);
    if (res == STATE_SUCCESS) {
        upload_file_len += data_len;
        printf("upload_file_len:%d\r\n", upload_file_len);
    } 
  5. 结束所有上传文件后销毁会话句柄。

    /* 等待所有文件上传完成后,才可销毁MQTT UPload 会话实例 */
    aiot_mqtt_upload_deinit(&up_handle);

日志说明

  1. MQTT连接成功。

    [1637138922.457][LK-0313] MQTT user calls aiot_mqtt_connect api, connect
    [1637138922.457][LK-032A] mqtt host: iot-*******.iot-as-mqtt.unify.aliyuncs.com
    [1637138922.457][LK-0317] user name: mqtt*******************Dih
    [1637138922.457][LK-0318] password: 2AE57DEEAADE00***********44BA8F5622507D1EFD6CB7687
    success to establish tcp, fd=5
    local port: 57**9
    [1637138922.488][LK-1000] establish mbedtls connection with server(host='io***********t-as-mqtt.unify.aliyuncs.com', port=[1883])
    [1637138922.507][LK-1000] success to establish mbedtls connection, (cost 45247 bytes in total, max used 47983 bytes)
    [1637138922.654][LK-0313] MQTT connect success in 197 ms
    AIOT_MQTTEVT_CONNECT
  2. MQTT UPLOAD创建会话并进行subtopic订阅。

    [1637138922.654][LK-2013] aiot_mqtt_upload_init init
    [1637138922.654][LK-0309] sub: /sys/a1*****bDih/mqttupload0001/thing/file/upload/mqtt/init_reply
    [1637138922.654][LK-0309] sub: /sys/a1*****bDih/mqttupload0001/thing/file/upload/mqtt/send_reply
    [1637138922.654][LK-0309] sub: /sys/a1*****bDih/mqttupload0001/thing/file/upload/mqtt/cancel_reply
  3. 请求物联网平台进行文件上传。

    Demo演示请求上传文件,文件长度:-1,文件名:mqttuploadfile.txt,上传模式覆盖模式:overwrite。

    [LK-030A] > 7B 22 69 64 22 3A 22 31  22 2C 22 70 61 72 61 6D | {"id":"1","param
    [LK-030A] > 73 22 3A 7B 22 66 69 6C  65 4E 61 6D 65 22 3A 22 | s":{"fileName":"
    [LK-030A] > 6D 71 74 74 75 70 6C 6F  61 64 66 69 6C 65 2E 74 | mqttuploadfile.t
    [LK-030A] > 78 74 22 2C 22 66 69 6C  65 53 69 7A 65 22 3A 2D | xt","fileSize":-
    [LK-030A] > 31 2C 22 63 6F 6E 66 6C  69 63 74 53 74 72 61 74 | 1,"conflictStart
    [LK-030A] > 65 67 79 22 3A 22 6F 76  65 72 77 72 69 74 65 22 | egy":"overwrite"
    [LK-030A] > 2C 22 69 6E 69 74 55 69  64 22 3A 22 31 36 34 34 | ,"initUid":"1644
    [LK-030A] > 32 31 39 33 37 33 33 33  39 37 35 22 7D 7D       | 2193********5"}}
    
    heartbeat response
  4. 收到物联网平台应答消息。

    [1644219373.441][LK-0309] pub: /sys/a1P*****Dih/mqttupload0002/thing/file/upload/mqtt/init_reply
    
    [LK-030A] < 7B 22 63 6F 64 65 22 3A  32 30 30 2C 22 64 61 74 | {"code":200,"dat
    [LK-030A] < 61 22 3A 7B 22 66 69 6C  65 4E 61 6D 65 22 3A 22 | a":{"fileName":"
    [LK-030A] < 6D 71 74 74 75 70 6C 6F  61 64 66 69 6C 65 2E 74 | mqttuploadfile.t
    [LK-030A] < 78 74 22 2C 22 75 70 6C  6F 61 64 49 64 22 3A 22 | xt","uploadId":"
    [LK-030A] < 64 4E 48 79 33 35 4F 55  4A 66 64 4A 49 31 6B 31 | dNH*********dJI1k1
    [LK-030A] < 6D 31 62 55 30 31 30 32  30 30 22 2C 22 6F 66 66 | m1bU010200","off
    [LK-030A] < 73 65 74 22 3A 30 2C 22  66 69 6C 65 53 69 7A 65 | set":0,"fileSize
    [LK-030A] < 22 3A 2D 31 2C 22 63 6F  6E 66 6C 69 63 74 53 74 | ":-1,"conflictSt
    [LK-030A] < 72 61 74 65 67 79 22 3A  22 6F 76 65 72 77 72 69 | rategy":"overwri
    [LK-030A] < 74 65 22 7D 2C 22 69 64  22 3A 22 31 22 2C 22 6D | te"},"id":"1","m
    [LK-030A] < 65 73 73 61 67 65 22 3A  22 73 75 63 63 65 73 73 | essage":"success
    [LK-030A] < 22 7D                                            | "}
    
    [1644219373.441][LK-210B] Recv file name:mqttuploadfile.txt
  5. 分送分片文件。

    循环调用aiot_mqtt_upload_send_data函数进行数据上传,该函数是同步发送接口,等待应答STATE_SUCCESS后才能发起上传新的一包数据。

    [1644219373.462][LK-0309] pub: /sys/a1*****Dih/mqttupload0002/thing/file/upload/mqtt/send
    
    [LK-030A] > 00 67 7B 22 69 64 22 3A  22 32 22 2C 22 70 61 72 | .g{"id":"2","par
    [LK-030A] > 61 6D 73 22 3A 7B 22 75  70 6C 6F 61 64 49 64 22 | ams":{"uploadId"
    [LK-030A] > 3A 22 64 4E 48 79 33 35  4F 55 4A 66 64 4A 49 31 | :"dN*****(fdJI1
    [LK-030A] > 6B 31 6D 31 62 55 30 31  30 32 30 30 22 2C 22 6F | k1m1bU010200","o
    [LK-030A] > 66 66 73 65 74 22 3A 30  2C 22 62 53 69 7A 65 22 | ffset":0,"bSize"
    [LK-030A] > 3A 32 35 36 2C 22 69 73  43 6F 6D 70 6C 65 74 65 | :256,"isComplete
    [LK-030A] > 22 3A 66 61 6C 73 65 7D  7D 00 00 00 00 00 00 00 | ":false}}.......
    [LK-030A] > 00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 00 | ................
    [LK-030A] > 00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 00 | ................
    [LK-030A] > 00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 00 | ................
    [LK-030A] > 00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 00 | ................
    ......
    ......
    ......
  6. 结束文件上传。

    上传最后一包数据时,您需要将isComplete字段设置为1,通知物联网平台这是最后一包上传数据,完成文件上传。

    [1644219373.734][LK-0309] pub: /sys/a1*****Dih/mqttupload0002/thing/file/upload/mqtt/send
    
    [LK-030A] > 00 68 7B 22 69 64 22 3A  22 35 22 2C 22 70 61 72 | .h{"id":"5","par
    [LK-030A] > 61 6D 73 22 3A 7B 22 75  70 6C 6F 61 64 49 64 22 | ams":{"uploadId"
    [LK-030A] > 3A 22 64 4E 48 79 33 35  4F 55 4A 66 64 4A 49 31 | :"dN*******dJI1
    [LK-030A] > 6B 31 6D 31 62 55 30 31  30 32 30 30 22 2C 22 6F | k1m1bU010200","o
    [LK-030A] > 66 66 73 65 74 22 3A 37  36 38 2C 22 62 53 69 7A | ffset":768,"bSiz
    [LK-030A] > 65 22 3A 32 35 36 2C 22  69 73 43 6F 6D 70 6C 65 | e":256,"isComple
    [LK-030A] > 74 65 22 3A 74 72 75 65  7D 7D 00 00 00 00 00 00 | te":true}}......
    [LK-030A] > 00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 00 | ................
    ......
    ......
    ......
    
    [1644219373.822][LK-0309] pub: /sys/a1*******Dih/mqttupload0002/thing/file/upload/mqtt/send_reply
    
    [LK-030A] < 7B 22 63 6F 64 65 22 3A  32 30 30 2C 22 64 61 74 | {"code":200,"dat
    [LK-030A] < 61 22 3A 7B 22 66 69 63  4D 6F 64 65 22 3A 22 63 | a":{"ficMode":"c
    [LK-030A] < 72 63 36 34 22 2C 22 75  70 6C 6F 61 64 49 64 22 | rc64","uploadId"
    [LK-030A] < 3A 22 64 4E 48 79 33 35  4F 55 4A 66 64 4A 49 31 | :"dNHy*******JI1
    [LK-030A] < 6B 31 6D 31 62 55 30 31  30 32 30 30 22 2C 22 6F | k1m1bU010200","o
    [LK-030A] < 66 66 73 65 74 22 3A 37  36 38 2C 22 63 6F 6D 70 | ffset":768,"comp
    [LK-030A] < 6C 65 74 65 22 3A 74 72  75 65 2C 22 66 69 63 56 | lete":true,"ficV
    [LK-030A] < 61 6C 75 65 53 65 72 76  65 72 22 3A 22 63 33 37 | alueServer":"c37
    [LK-030A] < 38 36 33 39 37 32 30 36  39 32 37 30 63 22 2C 22 | 863972069270c","
    [LK-030A] < 62 53 69 7A 65 22 3A 32  35 36 7D 2C 22 69 64 22 | bSize":256},"id"
    [LK-030A] < 3A 22 35 22 2C 22 6D 65  73 73 61 67 65 22 3A 22 | :"5","message":"
    [LK-030A] < 73 75 63 63 65 73 73 22  7D                      | success"}
    
    [1644219373.822][LK-2125] Complete upload total size:1024
    upload_file_len:1024
    [1644219373.830][LK-2100] Finish upload,Deleted upload task.MQTT Upload file(mqttuploadfile.txt) uploadid(dNHy35OU*********1bU010200) success.
  7. 上传完成后销毁句柄释放资源。

    [1644219378.827][LK-210F] aiot_mqtt_upload_deinit
    [1644219378.827][LK-0309] unsub: /sys/a1P8*****ih/mqttupload0002/thing/file/upload/mqtt/init_reply
    [1644219378.827][LK-0309] unsub: /sys/a1P8*****ih/mqttupload0002/thing/file/upload/mqtt/send_reply
    [1644219378.827][LK-0309] unsub: /sys/a1P8*****ih/mqttupload0002/thing/file/upload/mqtt/cancel_reply
    [1644219378.827][LK-1000] adapter_network_deinit
  • 本页导读 (1)
文档反馈