文档

MNS消息队列消费模式

更新时间:

本文为您介绍MNS消息队列消费模式接收回执消息的示例代码。

相关下载

语言

说明

Java

下载Java MNS SDK安装包拉取消息。

Node.js

下载原Node.js MNS SDK安装包拉取消息。

C#

依赖.NET语言的阿里云SDK及dybaseapi,其中dybaseapi包用于拉取MNS消息。

PHP

依赖OpenAPI PHP Client包和OpenAPI PHP SDK包拉取MNS消息。

Python

依赖Python语言的阿里云SDK核心库及dybaseapi,其中dybaseapi包用于拉取MNS消息。

Go

下载Go SDK安装包拉取MNS消息。

注意事项

使用示例时,您需要注意如下信息:

  • 配置AccessKey ID和AccessKey Secret信息。

    说明

    为避免在代码中硬编码访问密钥(AccessKey)而造成泄露,请通过配置环境变量的方式获取AccessKey。环境变量配置方法,请参见在Linux、macOS和Windows系统配置环境变量

    本文以环境变量名VMS_AK_ENVVMS_SK_ENV为例,进行后续操作。通过环境变量获取AccessKey的代码示例如下:

    String accessKeyId = System.getenv("VMS_AK_ENV");
    String accessKeySecret = System.getenv("VMS_SK_ENV");
  • messageType替换为您需要的消息类型,如订阅呼叫记录消息(VoiceReport)。语音服务支持的回执消息类型,请参见回执消息简介与配置流程

    String messageType="messageType";
  • queueName是MNS消息队列名称,您可以在语音服务控制台通用设置>订阅回执消息页面查看。

    String queueName="queueName";
  • 本文示例代码中您获取到的上行信息内容由dealMessage方法处理,您可以将需要的上行信息内容业务逻辑写在该方法中。arg代表回执消息体参数,可填写的值如:start_time、end_time、duration、status_code等,建议您根据具体场景填写参数。

    // 根据文档中具体的消息格式进行消息体的解析
    String arg = (String) contentMap.get("arg");
    // 编写您的业务代码

示例

package com.alicom.mns.sample;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


import com.alicom.mns.tools.DefaultAlicomMessagePuller;
import com.alicom.mns.tools.MessageListener;
import com.aliyun.mns.model.Message;
import com.google.gson.Gson;

/**
 * 只能用于接收云通信的消息,不能用于接收其他业务的消息
 * MNS消息接收demo
 */

public class ReceiveDemo {

    private static Log logger=LogFactory.getLog(ReceiveDemo.class);

    static class MyMessageListener implements MessageListener{
        private Gson gson=new Gson();

        @Override
        public boolean dealMessage(Message message) {

            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //消息的几个关键值
            System.out.println("message receiver time from mns:" + format.format(new Date()));
            System.out.println("message handle: " + message.getReceiptHandle());
            System.out.println("message body: " + message.getMessageBodyAsString());
            System.out.println("message id: " + message.getMessageId());
            System.out.println("message dequeue count:" + message.getDequeueCount());
            System.out.println("Thread:" + Thread.currentThread().getName());
            try{
                Map<String,Object> contentMap=gson.fromJson(message.getMessageBodyAsString(), HashMap.class);

                // 根据文档中具体的消息格式进行消息体的解析
                String arg = (String) contentMap.get("arg");

                // 这里开始编写您的业务代码

            }catch(com.google.gson.JsonSyntaxException e){
                logger.error("error_json_format:"+message.getMessageBodyAsString(),e);
                //理论上不会出现格式错误的情况,所以遇见格式错误的消息,只能先delete,否则重新推送也会一直报错
                return true;
            } catch (Throwable e) {
                //您自己的代码部分导致的异常,应该return false,这样消息不会被delete掉,而会根据策略进行重推
                return false;
            }

            //消息处理成功,返回true, SDK将调用MNS的delete方法将消息从队列中删除掉
            return true;
        }

    }

