MAL,是MQTT Adapter Layer的简称。MAL组件将外部扩展模组中提供的MQTT协议,通过AT或其他命令的方式,为用户转换为AliOS Things系统中提供的统一MQTT协议API,提高用户应用程序的可移植性和硬件无关性。下图是AliOS Things中MAL套件的架构示意图。
其中,组件包括:
- MAL Core:由AliOS Things提供,MAL核心组件。主要包括MQTT连接管理、数据缓存、协议转换等功能,对上提供MQTT API接口服务,对下提供统一的HAL接口规范(可以对接到不同厂商的AT模组)。
- MAL Driver:驱动,部分由AliOS Things提供,其他由用户自己提供。MAL驱动模块基于具体型号的通信模组提供的AT命令,实现MAL规范的HAL接口功能。
API包括:
- MQTT API:这一层接口提供MQTT标准接口,如subscribe、unsubscribe、publish等。
- MAL HAL API:这一层接口定义MAL核心模块与不同厂商模组驱动之间的统一界面。这一层HAL的对接是模组驱动接入中的主要工作。
API列表
名称 | 说明 |
IOT_MQTT_Construct | 云端建立MQTT连接 |
IOT_MQTT_Destroy | 销毁指定MQTT连接并释放资源 |
IOT_MQTT_CheckStateNormal | 获取当前MQTT连接状态 |
IOT_MQTT_Subscribe | 向云端订阅指定的MQTT Topic |
IOT_MQTT_Subscribe_Sync | 向云端订阅指定的MQTT Topic,该接口为同步接口 |
IOT_MQTT_Unsubscribe | 向云端取消订阅指定的topic |
IOT_MQTT_Publish | 向指定topic推送消息 |
IOT_MQTT_Publish_Simple | 向指定topic推送消息,MQTT句柄可为NULL |
mal_init | MAL模块初始化,包括初始化底层驱动模块。 |
mal_add_dev | 配置驱动参数,例如串口通信串口号、波特率等。 |
使用
添加该组件




