MQTT数据流模式适用于未知文件大小的上传,例如边采集边上传的录音文件,无法确定停止时文件的具体大小,可以通过MQTT数据流模式来上传文件。
操作步骤
初始化MQTT模块。
初始化MQTT Upload模块,在
mqtt_upload_stream_send_demo.c
文件中设置需要上传文件的信息。/* 增加MQTT UPLOAD 组件的初始化,创建upload会话实例 */ void *up_handle = aiot_mqtt_upload_init(); /* 把Upload会话和MQTT会话关联起来 */ aiot_mqtt_upload_setopt(up_handle, AIOT_MQTT_UPLOADOPT_MQTT_HANDLE, mqtt_handle); /* (可选)设置每包数据超时重发的超时时间,默认是5s超时重试 */ uint32_t rsp_timeout = 10000; aiot_mqtt_upload_setopt(up_handle, AIOT_MQTT_UPLOADOPT_RSP_TIMEOUT_MS, &rsp_timeout); /* (可选)设置每包数据超时重试的次数,默认是超时重试次数是10次 */ uint32_t rety_count = 5; aiot_mqtt_upload_setopt(up_handle, AIOT_MQTT_UPLOADOPT_RETRY_COUNT, &rety_count); /* (可选)设置每包数据发送的大小,默认是2KB,最小不能小于256Byte,最大不能大于128KB */ uint32_t block_size = 256; aiot_mqtt_upload_setopt(up_handle, AIOT_MQTT_UPLOADOPT_DEFAULT_BLOCK_SIZE, &block_size); /* 配置MQTT需要上传的文件信息,未知文件大小,文件长度设置为-1,回调函数read_data_handler设置为NULL, 使用aiot_mqtt_upload_send_data接口进行数据发送 */ /* 当MQTT上传的文件长度为-1时,您可以上传任意大小不超过16M的文件,上传过程中您可以随时将is_commplete参数设置为1,结束并完成文件上传 */ uint32_t file_size = -1; aiot_mqtt_upload_file_opt_t file_option = { .file_name = MQTT_UPLOAD_SINGLE_FILE_NAME, .file_size = file_size, .mode = AIOT_MQTT_UPLOAD_FILE_MODE_OVERWRITE, .digest = NULL, .read_data_handler = NULL, .userdata = NULL }; aiot_mqtt_upload_setopt(up_handle, AIOT_MQTT_UPLOADOPT_FILE_OPTION, &file_option);
请求文件上传。
请求文件上传aiot_mqtt_upload_open_stream接口是同步接口,您需要等待应答信息后才能开始进行上传。
aiot_mqtt_upload_recv_t recv_packet = {0}; res = aiot_mqtt_upload_open_stream(up_handle, MQTT_UPLOAD_SINGLE_FILE_NAME, &recv_packet); if (res < STATE_SUCCESS) { goto exit; }
上传分片文件。
您主动调用aiot_mqtt_upload_send_data接口,进行分片数据的上传。
is_commplete
参数是通知物联网平台是否为文件上传的最后一包数据,所以在上传的过程中该参数需要设置为0,上传最后一包数据时is_commplete
参数需设置为1。if ((upload_file_len + data_len) < MQTT_UPLOAD_SINGLE_FILE_SIZE) { /* 发送分包数据 */ data_len = block_size; is_commplete = 0; } else { /* 发送最后一包文件内容 */ data_len = MQTT_UPLOAD_SINGLE_FILE_SIZE - upload_file_len; is_commplete = 1; } res = aiot_mqtt_upload_send_data(up_handle, MQTT_UPLOAD_SINGLE_FILE_NAME, &g_upload_single_file_buf[upload_file_len], data_len, is_commplete); if (res == STATE_SUCCESS) { upload_file_len += data_len; printf("upload_file_len:%d\r\n", upload_file_len); }
结束所有上传文件后销毁会话句柄。
/* 等待所有文件上传完成后,才可销毁MQTT UPload 会话实例 */ aiot_mqtt_upload_deinit(&up_handle);
日志说明
MQTT连接成功。
[1637138922.457][LK-0313] MQTT user calls aiot_mqtt_connect api, connect [1637138922.457][LK-032A] mqtt host: iot-*******.iot-as-mqtt.unify.aliyuncs.com [1637138922.457][LK-0317] user name: mqtt*******************Dih [1637138922.457][LK-0318] password: 2AE57DEEAADE00***********44BA8F5622507D1EFD6CB7687 success to establish tcp, fd=5 local port: 57**9 [1637138922.488][LK-1000] establish mbedtls connection with server(host='io***********t-as-mqtt.unify.aliyuncs.com', port=[1883]) [1637138922.507][LK-1000] success to establish mbedtls connection, (cost 45247 bytes in total, max used 47983 bytes) [1637138922.654][LK-0313] MQTT connect success in 197 ms AIOT_MQTTEVT_CONNECT
MQTT UPLOAD创建会话并进行subtopic订阅。
[1637138922.654][LK-2013] aiot_mqtt_upload_init init [1637138922.654][LK-0309] sub: /sys/a1*****bDih/mqttupload0001/thing/file/upload/mqtt/init_reply [1637138922.654][LK-0309] sub: /sys/a1*****bDih/mqttupload0001/thing/file/upload/mqtt/send_reply [1637138922.654][LK-0309] sub: /sys/a1*****bDih/mqttupload0001/thing/file/upload/mqtt/cancel_reply
请求物联网平台进行文件上传。
Demo演示请求上传文件,文件长度:-1,文件名:mqttuploadfile.txt,上传模式覆盖模式:overwrite。
[LK-030A] > 7B 22 69 64 22 3A 22 31 22 2C 22 70 61 72 61 6D | {"id":"1","param [LK-030A] > 73 22 3A 7B 22 66 69 6C 65 4E 61 6D 65 22 3A 22 | s":{"fileName":" [LK-030A] > 6D 71 74 74 75 70 6C 6F 61 64 66 69 6C 65 2E 74 | mqttuploadfile.t [LK-030A] > 78 74 22 2C 22 66 69 6C 65 53 69 7A 65 22 3A 2D | xt","fileSize":- [LK-030A] > 31 2C 22 63 6F 6E 66 6C 69 63 74 53 74 72 61 74 | 1,"conflictStart [LK-030A] > 65 67 79 22 3A 22 6F 76 65 72 77 72 69 74 65 22 | egy":"overwrite" [LK-030A] > 2C 22 69 6E 69 74 55 69 64 22 3A 22 31 36 34 34 | ,"initUid":"1644 [LK-030A] > 32 31 39 33 37 33 33 33 39 37 35 22 7D 7D | 2193********5"}} heartbeat response
收到物联网平台应答消息。
[1644219373.441][LK-0309] pub: /sys/a1P*****Dih/mqttupload0002/thing/file/upload/mqtt/init_reply [LK-030A] < 7B 22 63 6F 64 65 22 3A 32 30 30 2C 22 64 61 74 | {"code":200,"dat [LK-030A] < 61 22 3A 7B 22 66 69 6C 65 4E 61 6D 65 22 3A 22 | a":{"fileName":" [LK-030A] < 6D 71 74 74 75 70 6C 6F 61 64 66 69 6C 65 2E 74 | mqttuploadfile.t [LK-030A] < 78 74 22 2C 22 75 70 6C 6F 61 64 49 64 22 3A 22 | xt","uploadId":" [LK-030A] < 64 4E 48 79 33 35 4F 55 4A 66 64 4A 49 31 6B 31 | dNH*********dJI1k1 [LK-030A] < 6D 31 62 55 30 31 30 32 30 30 22 2C 22 6F 66 66 | m1bU010200","off [LK-030A] < 73 65 74 22 3A 30 2C 22 66 69 6C 65 53 69 7A 65 | set":0,"fileSize [LK-030A] < 22 3A 2D 31 2C 22 63 6F 6E 66 6C 69 63 74 53 74 | ":-1,"conflictSt [LK-030A] < 72 61 74 65 67 79 22 3A 22 6F 76 65 72 77 72 69 | rategy":"overwri [LK-030A] < 74 65 22 7D 2C 22 69 64 22 3A 22 31 22 2C 22 6D | te"},"id":"1","m [LK-030A] < 65 73 73 61 67 65 22 3A 22 73 75 63 63 65 73 73 | essage":"success [LK-030A] < 22 7D | "} [1644219373.441][LK-210B] Recv file name:mqttuploadfile.txt
分送分片文件。
循环调用aiot_mqtt_upload_send_data函数进行数据上传,该函数是同步发送接口,等待应答STATE_SUCCESS后才能发起上传新的一包数据。
[1644219373.462][LK-0309] pub: /sys/a1*****Dih/mqttupload0002/thing/file/upload/mqtt/send [LK-030A] > 00 67 7B 22 69 64 22 3A 22 32 22 2C 22 70 61 72 | .g{"id":"2","par [LK-030A] > 61 6D 73 22 3A 7B 22 75 70 6C 6F 61 64 49 64 22 | ams":{"uploadId" [LK-030A] > 3A 22 64 4E 48 79 33 35 4F 55 4A 66 64 4A 49 31 | :"dN*****(fdJI1 [LK-030A] > 6B 31 6D 31 62 55 30 31 30 32 30 30 22 2C 22 6F | k1m1bU010200","o [LK-030A] > 66 66 73 65 74 22 3A 30 2C 22 62 53 69 7A 65 22 | ffset":0,"bSize" [LK-030A] > 3A 32 35 36 2C 22 69 73 43 6F 6D 70 6C 65 74 65 | :256,"isComplete [LK-030A] > 22 3A 66 61 6C 73 65 7D 7D 00 00 00 00 00 00 00 | ":false}}....... [LK-030A] > 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 | ................ [LK-030A] > 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 | ................ [LK-030A] > 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 | ................ [LK-030A] > 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 | ................ ...... ...... ......
结束文件上传。
上传最后一包数据时,您需要将
isComplete
字段设置为1,通知物联网平台这是最后一包上传数据,完成文件上传。[1644219373.734][LK-0309] pub: /sys/a1*****Dih/mqttupload0002/thing/file/upload/mqtt/send [LK-030A] > 00 68 7B 22 69 64 22 3A 22 35 22 2C 22 70 61 72 | .h{"id":"5","par [LK-030A] > 61 6D 73 22 3A 7B 22 75 70 6C 6F 61 64 49 64 22 | ams":{"uploadId" [LK-030A] > 3A 22 64 4E 48 79 33 35 4F 55 4A 66 64 4A 49 31 | :"dN*******dJI1 [LK-030A] > 6B 31 6D 31 62 55 30 31 30 32 30 30 22 2C 22 6F | k1m1bU010200","o [LK-030A] > 66 66 73 65 74 22 3A 37 36 38 2C 22 62 53 69 7A | ffset":768,"bSiz [LK-030A] > 65 22 3A 32 35 36 2C 22 69 73 43 6F 6D 70 6C 65 | e":256,"isComple [LK-030A] > 74 65 22 3A 74 72 75 65 7D 7D 00 00 00 00 00 00 | te":true}}...... [LK-030A] > 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 | ................ ...... ...... ...... [1644219373.822][LK-0309] pub: /sys/a1*******Dih/mqttupload0002/thing/file/upload/mqtt/send_reply [LK-030A] < 7B 22 63 6F 64 65 22 3A 32 30 30 2C 22 64 61 74 | {"code":200,"dat [LK-030A] < 61 22 3A 7B 22 66 69 63 4D 6F 64 65 22 3A 22 63 | a":{"ficMode":"c [LK-030A] < 72 63 36 34 22 2C 22 75 70 6C 6F 61 64 49 64 22 | rc64","uploadId" [LK-030A] < 3A 22 64 4E 48 79 33 35 4F 55 4A 66 64 4A 49 31 | :"dNHy*******JI1 [LK-030A] < 6B 31 6D 31 62 55 30 31 30 32 30 30 22 2C 22 6F | k1m1bU010200","o [LK-030A] < 66 66 73 65 74 22 3A 37 36 38 2C 22 63 6F 6D 70 | ffset":768,"comp [LK-030A] < 6C 65 74 65 22 3A 74 72 75 65 2C 22 66 69 63 56 | lete":true,"ficV [LK-030A] < 61 6C 75 65 53 65 72 76 65 72 22 3A 22 63 33 37 | alueServer":"c37 [LK-030A] < 38 36 33 39 37 32 30 36 39 32 37 30 63 22 2C 22 | 863972069270c"," [LK-030A] < 62 53 69 7A 65 22 3A 32 35 36 7D 2C 22 69 64 22 | bSize":256},"id" [LK-030A] < 3A 22 35 22 2C 22 6D 65 73 73 61 67 65 22 3A 22 | :"5","message":" [LK-030A] < 73 75 63 63 65 73 73 22 7D | success"} [1644219373.822][LK-2125] Complete upload total size:1024 upload_file_len:1024 [1644219373.830][LK-2100] Finish upload,Deleted upload task.MQTT Upload file(mqttuploadfile.txt) uploadid(dNHy35OU*********1bU010200) success.
上传完成后销毁句柄释放资源。
[1644219378.827][LK-210F] aiot_mqtt_upload_deinit [1644219378.827][LK-0309] unsub: /sys/a1P8*****ih/mqttupload0002/thing/file/upload/mqtt/init_reply [1644219378.827][LK-0309] unsub: /sys/a1P8*****ih/mqttupload0002/thing/file/upload/mqtt/send_reply [1644219378.827][LK-0309] unsub: /sys/a1P8*****ih/mqttupload0002/thing/file/upload/mqtt/cancel_reply [1644219378.827][LK-1000] adapter_network_deinit
文档内容是否对您有帮助?