    public static void main(String[] args) throws Exception, ParseException {

        DefaultAlicomMessagePuller puller=new DefaultAlicomMessagePuller();

        //设置异步线程池大小及任务队列的大小,还有无数据线程休眠时间
        puller.setConsumeMinThreadSize(6);
        puller.setConsumeMaxThreadSize(16);
        puller.setThreadQueueSize(200);
        puller.setPullMsgThreadSize(1);
        //和服务端联调问题时开启,平时无需开启,消耗性能
        puller.openDebugLog(false);

        // 从本地环境变量获取AccessKey ID和AccessKey Secret信息
        String accessKeyId = System.getenv("VMS_AK_ENV");
        String accessKeySecret = System.getenv("VMS_SK_ENV");

        /*
        *  将messageType和queueName替换成您需要的消息类型名称和对应的队列名称
        *云通信产品下所有的回执消息类型:
        *1:呼叫记录:VoiceReport
        *2:呼叫中间状态:VoiceCallReport
        *3:录音记录消息:VoiceRecordReport
        *4:ASR实时消息:VoiceRTASRReport
        */
         //此处应该替换成相应产品的消息类型
        String messageType="messageType";
        //在云通信页面开通相应业务消息后,就能在页面上获得对应的queueName,格式类似Alicom-Queue-******-VoiceReport
        String queueName="queueName"; 
        puller.startReceiveMsg(accessKeyId,accessKeySecret,messageType,queueName,new MyMessageListener());
    }
}
using Aliyun.Acs.Core;
using Aliyun.Acs.Core.Profile;
using Aliyun.Acs.Core.Exceptions;
using Aliyun.Acs.Dysmsapi.Model.V20170525;
using Aliyun.Acs.Dysmsapi.MNS;
using Aliyun.Acs.Dysmsapi.MNS.Model;
using System.Threading;
using System.Collections.Generic;
using System.Text;

using System;

using QueryTokenForMnsQueue_MessageTokenDTO = Aliyun.Acs.Dysmsapi.Model.V20170525.QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO;

namespace CommonRpc
{
    class Program
    {
        static void Main(string[] args)
        {
                
            // 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
            // 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
            // 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明,来实现API访问的身份验证。
            IClientProfile profile = DefaultProfile.GetProfile("cn-hangzhou", Environment.GetEnvironmentVariable("VMS_AK_ENV"), Environment.GetEnvironmentVariable("VMS_SK_ENV")); // todo: 补充AK信息
            DefaultProfile.AddEndpoint("cn-hangzhou", "cn-hangzhou", "Dysmsapi", "dysmsapi.aliyuncs.com");
            DefaultAcsClient client = new DefaultAcsClient(profile);

            String queueName = "<QueueName>"; // todo: 补充队列名称
            String messageType = "<MessageType>"; // todo: 补充消息类型

            int maxThread = 2;

            for (int i = 0; i < maxThread; i++)
            {
                TestTask testTask = new TestTask("PullMessageTask-thread-" + i, messageType, queueName, client);
                Thread t = new Thread(new ThreadStart(testTask.Handle));
                //启动线程
                t.Start();
            }
            Console.ReadKey();

            try
            {
                QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest
                {
                    MessageType = messageType,
                    QueueName = queueName
                };

                QueryTokenForMnsQueueResponse response = client.GetAcsResponse(request);
                Console.WriteLine(response.MessageTokenDTO.SecurityToken);
            }
            catch (ServerException ex)
            {
                Console.WriteLine(ex.ToString());
            }
            catch (ClientException ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }
    }

    class TestTask
    {
        object o = new object();
        const int sleepTime = 50;
        const long bufferTime = 60 * 2; // 过期时间小于2分钟则重新获取,防止服务器时间误差
        const String mnsAccountEndpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/"; // 阿里通信消息的endpoint,固定

        public String name { get; private set; }
        public String messageType { get; private set; }
        public String QueueName { get; private set; }
        public int TaskID { get; private set; }
        public IAcsClient AcsClient { get; private set; }

        public TestTask(String name, String messageType, String queueName, IAcsClient acsClient)
        {
            this.name = name;
            this.messageType = messageType;
            this.QueueName = queueName;
            this.AcsClient = acsClient;
        }

        readonly Dictionary<string, QueryTokenForMnsQueue_MessageTokenDTO> tokenMap = new Dictionary<string, QueryTokenForMnsQueue_MessageTokenDTO>();
        readonly Dictionary<string, Queue> queueMap = new Dictionary<string, Queue>();

        public QueryTokenForMnsQueue_MessageTokenDTO GetTokenByMessageType(IAcsClient acsClient, String messageType)
        {
            QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest
            {
                MessageType = messageType
            };
            QueryTokenForMnsQueueResponse queryTokenForMnsQueueResponse = acsClient.GetAcsResponse(request);
            QueryTokenForMnsQueue_MessageTokenDTO token = queryTokenForMnsQueueResponse.MessageTokenDTO;
            return token;
        }