头文件
对外头文件代码位于include/network/mal
,包括
mal.h
hal_mal.h
使用时只需包含mal.h,hal_mal.h在对接时使用
#include "mal/mal.h"
使用示例
pclient = IOT_MQTT_Construct(&mqtt_params);
if (NULL == pclient) {
EXAMPLE_TRACE("MQTT construct failed");
return -1;
}
res = IOT_MQTT_Subscribe_Sync(pclient, topic, IOTX_MQTT_QOS0, example_message_arrive, NULL);
if (res < 0) {
EXAMPLE_TRACE("subscribe failed");
HAL_Free(topic);
return -1;
}
while (1) {
int rc = IOT_MQTT_Publish(pclient, TOPIC_UPDATE, &topic_msg);
if (rc < 0) {
EXAMPLE_TRACE("IOT_MQTT_Publish fail, ret=%d", rc);
}
IOT_MQTT_Yield(pclient, 200);
}
更详细的例子请参考application/example/example_legacy/mal_app/
API 详情
IOT_MQTT_Construct
原型
void *IOT_MQTT_Construct(iotx_mqtt_param_t *pInitParams)
接口说明
与云端建立MQTT连接, 入参pInitParams
为NULL
时将会使用默认参数建连。
参数说明
参数 | 数据类型 | 方向 | 说明 |
---|---|---|---|
pInitParams | iotx_mqtt_param_t * | 输入 | MQTT初始化参数,填写NULL将以默认参数建连 |
返回值说明
值 | 说明 |
---|---|
NULL | 失败 |
非NULL | MQTT句柄 |
接口示例
iotx_mqtt_param_t mqtt_params;
iotx_sign_mqtt_t sign_mqtt;
/* 生成签名 */
gen_aliyun_mqtt_sign(&sign_mqtt);
/* 配置MQTT连接参数 */
mqtt_params.host = sign_mqtt.hostname;
mqtt_params.port = sign_mqtt.port;
mqtt_params.client_id = sign_mqtt.clientid;
mqtt_params.username = sign_mqtt.username;
mqtt_params.password = sign_mqtt.password;
mqtt_params.request_timeout_ms = 2000;
mqtt_params.clean_session = 0;
mqtt_params.keepalive_interval_ms = 60000;
mqtt_params.write_buf_size = 1024;
mqtt_params.read_buf_size = 1024;
mqtt_params.handle_event.h_fp = example_event_handle;
/* 与云端建立MQTT连接 */
pclient = IOT_MQTT_Construct(&mqtt_params);
if (NULL == pclient) {
return -1;
}
IOT_MQTT_Destroy
原型
int IOT_MQTT_Destroy(void **phandle);
接口说明
销毁指定MQTT连接并释放资源
参数说明
参数 | 数据类型 | 方向 | 说明 |
---|---|---|---|
phandle | void ** | 输入 | MQTT句柄,可为NULL |
返回值说明
值 | 说明 |
---|---|
0 | 成功 |
< 0 | 失败 |
接口示例
/* 与云端建立MQTT连接 * /
pclient = IOT_MQTT_Construct(&mqtt_params);
/* 销毁指定MQTT连接并释放资源 * /
IOT_MQTT_Destroy(&pclient);
IOT_MQTT_CheckStateNormal
原型
int IOT_MQTT_CheckStateNormal(void *handle);
接口说明
获取当前MQTT连接状态
参数说明
参数 | 数据类型 | 方向 | 说明 |
---|---|---|---|
handle | void * | 输入 | MQTT句柄,可为NULL |
返回值说明
值 | 说明 |
---|---|
0 | 未连接 |
1 | 已连接 |
接口示例
/* 获取当前MQTT连接状态 */
int state = IOT_MQTT_CheckStateNormal(pclient);
if (1 == state) {
LOG("MQTT connected");
} else {
LOG("MQTT disconnected");
}
IOT_MQTT_Yield
原型
int IOT_MQTT_Yield(void *handle, int timeout_ms);
接口说明
用于接收网络报文并将消息分发到用户的回调函数中
参数说明
参数 | 数据类型 | 方向 | 说明 |
---|---|---|---|
handle | void * | 输入 | MQTT句柄,可为NULL |
timeout_ms | int | 输入 | 尝试接收报文的超时时间 |
返回值说明
值 | 说明 |
---|---|
0 | 成功 |
接口示例
while (1) {
/* 周期上报消息 */
if (0 == loop_cnt % 20) {
example_publish(pclient);
}
/* 接收网络报文并将消息分发到用户的回调函数 * /
IOT_MQTT_Yield(pclient, 200);
loop_cnt += 1;
}
IOT_MQTT_Subscribe
原型
int IOT_MQTT_Subscribe(void *handle,
const char *topic_filter,
iotx_mqtt_qos_t qos,
iotx_mqtt_event_handle_func_fpt topic_handle_func,
void *pcontext);
接口说明
向云端订阅指定的MQTT Topic
参数说明
参数 | 数据类型 | 方向 | 说明 |
---|---|---|---|
handle | void * | 输入 | MQTT句柄,可为NULL |
topic_filter | const char * | 输入 | 需要订阅的topic |
qos | iotx_mqtt_qos_t | 输入 | 采用的QoS策略 |
topic_handle_func | iotx_mqtt_event_handle_func_fpt | 输入 | 用于接收MQTT消息的回调函数 |
pcontext | void * | 输入 | 用户Context, 会通过回调函数送回 |
返回值说明
值 | 说明 |
---|---|
0 | 成功 |
< 0 | 失败 |
接口示例
const char *fmt = "/%s/%s/user/get";
char *topic = NULL;
int topic_len = 0;
topic_len = strlen(fmt) + strlen(DEMO_PRODUCT_KEY) + strlen(DEMO_DEVICE_NAME) + 1;
topic = HAL_Malloc(topic_len);
memset(topic, 0, topic_len);
/* 拼接主题 */
HAL_Snprintf(topic, topic_len, fmt, DEMO_PRODUCT_KEY, DEMO_DEVICE_NAME);
/* 订阅主题 */
res = IOT_MQTT_Subscribe_Sync(handle, topic, IOTX_MQTT_QOS0, example_message_arrive, NULL);
if (res < 0) {
EXAMPLE_TRACE("subscribe failed");
HAL_Free(topic);
return -1;
}
HAL_Free(topic);
IOT_MQTT_Subscribe_Sync
原型
int IOT_MQTT_Subscribe_Sync(void *handle,
const char *topic_filter,
iotx_mqtt_qos_t qos,
iotx_mqtt_event_handle_func_fpt topic_handle_func,
void *pcontext,
int timeout_ms);
接口说明
向云端订阅指定的MQTT Topic, 该接口为同步接口
参数说明
参数 | 数据类型 | 方向 | 说明 |
---|---|---|---|
handle | void * | 输入 | MQTT句柄,可为NULL |
topic_filter | const char * | 输入 | 需要订阅的topic |
qos | iotx_mqtt_qos_t | 输入 | 采用的QoS策略 |
topic_handle_func | iotx_mqtt_event_handle_func_fpt | 输入 | 用于接收MQTT消息的回调函数 |
pcontext | void * | 输入 | 用户Context, 会通过回调函数送回 |
timeout_ms | int | 输入 | 该同步接口的超时时间 |
返回值说明
值 | 说明 |
---|---|
0 | 成功 |
< 0 | 失败 |
接口示例
const char *fmt = "/%s/%s/user/get";
char *topic = NULL;
int topic_len = 0;
topic_len = strlen(fmt) + strlen(DEMO_PRODUCT_KEY) + strlen(DEMO_DEVICE_NAME) + 1;
topic = HAL_Malloc(topic_len);
memset(topic, 0, topic_len);
/* 拼接主题 */
HAL_Snprintf(topic, topic_len, fmt, DEMO_PRODUCT_KEY, DEMO_DEVICE_NAME);
/* 同步订阅,5000毫秒超时 */
res = IOT_MQTT_Subscribe_Sync(handle, topic, IOTX_MQTT_QOS0, example_message_arrive, NULL,5000);
if (res < 0) {
EXAMPLE_TRACE("subscribe failed");
HAL_Free(topic);
return -1;
}
HAL_Free(topic);
IOT_MQTT_Unsubscribe
原型
int IOT_MQTT_Unsubscribe(void *handle, const char *topic_filter);
接口说明
向云端取消订阅指定的topic
参数说明
参数 | 数据类型 | 方向 | 说明 |
---|---|---|---|
handle | void * | 输入 | MQTT句柄,可为NULL |
topic_filter | const char * | 输入 | 需要取消订阅的topic |
返回值说明
值 | 说明 |
---|---|
0 | 成功 |
< 0 | 失败 |
接口示例
/* 订阅主题 */
IOT_MQTT_Subscribe(pclient, TOPIC_GET, IOTX_MQTT_QOS1, _demo_message_arrive, NULL);
/* 取消订阅 */
IOT_MQTT_Unsubscribe(pclient, TOPIC_GET);
IOT_MQTT_Publish
原型
int IOT_MQTT_Publish(void *handle, const char *topic_name, iotx_mqtt_topic_info_pt topic_msg);
接口说明
向指定topic推送消息
参数说明
参数 | 数据类型 | 方向 | 说明 |
---|---|---|---|
handle | void * | 输入 | MQTT句柄,可为NULL |
topic_name | const char * | 输入 | 接收此推送消息的目标topic |
topic_msg | iotx_mqtt_topic_info_pt | 输入 | 需要推送的消息 |
返回值说明
值 | 说明 |
---|---|
> 0 | 成功(消息是QoS1时, 返回值就是这个上报报文的MQTT消息ID, 对应协议里的messageId )
|
0 | 成功(消息是QoS0时) |
< 0 | 失败 |
接口示例
iotx_mqtt_topic_info_t topic_msg;
/* 组装消息 */
memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
topic_msg.qos = IOTX_MQTT_QOS1;
topic_msg.retain = 0;
topic_msg.dup = 0;
topic_msg.payload = (void *)ptopic_info->payload;
topic_msg.payload_len = ptopic_info->payload_len;
/* 向主题发布消息 */
int rc = IOT_MQTT_Publish(pclient, TOPIC_UPDATE, &topic_msg);
if (rc < 0) {
EXAMPLE_TRACE("IOT_MQTT_Publish fail, ret=%d", rc);
}
IOT_MQTT_Publish_Simple
原型
int IOT_MQTT_Publish_Simple(void *handle, const char *topic_name, int qos, void *data, int len)
接口说明
向指定topic推送消息
参数说明
参数 | 数据类型 | 方向 | 说明 |
---|---|---|---|
handle | void * | 输入 | MQTT句柄,可为NULL |
topic_name | const char * | 输入 | 接收此推送消息的目标topic |
qos | int | 输入 | 采用的QoS策略 |
data | void * | 输入 | 需要发送的数据 |
len | int | 输入 | 数据长度 |
返回值说明
值 | 说明 |
---|---|
> 0 | 成功(消息是QoS1时, 返回值就是这个上报报文的MQTT消息ID, 对应协议里的messageId )
|
0 | 成功(消息是QoS0时) |
< 0 | 失败 |
接口示例
const char *fmt = "/%s/%s/user/get";
char *topic = NULL;
int topic_len = 0;
char *payload = "{\"message\":\"hello!\"}";
topic_len = strlen(fmt) + strlen(DEMO_PRODUCT_KEY) + strlen(DEMO_DEVICE_NAME) + 1;
topic = HAL_Malloc(topic_len);
/* 拼接topic字符串 */
HAL_Snprintf(topic, topic_len, fmt, DEMO_PRODUCT_KEY, DEMO_DEVICE_NAME);
/* 向指定topic推送消息 */
IOT_MQTT_Publish_Simple(0, topic, IOTX_MQTT_QOS0, payload, strlen(payload));
HAL_Free(topic);
mal_init
原型
int mal_init(void);
接口说明
MAL模块初始化,包括初始化底层驱动模块。
参数说明
无
返回值说明
值 | 说明 |
---|---|
0 | 成功 |
负值 | 失败 |
接口示例
mal_device_config_t data = {0};
data.uart_dev.port = 1;
data.uart_dev.config.baud_rate = 115200;
data.uart_dev.config.data_width = DATA_WIDTH_8BIT;
data.uart_dev.config.parity = NO_PARITY;
data.uart_dev.config.stop_bits = STOP_BITS_1;
data.uart_dev.config.flow_control = FLOW_CONTROL_DISABLED;
data.uart_dev.config.mode = MODE_TX_RX;
/* 配置驱动参数 */
if (mal_add_dev("sim800", &data) != 0) {
LOG("Failed to add MAL device!");
return -1;
}
/* 初始化MAL core */
mal_init();
mal_add_dev
原型
int mal_add_dev(char* driver_name, void* data);
接口说明
配置驱动参数,例如串口通信串口号、波特率等。
参数说明
参数 | 数据类型 | 方向 | 说明 |
---|---|---|---|
driver_name | char * | 输入 | 驱动名称 |
data | void* | 输入 | 配置参数指针 |
返回值说明
值 | 说明 |
---|---|
0 | 成功 |
负值 | 失败 |
接口示例
见mal_init示例
配置说明
MAL可配置项包括:
- 最大topic长度,默认128
- 最大消息长度,默认256
- 是否定义WITH_MAL宏,默认定义WITH_MAL宏
- 是否定义NO_TCPIP,默认定义NO_TCPIP宏
移植说明
MAL模块需要实现AT连接HAL、系统HAL。
AT连接HAL
MAL AT连接HAL函数定义见mal_op_t。
接口 | 描述 |
int (*add_dev)(void* data) |
添加设备 data:设备参数 返回值:0成功,-1失败 |
int (*init)(iotx_mqtt_param_t *pInitParams) |
初始化模组,使其处理准备连接状态 pInitParams:连接状态参数 返回值:0成功,-1失败 |
int (*connect)(char *proKey, char *devName, char *devSecret) |
与远端建立MQTT连接 proKey:product key devName:Device Name devSecret:Device Secret 返回值:0成功,-1失败 |
int (*subscribe)(const char *topic, int qos, unsigned int *mqtt_packet_id, int *mqtt_status, int timeout_ms) |
订阅主题 topic:主题字符串地址 qos:QoS等级 mqtt_packet_id:消息包ID mqtt_status:MQTT消息状态 timeout_ms:超时时间 返回值:0成功,-1失败 |
int (*unsubscribe)(const char *topic, unsigned int *mqtt_packet_id, int *mqtt_status) |
取消订阅主题 topic:主题字符串地址 mqtt_packet_id:数据包ID mqtt_status:MQTT消息状态 返回值:0成功,-1失败 |
int (*publish)(const char *topic, int qos, const char *message, unsigned int msg_len) |
发布消息 topic:主题字符串地址 qos:QoS等级 message:消息地址 msg_len:消息长度 返回值:0成功,-1失败 |
int (*conn_state)(void) |
查询连接状态 返回值参考iotx_mc_state_t |
int (*disconn)(void) |
断开连接 返回值:0成功,-1失败 |
int (*deinit)(void) |
去初始化 返回值:0成功,-1失败 |
int (*register_mqtt_data_input_cb)(mqtt_data_input_cb_t cb) |
注册MQTT数据接收回调 cb:回调函数 返回值:0成功,-1失败 |
系统wrapper
锁函数
函数名 | 说明 |
---|---|
void *HAL_MutexCreate(void) | 创建锁 |
void HAL_MutexLock(void *mutex) | 上锁 |
void HAL_MutexUnlock(void *mutex) | 开锁 |
void HAL_MutexDestroy(void *mutex) | 删除锁 |
信号量函数
函数名 | 说明 |
---|---|
void *HAL_SemaphoreCreate(void) | 创建信号量 |
void HAL_SemaphorePost(void *sem) | 释放信号量 |
int HAL_SemaphoreWait(void *sem, uint32_t timeout_ms) | 等待信号量 |
void HAL_SemaphoreDestroy(void *sem) | 删除信号量 |
时间函数
函数名 | 说明 |
---|---|
void HAL_SleepMs(uint32_t ms) | 睡眠函数 |
uint64_t HAL_UptimeMs(void) | 获取系统时间 |
内存函数
函数名 | 说明 |
---|---|
void *HAL_Malloc(uint32_t size) | 分配内存 |
void HAL_Free(void *ptr) | 释放内存 |
打印函数
函数名 | 说明 |
nt HAL_Snprintf(char *str, const int len, const char *fmt, ...) | 格式化打印函数 |
标准宏和结构体说明
结构体iotx_mqtt_param_t定义
typedef struct {
uint16_t port;
const char *host;
const char *client_id;
const char *username;
const char *password;
const char *pub_key;
const char *customize_info;
uint8_t clean_session;
uint32_t request_timeout_ms;
uint32_t keepalive_interval_ms;
uint32_t write_buf_size;
uint32_t read_buf_size;
iotx_mqtt_event_handle_t handle_event;
} iotx_mqtt_param_t, *iotx_mqtt_param_pt;
port
: 云端服务器端口host
: 云端服务器地址client_id
: MQTT客户端IDusername
: 登录MQTT服务器用户名password
: 登录MQTT服务器密码pub_key
: MQTT连接加密方式及密钥clean_session
: 选择是否使用MQTT协议的clean session特性request_timeout_ms
: MQTT消息发送的超时时间keepalive_interval_ms
: MQTT心跳超时时间write_buf_size
: MQTT消息发送buffer最大长度read_buf_size
: MQTT消息接收buffer最大长度handle_event
: 用户回调函数, 用与接收MQTT模块的事件信息customize_info
: 用户自定义上报信息,是以逗号为分隔符kv字符串,如用户的厂商信息,模组信息自定义字符串为"pid=123456,mid=abcd";
pInitParams结构体的成员配置为0或NULL时将使用内部默认参数
结构体mal_op_t定义
typedef struct mal_op_s {
struct mal_op_s * next; //<! Next mal_op_t structure
char *version; //<! Reserved for furture use.
char *name; //<! Drvier name
/**
* Add mal device .
*
* @param[in] data - device parameters
*
* @return 0 - success, -1 - failure
*/
int (*add_dev)(void* data);
/**
* Module low level init so that it's ready to setup mqtt connection.
*
* @param[in] pInitParams - connect parameters which are used to setup
* the MQTT connection.
* @return 0 - success, -1 - failure
*/
int (*init)(iotx_mqtt_param_t *pInitParams);
/**
* Start a socket connection via module.
*
* @param[in] proKey - product key.
* @param[in] devName - device name.
* @param[in] devSecret - device secret.
*
* @note: If the module does not accept the triple, ingore these parameters
*
* @return 0 - success, -1 - failure
*/
int (*connect)(char *proKey, char *devName, char *devSecret);
/**
* Subscribe a topic via the MQTT connection
*
* @param[in] topic - MQTT topic.
* @param[in] qos - QoS used.
* @param[out] mqtt_packet_id - MQTT packet ID.
* @param[out] mqtt_status - MQTT subscribe status.
* @param[out] timeout_ms - time out in milliseconds.
*
* @return 0 - success, -1 - failure
*/
int (*subscribe)(const char *topic, int qos, unsigned int *mqtt_packet_id, int *mqtt_status, int timeout_ms);
/**
* Unsubscribe a topic via the MQTT connection
*
* @param[in] topic - MQTT topic.
* @param[out] mqtt_packet_id - MQTT packet ID.
* @param[out] mqtt_status - MQTT unsubscribe status.
*
* @return 0 - success, -1 - failure
*/
int (*unsubscribe)(const char *topic, unsigned int *mqtt_packet_id, int *mqtt_status);
/**
* Publish MQTT message to the topic via module.
*
* @param[in] topic - MQTT topic
* @param[in] qos - quality of service used
* @param[in] message - message to be published
* @param[in] msg_len - message length
*
* @return 0 - success, -1 - failure
*/
int (*publish)(const char *topic, int qos, const char *message, unsigned int msg_len);
/**
* Query AT MQTT connection status
*
* @return MQTT status value.
* Refer to definition of iotx_mc_state_t in mal.h.
*/
int (*conn_state)(void);
/**
* Disconnect the MQTT connection via module.
*
* @return 0 - success, -1 - failure
*/
int (*disconn)(void);
/**
* Destroy MAL or exit low level state if necessary.
*
* @return 0 - success, -1 - failure
*/
int (*deinit)(void);
/**
* Register mqtt data input function
* Input data from mqtt module.
* This callback should be called when mqtt data is received from the module
* @param[in] topic - topic of the received message.
* @param[in] topic_len - length of topic.
* @param[in] message - received message.
* @param[in] meg_len - length of received message.
*
* @return 0 - success, -1 - failure
*/
int (*register_mqtt_data_input_cb)(mqtt_data_input_cb_t cb);
} mal_op_t;
枚举iotx_mc_state_t定义
/* State of MQTT client */
typedef enum {
IOTX_MC_STATE_INVALID = 0, /* MQTT in invalid state */
IOTX_MC_STATE_INITIALIZED = 1, /* MQTT in initializing state */
IOTX_MC_STATE_CONNECTED = 2, /* MQTT in connected state */
IOTX_MC_STATE_DISCONNECTED = 3, /* MQTT in disconnected state */
IOTX_MC_STATE_DISCONNECTED_RECONNECTING = 4, /* MQTT in reconnecting state */
IOTX_MC_STATE_CONNECT_BLOCK = 5 /* MQTT in connecting state when using async protocol stack */
} iotx_mc_state_t;
在文档使用中是否遇到以下问题
更多建议
匿名提交