        /// 处理消息
        public void Handle()
        {
            while (true)
            {
                try
                {
                    QueryTokenForMnsQueue_MessageTokenDTO token = null;
                    Queue queue = null;
                    lock (o)
                    {
                        if (tokenMap.ContainsKey(messageType))
                        {
                            token = tokenMap[messageType];
                        }

                        if (queueMap.ContainsKey(QueueName))
                        {
                            queue = queueMap[QueueName];
                        }

                        TimeSpan ts = new TimeSpan(0);

                        if (token != null)
                        {
                            DateTime b = Convert.ToDateTime(token.ExpireTime);
                            DateTime c = Convert.ToDateTime(DateTime.Now);
                            ts = b - c;
                        }

                        if (token == null || ts.TotalSeconds < bufferTime || queue == null)
                        {
                            token = GetTokenByMessageType(AcsClient, messageType);
                            IMNS client = new MNSClient(token.AccessKeyId, token.AccessKeySecret, mnsAccountEndpoint, token.SecurityToken);
                            queue = client.GetNativeQueue(QueueName);
                            if (tokenMap.ContainsKey(messageType))
                            {
                                tokenMap.Remove(messageType);
                            }
                            if (queueMap.ContainsKey(QueueName))
                            {
                                queueMap.Remove(QueueName);
                            }
                            tokenMap.Add(messageType, token);
                            queueMap.Add(QueueName, queue);
                        }
                    }

                    BatchReceiveMessageResponse batchReceiveMessageResponse = queue.BatchReceiveMessage(16);
                    List<Message> messages = batchReceiveMessageResponse.Messages;

                    for (int i = 0; i <= messages.Count - 1; i++)
                    {
                        try
                        {
                            byte[] outputb = Convert.FromBase64String(messages[i].Body);
                            string orgStr = Encoding.UTF8.GetString(outputb);
                            Console.WriteLine(orgStr);
                            // TODO 具体消费逻辑,待客户自己实现.
                            // 消费成功的前提下删除消息
                            // queue.DeleteMessage(messages[i].ReceiptHandle);
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(e.ToString());
                        }
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.ToString());
                }
                Thread.Sleep(sleepTime);
            }
        }
    }
}
<?php
use AlibabaCloud\Client\AlibabaCloud;
use AlibabaCloud\Client\Exception\ClientException;
use AlibabaCloud\Client\Exception\ServerException;
use AlibabaCloud\Dybaseapi\MNS\Requests\BatchReceiveMessage;
use AlibabaCloud\Dybaseapi\MNS\Requests\BatchDeleteMessage;

// 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
// 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
// 本示例以将AccessKey ID和AccessKey Secret保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里。
AlibabaCloud::accessKeyClient(getenv("VMS_AK_ENV"), getenv("VMS_SK_ENV"))
    ->regionId('cn-hangzhou')
    ->asGlobalClient();

$queueName = '<QueueName>'; // 队列名称,需要替换成您的队列名称
$messageType = '<MessageType>'; // 需要接收的消息类型,需要替换成您需要接收的消息类型,回执消息更多信息请参见回执消息简介与配置流程。

$response = null;
$token = null;
$i = 0;

do {
    try {
        if (null == $token || strtotime($token['ExpireTime']) - time() > 2 * 60) {
            $response = AlibabaCloud::rpcRequest()
                ->product('Dybaseapi')
                ->version('2017-05-25')
                ->action('QueryTokenForMnsQueue')
                ->method('POST')
                ->host("dybaseapi.aliyuncs.com")
                ->options([
                    'query' => [
                        'MessageType' => $messageType,
                        'QueueName' => $queueName,
                    ],
                ])
                ->request()
                ->toArray();
        }

        $token = $response['MessageTokenDTO'];

        $mnsClient = new \AlibabaCloud\Dybaseapi\MNS\MnsClient(
            "http://1943695596114318.mns.cn-hangzhou.aliyuncs.com",
            $token['AccessKeyId'],
            $token['AccessKeySecret'],
            $token['SecurityToken']
        );
        $mnsRequest = new BatchReceiveMessage(10, 5);
        $mnsRequest->setQueueName($queueName);
        $mnsResponse = $mnsClient->sendRequest($mnsRequest);

        $receiptHandles = Array();
        foreach ($mnsResponse->Message as $message) {
            // 用户逻辑:
            // $receiptHandles[] = $message->ReceiptHandle; // 加入$receiptHandles数组中的记录将会被删除
            $messageBody = base64_decode($message->MessageBody); // base64解码后的JSON字符串
            print_r($messageBody . "\n");
        }

        if (count($receiptHandles) > 0) {
            $deleteRequest = new BatchDeleteMessage($queueName, $receiptHandles);
            $mnsClient->sendRequest($deleteRequest);
        }
    } catch (ClientException $e) {
        echo $e->getErrorMessage() . PHP_EOL;
    } catch (ServerException $e) {
        if ($e->getCode() == 404) {
            $i++;
        }
        echo $e->getErrorMessage() . PHP_EOL;
    }
} while ($i < 3);
#!/usr/bin/env python
# coding=utf8

import os
import time
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkdybaseapi.request.v20170525.QueryTokenForMnsQueueRequest import QueryTokenForMnsQueueRequest
from aliyunsdkcore.profile import region_provider
from datetime import datetime
from aliyunsdkdybaseapi.mns.account import Account
from aliyunsdkdybaseapi.mns.queue import *
from aliyunsdkdybaseapi.mns.mns_exception import *

try:
    import json
except ImportError:
    import simplejson as json


# TODO 需要替换成您需要接收的消息类型
message_type = "<MessageType>"
# TODO 需要替换成您的队列名称。在云通信页面开通相应业务消息后,就能在页面上获得对应的queueName
queue_name = "<QueueName>"

# 云通信固定的endpoint地址
endpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/"
# 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
# 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
# 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明,来实现API访问的身份验证。
acs_client = AcsClient(os.getenv("VMS_AK_ENV"), os.getenv("VMS_SK_ENV"), "cn-hangzhou")
region_provider.add_endpoint("Dybaseapi", "dybaseapi.aliyuncs.com", "cn-hangzhou")


# 云通信业务token存在失效时间,需动态更新。
class Token():
    def __init__(self):
        self.token = None
        self.tmp_access_id = None
        self.tmp_access_key = None
        self.expire_time = None

    def is_refresh(self):
        if self.expire_time is None:
            return 1
        # 失效时间与当前系统时间比较,提前2分钟刷新token
        now = datetime.now()
        expire = datetime.strptime(self.expire_time, "%Y-%m-%d %H:%M:%S")
        if expire <= now or (expire - now).seconds < 120:
            return 1
        return 0

    def refresh(self):
        print("start refresh token...")
        request = QueryTokenForMnsQueueRequest()
        request.set_MessageType(message_type)
        request.set_QueueName(queue_name)
        response = acs_client.do_action_with_exception(request)
        # print response
        if response is None:
            raise ServerException("GET_TOKEN_FAIL", "获取token时无响应")

        response_body = json.loads(response)

        if response_body.get("Code") != "OK":
            raise ServerException("GET_TOKEN_FAIL", "获取token失败")

        sts_token = response_body.get("MessageTokenDTO")
        self.tmp_access_key = sts_token.get("AccessKeySecret")
        self.tmp_access_id = sts_token.get("AccessKeyId")
        self.expire_time = sts_token.get("ExpireTime")
        self.token = sts_token.get("SecurityToken")

        print("finish refresh token...")


# 初始化 token, my_account, my_queue
token, my_account, my_queue = Token(), None, None

# 循环读取删除消息直到队列空
# receive message请求使用long polling方式,通过wait_seconds指定长轮询时间为3秒

## long polling 解析:
### 当队列中有消息时,请求立即返回;
### 当队列中没有消息时,请求在MNS服务器端挂3秒钟,在这期间,有消息写入队列,请求会立即返回消息,3秒后,请求返回队列没有消息;

wait_seconds = 3
print("%sReceive And Delete Message From Queue%s\nQueueName:%s\nWaitSeconds:%s\n" % (
    10 * "=", 10 * "=", queue_name, wait_seconds))

while True:
    receipt_handles = []
    # 读取消息
    try:
        # token过期是否需要刷新
        if token.is_refresh() == 1:
            # 刷新token
            token.refresh()
            if my_account:
                my_account.mns_client.close_connection()
                my_account = None

        if not my_account:
            my_account = Account(endpoint, token.tmp_access_id, token.tmp_access_key, token.token)
            my_queue = my_account.get_queue(queue_name)

        # 接收消息
        recv_msgs = my_queue.batch_receive_message(10, wait_seconds)

        for recv_msg in recv_msgs:
            # TODO 业务处理

            # receipt_handles.append(recv_msg.receipt_handle)
            print("Receive Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" % (
                recv_msg.receipt_handle, recv_msg.message_body, recv_msg.message_id))

    except MNSExceptionBase as e:
        if e.type == "QueueNotExist":
            print("Queue not exist, please create queue before receive message.")
            break
        elif e.type == "MessageNotExist":
            print("Queue is empty! sleep 10s")
            time.sleep(10)
            continue
        print("Receive Message Fail! Exception:%s\n" % e)
        break

    # 删除消息
    try:
        if len(receipt_handles) > 0:
            # my_queue.batch_delete_message(receipt_handles)
            print("Delete Message Succeed!  ReceiptHandles:%s" % receipt_handles)
    except MNSExceptionBase as e:
        print("Delete Message Fail! Exception:%s\n" % e)
package main

import (
    "os"
    "encoding/base64"
    "fmt"
    "github.com/aliyun/alibaba-cloud-sdk-go/sdk/endpoints"
    "github.com/aliyun/alibaba-cloud-sdk-go/services/dybaseapi"
    "github.com/aliyun/alibaba-cloud-sdk-go/services/dybaseapi/mns"
    "time"
)

const (
    mnsDomain = "1943695596114318.mns.cn-hangzhou.aliyuncs.com"
)

func main() {
    endpoints.AddEndpointMapping("cn-hangzhou", "Dybaseapi", "dybaseapi.aliyuncs.com")

    // 阿里云账号AccessKey ID拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户
    // 此处以把AccessKey ID和AccessKey Secret保存在环境变量为例说明。 您也可以根据业务需要,保存到配置文件里
    // 强烈建议不要把AccessKey ID和AccessKey Secret保存到代码里,会存在密钥泄漏风险
    // 创建client实例
    client, err := dybaseapi.NewClientWithAccessKey(
    "cn-hangzhou",           // 您的可用区ID
    os.Getenv("VMS_AK_ENV"), 
    os.Getenv("VMS_SK_ENV")) 
if err != nil {
    // 异常处理
    panic(err)
}

    queueName := "<QueueName>"//需要替换成您的队列名称
    messageType := "<MessageType>"//需要替换成您需要接收的消息类型,回执消息更多信息请参见回执消息简介与配置流程。


    var token *dybaseapi.MessageTokenDTO

    for {
        if token == nil || expireTime.Unix()-time.Now().Unix() < 2*60 {
            // 创建 API 请求并设置参数
            request := dybaseapi.CreateQueryTokenForMnsQueueRequest()
            request.MessageType = messageType
            request.QueueName = queueName
            // 发起请求并处理异常
            response, err := client.QueryTokenForMnsQueue(request)
            if err != nil {
                panic(err)
            }

            token = &response.MessageTokenDTO
        }
        expireTime, err = time.ParseInLocation("2006-01-02 15:04:05", token.ExpireTime, time.Local)
        if err != nil {
            panic(err)
        }

        mnsClient, err := mns.NewClientWithStsToken(
            "cn-hangzhou",
            token.AccessKeyId,
            token.AccessKeySecret,
            token.SecurityToken,
        )

        if err != nil {
            panic(err)
        }

        mnsRequest := mns.CreateBatchReceiveMessageRequest()
        mnsRequest.Domain = mnsDomain
        mnsRequest.QueueName = queueName
        mnsRequest.NumOfMessages = "10"
        mnsRequest.WaitSeconds = "5"

        mnsResponse, err := mnsClient.BatchReceiveMessage(mnsRequest)
        if err != nil {
            panic(err)
        }
        // fmt.Println(mnsResponse)

        receiptHandles := make([]string, len(mnsResponse.Message))
        for i, message := range mnsResponse.Message {
            messageBody, decodeErr := base64.StdEncoding.DecodeString(message.MessageBody)
            if decodeErr != nil {
                panic(decodeErr)
            }
            fmt.Println(string(messageBody))
            receiptHandles[i] = message.ReceiptHandle
        }
        if len(receiptHandles) > 0 {
            mnsDeleteRequest := mns.CreateBatchDeleteMessageRequest()
            mnsDeleteRequest.Domain = mnsDomain
            mnsDeleteRequest.QueueName = queueName
            mnsDeleteRequest.SetReceiptHandles(receiptHandles)
            //_, err = mnsClient.BatchDeleteMessage(mnsDeleteRequest) // 取消注释将删除队列中的消息
            if err != nil {
                panic(err)
            }
        }
    }
}